Files
cameleer-server/docs/superpowers/plans/2026-03-31-clickhouse-phase3-stats-analytics.md
hsiegeln cb3ebfea7c
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 18s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
chore: rename cameleer3 to cameleer
Rename Java packages from com.cameleer3 to com.cameleer, module
directories from cameleer3-* to cameleer-*, and all references
throughout workflows, Dockerfiles, docs, migrations, and pom.xml.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 15:28:42 +02:00

432 lines
20 KiB
Markdown

# ClickHouse Phase 3: Stats & Analytics — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Replace TimescaleDB continuous aggregates with ClickHouse materialized views and implement a `ClickHouseStatsStore` that reads from them using `-Merge` aggregate functions.
**Architecture:** 5 DDL scripts create AggregatingMergeTree target tables + materialized views that trigger on INSERT to `executions` and `processor_executions`. A `ClickHouseStatsStore` implements the existing `StatsStore` interface, translating `time_bucket()``toStartOfInterval()`, `SUM(total_count)``countMerge(total_count)`, `approx_percentile``quantileMerge`, etc. SLA and topErrors queries hit the raw `executions` / `processor_executions` tables with `FINAL`. Feature flag `cameleer.storage.stats=postgres|clickhouse` controls which implementation is active.
**Tech Stack:** ClickHouse 24.12, AggregatingMergeTree, `-State`/`-Merge` combinators, JdbcTemplate, Testcontainers
**Design Spec:** `docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md` (Materialized Views + Stats Query Translation sections)
---
## File Structure
| File | Responsibility |
|------|----------------|
| `cameleer-server-app/.../resources/clickhouse/V4__stats_tables_and_mvs.sql` | DDL for all 5 stats tables + 5 materialized views |
| `cameleer-server-app/.../storage/ClickHouseStatsStore.java` | StatsStore impl using -Merge functions on AggregatingMergeTree tables |
| `cameleer-server-app/.../config/StorageBeanConfig.java` | Modified: add CH stats store bean with feature flag |
| `cameleer-server-app/.../storage/PostgresStatsStore.java` | Modified: add ConditionalOnProperty |
| `cameleer-server-app/.../resources/application.yml` | Modified: add `cameleer.storage.stats` flag |
| `deploy/base/server.yaml` | Modified: add `CAMELEER_STORAGE_STATS` env var |
| `cameleer-server-app/...test.../storage/ClickHouseStatsStoreIT.java` | Integration test for CH stats queries |
---
## Query Translation Reference
| TimescaleDB (PostgresStatsStore) | ClickHouse (ClickHouseStatsStore) |
|----------------------------------|-----------------------------------|
| `time_bucket(N * INTERVAL '1 second', bucket)` | `toStartOfInterval(bucket, INTERVAL N SECOND)` |
| `SUM(total_count)` | `countMerge(total_count)` |
| `SUM(failed_count)` | `countIfMerge(failed_count)` |
| `SUM(running_count)` | `countIfMerge(running_count)` |
| `SUM(duration_sum)` | `sumMerge(duration_sum)` |
| `MAX(p99_duration)` | `quantileMerge(0.99)(p99_duration)` |
| `MAX(duration_max)` | `maxMerge(duration_max)` |
| `SUM(duration_sum) / SUM(total_count)` | `sumMerge(duration_sum) / countMerge(total_count)` |
| `COUNT(*) FILTER (WHERE ...)` | `countIf(...)` |
| `EXTRACT(DOW FROM bucket)` | `toDayOfWeek(bucket, 1) % 7` (1=Mon in CH, shift to 0=Sun) |
| `EXTRACT(HOUR FROM bucket)` | `toHour(bucket)` |
| `LEFT(error_message, 200)` | `substring(error_message, 1, 200)` |
| `COUNT(DISTINCT ...)` | `uniq(...)` or `COUNT(DISTINCT ...)` |
---
### Task 1: DDL for Stats Tables and Materialized Views
**Files:**
- Create: `cameleer-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql`
All 5 table+MV pairs in a single DDL file. Tables use `AggregatingMergeTree()`. MVs use `-State` combinators and trigger on INSERT to `executions` or `processor_executions`.
- [ ] **Step 1: Create the DDL file**
```sql
-- V4__stats_tables_and_mvs.sql
-- Materialized views replacing TimescaleDB continuous aggregates.
-- Tables use AggregatingMergeTree; MVs use -State combinators.
-- ── stats_1m_all (global) ────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS stats_1m_all (
tenant_id LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
running_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS
SELECT
tenant_id,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, bucket;
-- ── stats_1m_app (per-application) ───────────────────────────────────
CREATE TABLE IF NOT EXISTS stats_1m_app (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
running_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS
SELECT
tenant_id,
application_name,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_name, bucket;
-- ── stats_1m_route (per-route) ───────────────────────────────────────
CREATE TABLE IF NOT EXISTS stats_1m_route (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
route_id LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
running_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, route_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS
SELECT
tenant_id,
application_name,
route_id,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_name, route_id, bucket;
-- ── stats_1m_processor (per-processor-type) ──────────────────────────
CREATE TABLE IF NOT EXISTS stats_1m_processor (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
processor_type LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, processor_type, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS
SELECT
tenant_id,
application_name,
processor_type,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_name, processor_type, bucket;
-- ── stats_1m_processor_detail (per-processor-id) ─────────────────────
CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
route_id LowCardinality(String),
processor_id String,
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, route_id, processor_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS
SELECT
tenant_id,
application_name,
route_id,
processor_id,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_name, route_id, processor_id, bucket;
```
Note: The `ClickHouseSchemaInitializer` runs each `.sql` file as a single statement. ClickHouse supports multiple statements separated by `;` in a single call, BUT the JDBC driver may not. If the initializer fails, each CREATE statement may need to be in its own file. Check during testing.
**IMPORTANT**: The ClickHouseSchemaInitializer needs to handle multi-statement files. Read it first — if it uses `jdbc.execute(sql)` for each file, the semicolons between statements will cause issues. If so, split into separate files (V4a, V4b, etc.) or modify the initializer to split on `;`.
- [ ] **Step 2: Check ClickHouseSchemaInitializer handles multi-statement**
Read `cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseSchemaInitializer.java`. If it runs each file as a single `jdbc.execute()`, modify it to split on `;` and run each statement separately. If it already handles this, proceed.
- [ ] **Step 3: Verify DDL loads in Testcontainers**
Write a quick smoke test or manually verify that all 10 objects (5 tables + 5 MVs) are created:
```bash
mvn clean compile -pl cameleer-server-app -f pom.xml
```
- [ ] **Step 4: Commit**
```bash
git add cameleer-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql
# also add ClickHouseSchemaInitializer if modified
git commit -m "feat(clickhouse): add stats materialized views DDL (5 tables + 5 MVs)"
```
---
### Task 2: ClickHouseStatsStore — Aggregate Queries
**Files:**
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseStatsStoreIT.java`
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseStatsStore.java`
The store implements `StatsStore` using ClickHouse `-Merge` functions. It follows the same pattern as `PostgresStatsStore` but with ClickHouse SQL syntax.
**Key implementation patterns:**
1. **Stats queries** (queryStats): Read from `stats_1m_*` tables using `-Merge` combinators:
```sql
SELECT
countMerge(total_count) AS total_count,
countIfMerge(failed_count) AS failed_count,
CASE WHEN countMerge(total_count) > 0
THEN sumMerge(duration_sum) / countMerge(total_count) ELSE 0 END AS avg_duration,
quantileMerge(0.99)(p99_duration) AS p99_duration,
countIfMerge(running_count) AS active_count
FROM stats_1m_all
WHERE tenant_id = 'default' AND bucket >= ? AND bucket < ?
```
Same pattern for prev-24h and today queries (identical to PostgresStatsStore logic).
2. **Timeseries queries** (queryTimeseries): Group by time period:
```sql
SELECT
toStartOfInterval(bucket, INTERVAL ? SECOND) AS period,
countMerge(total_count) AS total_count,
countIfMerge(failed_count) AS failed_count,
CASE WHEN countMerge(total_count) > 0
THEN sumMerge(duration_sum) / countMerge(total_count) ELSE 0 END AS avg_duration,
quantileMerge(0.99)(p99_duration) AS p99_duration,
countIfMerge(running_count) AS active_count
FROM stats_1m_app
WHERE tenant_id = 'default' AND bucket >= ? AND bucket < ? AND application_name = ?
GROUP BY period ORDER BY period
```
3. **Grouped timeseries**: Same as timeseries but with extra GROUP BY column (application_name or route_id), returned as `Map<String, StatsTimeseries>`.
4. **SLA compliance**: Hit raw `executions FINAL` table:
```sql
SELECT
countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant,
countIf(status != 'RUNNING') AS total
FROM executions FINAL
WHERE tenant_id = 'default' AND start_time >= ? AND start_time < ?
AND application_name = ?
```
5. **SLA counts by app/route**: Same pattern with GROUP BY.
6. **Top errors**: Hit raw `executions FINAL` or `processor_executions` table with CTE for counts + velocity. ClickHouse differences:
- No `FILTER (WHERE ...)` → use `countIf(...)`
- No `LEFT(s, n)` → use `substring(s, 1, n)`
- CTE syntax is identical (`WITH ... AS (...)`)
7. **Active error types**: `SELECT uniq(...)` or `COUNT(DISTINCT ...)` from raw executions.
8. **Punchcard**: ClickHouse day-of-week: `toDayOfWeek(bucket, 1)` returns 1=Mon..7=Sun. PG `EXTRACT(DOW)` returns 0=Sun..6=Sat. Conversion: `toDayOfWeek(bucket, 1) % 7` gives 0=Sun..6=Sat.
**Constructor**: Takes `@Qualifier("clickHouseJdbcTemplate") JdbcTemplate jdbc`.
**Test approach**: Seed data by inserting directly into `executions` and `processor_executions` tables (the MVs trigger automatically on INSERT). Then query via the StatsStore methods and verify results.
**Test data seeding**: Insert 10 executions across 2 apps, 3 routes, spanning 10 minutes. Include some FAILED, some COMPLETED, varying durations. Then verify:
- `stats()` returns correct totals
- `statsForApp()` filters correctly
- `timeseries()` returns multiple buckets
- `slaCompliance()` returns correct percentage
- `topErrors()` returns ranked errors
- `punchcard()` returns non-empty cells
- [ ] **Step 1: Write the failing integration test**
Create `ClickHouseStatsStoreIT.java` with:
- Load all 4 DDL files (V2 executions, V3 processor_executions, V4 stats MVs)
- Seed 10+ executions and 20+ processor records across 2 apps, 3 routes, 10 minutes
- Test: `stats_returnsCorrectTotals`, `statsForApp_filtersCorrectly`, `timeseries_returnsBuckets`, `timeseriesGroupedByApp_returnsMap`, `slaCompliance_calculatesCorrectly`, `topErrors_returnsRankedErrors`, `activeErrorTypes_countsDistinct`, `punchcard_returnsNonEmpty`, `slaCountsByApp_returnsMap`
- [ ] **Step 2: Run test to verify it fails**
```bash
mvn test -pl cameleer-server-app -Dtest=ClickHouseStatsStoreIT -Dfailsafe.provider=surefire -DfailIfNoTests=false -f pom.xml
```
- [ ] **Step 3: Implement ClickHouseStatsStore**
Follow the `PostgresStatsStore` structure closely. Same private `Filter` record, same `queryStats`/`queryTimeseries`/`queryGroupedTimeseries` helper methods. Replace PG-specific SQL with CH equivalents per the translation table above.
- [ ] **Step 4: Run test to verify it passes**
```bash
mvn test -pl cameleer-server-app -Dtest=ClickHouseStatsStoreIT -Dfailsafe.provider=surefire -f pom.xml
```
- [ ] **Step 5: Commit**
```bash
git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseStatsStore.java \
cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseStatsStoreIT.java
git commit -m "feat(clickhouse): add ClickHouseStatsStore with -Merge aggregate queries"
```
---
### Task 3: Feature Flag Wiring
**Files:**
- Modify: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java`
- Modify: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresStatsStore.java`
- Modify: `cameleer-server-app/src/main/resources/application.yml`
- Modify: `deploy/base/server.yaml`
- [ ] **Step 1: Add ConditionalOnProperty to PostgresStatsStore**
```java
@Repository
@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "postgres", matchIfMissing = true)
public class PostgresStatsStore implements StatsStore {
```
- [ ] **Step 2: Add CH StatsStore bean to StorageBeanConfig**
```java
@Bean
@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "clickhouse")
public StatsStore clickHouseStatsStore(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseStatsStore(clickHouseJdbc);
}
```
- [ ] **Step 3: Update application.yml**
Add under `cameleer.storage`:
```yaml
stats: ${CAMELEER_STORAGE_STATS:postgres}
```
- [ ] **Step 4: Update deploy/base/server.yaml**
Add env var:
```yaml
- name: CAMELEER_STORAGE_STATS
value: "postgres"
```
- [ ] **Step 5: Compile and verify all tests pass**
```bash
mvn clean verify -DskipITs -f pom.xml
```
- [ ] **Step 6: Commit**
```bash
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java \
cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresStatsStore.java \
cameleer-server-app/src/main/resources/application.yml \
deploy/base/server.yaml
git commit -m "feat(clickhouse): wire ClickHouseStatsStore with cameleer.storage.stats feature flag"
```
---
## Verification Checklist
After all tasks are complete, verify:
1. **MVs trigger**: Insert a row into `executions`, verify `stats_1m_all` has a row
2. **Aggregate correctness**: Insert known data, verify countMerge/sumMerge/quantileMerge produce correct values
3. **Timeseries bucketing**: Verify `toStartOfInterval` groups correctly across time ranges
4. **SLA compliance**: Verify percentage calculation against raw data
5. **Top errors**: Verify ranking and velocity trend detection
6. **Punchcard**: Verify weekday/hour mapping (0=Sun..6=Sat convention)
7. **Feature flag**: `cameleer.storage.stats=postgres` uses PG, `=clickhouse` uses CH
8. **Backward compat**: With default config, everything uses PG
9. **CI**: `mvn clean verify -DskipITs` passes