Files
cameleer-server/docs/superpowers/plans/2026-03-31-clickhouse-phase4-remaining-tables.md
hsiegeln cb3ebfea7c
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 18s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
chore: rename cameleer3 to cameleer
Rename Java packages from com.cameleer3 to com.cameleer, module
directories from cameleer3-* to cameleer-*, and all references
throughout workflows, Dockerfiles, docs, migrations, and pom.xml.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 15:28:42 +02:00

11 KiB

ClickHouse Phase 4: Remaining Tables — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Migrate route diagrams, agent events, and application logs from PostgreSQL/OpenSearch to ClickHouse.

Architecture: Three new ClickHouse stores implement existing interfaces. ClickHouseDiagramStore uses ReplacingMergeTree for content-hash dedup. ClickHouseAgentEventRepository uses MergeTree for append-only events. ClickHouseLogStore replaces OpenSearchLogIndex with SQL + ngram indexes. Feature flags control each store independently.

Tech Stack: ClickHouse 24.12, JdbcTemplate, Testcontainers

Design Spec: docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md


File Structure

File Responsibility
cameleer-server-app/.../resources/clickhouse/V5__route_diagrams.sql DDL for route_diagrams (ReplacingMergeTree)
cameleer-server-app/.../resources/clickhouse/V6__agent_events.sql DDL for agent_events (MergeTree)
cameleer-server-app/.../resources/clickhouse/V7__logs.sql DDL for logs (MergeTree with ngram indexes)
cameleer-server-app/.../storage/ClickHouseDiagramStore.java DiagramStore impl for ClickHouse
cameleer-server-app/.../storage/ClickHouseAgentEventRepository.java AgentEventRepository impl for ClickHouse
cameleer-server-app/.../search/ClickHouseLogStore.java Replaces OpenSearchLogIndex
cameleer-server-app/.../config/StorageBeanConfig.java Modified: add CH beans with feature flags
cameleer-server-app/.../storage/PostgresDiagramStore.java Modified: add ConditionalOnProperty
cameleer-server-app/.../storage/PostgresAgentEventRepository.java Modified: add ConditionalOnProperty
cameleer-server-app/.../search/OpenSearchLogIndex.java Modified: add ConditionalOnProperty
cameleer-server-app/.../resources/application.yml Modified: add feature flags
deploy/base/server.yaml Modified: add env vars

Task 1: DDL Scripts

Files:

  • Create: cameleer-server-app/src/main/resources/clickhouse/V5__route_diagrams.sql

  • Create: cameleer-server-app/src/main/resources/clickhouse/V6__agent_events.sql

  • Create: cameleer-server-app/src/main/resources/clickhouse/V7__logs.sql

  • Step 1: Create route_diagrams DDL

CREATE TABLE IF NOT EXISTS route_diagrams (
    tenant_id        LowCardinality(String) DEFAULT 'default',
    content_hash     String,
    route_id         LowCardinality(String),
    agent_id         LowCardinality(String),
    application_name LowCardinality(String),
    definition       String,
    created_at       DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplacingMergeTree(created_at)
ORDER BY (tenant_id, content_hash)
SETTINGS index_granularity = 8192
  • Step 2: Create agent_events DDL
CREATE TABLE IF NOT EXISTS agent_events (
    tenant_id   LowCardinality(String) DEFAULT 'default',
    timestamp   DateTime64(3) DEFAULT now64(3),
    agent_id    LowCardinality(String),
    app_id      LowCardinality(String),
    event_type  LowCardinality(String),
    detail      String DEFAULT ''
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, app_id, agent_id, timestamp)
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
  • Step 3: Create logs DDL
CREATE TABLE IF NOT EXISTS logs (
    tenant_id    LowCardinality(String) DEFAULT 'default',
    timestamp    DateTime64(3),
    application  LowCardinality(String),
    agent_id     LowCardinality(String),
    level        LowCardinality(String),
    logger_name  LowCardinality(String) DEFAULT '',
    message      String,
    thread_name  LowCardinality(String) DEFAULT '',
    stack_trace  String DEFAULT '',
    exchange_id  String DEFAULT '',
    mdc          Map(String, String) DEFAULT map(),

    INDEX idx_msg   message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_level level TYPE set(10) GRANULARITY 1
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, application, timestamp)
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192
  • Step 4: Compile and commit
mvn clean compile -pl cameleer-server-app -f pom.xml
git commit -m "feat(clickhouse): add DDL for route_diagrams, agent_events, and logs tables"

Task 2: ClickHouseDiagramStore

Files:

  • Create: cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseDiagramStore.java
  • Create: cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseDiagramStoreIT.java

Implements DiagramStore interface (5 methods). Read PostgresDiagramStore.java first and translate.

Key differences from PG:

  • INSERT INTO ... ON CONFLICT DO NOTHING → just INSERT INTO (ReplacingMergeTree deduplicates by content_hash)
  • ?::jsonb → plain ? (CH stores definition as String, not JSONB)
  • ORDER BY created_at DESC LIMIT 1ORDER BY created_at DESC LIMIT 1 (same, but add FINAL for ReplacingMergeTree reads)
  • findProcessorRouteMapping: PG uses jsonb_array_elements() — CH has no native JSON array functions. Instead, store the definition as a string and parse in Java, OR query route_diagrams FINAL and deserialize definitions application-side. Recommended: Fetch all definitions for the application, deserialize in Java, extract processor→route mappings. This is a small result set (one row per route).
  • SHA-256 content hash computation stays in Java (same as PG store)
  • Add WHERE tenant_id = 'default' to all queries

Tests:

  • store_insertsNewDiagram
  • store_duplicateHashIgnored (ReplacingMergeTree dedup after OPTIMIZE FINAL)
  • findByContentHash_returnsGraph
  • findContentHashForRoute_returnsMostRecent
  • findProcessorRouteMapping_extractsMapping

Task 3: ClickHouseAgentEventRepository

Files:

  • Create: cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepository.java
  • Create: cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepositoryIT.java

Implements AgentEventRepository interface (2 methods: insert + query).

Key differences from PG:

  • No BIGSERIAL id — CH doesn't have auto-increment. AgentEventRecord has long id but set to 0 for CH rows.
  • INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail) VALUES (?, ?, ?, ?, ?)
  • query builds dynamic WHERE (same pattern as PG) with ORDER BY timestamp DESC LIMIT ?
  • Add WHERE tenant_id = 'default'

Tests:

  • insert_writesEvent
  • query_byAppId_filtersCorrectly
  • query_byTimeRange_filtersCorrectly
  • query_respectsLimit

Task 4: ClickHouseLogStore

Files:

  • Create: cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseLogStore.java
  • Create: cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseLogStoreIT.java

Replaces OpenSearchLogIndex. Must have the same public API:

  • search(application, agentId, level, query, exchangeId, from, to, limit) → returns List<LogEntryResponse>
  • indexBatch(agentId, application, List<LogEntry> entries) → batch INSERT into logs

Key implementation:

indexBatch: Batch INSERT with Map(String, String) for MDC column. Extract camel.exchangeId from MDC into top-level exchange_id column.

INSERT INTO logs (tenant_id, timestamp, application, agent_id, level, logger_name,
                  message, thread_name, stack_trace, exchange_id, mdc)
VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

MCE Map type: pass as java.util.HashMap — ClickHouse JDBC 0.9.7 supports native Map type (same as ClickHouseMetricsStore uses for tags).

search: Build WHERE clause from params. Use LIKE '%query%' for message text search (ngram-accelerated). Return LogEntryResponse records.

SELECT timestamp, level, logger_name, message, thread_name, stack_trace
FROM logs
WHERE tenant_id = 'default' AND application = ?
  [AND agent_id = ?]
  [AND level = ?]
  [AND (exchange_id = ? OR mdc['camel.exchangeId'] = ?)]
  [AND message LIKE ?]
  [AND timestamp >= ?]
  [AND timestamp <= ?]
ORDER BY timestamp DESC
LIMIT ?

Tests:

  • indexBatch_writesLogs
  • search_byApplication_returnsLogs
  • search_byLevel_filtersCorrectly
  • search_byQuery_usesLikeSearch
  • search_byExchangeId_matchesTopLevelAndMdc
  • search_byTimeRange_filtersCorrectly
  • indexBatch_storesMdc

Task 5: Feature Flag Wiring

Files to modify:

  • PostgresDiagramStore.java — add @ConditionalOnProperty(name = "cameleer.storage.diagrams", havingValue = "postgres")
  • PostgresAgentEventRepository.java — add @ConditionalOnProperty(name = "cameleer.storage.events", havingValue = "postgres")
  • OpenSearchLogIndex.java — add @ConditionalOnProperty(name = "cameleer.storage.logs", havingValue = "opensearch")
  • StorageBeanConfig.java — add CH diagram, event, and log store beans (all default to clickhouse with matchIfMissing = true)
  • application.yml — add diagrams, events, logs flags under cameleer.storage
  • deploy/base/server.yaml — add env vars

Feature flags (all default to clickhouse):

cameleer:
  storage:
    metrics: ${CAMELEER_STORAGE_METRICS:postgres}
    search: ${CAMELEER_STORAGE_SEARCH:opensearch}
    stats: ${CAMELEER_STORAGE_STATS:clickhouse}
    diagrams: ${CAMELEER_STORAGE_DIAGRAMS:clickhouse}
    events: ${CAMELEER_STORAGE_EVENTS:clickhouse}
    logs: ${CAMELEER_STORAGE_LOGS:clickhouse}

Important for LogStore wiring: The OpenSearchLogIndex is a @Repository used directly by controllers (not via an interface). The ClickHouseLogStore must be injectable in the same way. Options:

  • Extract a LogIndex interface with search() + indexBatch() methods, used by both controllers
  • Or make ClickHouseLogStore extend/implement the same type

Recommended: Create a LogIndex interface in the core module with the two methods, have both OpenSearchLogIndex and ClickHouseLogStore implement it, and update LogIngestionController + LogQueryController to inject LogIndex instead of OpenSearchLogIndex.


Verification Checklist

  1. Diagrams: Store + retrieve RouteGraph via ClickHouse, verify content-hash dedup
  2. Events: Insert + query events with time range and app/agent filters
  3. Logs: Batch insert + search with all filter types (level, query, exchangeId, time range)
  4. Feature flags: Each store independently switchable between PG/OS and CH
  5. Backward compat: Default config uses ClickHouse for all Phase 4 stores
  6. CI: mvn clean verify -DskipITs passes