Rename Java packages from com.cameleer3 to com.cameleer, module directories from cameleer3-* to cameleer-*, and all references throughout workflows, Dockerfiles, docs, migrations, and pom.xml. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
245 lines
11 KiB
Markdown
245 lines
11 KiB
Markdown
# ClickHouse Phase 4: Remaining Tables — Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Migrate route diagrams, agent events, and application logs from PostgreSQL/OpenSearch to ClickHouse.
|
|
|
|
**Architecture:** Three new ClickHouse stores implement existing interfaces. `ClickHouseDiagramStore` uses ReplacingMergeTree for content-hash dedup. `ClickHouseAgentEventRepository` uses MergeTree for append-only events. `ClickHouseLogStore` replaces `OpenSearchLogIndex` with SQL + ngram indexes. Feature flags control each store independently.
|
|
|
|
**Tech Stack:** ClickHouse 24.12, JdbcTemplate, Testcontainers
|
|
|
|
**Design Spec:** `docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md`
|
|
|
|
---
|
|
|
|
## File Structure
|
|
|
|
| File | Responsibility |
|
|
|------|----------------|
|
|
| `cameleer-server-app/.../resources/clickhouse/V5__route_diagrams.sql` | DDL for `route_diagrams` (ReplacingMergeTree) |
|
|
| `cameleer-server-app/.../resources/clickhouse/V6__agent_events.sql` | DDL for `agent_events` (MergeTree) |
|
|
| `cameleer-server-app/.../resources/clickhouse/V7__logs.sql` | DDL for `logs` (MergeTree with ngram indexes) |
|
|
| `cameleer-server-app/.../storage/ClickHouseDiagramStore.java` | DiagramStore impl for ClickHouse |
|
|
| `cameleer-server-app/.../storage/ClickHouseAgentEventRepository.java` | AgentEventRepository impl for ClickHouse |
|
|
| `cameleer-server-app/.../search/ClickHouseLogStore.java` | Replaces OpenSearchLogIndex |
|
|
| `cameleer-server-app/.../config/StorageBeanConfig.java` | Modified: add CH beans with feature flags |
|
|
| `cameleer-server-app/.../storage/PostgresDiagramStore.java` | Modified: add ConditionalOnProperty |
|
|
| `cameleer-server-app/.../storage/PostgresAgentEventRepository.java` | Modified: add ConditionalOnProperty |
|
|
| `cameleer-server-app/.../search/OpenSearchLogIndex.java` | Modified: add ConditionalOnProperty |
|
|
| `cameleer-server-app/.../resources/application.yml` | Modified: add feature flags |
|
|
| `deploy/base/server.yaml` | Modified: add env vars |
|
|
|
|
---
|
|
|
|
### Task 1: DDL Scripts
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/main/resources/clickhouse/V5__route_diagrams.sql`
|
|
- Create: `cameleer-server-app/src/main/resources/clickhouse/V6__agent_events.sql`
|
|
- Create: `cameleer-server-app/src/main/resources/clickhouse/V7__logs.sql`
|
|
|
|
- [ ] **Step 1: Create route_diagrams DDL**
|
|
|
|
```sql
|
|
CREATE TABLE IF NOT EXISTS route_diagrams (
|
|
tenant_id LowCardinality(String) DEFAULT 'default',
|
|
content_hash String,
|
|
route_id LowCardinality(String),
|
|
agent_id LowCardinality(String),
|
|
application_name LowCardinality(String),
|
|
definition String,
|
|
created_at DateTime64(3) DEFAULT now64(3)
|
|
)
|
|
ENGINE = ReplacingMergeTree(created_at)
|
|
ORDER BY (tenant_id, content_hash)
|
|
SETTINGS index_granularity = 8192
|
|
```
|
|
|
|
- [ ] **Step 2: Create agent_events DDL**
|
|
|
|
```sql
|
|
CREATE TABLE IF NOT EXISTS agent_events (
|
|
tenant_id LowCardinality(String) DEFAULT 'default',
|
|
timestamp DateTime64(3) DEFAULT now64(3),
|
|
agent_id LowCardinality(String),
|
|
app_id LowCardinality(String),
|
|
event_type LowCardinality(String),
|
|
detail String DEFAULT ''
|
|
)
|
|
ENGINE = MergeTree()
|
|
PARTITION BY (tenant_id, toYYYYMM(timestamp))
|
|
ORDER BY (tenant_id, app_id, agent_id, timestamp)
|
|
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
|
|
```
|
|
|
|
- [ ] **Step 3: Create logs DDL**
|
|
|
|
```sql
|
|
CREATE TABLE IF NOT EXISTS logs (
|
|
tenant_id LowCardinality(String) DEFAULT 'default',
|
|
timestamp DateTime64(3),
|
|
application LowCardinality(String),
|
|
agent_id LowCardinality(String),
|
|
level LowCardinality(String),
|
|
logger_name LowCardinality(String) DEFAULT '',
|
|
message String,
|
|
thread_name LowCardinality(String) DEFAULT '',
|
|
stack_trace String DEFAULT '',
|
|
exchange_id String DEFAULT '',
|
|
mdc Map(String, String) DEFAULT map(),
|
|
|
|
INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
|
INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
|
INDEX idx_level level TYPE set(10) GRANULARITY 1
|
|
)
|
|
ENGINE = MergeTree()
|
|
PARTITION BY (tenant_id, toYYYYMM(timestamp))
|
|
ORDER BY (tenant_id, application, timestamp)
|
|
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
|
|
SETTINGS index_granularity = 8192
|
|
```
|
|
|
|
- [ ] **Step 4: Compile and commit**
|
|
|
|
```bash
|
|
mvn clean compile -pl cameleer-server-app -f pom.xml
|
|
git commit -m "feat(clickhouse): add DDL for route_diagrams, agent_events, and logs tables"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 2: ClickHouseDiagramStore
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseDiagramStore.java`
|
|
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseDiagramStoreIT.java`
|
|
|
|
Implements `DiagramStore` interface (5 methods). Read `PostgresDiagramStore.java` first and translate.
|
|
|
|
**Key differences from PG:**
|
|
- `INSERT INTO ... ON CONFLICT DO NOTHING` → just `INSERT INTO` (ReplacingMergeTree deduplicates by content_hash)
|
|
- `?::jsonb` → plain `?` (CH stores definition as String, not JSONB)
|
|
- `ORDER BY created_at DESC LIMIT 1` → `ORDER BY created_at DESC LIMIT 1` (same, but add `FINAL` for ReplacingMergeTree reads)
|
|
- `findProcessorRouteMapping`: PG uses `jsonb_array_elements()` — CH has no native JSON array functions. Instead, store the definition as a string and parse in Java, OR query `route_diagrams FINAL` and deserialize definitions application-side. **Recommended:** Fetch all definitions for the application, deserialize in Java, extract processor→route mappings. This is a small result set (one row per route).
|
|
- SHA-256 content hash computation stays in Java (same as PG store)
|
|
- Add `WHERE tenant_id = 'default'` to all queries
|
|
|
|
**Tests:**
|
|
- `store_insertsNewDiagram`
|
|
- `store_duplicateHashIgnored` (ReplacingMergeTree dedup after OPTIMIZE FINAL)
|
|
- `findByContentHash_returnsGraph`
|
|
- `findContentHashForRoute_returnsMostRecent`
|
|
- `findProcessorRouteMapping_extractsMapping`
|
|
|
|
---
|
|
|
|
### Task 3: ClickHouseAgentEventRepository
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepository.java`
|
|
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepositoryIT.java`
|
|
|
|
Implements `AgentEventRepository` interface (2 methods: `insert` + `query`).
|
|
|
|
**Key differences from PG:**
|
|
- No `BIGSERIAL id` — CH doesn't have auto-increment. `AgentEventRecord` has `long id` but set to 0 for CH rows.
|
|
- `INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail) VALUES (?, ?, ?, ?, ?)`
|
|
- `query` builds dynamic WHERE (same pattern as PG) with `ORDER BY timestamp DESC LIMIT ?`
|
|
- Add `WHERE tenant_id = 'default'`
|
|
|
|
**Tests:**
|
|
- `insert_writesEvent`
|
|
- `query_byAppId_filtersCorrectly`
|
|
- `query_byTimeRange_filtersCorrectly`
|
|
- `query_respectsLimit`
|
|
|
|
---
|
|
|
|
### Task 4: ClickHouseLogStore
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseLogStore.java`
|
|
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseLogStoreIT.java`
|
|
|
|
Replaces `OpenSearchLogIndex`. Must have the same public API:
|
|
- `search(application, agentId, level, query, exchangeId, from, to, limit)` → returns `List<LogEntryResponse>`
|
|
- `indexBatch(agentId, application, List<LogEntry> entries)` → batch INSERT into `logs`
|
|
|
|
**Key implementation:**
|
|
|
|
`indexBatch`: Batch INSERT with `Map(String, String)` for MDC column. Extract `camel.exchangeId` from MDC into top-level `exchange_id` column.
|
|
|
|
```sql
|
|
INSERT INTO logs (tenant_id, timestamp, application, agent_id, level, logger_name,
|
|
message, thread_name, stack_trace, exchange_id, mdc)
|
|
VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
```
|
|
|
|
MCE Map type: pass as `java.util.HashMap` — ClickHouse JDBC 0.9.7 supports native Map type (same as ClickHouseMetricsStore uses for tags).
|
|
|
|
`search`: Build WHERE clause from params. Use `LIKE '%query%'` for message text search (ngram-accelerated). Return `LogEntryResponse` records.
|
|
|
|
```sql
|
|
SELECT timestamp, level, logger_name, message, thread_name, stack_trace
|
|
FROM logs
|
|
WHERE tenant_id = 'default' AND application = ?
|
|
[AND agent_id = ?]
|
|
[AND level = ?]
|
|
[AND (exchange_id = ? OR mdc['camel.exchangeId'] = ?)]
|
|
[AND message LIKE ?]
|
|
[AND timestamp >= ?]
|
|
[AND timestamp <= ?]
|
|
ORDER BY timestamp DESC
|
|
LIMIT ?
|
|
```
|
|
|
|
**Tests:**
|
|
- `indexBatch_writesLogs`
|
|
- `search_byApplication_returnsLogs`
|
|
- `search_byLevel_filtersCorrectly`
|
|
- `search_byQuery_usesLikeSearch`
|
|
- `search_byExchangeId_matchesTopLevelAndMdc`
|
|
- `search_byTimeRange_filtersCorrectly`
|
|
- `indexBatch_storesMdc`
|
|
|
|
---
|
|
|
|
### Task 5: Feature Flag Wiring
|
|
|
|
**Files to modify:**
|
|
- `PostgresDiagramStore.java` — add `@ConditionalOnProperty(name = "cameleer.storage.diagrams", havingValue = "postgres")`
|
|
- `PostgresAgentEventRepository.java` — add `@ConditionalOnProperty(name = "cameleer.storage.events", havingValue = "postgres")`
|
|
- `OpenSearchLogIndex.java` — add `@ConditionalOnProperty(name = "cameleer.storage.logs", havingValue = "opensearch")`
|
|
- `StorageBeanConfig.java` — add CH diagram, event, and log store beans (all default to clickhouse with `matchIfMissing = true`)
|
|
- `application.yml` — add `diagrams`, `events`, `logs` flags under `cameleer.storage`
|
|
- `deploy/base/server.yaml` — add env vars
|
|
|
|
**Feature flags (all default to clickhouse):**
|
|
```yaml
|
|
cameleer:
|
|
storage:
|
|
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
|
|
search: ${CAMELEER_STORAGE_SEARCH:opensearch}
|
|
stats: ${CAMELEER_STORAGE_STATS:clickhouse}
|
|
diagrams: ${CAMELEER_STORAGE_DIAGRAMS:clickhouse}
|
|
events: ${CAMELEER_STORAGE_EVENTS:clickhouse}
|
|
logs: ${CAMELEER_STORAGE_LOGS:clickhouse}
|
|
```
|
|
|
|
**Important for LogStore wiring:** The `OpenSearchLogIndex` is a `@Repository` used directly by controllers (not via an interface). The `ClickHouseLogStore` must be injectable in the same way. Options:
|
|
- Extract a `LogIndex` interface with `search()` + `indexBatch()` methods, used by both controllers
|
|
- Or make `ClickHouseLogStore` extend/implement the same type
|
|
|
|
**Recommended:** Create a `LogIndex` interface in the core module with the two methods, have both `OpenSearchLogIndex` and `ClickHouseLogStore` implement it, and update `LogIngestionController` + `LogQueryController` to inject `LogIndex` instead of `OpenSearchLogIndex`.
|
|
|
|
---
|
|
|
|
## Verification Checklist
|
|
|
|
1. **Diagrams**: Store + retrieve RouteGraph via ClickHouse, verify content-hash dedup
|
|
2. **Events**: Insert + query events with time range and app/agent filters
|
|
3. **Logs**: Batch insert + search with all filter types (level, query, exchangeId, time range)
|
|
4. **Feature flags**: Each store independently switchable between PG/OS and CH
|
|
5. **Backward compat**: Default config uses ClickHouse for all Phase 4 stores
|
|
6. **CI**: `mvn clean verify -DskipITs` passes
|