Files
cameleer-server/docs/superpowers/plans/2026-03-31-clickhouse-phase4-remaining-tables.md
hsiegeln cb3ebfea7c
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 18s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
chore: rename cameleer3 to cameleer
Rename Java packages from com.cameleer3 to com.cameleer, module
directories from cameleer3-* to cameleer-*, and all references
throughout workflows, Dockerfiles, docs, migrations, and pom.xml.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 15:28:42 +02:00

245 lines
11 KiB
Markdown

# ClickHouse Phase 4: Remaining Tables — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Migrate route diagrams, agent events, and application logs from PostgreSQL/OpenSearch to ClickHouse.
**Architecture:** Three new ClickHouse stores implement existing interfaces. `ClickHouseDiagramStore` uses ReplacingMergeTree for content-hash dedup. `ClickHouseAgentEventRepository` uses MergeTree for append-only events. `ClickHouseLogStore` replaces `OpenSearchLogIndex` with SQL + ngram indexes. Feature flags control each store independently.
**Tech Stack:** ClickHouse 24.12, JdbcTemplate, Testcontainers
**Design Spec:** `docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md`
---
## File Structure
| File | Responsibility |
|------|----------------|
| `cameleer-server-app/.../resources/clickhouse/V5__route_diagrams.sql` | DDL for `route_diagrams` (ReplacingMergeTree) |
| `cameleer-server-app/.../resources/clickhouse/V6__agent_events.sql` | DDL for `agent_events` (MergeTree) |
| `cameleer-server-app/.../resources/clickhouse/V7__logs.sql` | DDL for `logs` (MergeTree with ngram indexes) |
| `cameleer-server-app/.../storage/ClickHouseDiagramStore.java` | DiagramStore impl for ClickHouse |
| `cameleer-server-app/.../storage/ClickHouseAgentEventRepository.java` | AgentEventRepository impl for ClickHouse |
| `cameleer-server-app/.../search/ClickHouseLogStore.java` | Replaces OpenSearchLogIndex |
| `cameleer-server-app/.../config/StorageBeanConfig.java` | Modified: add CH beans with feature flags |
| `cameleer-server-app/.../storage/PostgresDiagramStore.java` | Modified: add ConditionalOnProperty |
| `cameleer-server-app/.../storage/PostgresAgentEventRepository.java` | Modified: add ConditionalOnProperty |
| `cameleer-server-app/.../search/OpenSearchLogIndex.java` | Modified: add ConditionalOnProperty |
| `cameleer-server-app/.../resources/application.yml` | Modified: add feature flags |
| `deploy/base/server.yaml` | Modified: add env vars |
---
### Task 1: DDL Scripts
**Files:**
- Create: `cameleer-server-app/src/main/resources/clickhouse/V5__route_diagrams.sql`
- Create: `cameleer-server-app/src/main/resources/clickhouse/V6__agent_events.sql`
- Create: `cameleer-server-app/src/main/resources/clickhouse/V7__logs.sql`
- [ ] **Step 1: Create route_diagrams DDL**
```sql
CREATE TABLE IF NOT EXISTS route_diagrams (
tenant_id LowCardinality(String) DEFAULT 'default',
content_hash String,
route_id LowCardinality(String),
agent_id LowCardinality(String),
application_name LowCardinality(String),
definition String,
created_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplacingMergeTree(created_at)
ORDER BY (tenant_id, content_hash)
SETTINGS index_granularity = 8192
```
- [ ] **Step 2: Create agent_events DDL**
```sql
CREATE TABLE IF NOT EXISTS agent_events (
tenant_id LowCardinality(String) DEFAULT 'default',
timestamp DateTime64(3) DEFAULT now64(3),
agent_id LowCardinality(String),
app_id LowCardinality(String),
event_type LowCardinality(String),
detail String DEFAULT ''
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, app_id, agent_id, timestamp)
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
```
- [ ] **Step 3: Create logs DDL**
```sql
CREATE TABLE IF NOT EXISTS logs (
tenant_id LowCardinality(String) DEFAULT 'default',
timestamp DateTime64(3),
application LowCardinality(String),
agent_id LowCardinality(String),
level LowCardinality(String),
logger_name LowCardinality(String) DEFAULT '',
message String,
thread_name LowCardinality(String) DEFAULT '',
stack_trace String DEFAULT '',
exchange_id String DEFAULT '',
mdc Map(String, String) DEFAULT map(),
INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_level level TYPE set(10) GRANULARITY 1
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, application, timestamp)
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192
```
- [ ] **Step 4: Compile and commit**
```bash
mvn clean compile -pl cameleer-server-app -f pom.xml
git commit -m "feat(clickhouse): add DDL for route_diagrams, agent_events, and logs tables"
```
---
### Task 2: ClickHouseDiagramStore
**Files:**
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseDiagramStore.java`
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseDiagramStoreIT.java`
Implements `DiagramStore` interface (5 methods). Read `PostgresDiagramStore.java` first and translate.
**Key differences from PG:**
- `INSERT INTO ... ON CONFLICT DO NOTHING` → just `INSERT INTO` (ReplacingMergeTree deduplicates by content_hash)
- `?::jsonb` → plain `?` (CH stores definition as String, not JSONB)
- `ORDER BY created_at DESC LIMIT 1``ORDER BY created_at DESC LIMIT 1` (same, but add `FINAL` for ReplacingMergeTree reads)
- `findProcessorRouteMapping`: PG uses `jsonb_array_elements()` — CH has no native JSON array functions. Instead, store the definition as a string and parse in Java, OR query `route_diagrams FINAL` and deserialize definitions application-side. **Recommended:** Fetch all definitions for the application, deserialize in Java, extract processor→route mappings. This is a small result set (one row per route).
- SHA-256 content hash computation stays in Java (same as PG store)
- Add `WHERE tenant_id = 'default'` to all queries
**Tests:**
- `store_insertsNewDiagram`
- `store_duplicateHashIgnored` (ReplacingMergeTree dedup after OPTIMIZE FINAL)
- `findByContentHash_returnsGraph`
- `findContentHashForRoute_returnsMostRecent`
- `findProcessorRouteMapping_extractsMapping`
---
### Task 3: ClickHouseAgentEventRepository
**Files:**
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepository.java`
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepositoryIT.java`
Implements `AgentEventRepository` interface (2 methods: `insert` + `query`).
**Key differences from PG:**
- No `BIGSERIAL id` — CH doesn't have auto-increment. `AgentEventRecord` has `long id` but set to 0 for CH rows.
- `INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail) VALUES (?, ?, ?, ?, ?)`
- `query` builds dynamic WHERE (same pattern as PG) with `ORDER BY timestamp DESC LIMIT ?`
- Add `WHERE tenant_id = 'default'`
**Tests:**
- `insert_writesEvent`
- `query_byAppId_filtersCorrectly`
- `query_byTimeRange_filtersCorrectly`
- `query_respectsLimit`
---
### Task 4: ClickHouseLogStore
**Files:**
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseLogStore.java`
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseLogStoreIT.java`
Replaces `OpenSearchLogIndex`. Must have the same public API:
- `search(application, agentId, level, query, exchangeId, from, to, limit)` → returns `List<LogEntryResponse>`
- `indexBatch(agentId, application, List<LogEntry> entries)` → batch INSERT into `logs`
**Key implementation:**
`indexBatch`: Batch INSERT with `Map(String, String)` for MDC column. Extract `camel.exchangeId` from MDC into top-level `exchange_id` column.
```sql
INSERT INTO logs (tenant_id, timestamp, application, agent_id, level, logger_name,
message, thread_name, stack_trace, exchange_id, mdc)
VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
```
MCE Map type: pass as `java.util.HashMap` — ClickHouse JDBC 0.9.7 supports native Map type (same as ClickHouseMetricsStore uses for tags).
`search`: Build WHERE clause from params. Use `LIKE '%query%'` for message text search (ngram-accelerated). Return `LogEntryResponse` records.
```sql
SELECT timestamp, level, logger_name, message, thread_name, stack_trace
FROM logs
WHERE tenant_id = 'default' AND application = ?
[AND agent_id = ?]
[AND level = ?]
[AND (exchange_id = ? OR mdc['camel.exchangeId'] = ?)]
[AND message LIKE ?]
[AND timestamp >= ?]
[AND timestamp <= ?]
ORDER BY timestamp DESC
LIMIT ?
```
**Tests:**
- `indexBatch_writesLogs`
- `search_byApplication_returnsLogs`
- `search_byLevel_filtersCorrectly`
- `search_byQuery_usesLikeSearch`
- `search_byExchangeId_matchesTopLevelAndMdc`
- `search_byTimeRange_filtersCorrectly`
- `indexBatch_storesMdc`
---
### Task 5: Feature Flag Wiring
**Files to modify:**
- `PostgresDiagramStore.java` — add `@ConditionalOnProperty(name = "cameleer.storage.diagrams", havingValue = "postgres")`
- `PostgresAgentEventRepository.java` — add `@ConditionalOnProperty(name = "cameleer.storage.events", havingValue = "postgres")`
- `OpenSearchLogIndex.java` — add `@ConditionalOnProperty(name = "cameleer.storage.logs", havingValue = "opensearch")`
- `StorageBeanConfig.java` — add CH diagram, event, and log store beans (all default to clickhouse with `matchIfMissing = true`)
- `application.yml` — add `diagrams`, `events`, `logs` flags under `cameleer.storage`
- `deploy/base/server.yaml` — add env vars
**Feature flags (all default to clickhouse):**
```yaml
cameleer:
storage:
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
search: ${CAMELEER_STORAGE_SEARCH:opensearch}
stats: ${CAMELEER_STORAGE_STATS:clickhouse}
diagrams: ${CAMELEER_STORAGE_DIAGRAMS:clickhouse}
events: ${CAMELEER_STORAGE_EVENTS:clickhouse}
logs: ${CAMELEER_STORAGE_LOGS:clickhouse}
```
**Important for LogStore wiring:** The `OpenSearchLogIndex` is a `@Repository` used directly by controllers (not via an interface). The `ClickHouseLogStore` must be injectable in the same way. Options:
- Extract a `LogIndex` interface with `search()` + `indexBatch()` methods, used by both controllers
- Or make `ClickHouseLogStore` extend/implement the same type
**Recommended:** Create a `LogIndex` interface in the core module with the two methods, have both `OpenSearchLogIndex` and `ClickHouseLogStore` implement it, and update `LogIngestionController` + `LogQueryController` to inject `LogIndex` instead of `OpenSearchLogIndex`.
---
## Verification Checklist
1. **Diagrams**: Store + retrieve RouteGraph via ClickHouse, verify content-hash dedup
2. **Events**: Insert + query events with time range and app/agent filters
3. **Logs**: Batch insert + search with all filter types (level, query, exchangeId, time range)
4. **Feature flags**: Each store independently switchable between PG/OS and CH
5. **Backward compat**: Default config uses ClickHouse for all Phase 4 stores
6. **CI**: `mvn clean verify -DskipITs` passes