Rename Java packages from com.cameleer3 to com.cameleer, module directories from cameleer3-* to cameleer-*, and all references throughout workflows, Dockerfiles, docs, migrations, and pom.xml. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
11 KiB
ClickHouse Phase 4: Remaining Tables — Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Migrate route diagrams, agent events, and application logs from PostgreSQL/OpenSearch to ClickHouse.
Architecture: Three new ClickHouse stores implement existing interfaces. ClickHouseDiagramStore uses ReplacingMergeTree for content-hash dedup. ClickHouseAgentEventRepository uses MergeTree for append-only events. ClickHouseLogStore replaces OpenSearchLogIndex with SQL + ngram indexes. Feature flags control each store independently.
Tech Stack: ClickHouse 24.12, JdbcTemplate, Testcontainers
Design Spec: docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md
File Structure
| File | Responsibility |
|---|---|
cameleer-server-app/.../resources/clickhouse/V5__route_diagrams.sql |
DDL for route_diagrams (ReplacingMergeTree) |
cameleer-server-app/.../resources/clickhouse/V6__agent_events.sql |
DDL for agent_events (MergeTree) |
cameleer-server-app/.../resources/clickhouse/V7__logs.sql |
DDL for logs (MergeTree with ngram indexes) |
cameleer-server-app/.../storage/ClickHouseDiagramStore.java |
DiagramStore impl for ClickHouse |
cameleer-server-app/.../storage/ClickHouseAgentEventRepository.java |
AgentEventRepository impl for ClickHouse |
cameleer-server-app/.../search/ClickHouseLogStore.java |
Replaces OpenSearchLogIndex |
cameleer-server-app/.../config/StorageBeanConfig.java |
Modified: add CH beans with feature flags |
cameleer-server-app/.../storage/PostgresDiagramStore.java |
Modified: add ConditionalOnProperty |
cameleer-server-app/.../storage/PostgresAgentEventRepository.java |
Modified: add ConditionalOnProperty |
cameleer-server-app/.../search/OpenSearchLogIndex.java |
Modified: add ConditionalOnProperty |
cameleer-server-app/.../resources/application.yml |
Modified: add feature flags |
deploy/base/server.yaml |
Modified: add env vars |
Task 1: DDL Scripts
Files:
-
Create:
cameleer-server-app/src/main/resources/clickhouse/V5__route_diagrams.sql -
Create:
cameleer-server-app/src/main/resources/clickhouse/V6__agent_events.sql -
Create:
cameleer-server-app/src/main/resources/clickhouse/V7__logs.sql -
Step 1: Create route_diagrams DDL
CREATE TABLE IF NOT EXISTS route_diagrams (
tenant_id LowCardinality(String) DEFAULT 'default',
content_hash String,
route_id LowCardinality(String),
agent_id LowCardinality(String),
application_name LowCardinality(String),
definition String,
created_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplacingMergeTree(created_at)
ORDER BY (tenant_id, content_hash)
SETTINGS index_granularity = 8192
- Step 2: Create agent_events DDL
CREATE TABLE IF NOT EXISTS agent_events (
tenant_id LowCardinality(String) DEFAULT 'default',
timestamp DateTime64(3) DEFAULT now64(3),
agent_id LowCardinality(String),
app_id LowCardinality(String),
event_type LowCardinality(String),
detail String DEFAULT ''
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, app_id, agent_id, timestamp)
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
- Step 3: Create logs DDL
CREATE TABLE IF NOT EXISTS logs (
tenant_id LowCardinality(String) DEFAULT 'default',
timestamp DateTime64(3),
application LowCardinality(String),
agent_id LowCardinality(String),
level LowCardinality(String),
logger_name LowCardinality(String) DEFAULT '',
message String,
thread_name LowCardinality(String) DEFAULT '',
stack_trace String DEFAULT '',
exchange_id String DEFAULT '',
mdc Map(String, String) DEFAULT map(),
INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_level level TYPE set(10) GRANULARITY 1
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, application, timestamp)
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192
- Step 4: Compile and commit
mvn clean compile -pl cameleer-server-app -f pom.xml
git commit -m "feat(clickhouse): add DDL for route_diagrams, agent_events, and logs tables"
Task 2: ClickHouseDiagramStore
Files:
- Create:
cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseDiagramStore.java - Create:
cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseDiagramStoreIT.java
Implements DiagramStore interface (5 methods). Read PostgresDiagramStore.java first and translate.
Key differences from PG:
INSERT INTO ... ON CONFLICT DO NOTHING→ justINSERT INTO(ReplacingMergeTree deduplicates by content_hash)?::jsonb→ plain?(CH stores definition as String, not JSONB)ORDER BY created_at DESC LIMIT 1→ORDER BY created_at DESC LIMIT 1(same, but addFINALfor ReplacingMergeTree reads)findProcessorRouteMapping: PG usesjsonb_array_elements()— CH has no native JSON array functions. Instead, store the definition as a string and parse in Java, OR queryroute_diagrams FINALand deserialize definitions application-side. Recommended: Fetch all definitions for the application, deserialize in Java, extract processor→route mappings. This is a small result set (one row per route).- SHA-256 content hash computation stays in Java (same as PG store)
- Add
WHERE tenant_id = 'default'to all queries
Tests:
store_insertsNewDiagramstore_duplicateHashIgnored(ReplacingMergeTree dedup after OPTIMIZE FINAL)findByContentHash_returnsGraphfindContentHashForRoute_returnsMostRecentfindProcessorRouteMapping_extractsMapping
Task 3: ClickHouseAgentEventRepository
Files:
- Create:
cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepository.java - Create:
cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepositoryIT.java
Implements AgentEventRepository interface (2 methods: insert + query).
Key differences from PG:
- No
BIGSERIAL id— CH doesn't have auto-increment.AgentEventRecordhaslong idbut set to 0 for CH rows. INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail) VALUES (?, ?, ?, ?, ?)querybuilds dynamic WHERE (same pattern as PG) withORDER BY timestamp DESC LIMIT ?- Add
WHERE tenant_id = 'default'
Tests:
insert_writesEventquery_byAppId_filtersCorrectlyquery_byTimeRange_filtersCorrectlyquery_respectsLimit
Task 4: ClickHouseLogStore
Files:
- Create:
cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseLogStore.java - Create:
cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseLogStoreIT.java
Replaces OpenSearchLogIndex. Must have the same public API:
search(application, agentId, level, query, exchangeId, from, to, limit)→ returnsList<LogEntryResponse>indexBatch(agentId, application, List<LogEntry> entries)→ batch INSERT intologs
Key implementation:
indexBatch: Batch INSERT with Map(String, String) for MDC column. Extract camel.exchangeId from MDC into top-level exchange_id column.
INSERT INTO logs (tenant_id, timestamp, application, agent_id, level, logger_name,
message, thread_name, stack_trace, exchange_id, mdc)
VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
MCE Map type: pass as java.util.HashMap — ClickHouse JDBC 0.9.7 supports native Map type (same as ClickHouseMetricsStore uses for tags).
search: Build WHERE clause from params. Use LIKE '%query%' for message text search (ngram-accelerated). Return LogEntryResponse records.
SELECT timestamp, level, logger_name, message, thread_name, stack_trace
FROM logs
WHERE tenant_id = 'default' AND application = ?
[AND agent_id = ?]
[AND level = ?]
[AND (exchange_id = ? OR mdc['camel.exchangeId'] = ?)]
[AND message LIKE ?]
[AND timestamp >= ?]
[AND timestamp <= ?]
ORDER BY timestamp DESC
LIMIT ?
Tests:
indexBatch_writesLogssearch_byApplication_returnsLogssearch_byLevel_filtersCorrectlysearch_byQuery_usesLikeSearchsearch_byExchangeId_matchesTopLevelAndMdcsearch_byTimeRange_filtersCorrectlyindexBatch_storesMdc
Task 5: Feature Flag Wiring
Files to modify:
PostgresDiagramStore.java— add@ConditionalOnProperty(name = "cameleer.storage.diagrams", havingValue = "postgres")PostgresAgentEventRepository.java— add@ConditionalOnProperty(name = "cameleer.storage.events", havingValue = "postgres")OpenSearchLogIndex.java— add@ConditionalOnProperty(name = "cameleer.storage.logs", havingValue = "opensearch")StorageBeanConfig.java— add CH diagram, event, and log store beans (all default to clickhouse withmatchIfMissing = true)application.yml— adddiagrams,events,logsflags undercameleer.storagedeploy/base/server.yaml— add env vars
Feature flags (all default to clickhouse):
cameleer:
storage:
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
search: ${CAMELEER_STORAGE_SEARCH:opensearch}
stats: ${CAMELEER_STORAGE_STATS:clickhouse}
diagrams: ${CAMELEER_STORAGE_DIAGRAMS:clickhouse}
events: ${CAMELEER_STORAGE_EVENTS:clickhouse}
logs: ${CAMELEER_STORAGE_LOGS:clickhouse}
Important for LogStore wiring: The OpenSearchLogIndex is a @Repository used directly by controllers (not via an interface). The ClickHouseLogStore must be injectable in the same way. Options:
- Extract a
LogIndexinterface withsearch()+indexBatch()methods, used by both controllers - Or make
ClickHouseLogStoreextend/implement the same type
Recommended: Create a LogIndex interface in the core module with the two methods, have both OpenSearchLogIndex and ClickHouseLogStore implement it, and update LogIngestionController + LogQueryController to inject LogIndex instead of OpenSearchLogIndex.
Verification Checklist
- Diagrams: Store + retrieve RouteGraph via ClickHouse, verify content-hash dedup
- Events: Insert + query events with time range and app/agent filters
- Logs: Batch insert + search with all filter types (level, query, exchangeId, time range)
- Feature flags: Each store independently switchable between PG/OS and CH
- Backward compat: Default config uses ClickHouse for all Phase 4 stores
- CI:
mvn clean verify -DskipITspasses