Files
cameleer-server/docs/superpowers/plans/2026-03-31-clickhouse-phase3-stats-analytics.md
hsiegeln 574f82b731
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m7s
CI / docker (push) Successful in 37s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Has been cancelled
docs: add historical implementation plans
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 15:45:49 +02:00

20 KiB

ClickHouse Phase 3: Stats & Analytics — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Replace TimescaleDB continuous aggregates with ClickHouse materialized views and implement a ClickHouseStatsStore that reads from them using -Merge aggregate functions.

Architecture: 5 DDL scripts create AggregatingMergeTree target tables + materialized views that trigger on INSERT to executions and processor_executions. A ClickHouseStatsStore implements the existing StatsStore interface, translating time_bucket()toStartOfInterval(), SUM(total_count)countMerge(total_count), approx_percentilequantileMerge, etc. SLA and topErrors queries hit the raw executions / processor_executions tables with FINAL. Feature flag cameleer.storage.stats=postgres|clickhouse controls which implementation is active.

Tech Stack: ClickHouse 24.12, AggregatingMergeTree, -State/-Merge combinators, JdbcTemplate, Testcontainers

Design Spec: docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md (Materialized Views + Stats Query Translation sections)


File Structure

File Responsibility
cameleer3-server-app/.../resources/clickhouse/V4__stats_tables_and_mvs.sql DDL for all 5 stats tables + 5 materialized views
cameleer3-server-app/.../storage/ClickHouseStatsStore.java StatsStore impl using -Merge functions on AggregatingMergeTree tables
cameleer3-server-app/.../config/StorageBeanConfig.java Modified: add CH stats store bean with feature flag
cameleer3-server-app/.../storage/PostgresStatsStore.java Modified: add ConditionalOnProperty
cameleer3-server-app/.../resources/application.yml Modified: add cameleer.storage.stats flag
deploy/base/server.yaml Modified: add CAMELEER_STORAGE_STATS env var
cameleer3-server-app/...test.../storage/ClickHouseStatsStoreIT.java Integration test for CH stats queries

Query Translation Reference

TimescaleDB (PostgresStatsStore) ClickHouse (ClickHouseStatsStore)
time_bucket(N * INTERVAL '1 second', bucket) toStartOfInterval(bucket, INTERVAL N SECOND)
SUM(total_count) countMerge(total_count)
SUM(failed_count) countIfMerge(failed_count)
SUM(running_count) countIfMerge(running_count)
SUM(duration_sum) sumMerge(duration_sum)
MAX(p99_duration) quantileMerge(0.99)(p99_duration)
MAX(duration_max) maxMerge(duration_max)
SUM(duration_sum) / SUM(total_count) sumMerge(duration_sum) / countMerge(total_count)
COUNT(*) FILTER (WHERE ...) countIf(...)
EXTRACT(DOW FROM bucket) toDayOfWeek(bucket, 1) % 7 (1=Mon in CH, shift to 0=Sun)
EXTRACT(HOUR FROM bucket) toHour(bucket)
LEFT(error_message, 200) substring(error_message, 1, 200)
COUNT(DISTINCT ...) uniq(...) or COUNT(DISTINCT ...)

Task 1: DDL for Stats Tables and Materialized Views

Files:

  • Create: cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql

All 5 table+MV pairs in a single DDL file. Tables use AggregatingMergeTree(). MVs use -State combinators and trigger on INSERT to executions or processor_executions.

  • Step 1: Create the DDL file
-- V4__stats_tables_and_mvs.sql
-- Materialized views replacing TimescaleDB continuous aggregates.
-- Tables use AggregatingMergeTree; MVs use -State combinators.

-- ── stats_1m_all (global) ────────────────────────────────────────────

CREATE TABLE IF NOT EXISTS stats_1m_all (
    tenant_id     LowCardinality(String),
    bucket        DateTime,
    total_count   AggregateFunction(count, UInt64),
    failed_count  AggregateFunction(countIf, UInt64, UInt8),
    running_count AggregateFunction(countIf, UInt64, UInt8),
    duration_sum  AggregateFunction(sum, Nullable(Int64)),
    duration_max  AggregateFunction(max, Nullable(Int64)),
    p99_duration  AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;

CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS
SELECT
    tenant_id,
    toStartOfMinute(start_time)           AS bucket,
    countState()                           AS total_count,
    countIfState(status = 'FAILED')        AS failed_count,
    countIfState(status = 'RUNNING')       AS running_count,
    sumState(duration_ms)                  AS duration_sum,
    maxState(duration_ms)                  AS duration_max,
    quantileState(0.99)(duration_ms)       AS p99_duration
FROM executions
GROUP BY tenant_id, bucket;

-- ── stats_1m_app (per-application) ───────────────────────────────────

CREATE TABLE IF NOT EXISTS stats_1m_app (
    tenant_id        LowCardinality(String),
    application_name LowCardinality(String),
    bucket           DateTime,
    total_count      AggregateFunction(count, UInt64),
    failed_count     AggregateFunction(countIf, UInt64, UInt8),
    running_count    AggregateFunction(countIf, UInt64, UInt8),
    duration_sum     AggregateFunction(sum, Nullable(Int64)),
    duration_max     AggregateFunction(max, Nullable(Int64)),
    p99_duration     AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;

CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS
SELECT
    tenant_id,
    application_name,
    toStartOfMinute(start_time)           AS bucket,
    countState()                           AS total_count,
    countIfState(status = 'FAILED')        AS failed_count,
    countIfState(status = 'RUNNING')       AS running_count,
    sumState(duration_ms)                  AS duration_sum,
    maxState(duration_ms)                  AS duration_max,
    quantileState(0.99)(duration_ms)       AS p99_duration
FROM executions
GROUP BY tenant_id, application_name, bucket;

-- ── stats_1m_route (per-route) ───────────────────────────────────────

CREATE TABLE IF NOT EXISTS stats_1m_route (
    tenant_id        LowCardinality(String),
    application_name LowCardinality(String),
    route_id         LowCardinality(String),
    bucket           DateTime,
    total_count      AggregateFunction(count, UInt64),
    failed_count     AggregateFunction(countIf, UInt64, UInt8),
    running_count    AggregateFunction(countIf, UInt64, UInt8),
    duration_sum     AggregateFunction(sum, Nullable(Int64)),
    duration_max     AggregateFunction(max, Nullable(Int64)),
    p99_duration     AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, route_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;

CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS
SELECT
    tenant_id,
    application_name,
    route_id,
    toStartOfMinute(start_time)           AS bucket,
    countState()                           AS total_count,
    countIfState(status = 'FAILED')        AS failed_count,
    countIfState(status = 'RUNNING')       AS running_count,
    sumState(duration_ms)                  AS duration_sum,
    maxState(duration_ms)                  AS duration_max,
    quantileState(0.99)(duration_ms)       AS p99_duration
FROM executions
GROUP BY tenant_id, application_name, route_id, bucket;

-- ── stats_1m_processor (per-processor-type) ──────────────────────────

CREATE TABLE IF NOT EXISTS stats_1m_processor (
    tenant_id        LowCardinality(String),
    application_name LowCardinality(String),
    processor_type   LowCardinality(String),
    bucket           DateTime,
    total_count      AggregateFunction(count, UInt64),
    failed_count     AggregateFunction(countIf, UInt64, UInt8),
    duration_sum     AggregateFunction(sum, Nullable(Int64)),
    duration_max     AggregateFunction(max, Nullable(Int64)),
    p99_duration     AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, processor_type, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;

CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS
SELECT
    tenant_id,
    application_name,
    processor_type,
    toStartOfMinute(start_time)           AS bucket,
    countState()                           AS total_count,
    countIfState(status = 'FAILED')        AS failed_count,
    sumState(duration_ms)                  AS duration_sum,
    maxState(duration_ms)                  AS duration_max,
    quantileState(0.99)(duration_ms)       AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_name, processor_type, bucket;

-- ── stats_1m_processor_detail (per-processor-id) ─────────────────────

CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
    tenant_id        LowCardinality(String),
    application_name LowCardinality(String),
    route_id         LowCardinality(String),
    processor_id     String,
    bucket           DateTime,
    total_count      AggregateFunction(count, UInt64),
    failed_count     AggregateFunction(countIf, UInt64, UInt8),
    duration_sum     AggregateFunction(sum, Nullable(Int64)),
    duration_max     AggregateFunction(max, Nullable(Int64)),
    p99_duration     AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, route_id, processor_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;

CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS
SELECT
    tenant_id,
    application_name,
    route_id,
    processor_id,
    toStartOfMinute(start_time)           AS bucket,
    countState()                           AS total_count,
    countIfState(status = 'FAILED')        AS failed_count,
    sumState(duration_ms)                  AS duration_sum,
    maxState(duration_ms)                  AS duration_max,
    quantileState(0.99)(duration_ms)       AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_name, route_id, processor_id, bucket;

Note: The ClickHouseSchemaInitializer runs each .sql file as a single statement. ClickHouse supports multiple statements separated by ; in a single call, BUT the JDBC driver may not. If the initializer fails, each CREATE statement may need to be in its own file. Check during testing.

IMPORTANT: The ClickHouseSchemaInitializer needs to handle multi-statement files. Read it first — if it uses jdbc.execute(sql) for each file, the semicolons between statements will cause issues. If so, split into separate files (V4a, V4b, etc.) or modify the initializer to split on ;.

  • Step 2: Check ClickHouseSchemaInitializer handles multi-statement

Read cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java. If it runs each file as a single jdbc.execute(), modify it to split on ; and run each statement separately. If it already handles this, proceed.

  • Step 3: Verify DDL loads in Testcontainers

Write a quick smoke test or manually verify that all 10 objects (5 tables + 5 MVs) are created:

mvn clean compile -pl cameleer3-server-app -f pom.xml
  • Step 4: Commit
git add cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql
# also add ClickHouseSchemaInitializer if modified
git commit -m "feat(clickhouse): add stats materialized views DDL (5 tables + 5 MVs)"

Task 2: ClickHouseStatsStore — Aggregate Queries

Files:

  • Create: cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java
  • Create: cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java

The store implements StatsStore using ClickHouse -Merge functions. It follows the same pattern as PostgresStatsStore but with ClickHouse SQL syntax.

Key implementation patterns:

  1. Stats queries (queryStats): Read from stats_1m_* tables using -Merge combinators:

    SELECT
        countMerge(total_count) AS total_count,
        countIfMerge(failed_count) AS failed_count,
        CASE WHEN countMerge(total_count) > 0
             THEN sumMerge(duration_sum) / countMerge(total_count) ELSE 0 END AS avg_duration,
        quantileMerge(0.99)(p99_duration) AS p99_duration,
        countIfMerge(running_count) AS active_count
    FROM stats_1m_all
    WHERE tenant_id = 'default' AND bucket >= ? AND bucket < ?
    

    Same pattern for prev-24h and today queries (identical to PostgresStatsStore logic).

  2. Timeseries queries (queryTimeseries): Group by time period:

    SELECT
        toStartOfInterval(bucket, INTERVAL ? SECOND) AS period,
        countMerge(total_count) AS total_count,
        countIfMerge(failed_count) AS failed_count,
        CASE WHEN countMerge(total_count) > 0
             THEN sumMerge(duration_sum) / countMerge(total_count) ELSE 0 END AS avg_duration,
        quantileMerge(0.99)(p99_duration) AS p99_duration,
        countIfMerge(running_count) AS active_count
    FROM stats_1m_app
    WHERE tenant_id = 'default' AND bucket >= ? AND bucket < ? AND application_name = ?
    GROUP BY period ORDER BY period
    
  3. Grouped timeseries: Same as timeseries but with extra GROUP BY column (application_name or route_id), returned as Map<String, StatsTimeseries>.

  4. SLA compliance: Hit raw executions FINAL table:

    SELECT
        countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant,
        countIf(status != 'RUNNING') AS total
    FROM executions FINAL
    WHERE tenant_id = 'default' AND start_time >= ? AND start_time < ?
      AND application_name = ?
    
  5. SLA counts by app/route: Same pattern with GROUP BY.

  6. Top errors: Hit raw executions FINAL or processor_executions table with CTE for counts + velocity. ClickHouse differences:

    • No FILTER (WHERE ...) → use countIf(...)
    • No LEFT(s, n) → use substring(s, 1, n)
    • CTE syntax is identical (WITH ... AS (...))
  7. Active error types: SELECT uniq(...) or COUNT(DISTINCT ...) from raw executions.

  8. Punchcard: ClickHouse day-of-week: toDayOfWeek(bucket, 1) returns 1=Mon..7=Sun. PG EXTRACT(DOW) returns 0=Sun..6=Sat. Conversion: toDayOfWeek(bucket, 1) % 7 gives 0=Sun..6=Sat.

Constructor: Takes @Qualifier("clickHouseJdbcTemplate") JdbcTemplate jdbc.

Test approach: Seed data by inserting directly into executions and processor_executions tables (the MVs trigger automatically on INSERT). Then query via the StatsStore methods and verify results.

Test data seeding: Insert 10 executions across 2 apps, 3 routes, spanning 10 minutes. Include some FAILED, some COMPLETED, varying durations. Then verify:

  • stats() returns correct totals

  • statsForApp() filters correctly

  • timeseries() returns multiple buckets

  • slaCompliance() returns correct percentage

  • topErrors() returns ranked errors

  • punchcard() returns non-empty cells

  • Step 1: Write the failing integration test

Create ClickHouseStatsStoreIT.java with:

  • Load all 4 DDL files (V2 executions, V3 processor_executions, V4 stats MVs)

  • Seed 10+ executions and 20+ processor records across 2 apps, 3 routes, 10 minutes

  • Test: stats_returnsCorrectTotals, statsForApp_filtersCorrectly, timeseries_returnsBuckets, timeseriesGroupedByApp_returnsMap, slaCompliance_calculatesCorrectly, topErrors_returnsRankedErrors, activeErrorTypes_countsDistinct, punchcard_returnsNonEmpty, slaCountsByApp_returnsMap

  • Step 2: Run test to verify it fails

mvn test -pl cameleer3-server-app -Dtest=ClickHouseStatsStoreIT -Dfailsafe.provider=surefire -DfailIfNoTests=false -f pom.xml
  • Step 3: Implement ClickHouseStatsStore

Follow the PostgresStatsStore structure closely. Same private Filter record, same queryStats/queryTimeseries/queryGroupedTimeseries helper methods. Replace PG-specific SQL with CH equivalents per the translation table above.

  • Step 4: Run test to verify it passes
mvn test -pl cameleer3-server-app -Dtest=ClickHouseStatsStoreIT -Dfailsafe.provider=surefire -f pom.xml
  • Step 5: Commit
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java \
       cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java
git commit -m "feat(clickhouse): add ClickHouseStatsStore with -Merge aggregate queries"

Task 3: Feature Flag Wiring

Files:

  • Modify: cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java

  • Modify: cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java

  • Modify: cameleer3-server-app/src/main/resources/application.yml

  • Modify: deploy/base/server.yaml

  • Step 1: Add ConditionalOnProperty to PostgresStatsStore

@Repository
@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "postgres", matchIfMissing = true)
public class PostgresStatsStore implements StatsStore {
  • Step 2: Add CH StatsStore bean to StorageBeanConfig
@Bean
@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "clickhouse")
public StatsStore clickHouseStatsStore(
        @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
    return new ClickHouseStatsStore(clickHouseJdbc);
}
  • Step 3: Update application.yml

Add under cameleer.storage:

    stats: ${CAMELEER_STORAGE_STATS:postgres}
  • Step 4: Update deploy/base/server.yaml

Add env var:

            - name: CAMELEER_STORAGE_STATS
              value: "postgres"
  • Step 5: Compile and verify all tests pass
mvn clean verify -DskipITs -f pom.xml
  • Step 6: Commit
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java \
       cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java \
       cameleer3-server-app/src/main/resources/application.yml \
       deploy/base/server.yaml
git commit -m "feat(clickhouse): wire ClickHouseStatsStore with cameleer.storage.stats feature flag"

Verification Checklist

After all tasks are complete, verify:

  1. MVs trigger: Insert a row into executions, verify stats_1m_all has a row
  2. Aggregate correctness: Insert known data, verify countMerge/sumMerge/quantileMerge produce correct values
  3. Timeseries bucketing: Verify toStartOfInterval groups correctly across time ranges
  4. SLA compliance: Verify percentage calculation against raw data
  5. Top errors: Verify ranking and velocity trend detection
  6. Punchcard: Verify weekday/hour mapping (0=Sun..6=Sat convention)
  7. Feature flag: cameleer.storage.stats=postgres uses PG, =clickhouse uses CH
  8. Backward compat: With default config, everything uses PG
  9. CI: mvn clean verify -DskipITs passes