Rename Java packages from com.cameleer3 to com.cameleer, module directories from cameleer3-* to cameleer-*, and all references throughout workflows, Dockerfiles, docs, migrations, and pom.xml. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
20 KiB
ClickHouse Phase 3: Stats & Analytics — Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Replace TimescaleDB continuous aggregates with ClickHouse materialized views and implement a ClickHouseStatsStore that reads from them using -Merge aggregate functions.
Architecture: 5 DDL scripts create AggregatingMergeTree target tables + materialized views that trigger on INSERT to executions and processor_executions. A ClickHouseStatsStore implements the existing StatsStore interface, translating time_bucket() → toStartOfInterval(), SUM(total_count) → countMerge(total_count), approx_percentile → quantileMerge, etc. SLA and topErrors queries hit the raw executions / processor_executions tables with FINAL. Feature flag cameleer.storage.stats=postgres|clickhouse controls which implementation is active.
Tech Stack: ClickHouse 24.12, AggregatingMergeTree, -State/-Merge combinators, JdbcTemplate, Testcontainers
Design Spec: docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md (Materialized Views + Stats Query Translation sections)
File Structure
| File | Responsibility |
|---|---|
cameleer-server-app/.../resources/clickhouse/V4__stats_tables_and_mvs.sql |
DDL for all 5 stats tables + 5 materialized views |
cameleer-server-app/.../storage/ClickHouseStatsStore.java |
StatsStore impl using -Merge functions on AggregatingMergeTree tables |
cameleer-server-app/.../config/StorageBeanConfig.java |
Modified: add CH stats store bean with feature flag |
cameleer-server-app/.../storage/PostgresStatsStore.java |
Modified: add ConditionalOnProperty |
cameleer-server-app/.../resources/application.yml |
Modified: add cameleer.storage.stats flag |
deploy/base/server.yaml |
Modified: add CAMELEER_STORAGE_STATS env var |
cameleer-server-app/...test.../storage/ClickHouseStatsStoreIT.java |
Integration test for CH stats queries |
Query Translation Reference
| TimescaleDB (PostgresStatsStore) | ClickHouse (ClickHouseStatsStore) |
|---|---|
time_bucket(N * INTERVAL '1 second', bucket) |
toStartOfInterval(bucket, INTERVAL N SECOND) |
SUM(total_count) |
countMerge(total_count) |
SUM(failed_count) |
countIfMerge(failed_count) |
SUM(running_count) |
countIfMerge(running_count) |
SUM(duration_sum) |
sumMerge(duration_sum) |
MAX(p99_duration) |
quantileMerge(0.99)(p99_duration) |
MAX(duration_max) |
maxMerge(duration_max) |
SUM(duration_sum) / SUM(total_count) |
sumMerge(duration_sum) / countMerge(total_count) |
COUNT(*) FILTER (WHERE ...) |
countIf(...) |
EXTRACT(DOW FROM bucket) |
toDayOfWeek(bucket, 1) % 7 (1=Mon in CH, shift to 0=Sun) |
EXTRACT(HOUR FROM bucket) |
toHour(bucket) |
LEFT(error_message, 200) |
substring(error_message, 1, 200) |
COUNT(DISTINCT ...) |
uniq(...) or COUNT(DISTINCT ...) |
Task 1: DDL for Stats Tables and Materialized Views
Files:
- Create:
cameleer-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql
All 5 table+MV pairs in a single DDL file. Tables use AggregatingMergeTree(). MVs use -State combinators and trigger on INSERT to executions or processor_executions.
- Step 1: Create the DDL file
-- V4__stats_tables_and_mvs.sql
-- Materialized views replacing TimescaleDB continuous aggregates.
-- Tables use AggregatingMergeTree; MVs use -State combinators.
-- ── stats_1m_all (global) ────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS stats_1m_all (
tenant_id LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
running_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS
SELECT
tenant_id,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, bucket;
-- ── stats_1m_app (per-application) ───────────────────────────────────
CREATE TABLE IF NOT EXISTS stats_1m_app (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
running_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS
SELECT
tenant_id,
application_name,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_name, bucket;
-- ── stats_1m_route (per-route) ───────────────────────────────────────
CREATE TABLE IF NOT EXISTS stats_1m_route (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
route_id LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
running_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, route_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS
SELECT
tenant_id,
application_name,
route_id,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_name, route_id, bucket;
-- ── stats_1m_processor (per-processor-type) ──────────────────────────
CREATE TABLE IF NOT EXISTS stats_1m_processor (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
processor_type LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, processor_type, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS
SELECT
tenant_id,
application_name,
processor_type,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_name, processor_type, bucket;
-- ── stats_1m_processor_detail (per-processor-id) ─────────────────────
CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
route_id LowCardinality(String),
processor_id String,
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, route_id, processor_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS
SELECT
tenant_id,
application_name,
route_id,
processor_id,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_name, route_id, processor_id, bucket;
Note: The ClickHouseSchemaInitializer runs each .sql file as a single statement. ClickHouse supports multiple statements separated by ; in a single call, BUT the JDBC driver may not. If the initializer fails, each CREATE statement may need to be in its own file. Check during testing.
IMPORTANT: The ClickHouseSchemaInitializer needs to handle multi-statement files. Read it first — if it uses jdbc.execute(sql) for each file, the semicolons between statements will cause issues. If so, split into separate files (V4a, V4b, etc.) or modify the initializer to split on ;.
- Step 2: Check ClickHouseSchemaInitializer handles multi-statement
Read cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseSchemaInitializer.java. If it runs each file as a single jdbc.execute(), modify it to split on ; and run each statement separately. If it already handles this, proceed.
- Step 3: Verify DDL loads in Testcontainers
Write a quick smoke test or manually verify that all 10 objects (5 tables + 5 MVs) are created:
mvn clean compile -pl cameleer-server-app -f pom.xml
- Step 4: Commit
git add cameleer-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql
# also add ClickHouseSchemaInitializer if modified
git commit -m "feat(clickhouse): add stats materialized views DDL (5 tables + 5 MVs)"
Task 2: ClickHouseStatsStore — Aggregate Queries
Files:
- Create:
cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseStatsStoreIT.java - Create:
cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseStatsStore.java
The store implements StatsStore using ClickHouse -Merge functions. It follows the same pattern as PostgresStatsStore but with ClickHouse SQL syntax.
Key implementation patterns:
-
Stats queries (queryStats): Read from
stats_1m_*tables using-Mergecombinators:SELECT countMerge(total_count) AS total_count, countIfMerge(failed_count) AS failed_count, CASE WHEN countMerge(total_count) > 0 THEN sumMerge(duration_sum) / countMerge(total_count) ELSE 0 END AS avg_duration, quantileMerge(0.99)(p99_duration) AS p99_duration, countIfMerge(running_count) AS active_count FROM stats_1m_all WHERE tenant_id = 'default' AND bucket >= ? AND bucket < ?Same pattern for prev-24h and today queries (identical to PostgresStatsStore logic).
-
Timeseries queries (queryTimeseries): Group by time period:
SELECT toStartOfInterval(bucket, INTERVAL ? SECOND) AS period, countMerge(total_count) AS total_count, countIfMerge(failed_count) AS failed_count, CASE WHEN countMerge(total_count) > 0 THEN sumMerge(duration_sum) / countMerge(total_count) ELSE 0 END AS avg_duration, quantileMerge(0.99)(p99_duration) AS p99_duration, countIfMerge(running_count) AS active_count FROM stats_1m_app WHERE tenant_id = 'default' AND bucket >= ? AND bucket < ? AND application_name = ? GROUP BY period ORDER BY period -
Grouped timeseries: Same as timeseries but with extra GROUP BY column (application_name or route_id), returned as
Map<String, StatsTimeseries>. -
SLA compliance: Hit raw
executions FINALtable:SELECT countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, countIf(status != 'RUNNING') AS total FROM executions FINAL WHERE tenant_id = 'default' AND start_time >= ? AND start_time < ? AND application_name = ? -
SLA counts by app/route: Same pattern with GROUP BY.
-
Top errors: Hit raw
executions FINALorprocessor_executionstable with CTE for counts + velocity. ClickHouse differences:- No
FILTER (WHERE ...)→ usecountIf(...) - No
LEFT(s, n)→ usesubstring(s, 1, n) - CTE syntax is identical (
WITH ... AS (...))
- No
-
Active error types:
SELECT uniq(...)orCOUNT(DISTINCT ...)from raw executions. -
Punchcard: ClickHouse day-of-week:
toDayOfWeek(bucket, 1)returns 1=Mon..7=Sun. PGEXTRACT(DOW)returns 0=Sun..6=Sat. Conversion:toDayOfWeek(bucket, 1) % 7gives 0=Sun..6=Sat.
Constructor: Takes @Qualifier("clickHouseJdbcTemplate") JdbcTemplate jdbc.
Test approach: Seed data by inserting directly into executions and processor_executions tables (the MVs trigger automatically on INSERT). Then query via the StatsStore methods and verify results.
Test data seeding: Insert 10 executions across 2 apps, 3 routes, spanning 10 minutes. Include some FAILED, some COMPLETED, varying durations. Then verify:
-
stats()returns correct totals -
statsForApp()filters correctly -
timeseries()returns multiple buckets -
slaCompliance()returns correct percentage -
topErrors()returns ranked errors -
punchcard()returns non-empty cells -
Step 1: Write the failing integration test
Create ClickHouseStatsStoreIT.java with:
-
Load all 4 DDL files (V2 executions, V3 processor_executions, V4 stats MVs)
-
Seed 10+ executions and 20+ processor records across 2 apps, 3 routes, 10 minutes
-
Test:
stats_returnsCorrectTotals,statsForApp_filtersCorrectly,timeseries_returnsBuckets,timeseriesGroupedByApp_returnsMap,slaCompliance_calculatesCorrectly,topErrors_returnsRankedErrors,activeErrorTypes_countsDistinct,punchcard_returnsNonEmpty,slaCountsByApp_returnsMap -
Step 2: Run test to verify it fails
mvn test -pl cameleer-server-app -Dtest=ClickHouseStatsStoreIT -Dfailsafe.provider=surefire -DfailIfNoTests=false -f pom.xml
- Step 3: Implement ClickHouseStatsStore
Follow the PostgresStatsStore structure closely. Same private Filter record, same queryStats/queryTimeseries/queryGroupedTimeseries helper methods. Replace PG-specific SQL with CH equivalents per the translation table above.
- Step 4: Run test to verify it passes
mvn test -pl cameleer-server-app -Dtest=ClickHouseStatsStoreIT -Dfailsafe.provider=surefire -f pom.xml
- Step 5: Commit
git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseStatsStore.java \
cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseStatsStoreIT.java
git commit -m "feat(clickhouse): add ClickHouseStatsStore with -Merge aggregate queries"
Task 3: Feature Flag Wiring
Files:
-
Modify:
cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java -
Modify:
cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresStatsStore.java -
Modify:
cameleer-server-app/src/main/resources/application.yml -
Modify:
deploy/base/server.yaml -
Step 1: Add ConditionalOnProperty to PostgresStatsStore
@Repository
@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "postgres", matchIfMissing = true)
public class PostgresStatsStore implements StatsStore {
- Step 2: Add CH StatsStore bean to StorageBeanConfig
@Bean
@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "clickhouse")
public StatsStore clickHouseStatsStore(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseStatsStore(clickHouseJdbc);
}
- Step 3: Update application.yml
Add under cameleer.storage:
stats: ${CAMELEER_STORAGE_STATS:postgres}
- Step 4: Update deploy/base/server.yaml
Add env var:
- name: CAMELEER_STORAGE_STATS
value: "postgres"
- Step 5: Compile and verify all tests pass
mvn clean verify -DskipITs -f pom.xml
- Step 6: Commit
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java \
cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresStatsStore.java \
cameleer-server-app/src/main/resources/application.yml \
deploy/base/server.yaml
git commit -m "feat(clickhouse): wire ClickHouseStatsStore with cameleer.storage.stats feature flag"
Verification Checklist
After all tasks are complete, verify:
- MVs trigger: Insert a row into
executions, verifystats_1m_allhas a row - Aggregate correctness: Insert known data, verify countMerge/sumMerge/quantileMerge produce correct values
- Timeseries bucketing: Verify
toStartOfIntervalgroups correctly across time ranges - SLA compliance: Verify percentage calculation against raw data
- Top errors: Verify ranking and velocity trend detection
- Punchcard: Verify weekday/hour mapping (0=Sun..6=Sat convention)
- Feature flag:
cameleer.storage.stats=postgresuses PG,=clickhouseuses CH - Backward compat: With default config, everything uses PG
- CI:
mvn clean verify -DskipITspasses