From adf13f04301afabcede19273eec6d28b32e1bb08 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Sun, 15 Mar 2026 10:46:26 +0100 Subject: [PATCH] Add 5-minute AggregatingMergeTree stats rollup for dashboard queries Pre-aggregate route execution stats into 5-minute buckets using a materialized view with -State/-Merge combinators. Rewrite stats() and timeseries() to query the rollup table instead of scanning the wide base table. Active count remains a real-time query since RUNNING is transient. Includes idempotent backfill migration. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/search/ClickHouseSearchEngine.java | 94 ++++++++++--------- .../resources/clickhouse/07-stats-rollup.sql | 31 ++++++ .../clickhouse/08-stats-rollup-backfill.sql | 16 ++++ 3 files changed, 97 insertions(+), 44 deletions(-) create mode 100644 cameleer3-server-app/src/main/resources/clickhouse/07-stats-rollup.sql create mode 100644 cameleer3-server-app/src/main/resources/clickhouse/08-stats-rollup-backfill.sql diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java index 7d095565..b2ae64ca 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java @@ -96,79 +96,87 @@ public class ClickHouseSearchEngine implements SearchEngine { @Override public ExecutionStats stats(Instant from, Instant to, String routeId, List agentIds) { + // Current period — read from rollup var conditions = new ArrayList(); var params = new ArrayList(); - conditions.add("start_time >= ?"); + conditions.add("bucket >= toStartOfFiveMinutes(?)"); params.add(Timestamp.from(from)); - conditions.add("start_time <= ?"); + conditions.add("bucket <= ?"); params.add(Timestamp.from(to)); addScopeFilters(routeId, agentIds, conditions, params); String where = " WHERE " + String.join(" AND ", conditions); - String aggregateSql = "SELECT count() AS total_count, " + - "countIf(status = 'FAILED') AS failed_count, " + - "toInt64(ifNotFinite(avg(duration_ms), 0)) AS avg_duration_ms, " + - "toInt64(ifNotFinite(quantile(0.99)(duration_ms), 0)) AS p99_duration_ms, " + - "countIf(status = 'RUNNING') AS active_count " + - "FROM route_executions" + where; + String rollupSql = "SELECT " + + "countMerge(total_count) AS total_count, " + + "countIfMerge(failed_count) AS failed_count, " + + "toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_duration_ms, " + + "toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_duration_ms " + + "FROM route_execution_stats_5m" + where; - // Current period - record PeriodStats(long totalCount, long failedCount, long avgDurationMs, long p99LatencyMs, long activeCount) {} - PeriodStats current = jdbcTemplate.queryForObject(aggregateSql, + record PeriodStats(long totalCount, long failedCount, long avgDurationMs, long p99LatencyMs) {} + PeriodStats current = jdbcTemplate.queryForObject(rollupSql, (rs, rowNum) -> new PeriodStats( rs.getLong("total_count"), rs.getLong("failed_count"), rs.getLong("avg_duration_ms"), - rs.getLong("p99_duration_ms"), - rs.getLong("active_count")), + rs.getLong("p99_duration_ms")), params.toArray()); - // Previous period (same window shifted back 24h) + // Active count — lightweight real-time query on base table (RUNNING is transient) + var activeConditions = new ArrayList(); + var activeParams = new ArrayList(); + activeConditions.add("status = 'RUNNING'"); + addScopeFilters(routeId, agentIds, activeConditions, activeParams); + String activeWhere = " WHERE " + String.join(" AND ", activeConditions); + Long activeCount = jdbcTemplate.queryForObject( + "SELECT count() FROM route_executions" + activeWhere, + Long.class, activeParams.toArray()); + + // Previous period (same window shifted back 24h) — read from rollup Duration window = Duration.between(from, to); Instant prevFrom = from.minus(Duration.ofHours(24)); Instant prevTo = prevFrom.plus(window); - var prevParams = new ArrayList(); var prevConditions = new ArrayList(); - prevConditions.add("start_time >= ?"); + var prevParams = new ArrayList(); + prevConditions.add("bucket >= toStartOfFiveMinutes(?)"); prevParams.add(Timestamp.from(prevFrom)); - prevConditions.add("start_time <= ?"); + prevConditions.add("bucket <= ?"); prevParams.add(Timestamp.from(prevTo)); addScopeFilters(routeId, agentIds, prevConditions, prevParams); String prevWhere = " WHERE " + String.join(" AND ", prevConditions); - String prevAggregateSql = "SELECT count() AS total_count, " + - "countIf(status = 'FAILED') AS failed_count, " + - "toInt64(ifNotFinite(avg(duration_ms), 0)) AS avg_duration_ms, " + - "toInt64(ifNotFinite(quantile(0.99)(duration_ms), 0)) AS p99_duration_ms, " + - "countIf(status = 'RUNNING') AS active_count " + - "FROM route_executions" + prevWhere; + String prevRollupSql = "SELECT " + + "countMerge(total_count) AS total_count, " + + "countIfMerge(failed_count) AS failed_count, " + + "toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_duration_ms, " + + "toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_duration_ms " + + "FROM route_execution_stats_5m" + prevWhere; - PeriodStats prev = jdbcTemplate.queryForObject(prevAggregateSql, + PeriodStats prev = jdbcTemplate.queryForObject(prevRollupSql, (rs, rowNum) -> new PeriodStats( rs.getLong("total_count"), rs.getLong("failed_count"), rs.getLong("avg_duration_ms"), - rs.getLong("p99_duration_ms"), - rs.getLong("active_count")), + rs.getLong("p99_duration_ms")), prevParams.toArray()); - // Today total (midnight UTC to now) with same scope + // Today total (midnight UTC to now) — read from rollup with same scope Instant todayStart = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.DAYS); var todayConditions = new ArrayList(); var todayParams = new ArrayList(); - todayConditions.add("start_time >= ?"); + todayConditions.add("bucket >= toStartOfFiveMinutes(?)"); todayParams.add(Timestamp.from(todayStart)); addScopeFilters(routeId, agentIds, todayConditions, todayParams); String todayWhere = " WHERE " + String.join(" AND ", todayConditions); Long totalToday = jdbcTemplate.queryForObject( - "SELECT count() FROM route_executions" + todayWhere, + "SELECT countMerge(total_count) FROM route_execution_stats_5m" + todayWhere, Long.class, todayParams.toArray()); return new ExecutionStats( current.totalCount, current.failedCount, current.avgDurationMs, - current.p99LatencyMs, current.activeCount, + current.p99LatencyMs, activeCount != null ? activeCount : 0L, totalToday != null ? totalToday : 0L, prev.totalCount, prev.failedCount, prev.avgDurationMs, prev.p99LatencyMs); } @@ -186,34 +194,32 @@ public class ClickHouseSearchEngine implements SearchEngine { var conditions = new ArrayList(); var params = new ArrayList(); - conditions.add("start_time >= ?"); + conditions.add("bucket >= toStartOfFiveMinutes(?)"); params.add(Timestamp.from(from)); - conditions.add("start_time <= ?"); + conditions.add("bucket <= ?"); params.add(Timestamp.from(to)); addScopeFilters(routeId, agentIds, conditions, params); String where = " WHERE " + String.join(" AND ", conditions); - // Use epoch-based bucketing for DateTime64 compatibility + // Re-aggregate 5-minute rollup buckets into the requested interval String sql = "SELECT " + - "toDateTime(intDiv(toUInt32(toDateTime(start_time)), " + intervalSeconds + ") * " + intervalSeconds + ") AS bucket, " + - "count() AS total_count, " + - "countIf(status = 'FAILED') AS failed_count, " + - "toInt64(ifNotFinite(avg(duration_ms), 0)) AS avg_duration_ms, " + - "toInt64(ifNotFinite(quantile(0.99)(duration_ms), 0)) AS p99_duration_ms, " + - "countIf(status = 'RUNNING') AS active_count " + - "FROM route_executions" + where + - " GROUP BY bucket " + - "ORDER BY bucket"; + "toDateTime(intDiv(toUInt32(bucket), " + intervalSeconds + ") * " + intervalSeconds + ") AS ts_bucket, " + + "countMerge(total_count) AS total_count, " + + "countIfMerge(failed_count) AS failed_count, " + + "toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_duration_ms, " + + "toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_duration_ms " + + "FROM route_execution_stats_5m" + where + + " GROUP BY ts_bucket ORDER BY ts_bucket"; List buckets = jdbcTemplate.query(sql, (rs, rowNum) -> new StatsTimeseries.TimeseriesBucket( - rs.getTimestamp("bucket").toInstant(), + rs.getTimestamp("ts_bucket").toInstant(), rs.getLong("total_count"), rs.getLong("failed_count"), rs.getLong("avg_duration_ms"), rs.getLong("p99_duration_ms"), - rs.getLong("active_count") + 0L ), params.toArray()); diff --git a/cameleer3-server-app/src/main/resources/clickhouse/07-stats-rollup.sql b/cameleer3-server-app/src/main/resources/clickhouse/07-stats-rollup.sql new file mode 100644 index 00000000..2c70890b --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/07-stats-rollup.sql @@ -0,0 +1,31 @@ +-- Pre-aggregated 5-minute stats rollup for route executions. +-- Uses AggregatingMergeTree with -State/-Merge combinators so intermediate +-- aggregates can be merged across arbitrary time windows and dimensions. + +CREATE TABLE IF NOT EXISTS route_execution_stats_5m ( + bucket DateTime('UTC'), + route_id LowCardinality(String), + agent_id LowCardinality(String), + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt8), + duration_sum AggregateFunction(sum, UInt64), + p99_duration AggregateFunction(quantileTDigest(0.99), UInt64) +) +ENGINE = AggregatingMergeTree() +PARTITION BY toYYYYMMDD(bucket) +ORDER BY (agent_id, route_id, bucket) +TTL bucket + toIntervalDay(30) +SETTINGS ttl_only_drop_parts = 1; + +CREATE MATERIALIZED VIEW IF NOT EXISTS route_execution_stats_5m_mv +TO route_execution_stats_5m +AS SELECT + toStartOfFiveMinutes(start_time) AS bucket, + route_id, + agent_id, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + quantileTDigestState(0.99)(duration_ms) AS p99_duration +FROM route_executions +GROUP BY bucket, route_id, agent_id; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/08-stats-rollup-backfill.sql b/cameleer3-server-app/src/main/resources/clickhouse/08-stats-rollup-backfill.sql new file mode 100644 index 00000000..5e80a23a --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/08-stats-rollup-backfill.sql @@ -0,0 +1,16 @@ +-- One-time idempotent backfill of existing route_executions into the +-- 5-minute stats rollup table. Safe for repeated execution — the WHERE +-- clause skips the INSERT if the target table already contains data. + +INSERT INTO route_execution_stats_5m +SELECT + toStartOfFiveMinutes(start_time) AS bucket, + route_id, + agent_id, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + quantileTDigestState(0.99)(duration_ms) AS p99_duration +FROM route_executions +WHERE (SELECT count() FROM route_execution_stats_5m) = 0 +GROUP BY bucket, route_id, agent_id;