Add 5-minute AggregatingMergeTree stats rollup for dashboard queries
Pre-aggregate route execution stats into 5-minute buckets using a materialized view with -State/-Merge combinators. Rewrite stats() and timeseries() to query the rollup table instead of scanning the wide base table. Active count remains a real-time query since RUNNING is transient. Includes idempotent backfill migration. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -96,79 +96,87 @@ public class ClickHouseSearchEngine implements SearchEngine {
|
||||
|
||||
@Override
|
||||
public ExecutionStats stats(Instant from, Instant to, String routeId, List<String> agentIds) {
|
||||
// Current period — read from rollup
|
||||
var conditions = new ArrayList<String>();
|
||||
var params = new ArrayList<Object>();
|
||||
conditions.add("start_time >= ?");
|
||||
conditions.add("bucket >= toStartOfFiveMinutes(?)");
|
||||
params.add(Timestamp.from(from));
|
||||
conditions.add("start_time <= ?");
|
||||
conditions.add("bucket <= ?");
|
||||
params.add(Timestamp.from(to));
|
||||
addScopeFilters(routeId, agentIds, conditions, params);
|
||||
|
||||
String where = " WHERE " + String.join(" AND ", conditions);
|
||||
|
||||
String aggregateSql = "SELECT count() AS total_count, " +
|
||||
"countIf(status = 'FAILED') AS failed_count, " +
|
||||
"toInt64(ifNotFinite(avg(duration_ms), 0)) AS avg_duration_ms, " +
|
||||
"toInt64(ifNotFinite(quantile(0.99)(duration_ms), 0)) AS p99_duration_ms, " +
|
||||
"countIf(status = 'RUNNING') AS active_count " +
|
||||
"FROM route_executions" + where;
|
||||
String rollupSql = "SELECT " +
|
||||
"countMerge(total_count) AS total_count, " +
|
||||
"countIfMerge(failed_count) AS failed_count, " +
|
||||
"toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_duration_ms, " +
|
||||
"toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_duration_ms " +
|
||||
"FROM route_execution_stats_5m" + where;
|
||||
|
||||
// Current period
|
||||
record PeriodStats(long totalCount, long failedCount, long avgDurationMs, long p99LatencyMs, long activeCount) {}
|
||||
PeriodStats current = jdbcTemplate.queryForObject(aggregateSql,
|
||||
record PeriodStats(long totalCount, long failedCount, long avgDurationMs, long p99LatencyMs) {}
|
||||
PeriodStats current = jdbcTemplate.queryForObject(rollupSql,
|
||||
(rs, rowNum) -> new PeriodStats(
|
||||
rs.getLong("total_count"),
|
||||
rs.getLong("failed_count"),
|
||||
rs.getLong("avg_duration_ms"),
|
||||
rs.getLong("p99_duration_ms"),
|
||||
rs.getLong("active_count")),
|
||||
rs.getLong("p99_duration_ms")),
|
||||
params.toArray());
|
||||
|
||||
// Previous period (same window shifted back 24h)
|
||||
// Active count — lightweight real-time query on base table (RUNNING is transient)
|
||||
var activeConditions = new ArrayList<String>();
|
||||
var activeParams = new ArrayList<Object>();
|
||||
activeConditions.add("status = 'RUNNING'");
|
||||
addScopeFilters(routeId, agentIds, activeConditions, activeParams);
|
||||
String activeWhere = " WHERE " + String.join(" AND ", activeConditions);
|
||||
Long activeCount = jdbcTemplate.queryForObject(
|
||||
"SELECT count() FROM route_executions" + activeWhere,
|
||||
Long.class, activeParams.toArray());
|
||||
|
||||
// Previous period (same window shifted back 24h) — read from rollup
|
||||
Duration window = Duration.between(from, to);
|
||||
Instant prevFrom = from.minus(Duration.ofHours(24));
|
||||
Instant prevTo = prevFrom.plus(window);
|
||||
var prevParams = new ArrayList<Object>();
|
||||
var prevConditions = new ArrayList<String>();
|
||||
prevConditions.add("start_time >= ?");
|
||||
var prevParams = new ArrayList<Object>();
|
||||
prevConditions.add("bucket >= toStartOfFiveMinutes(?)");
|
||||
prevParams.add(Timestamp.from(prevFrom));
|
||||
prevConditions.add("start_time <= ?");
|
||||
prevConditions.add("bucket <= ?");
|
||||
prevParams.add(Timestamp.from(prevTo));
|
||||
addScopeFilters(routeId, agentIds, prevConditions, prevParams);
|
||||
String prevWhere = " WHERE " + String.join(" AND ", prevConditions);
|
||||
|
||||
String prevAggregateSql = "SELECT count() AS total_count, " +
|
||||
"countIf(status = 'FAILED') AS failed_count, " +
|
||||
"toInt64(ifNotFinite(avg(duration_ms), 0)) AS avg_duration_ms, " +
|
||||
"toInt64(ifNotFinite(quantile(0.99)(duration_ms), 0)) AS p99_duration_ms, " +
|
||||
"countIf(status = 'RUNNING') AS active_count " +
|
||||
"FROM route_executions" + prevWhere;
|
||||
String prevRollupSql = "SELECT " +
|
||||
"countMerge(total_count) AS total_count, " +
|
||||
"countIfMerge(failed_count) AS failed_count, " +
|
||||
"toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_duration_ms, " +
|
||||
"toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_duration_ms " +
|
||||
"FROM route_execution_stats_5m" + prevWhere;
|
||||
|
||||
PeriodStats prev = jdbcTemplate.queryForObject(prevAggregateSql,
|
||||
PeriodStats prev = jdbcTemplate.queryForObject(prevRollupSql,
|
||||
(rs, rowNum) -> new PeriodStats(
|
||||
rs.getLong("total_count"),
|
||||
rs.getLong("failed_count"),
|
||||
rs.getLong("avg_duration_ms"),
|
||||
rs.getLong("p99_duration_ms"),
|
||||
rs.getLong("active_count")),
|
||||
rs.getLong("p99_duration_ms")),
|
||||
prevParams.toArray());
|
||||
|
||||
// Today total (midnight UTC to now) with same scope
|
||||
// Today total (midnight UTC to now) — read from rollup with same scope
|
||||
Instant todayStart = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.DAYS);
|
||||
var todayConditions = new ArrayList<String>();
|
||||
var todayParams = new ArrayList<Object>();
|
||||
todayConditions.add("start_time >= ?");
|
||||
todayConditions.add("bucket >= toStartOfFiveMinutes(?)");
|
||||
todayParams.add(Timestamp.from(todayStart));
|
||||
addScopeFilters(routeId, agentIds, todayConditions, todayParams);
|
||||
String todayWhere = " WHERE " + String.join(" AND ", todayConditions);
|
||||
|
||||
Long totalToday = jdbcTemplate.queryForObject(
|
||||
"SELECT count() FROM route_executions" + todayWhere,
|
||||
"SELECT countMerge(total_count) FROM route_execution_stats_5m" + todayWhere,
|
||||
Long.class, todayParams.toArray());
|
||||
|
||||
return new ExecutionStats(
|
||||
current.totalCount, current.failedCount, current.avgDurationMs,
|
||||
current.p99LatencyMs, current.activeCount,
|
||||
current.p99LatencyMs, activeCount != null ? activeCount : 0L,
|
||||
totalToday != null ? totalToday : 0L,
|
||||
prev.totalCount, prev.failedCount, prev.avgDurationMs, prev.p99LatencyMs);
|
||||
}
|
||||
@@ -186,34 +194,32 @@ public class ClickHouseSearchEngine implements SearchEngine {
|
||||
|
||||
var conditions = new ArrayList<String>();
|
||||
var params = new ArrayList<Object>();
|
||||
conditions.add("start_time >= ?");
|
||||
conditions.add("bucket >= toStartOfFiveMinutes(?)");
|
||||
params.add(Timestamp.from(from));
|
||||
conditions.add("start_time <= ?");
|
||||
conditions.add("bucket <= ?");
|
||||
params.add(Timestamp.from(to));
|
||||
addScopeFilters(routeId, agentIds, conditions, params);
|
||||
|
||||
String where = " WHERE " + String.join(" AND ", conditions);
|
||||
|
||||
// Use epoch-based bucketing for DateTime64 compatibility
|
||||
// Re-aggregate 5-minute rollup buckets into the requested interval
|
||||
String sql = "SELECT " +
|
||||
"toDateTime(intDiv(toUInt32(toDateTime(start_time)), " + intervalSeconds + ") * " + intervalSeconds + ") AS bucket, " +
|
||||
"count() AS total_count, " +
|
||||
"countIf(status = 'FAILED') AS failed_count, " +
|
||||
"toInt64(ifNotFinite(avg(duration_ms), 0)) AS avg_duration_ms, " +
|
||||
"toInt64(ifNotFinite(quantile(0.99)(duration_ms), 0)) AS p99_duration_ms, " +
|
||||
"countIf(status = 'RUNNING') AS active_count " +
|
||||
"FROM route_executions" + where +
|
||||
" GROUP BY bucket " +
|
||||
"ORDER BY bucket";
|
||||
"toDateTime(intDiv(toUInt32(bucket), " + intervalSeconds + ") * " + intervalSeconds + ") AS ts_bucket, " +
|
||||
"countMerge(total_count) AS total_count, " +
|
||||
"countIfMerge(failed_count) AS failed_count, " +
|
||||
"toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_duration_ms, " +
|
||||
"toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_duration_ms " +
|
||||
"FROM route_execution_stats_5m" + where +
|
||||
" GROUP BY ts_bucket ORDER BY ts_bucket";
|
||||
|
||||
List<StatsTimeseries.TimeseriesBucket> buckets = jdbcTemplate.query(sql, (rs, rowNum) ->
|
||||
new StatsTimeseries.TimeseriesBucket(
|
||||
rs.getTimestamp("bucket").toInstant(),
|
||||
rs.getTimestamp("ts_bucket").toInstant(),
|
||||
rs.getLong("total_count"),
|
||||
rs.getLong("failed_count"),
|
||||
rs.getLong("avg_duration_ms"),
|
||||
rs.getLong("p99_duration_ms"),
|
||||
rs.getLong("active_count")
|
||||
0L
|
||||
),
|
||||
params.toArray());
|
||||
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
-- Pre-aggregated 5-minute stats rollup for route executions.
|
||||
-- Uses AggregatingMergeTree with -State/-Merge combinators so intermediate
|
||||
-- aggregates can be merged across arbitrary time windows and dimensions.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS route_execution_stats_5m (
|
||||
bucket DateTime('UTC'),
|
||||
route_id LowCardinality(String),
|
||||
agent_id LowCardinality(String),
|
||||
total_count AggregateFunction(count, UInt64),
|
||||
failed_count AggregateFunction(countIf, UInt8),
|
||||
duration_sum AggregateFunction(sum, UInt64),
|
||||
p99_duration AggregateFunction(quantileTDigest(0.99), UInt64)
|
||||
)
|
||||
ENGINE = AggregatingMergeTree()
|
||||
PARTITION BY toYYYYMMDD(bucket)
|
||||
ORDER BY (agent_id, route_id, bucket)
|
||||
TTL bucket + toIntervalDay(30)
|
||||
SETTINGS ttl_only_drop_parts = 1;
|
||||
|
||||
CREATE MATERIALIZED VIEW IF NOT EXISTS route_execution_stats_5m_mv
|
||||
TO route_execution_stats_5m
|
||||
AS SELECT
|
||||
toStartOfFiveMinutes(start_time) AS bucket,
|
||||
route_id,
|
||||
agent_id,
|
||||
countState() AS total_count,
|
||||
countIfState(status = 'FAILED') AS failed_count,
|
||||
sumState(duration_ms) AS duration_sum,
|
||||
quantileTDigestState(0.99)(duration_ms) AS p99_duration
|
||||
FROM route_executions
|
||||
GROUP BY bucket, route_id, agent_id;
|
||||
@@ -0,0 +1,16 @@
|
||||
-- One-time idempotent backfill of existing route_executions into the
|
||||
-- 5-minute stats rollup table. Safe for repeated execution — the WHERE
|
||||
-- clause skips the INSERT if the target table already contains data.
|
||||
|
||||
INSERT INTO route_execution_stats_5m
|
||||
SELECT
|
||||
toStartOfFiveMinutes(start_time) AS bucket,
|
||||
route_id,
|
||||
agent_id,
|
||||
countState() AS total_count,
|
||||
countIfState(status = 'FAILED') AS failed_count,
|
||||
sumState(duration_ms) AS duration_sum,
|
||||
quantileTDigestState(0.99)(duration_ms) AS p99_duration
|
||||
FROM route_executions
|
||||
WHERE (SELECT count() FROM route_execution_stats_5m) = 0
|
||||
GROUP BY bucket, route_id, agent_id;
|
||||
Reference in New Issue
Block a user