# ClickHouse Phase 3: Stats & Analytics — Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Replace TimescaleDB continuous aggregates with ClickHouse materialized views and implement a `ClickHouseStatsStore` that reads from them using `-Merge` aggregate functions. **Architecture:** 5 DDL scripts create AggregatingMergeTree target tables + materialized views that trigger on INSERT to `executions` and `processor_executions`. A `ClickHouseStatsStore` implements the existing `StatsStore` interface, translating `time_bucket()` → `toStartOfInterval()`, `SUM(total_count)` → `countMerge(total_count)`, `approx_percentile` → `quantileMerge`, etc. SLA and topErrors queries hit the raw `executions` / `processor_executions` tables with `FINAL`. Feature flag `cameleer.storage.stats=postgres|clickhouse` controls which implementation is active. **Tech Stack:** ClickHouse 24.12, AggregatingMergeTree, `-State`/`-Merge` combinators, JdbcTemplate, Testcontainers **Design Spec:** `docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md` (Materialized Views + Stats Query Translation sections) --- ## File Structure | File | Responsibility | |------|----------------| | `cameleer3-server-app/.../resources/clickhouse/V4__stats_tables_and_mvs.sql` | DDL for all 5 stats tables + 5 materialized views | | `cameleer3-server-app/.../storage/ClickHouseStatsStore.java` | StatsStore impl using -Merge functions on AggregatingMergeTree tables | | `cameleer3-server-app/.../config/StorageBeanConfig.java` | Modified: add CH stats store bean with feature flag | | `cameleer3-server-app/.../storage/PostgresStatsStore.java` | Modified: add ConditionalOnProperty | | `cameleer3-server-app/.../resources/application.yml` | Modified: add `cameleer.storage.stats` flag | | `deploy/base/server.yaml` | Modified: add `CAMELEER_STORAGE_STATS` env var | | `cameleer3-server-app/...test.../storage/ClickHouseStatsStoreIT.java` | Integration test for CH stats queries | --- ## Query Translation Reference | TimescaleDB (PostgresStatsStore) | ClickHouse (ClickHouseStatsStore) | |----------------------------------|-----------------------------------| | `time_bucket(N * INTERVAL '1 second', bucket)` | `toStartOfInterval(bucket, INTERVAL N SECOND)` | | `SUM(total_count)` | `countMerge(total_count)` | | `SUM(failed_count)` | `countIfMerge(failed_count)` | | `SUM(running_count)` | `countIfMerge(running_count)` | | `SUM(duration_sum)` | `sumMerge(duration_sum)` | | `MAX(p99_duration)` | `quantileMerge(0.99)(p99_duration)` | | `MAX(duration_max)` | `maxMerge(duration_max)` | | `SUM(duration_sum) / SUM(total_count)` | `sumMerge(duration_sum) / countMerge(total_count)` | | `COUNT(*) FILTER (WHERE ...)` | `countIf(...)` | | `EXTRACT(DOW FROM bucket)` | `toDayOfWeek(bucket, 1) % 7` (1=Mon in CH, shift to 0=Sun) | | `EXTRACT(HOUR FROM bucket)` | `toHour(bucket)` | | `LEFT(error_message, 200)` | `substring(error_message, 1, 200)` | | `COUNT(DISTINCT ...)` | `uniq(...)` or `COUNT(DISTINCT ...)` | --- ### Task 1: DDL for Stats Tables and Materialized Views **Files:** - Create: `cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql` All 5 table+MV pairs in a single DDL file. Tables use `AggregatingMergeTree()`. MVs use `-State` combinators and trigger on INSERT to `executions` or `processor_executions`. - [ ] **Step 1: Create the DDL file** ```sql -- V4__stats_tables_and_mvs.sql -- Materialized views replacing TimescaleDB continuous aggregates. -- Tables use AggregatingMergeTree; MVs use -State combinators. -- ── stats_1m_all (global) ──────────────────────────────────────────── CREATE TABLE IF NOT EXISTS stats_1m_all ( tenant_id LowCardinality(String), bucket DateTime, total_count AggregateFunction(count, UInt64), failed_count AggregateFunction(countIf, UInt64, UInt8), running_count AggregateFunction(countIf, UInt64, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS SELECT tenant_id, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, bucket; -- ── stats_1m_app (per-application) ─────────────────────────────────── CREATE TABLE IF NOT EXISTS stats_1m_app ( tenant_id LowCardinality(String), application_name LowCardinality(String), bucket DateTime, total_count AggregateFunction(count, UInt64), failed_count AggregateFunction(countIf, UInt64, UInt8), running_count AggregateFunction(countIf, UInt64, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, application_name, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS SELECT tenant_id, application_name, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, application_name, bucket; -- ── stats_1m_route (per-route) ─────────────────────────────────────── CREATE TABLE IF NOT EXISTS stats_1m_route ( tenant_id LowCardinality(String), application_name LowCardinality(String), route_id LowCardinality(String), bucket DateTime, total_count AggregateFunction(count, UInt64), failed_count AggregateFunction(countIf, UInt64, UInt8), running_count AggregateFunction(countIf, UInt64, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, application_name, route_id, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS SELECT tenant_id, application_name, route_id, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, application_name, route_id, bucket; -- ── stats_1m_processor (per-processor-type) ────────────────────────── CREATE TABLE IF NOT EXISTS stats_1m_processor ( tenant_id LowCardinality(String), application_name LowCardinality(String), processor_type LowCardinality(String), bucket DateTime, total_count AggregateFunction(count, UInt64), failed_count AggregateFunction(countIf, UInt64, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, application_name, processor_type, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS SELECT tenant_id, application_name, processor_type, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions GROUP BY tenant_id, application_name, processor_type, bucket; -- ── stats_1m_processor_detail (per-processor-id) ───────────────────── CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( tenant_id LowCardinality(String), application_name LowCardinality(String), route_id LowCardinality(String), processor_id String, bucket DateTime, total_count AggregateFunction(count, UInt64), failed_count AggregateFunction(countIf, UInt64, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, application_name, route_id, processor_id, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS SELECT tenant_id, application_name, route_id, processor_id, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions GROUP BY tenant_id, application_name, route_id, processor_id, bucket; ``` Note: The `ClickHouseSchemaInitializer` runs each `.sql` file as a single statement. ClickHouse supports multiple statements separated by `;` in a single call, BUT the JDBC driver may not. If the initializer fails, each CREATE statement may need to be in its own file. Check during testing. **IMPORTANT**: The ClickHouseSchemaInitializer needs to handle multi-statement files. Read it first — if it uses `jdbc.execute(sql)` for each file, the semicolons between statements will cause issues. If so, split into separate files (V4a, V4b, etc.) or modify the initializer to split on `;`. - [ ] **Step 2: Check ClickHouseSchemaInitializer handles multi-statement** Read `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java`. If it runs each file as a single `jdbc.execute()`, modify it to split on `;` and run each statement separately. If it already handles this, proceed. - [ ] **Step 3: Verify DDL loads in Testcontainers** Write a quick smoke test or manually verify that all 10 objects (5 tables + 5 MVs) are created: ```bash mvn clean compile -pl cameleer3-server-app -f pom.xml ``` - [ ] **Step 4: Commit** ```bash git add cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql # also add ClickHouseSchemaInitializer if modified git commit -m "feat(clickhouse): add stats materialized views DDL (5 tables + 5 MVs)" ``` --- ### Task 2: ClickHouseStatsStore — Aggregate Queries **Files:** - Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java` - Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java` The store implements `StatsStore` using ClickHouse `-Merge` functions. It follows the same pattern as `PostgresStatsStore` but with ClickHouse SQL syntax. **Key implementation patterns:** 1. **Stats queries** (queryStats): Read from `stats_1m_*` tables using `-Merge` combinators: ```sql SELECT countMerge(total_count) AS total_count, countIfMerge(failed_count) AS failed_count, CASE WHEN countMerge(total_count) > 0 THEN sumMerge(duration_sum) / countMerge(total_count) ELSE 0 END AS avg_duration, quantileMerge(0.99)(p99_duration) AS p99_duration, countIfMerge(running_count) AS active_count FROM stats_1m_all WHERE tenant_id = 'default' AND bucket >= ? AND bucket < ? ``` Same pattern for prev-24h and today queries (identical to PostgresStatsStore logic). 2. **Timeseries queries** (queryTimeseries): Group by time period: ```sql SELECT toStartOfInterval(bucket, INTERVAL ? SECOND) AS period, countMerge(total_count) AS total_count, countIfMerge(failed_count) AS failed_count, CASE WHEN countMerge(total_count) > 0 THEN sumMerge(duration_sum) / countMerge(total_count) ELSE 0 END AS avg_duration, quantileMerge(0.99)(p99_duration) AS p99_duration, countIfMerge(running_count) AS active_count FROM stats_1m_app WHERE tenant_id = 'default' AND bucket >= ? AND bucket < ? AND application_name = ? GROUP BY period ORDER BY period ``` 3. **Grouped timeseries**: Same as timeseries but with extra GROUP BY column (application_name or route_id), returned as `Map`. 4. **SLA compliance**: Hit raw `executions FINAL` table: ```sql SELECT countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, countIf(status != 'RUNNING') AS total FROM executions FINAL WHERE tenant_id = 'default' AND start_time >= ? AND start_time < ? AND application_name = ? ``` 5. **SLA counts by app/route**: Same pattern with GROUP BY. 6. **Top errors**: Hit raw `executions FINAL` or `processor_executions` table with CTE for counts + velocity. ClickHouse differences: - No `FILTER (WHERE ...)` → use `countIf(...)` - No `LEFT(s, n)` → use `substring(s, 1, n)` - CTE syntax is identical (`WITH ... AS (...)`) 7. **Active error types**: `SELECT uniq(...)` or `COUNT(DISTINCT ...)` from raw executions. 8. **Punchcard**: ClickHouse day-of-week: `toDayOfWeek(bucket, 1)` returns 1=Mon..7=Sun. PG `EXTRACT(DOW)` returns 0=Sun..6=Sat. Conversion: `toDayOfWeek(bucket, 1) % 7` gives 0=Sun..6=Sat. **Constructor**: Takes `@Qualifier("clickHouseJdbcTemplate") JdbcTemplate jdbc`. **Test approach**: Seed data by inserting directly into `executions` and `processor_executions` tables (the MVs trigger automatically on INSERT). Then query via the StatsStore methods and verify results. **Test data seeding**: Insert 10 executions across 2 apps, 3 routes, spanning 10 minutes. Include some FAILED, some COMPLETED, varying durations. Then verify: - `stats()` returns correct totals - `statsForApp()` filters correctly - `timeseries()` returns multiple buckets - `slaCompliance()` returns correct percentage - `topErrors()` returns ranked errors - `punchcard()` returns non-empty cells - [ ] **Step 1: Write the failing integration test** Create `ClickHouseStatsStoreIT.java` with: - Load all 4 DDL files (V2 executions, V3 processor_executions, V4 stats MVs) - Seed 10+ executions and 20+ processor records across 2 apps, 3 routes, 10 minutes - Test: `stats_returnsCorrectTotals`, `statsForApp_filtersCorrectly`, `timeseries_returnsBuckets`, `timeseriesGroupedByApp_returnsMap`, `slaCompliance_calculatesCorrectly`, `topErrors_returnsRankedErrors`, `activeErrorTypes_countsDistinct`, `punchcard_returnsNonEmpty`, `slaCountsByApp_returnsMap` - [ ] **Step 2: Run test to verify it fails** ```bash mvn test -pl cameleer3-server-app -Dtest=ClickHouseStatsStoreIT -Dfailsafe.provider=surefire -DfailIfNoTests=false -f pom.xml ``` - [ ] **Step 3: Implement ClickHouseStatsStore** Follow the `PostgresStatsStore` structure closely. Same private `Filter` record, same `queryStats`/`queryTimeseries`/`queryGroupedTimeseries` helper methods. Replace PG-specific SQL with CH equivalents per the translation table above. - [ ] **Step 4: Run test to verify it passes** ```bash mvn test -pl cameleer3-server-app -Dtest=ClickHouseStatsStoreIT -Dfailsafe.provider=surefire -f pom.xml ``` - [ ] **Step 5: Commit** ```bash git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java \ cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java git commit -m "feat(clickhouse): add ClickHouseStatsStore with -Merge aggregate queries" ``` --- ### Task 3: Feature Flag Wiring **Files:** - Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java` - Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java` - Modify: `cameleer3-server-app/src/main/resources/application.yml` - Modify: `deploy/base/server.yaml` - [ ] **Step 1: Add ConditionalOnProperty to PostgresStatsStore** ```java @Repository @ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "postgres", matchIfMissing = true) public class PostgresStatsStore implements StatsStore { ``` - [ ] **Step 2: Add CH StatsStore bean to StorageBeanConfig** ```java @Bean @ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "clickhouse") public StatsStore clickHouseStatsStore( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseStatsStore(clickHouseJdbc); } ``` - [ ] **Step 3: Update application.yml** Add under `cameleer.storage`: ```yaml stats: ${CAMELEER_STORAGE_STATS:postgres} ``` - [ ] **Step 4: Update deploy/base/server.yaml** Add env var: ```yaml - name: CAMELEER_STORAGE_STATS value: "postgres" ``` - [ ] **Step 5: Compile and verify all tests pass** ```bash mvn clean verify -DskipITs -f pom.xml ``` - [ ] **Step 6: Commit** ```bash git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java \ cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java \ cameleer3-server-app/src/main/resources/application.yml \ deploy/base/server.yaml git commit -m "feat(clickhouse): wire ClickHouseStatsStore with cameleer.storage.stats feature flag" ``` --- ## Verification Checklist After all tasks are complete, verify: 1. **MVs trigger**: Insert a row into `executions`, verify `stats_1m_all` has a row 2. **Aggregate correctness**: Insert known data, verify countMerge/sumMerge/quantileMerge produce correct values 3. **Timeseries bucketing**: Verify `toStartOfInterval` groups correctly across time ranges 4. **SLA compliance**: Verify percentage calculation against raw data 5. **Top errors**: Verify ranking and velocity trend detection 6. **Punchcard**: Verify weekday/hour mapping (0=Sun..6=Sat convention) 7. **Feature flag**: `cameleer.storage.stats=postgres` uses PG, `=clickhouse` uses CH 8. **Backward compat**: With default config, everything uses PG 9. **CI**: `mvn clean verify -DskipITs` passes