feat(clickhouse): add ClickHouseStatsStore with -Merge aggregate queries

Implements StatsStore interface for ClickHouse using AggregatingMergeTree
tables with -Merge combinators (countMerge, countIfMerge, sumMerge,
quantileMerge). Uses literal SQL for aggregate table queries to avoid
ClickHouse JDBC driver PreparedStatement issues with AggregateFunction
columns. Raw table queries (SLA, topErrors, activeErrorTypes) use normal
prepared statements.

Includes 13 integration tests covering stats, timeseries, grouped
timeseries, SLA compliance, SLA counts by app/route, top errors, active
error types, punchcard, and processor stats. Also fixes AggregateFunction
type signatures in V4 DDL (count() takes no args, countIf takes UInt8).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-31 21:49:22 +02:00
parent eb0d26814f
commit 052990bb59
3 changed files with 953 additions and 13 deletions

View File

@@ -0,0 +1,565 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket;
import com.cameleer3.server.core.search.TopError;
import com.cameleer3.server.core.storage.StatsStore;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* ClickHouse implementation of {@link StatsStore}.
* Reads from AggregatingMergeTree tables populated by materialized views,
* using {@code -Merge} aggregate combinators to finalize partial states.
*
* <p>Queries against AggregatingMergeTree tables use literal SQL values instead
* of JDBC prepared-statement parameters because the ClickHouse JDBC v2 driver
* (0.9.x) wraps prepared statements in a sub-query that strips the
* {@code AggregateFunction} column type, breaking {@code -Merge} combinators.
* Queries against raw tables ({@code executions FINAL},
* {@code processor_executions}) use normal prepared-statement parameters
* since they have no AggregateFunction columns.</p>
*/
public class ClickHouseStatsStore implements StatsStore {
private static final String TENANT = "default";
private final JdbcTemplate jdbc;
public ClickHouseStatsStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
// ── Stats (aggregate) ────────────────────────────────────────────────
@Override
public ExecutionStats stats(Instant from, Instant to) {
return queryStats("stats_1m_all", from, to, List.of(), true);
}
@Override
public ExecutionStats statsForApp(Instant from, Instant to, String applicationName) {
return queryStats("stats_1m_app", from, to, List.of(
new Filter("application_name", applicationName)), true);
}
@Override
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds) {
return queryStats("stats_1m_route", from, to, List.of(
new Filter("route_id", routeId)), true);
}
@Override
public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) {
return queryProcessorStatsRaw(from, to, routeId, processorType);
}
// ── Timeseries ───────────────────────────────────────────────────────
@Override
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true);
}
@Override
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationName) {
return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
new Filter("application_name", applicationName)), true);
}
@Override
public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
String routeId, List<String> agentIds) {
return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of(
new Filter("route_id", routeId)), true);
}
@Override
public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
String routeId, String processorType) {
return queryProcessorTimeseriesRaw(from, to, bucketCount, routeId, processorType);
}
// ── Grouped timeseries ───────────────────────────────────────────────
@Override
public Map<String, StatsTimeseries> timeseriesGroupedByApp(Instant from, Instant to, int bucketCount) {
return queryGroupedTimeseries("stats_1m_app", "application_name", from, to,
bucketCount, List.of());
}
@Override
public Map<String, StatsTimeseries> timeseriesGroupedByRoute(Instant from, Instant to,
int bucketCount, String applicationName) {
return queryGroupedTimeseries("stats_1m_route", "route_id", from, to,
bucketCount, List.of(new Filter("application_name", applicationName)));
}
// ── SLA compliance (raw table — prepared statements OK) ──────────────
@Override
public double slaCompliance(Instant from, Instant to, int thresholdMs,
String applicationName, String routeId) {
String sql = "SELECT " +
"countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
"countIf(status != 'RUNNING') AS total " +
"FROM executions FINAL " +
"WHERE tenant_id = ? AND start_time >= ? AND start_time < ?";
List<Object> params = new ArrayList<>();
params.add(thresholdMs);
params.add(TENANT);
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
sql += " AND application_name = ?";
params.add(applicationName);
}
if (routeId != null) {
sql += " AND route_id = ?";
params.add(routeId);
}
return jdbc.query(sql, (rs, rowNum) -> {
long total = rs.getLong("total");
if (total == 0) return 1.0;
return rs.getLong("compliant") * 100.0 / total;
}, params.toArray()).stream().findFirst().orElse(1.0);
}
@Override
public Map<String, long[]> slaCountsByApp(Instant from, Instant to, int defaultThresholdMs) {
String sql = "SELECT application_name, " +
"countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
"countIf(status != 'RUNNING') AS total " +
"FROM executions FINAL " +
"WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
"GROUP BY application_name";
Map<String, long[]> result = new LinkedHashMap<>();
jdbc.query(sql, (rs) -> {
result.put(rs.getString("application_name"),
new long[]{rs.getLong("compliant"), rs.getLong("total")});
}, defaultThresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to));
return result;
}
@Override
public Map<String, long[]> slaCountsByRoute(Instant from, Instant to,
String applicationName, int thresholdMs) {
String sql = "SELECT route_id, " +
"countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
"countIf(status != 'RUNNING') AS total " +
"FROM executions FINAL " +
"WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
"AND application_name = ? GROUP BY route_id";
Map<String, long[]> result = new LinkedHashMap<>();
jdbc.query(sql, (rs) -> {
result.put(rs.getString("route_id"),
new long[]{rs.getLong("compliant"), rs.getLong("total")});
}, thresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to), applicationName);
return result;
}
// ── Top errors (raw table — prepared statements OK) ──────────────────
@Override
public List<TopError> topErrors(Instant from, Instant to, String applicationName,
String routeId, int limit) {
StringBuilder where = new StringBuilder(
"status = 'FAILED' AND start_time >= ? AND start_time < ?");
List<Object> params = new ArrayList<>();
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
where.append(" AND application_name = ?");
params.add(applicationName);
}
String table;
String groupId;
if (routeId != null) {
table = "processor_executions";
groupId = "processor_id";
where.append(" AND route_id = ?");
params.add(routeId);
} else {
table = "executions FINAL";
groupId = "route_id";
}
Instant fiveMinAgo = Instant.now().minus(5, ChronoUnit.MINUTES);
Instant tenMinAgo = Instant.now().minus(10, ChronoUnit.MINUTES);
String sql = "WITH counted AS (" +
" SELECT COALESCE(error_type, substring(error_message, 1, 200)) AS error_key, " +
" " + groupId + " AS group_id, " +
" count() AS cnt, max(start_time) AS last_seen " +
" FROM " + table + " WHERE tenant_id = ? AND " + where +
" GROUP BY error_key, group_id ORDER BY cnt DESC LIMIT ?" +
"), velocity AS (" +
" SELECT COALESCE(error_type, substring(error_message, 1, 200)) AS error_key, " +
" countIf(start_time >= ?) AS recent_5m, " +
" countIf(start_time >= ? AND start_time < ?) AS prev_5m " +
" FROM " + table + " WHERE tenant_id = ? AND " + where +
" GROUP BY error_key" +
") SELECT c.error_key, c.group_id, c.cnt, c.last_seen, " +
" COALESCE(v.recent_5m, 0) / 5.0 AS velocity, " +
" CASE " +
" WHEN COALESCE(v.recent_5m, 0) > COALESCE(v.prev_5m, 0) * 1.2 THEN 'accelerating' " +
" WHEN COALESCE(v.recent_5m, 0) < COALESCE(v.prev_5m, 0) * 0.8 THEN 'decelerating' " +
" ELSE 'stable' END AS trend " +
"FROM counted c LEFT JOIN velocity v ON c.error_key = v.error_key " +
"ORDER BY c.cnt DESC";
List<Object> fullParams = new ArrayList<>();
fullParams.add(TENANT);
fullParams.addAll(params);
fullParams.add(limit);
fullParams.add(Timestamp.from(fiveMinAgo));
fullParams.add(Timestamp.from(tenMinAgo));
fullParams.add(Timestamp.from(fiveMinAgo));
fullParams.add(TENANT);
fullParams.addAll(params);
return jdbc.query(sql, (rs, rowNum) -> {
String errorKey = rs.getString("error_key");
String gid = rs.getString("group_id");
return new TopError(
errorKey,
routeId != null ? routeId : gid,
routeId != null ? gid : null,
rs.getLong("cnt"),
rs.getDouble("velocity"),
rs.getString("trend"),
rs.getTimestamp("last_seen").toInstant());
}, fullParams.toArray());
}
@Override
public int activeErrorTypes(Instant from, Instant to, String applicationName) {
String sql = "SELECT COUNT(DISTINCT COALESCE(error_type, substring(error_message, 1, 200))) " +
"FROM executions FINAL " +
"WHERE tenant_id = ? AND status = 'FAILED' AND start_time >= ? AND start_time < ?";
List<Object> params = new ArrayList<>();
params.add(TENANT);
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
sql += " AND application_name = ?";
params.add(applicationName);
}
Integer count = jdbc.queryForObject(sql, Integer.class, params.toArray());
return count != null ? count : 0;
}
// ── Punchcard (AggregatingMergeTree — literal SQL) ───────────────────
@Override
public List<PunchcardCell> punchcard(Instant from, Instant to, String applicationName) {
String view = applicationName != null ? "stats_1m_app" : "stats_1m_all";
String sql = "SELECT toDayOfWeek(bucket, 1) % 7 AS weekday, " +
"toHour(bucket) AS hour, " +
"countMerge(total_count) AS total_count, " +
"countIfMerge(failed_count) AS failed_count " +
"FROM " + view +
" WHERE tenant_id = " + lit(TENANT) +
" AND bucket >= " + lit(from) +
" AND bucket < " + lit(to);
if (applicationName != null) {
sql += " AND application_name = " + lit(applicationName);
}
sql += " GROUP BY weekday, hour ORDER BY weekday, hour";
return jdbc.query(sql, (rs, rowNum) -> new PunchcardCell(
rs.getInt("weekday"), rs.getInt("hour"),
rs.getLong("total_count"), rs.getLong("failed_count")));
}
// ── Private helpers ──────────────────────────────────────────────────
private record Filter(String column, String value) {}
/**
* Format an Instant as a ClickHouse DateTime literal.
* Uses java.sql.Timestamp to match the JVM→ClickHouse timezone convention
* used by the JDBC driver, then truncates to second precision for DateTime
* column compatibility.
*/
private static String lit(Instant instant) {
// Truncate to seconds — ClickHouse DateTime has second precision
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
String ts = new Timestamp(truncated.toEpochMilli()).toString();
// Remove trailing ".0" that Timestamp.toString() always appends
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
return "'" + ts + "'";
}
/** Format a string as a SQL literal with single-quote escaping. */
private static String lit(String value) {
return "'" + value.replace("'", "\\'") + "'";
}
/** Convert Instant to java.sql.Timestamp for JDBC binding. */
private static Timestamp ts(Instant instant) {
return Timestamp.from(instant);
}
/**
* Build -Merge combinator SQL for the given view and time range.
*/
private String buildStatsSql(String view, Instant rangeFrom, Instant rangeTo,
List<Filter> filters, boolean hasRunning) {
String runningCol = hasRunning ? "countIfMerge(running_count)" : "0";
String sql = "SELECT " +
"countMerge(total_count) AS total_count, " +
"countIfMerge(failed_count) AS failed_count, " +
"sumMerge(duration_sum) AS duration_sum, " +
"quantileMerge(0.99)(p99_duration) AS p99_duration, " +
runningCol + " AS active_count " +
"FROM " + view +
" WHERE tenant_id = " + lit(TENANT) +
" AND bucket >= " + lit(rangeFrom) +
" AND bucket < " + lit(rangeTo);
for (Filter f : filters) {
sql += " AND " + f.column() + " = " + lit(f.value());
}
return sql;
}
/**
* Query an AggregatingMergeTree stats table using -Merge combinators.
* Uses literal SQL to avoid ClickHouse JDBC driver PreparedStatement issues.
*/
private ExecutionStats queryStats(String view, Instant from, Instant to,
List<Filter> filters, boolean hasRunning) {
String sql = buildStatsSql(view, from, to, filters, hasRunning);
long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0;
var currentResult = jdbc.query(sql, (rs, rowNum) -> {
long tc = rs.getLong("total_count");
long fc = rs.getLong("failed_count");
long ds = rs.getLong("duration_sum"); // Nullable → 0 if null
long p99 = (long) rs.getDouble("p99_duration"); // quantileMerge returns Float64
long ac = rs.getLong("active_count");
return new long[]{tc, fc, ds, p99, ac};
});
if (!currentResult.isEmpty()) {
long[] r = currentResult.get(0);
totalCount = r[0]; failedCount = r[1];
avgDuration = totalCount > 0 ? r[2] / totalCount : 0;
p99Duration = r[3]; activeCount = r[4];
}
// Previous period (shifted back 24h)
Instant prevFrom = from.minus(Duration.ofHours(24));
Instant prevTo = to.minus(Duration.ofHours(24));
String prevSql = buildStatsSql(view, prevFrom, prevTo, filters, hasRunning);
long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0;
var prevResult = jdbc.query(prevSql, (rs, rowNum) -> {
long tc = rs.getLong("total_count");
long fc = rs.getLong("failed_count");
long ds = rs.getLong("duration_sum");
long p99 = (long) rs.getDouble("p99_duration");
return new long[]{tc, fc, ds, p99};
});
if (!prevResult.isEmpty()) {
long[] r = prevResult.get(0);
prevTotal = r[0]; prevFailed = r[1];
prevAvg = prevTotal > 0 ? r[2] / prevTotal : 0;
prevP99 = r[3];
}
// Today total
Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
String todaySql = buildStatsSql(view, todayStart, Instant.now(), filters, hasRunning);
long totalToday = 0;
var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"));
if (!todayResult.isEmpty()) totalToday = todayResult.get(0);
return new ExecutionStats(
totalCount, failedCount, avgDuration, p99Duration, activeCount,
totalToday, prevTotal, prevFailed, prevAvg, prevP99);
}
/**
* Timeseries from AggregatingMergeTree using -Merge combinators.
*/
private StatsTimeseries queryTimeseries(String view, Instant from, Instant to,
int bucketCount, List<Filter> filters,
boolean hasRunningCount) {
long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
if (intervalSeconds < 60) intervalSeconds = 60;
String runningCol = hasRunningCount ? "countIfMerge(running_count)" : "0";
String sql = "SELECT " +
"toStartOfInterval(bucket, INTERVAL " + intervalSeconds + " SECOND) AS period, " +
"countMerge(total_count) AS total_count, " +
"countIfMerge(failed_count) AS failed_count, " +
"sumMerge(duration_sum) AS duration_sum, " +
"quantileMerge(0.99)(p99_duration) AS p99_duration, " +
runningCol + " AS active_count " +
"FROM " + view +
" WHERE tenant_id = " + lit(TENANT) +
" AND bucket >= " + lit(from) +
" AND bucket < " + lit(to);
for (Filter f : filters) {
sql += " AND " + f.column() + " = " + lit(f.value());
}
sql += " GROUP BY period ORDER BY period";
List<TimeseriesBucket> buckets = jdbc.query(sql, (rs, rowNum) -> {
long tc = rs.getLong("total_count");
long ds = rs.getLong("duration_sum");
return new TimeseriesBucket(
rs.getTimestamp("period").toInstant(),
tc, rs.getLong("failed_count"),
tc > 0 ? ds / tc : 0, (long) rs.getDouble("p99_duration"),
rs.getLong("active_count"));
});
return new StatsTimeseries(buckets);
}
/**
* Grouped timeseries from AggregatingMergeTree.
*/
private Map<String, StatsTimeseries> queryGroupedTimeseries(
String view, String groupCol, Instant from, Instant to,
int bucketCount, List<Filter> filters) {
long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
if (intervalSeconds < 60) intervalSeconds = 60;
String sql = "SELECT " +
"toStartOfInterval(bucket, INTERVAL " + intervalSeconds + " SECOND) AS period, " +
groupCol + " AS group_key, " +
"countMerge(total_count) AS total_count, " +
"countIfMerge(failed_count) AS failed_count, " +
"sumMerge(duration_sum) AS duration_sum, " +
"quantileMerge(0.99)(p99_duration) AS p99_duration, " +
"countIfMerge(running_count) AS active_count " +
"FROM " + view +
" WHERE tenant_id = " + lit(TENANT) +
" AND bucket >= " + lit(from) +
" AND bucket < " + lit(to);
for (Filter f : filters) {
sql += " AND " + f.column() + " = " + lit(f.value());
}
sql += " GROUP BY period, group_key ORDER BY period, group_key";
Map<String, List<TimeseriesBucket>> grouped = new LinkedHashMap<>();
jdbc.query(sql, (rs) -> {
String key = rs.getString("group_key");
long tc = rs.getLong("total_count");
long ds = rs.getLong("duration_sum");
TimeseriesBucket bucket = new TimeseriesBucket(
rs.getTimestamp("period").toInstant(),
tc, rs.getLong("failed_count"),
tc > 0 ? ds / tc : 0, (long) rs.getDouble("p99_duration"),
rs.getLong("active_count"));
grouped.computeIfAbsent(key, k -> new ArrayList<>()).add(bucket);
});
Map<String, StatsTimeseries> result = new LinkedHashMap<>();
grouped.forEach((key, buckets) -> result.put(key, new StatsTimeseries(buckets)));
return result;
}
/**
* Direct aggregation on processor_executions for processor-level stats.
*/
private ExecutionStats queryProcessorStatsRaw(Instant from, Instant to,
String routeId, String processorType) {
String sql = "SELECT " +
"count() AS total_count, " +
"countIf(status = 'FAILED') AS failed_count, " +
"CASE WHEN count() > 0 THEN sum(duration_ms) / count() ELSE 0 END AS avg_duration, " +
"quantile(0.99)(duration_ms) AS p99_duration, " +
"0 AS active_count " +
"FROM processor_executions " +
"WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
"AND route_id = ? AND processor_type = ?";
long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0;
var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
rs.getLong("total_count"), rs.getLong("failed_count"),
(long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"),
rs.getLong("active_count")
}, TENANT, ts(from), ts(to), routeId, processorType);
if (!currentResult.isEmpty()) {
long[] r = currentResult.get(0);
totalCount = r[0]; failedCount = r[1]; avgDuration = r[2]; p99Duration = r[3];
}
Instant prevFrom = from.minus(Duration.ofHours(24));
Instant prevTo = to.minus(Duration.ofHours(24));
long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0;
var prevResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
rs.getLong("total_count"), rs.getLong("failed_count"),
(long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration")
}, TENANT, ts(prevFrom), ts(prevTo), routeId, processorType);
if (!prevResult.isEmpty()) {
long[] r = prevResult.get(0);
prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3];
}
Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
long totalToday = 0;
var todayResult = jdbc.query(sql, (rs, rowNum) -> rs.getLong("total_count"),
TENANT, ts(todayStart), ts(Instant.now()), routeId, processorType);
if (!todayResult.isEmpty()) totalToday = todayResult.get(0);
return new ExecutionStats(
totalCount, failedCount, avgDuration, p99Duration, 0,
totalToday, prevTotal, prevFailed, prevAvg, prevP99);
}
/**
* Direct aggregation on processor_executions for processor-level timeseries.
*/
private StatsTimeseries queryProcessorTimeseriesRaw(Instant from, Instant to,
int bucketCount,
String routeId, String processorType) {
long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
if (intervalSeconds < 60) intervalSeconds = 60;
String sql = "SELECT " +
"toStartOfInterval(start_time, INTERVAL " + intervalSeconds + " SECOND) AS period, " +
"count() AS total_count, " +
"countIf(status = 'FAILED') AS failed_count, " +
"CASE WHEN count() > 0 THEN sum(duration_ms) / count() ELSE 0 END AS avg_duration, " +
"quantile(0.99)(duration_ms) AS p99_duration, " +
"0 AS active_count " +
"FROM processor_executions " +
"WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
"AND route_id = ? AND processor_type = ? " +
"GROUP BY period ORDER BY period";
List<TimeseriesBucket> buckets = jdbc.query(sql, (rs, rowNum) ->
new TimeseriesBucket(
rs.getTimestamp("period").toInstant(),
rs.getLong("total_count"), rs.getLong("failed_count"),
(long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"),
rs.getLong("active_count")
), TENANT, ts(from), ts(to), routeId, processorType);
return new StatsTimeseries(buckets);
}
}

View File

@@ -7,9 +7,9 @@
CREATE TABLE IF NOT EXISTS stats_1m_all (
tenant_id LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
running_count AggregateFunction(countIf, UInt64, UInt8),
total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8),
running_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
@@ -38,9 +38,9 @@ CREATE TABLE IF NOT EXISTS stats_1m_app (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
running_count AggregateFunction(countIf, UInt64, UInt8),
total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8),
running_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
@@ -71,9 +71,9 @@ CREATE TABLE IF NOT EXISTS stats_1m_route (
application_name LowCardinality(String),
route_id LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
running_count AggregateFunction(countIf, UInt64, UInt8),
total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8),
running_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
@@ -105,8 +105,8 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor (
application_name LowCardinality(String),
processor_type LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
@@ -138,8 +138,8 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
route_id LowCardinality(String),
processor_id String,
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))

View File

@@ -0,0 +1,375 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.search.TopError;
import com.cameleer3.server.core.storage.StatsStore.PunchcardCell;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class ClickHouseStatsStoreIT {
@Container
static final ClickHouseContainer clickhouse =
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
private JdbcTemplate jdbc;
private ClickHouseStatsStore store;
// base time: 2026-03-31T10:00:00Z (a Tuesday)
private static final Instant BASE = Instant.parse("2026-03-31T10:00:00Z");
@BeforeEach
void setUp() throws Exception {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(clickhouse.getJdbcUrl());
ds.setUsername(clickhouse.getUsername());
ds.setPassword(clickhouse.getPassword());
jdbc = new JdbcTemplate(ds);
// Load DDL from classpath resources
String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql")
.getContentAsString(StandardCharsets.UTF_8);
String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql")
.getContentAsString(StandardCharsets.UTF_8);
String statsDdl = new ClassPathResource("clickhouse/V4__stats_tables_and_mvs.sql")
.getContentAsString(StandardCharsets.UTF_8);
jdbc.execute(executionsDdl);
jdbc.execute(processorsDdl);
// Drop MVs first (they reference the stats tables), then recreate everything
jdbc.execute("DROP TABLE IF EXISTS stats_1m_all_mv");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_app_mv");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_route_mv");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_mv");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_detail_mv");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_all");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_app");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_route");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_detail");
// Strip SQL line comments first (they may contain semicolons),
// then split by ';' and execute non-empty statements.
String cleanedDdl = statsDdl.replaceAll("--[^\n]*", "");
for (String stmt : cleanedDdl.split(";")) {
String trimmed = stmt.trim();
if (!trimmed.isEmpty()) {
jdbc.execute(trimmed);
}
}
// Truncate base tables
jdbc.execute("TRUNCATE TABLE executions");
jdbc.execute("TRUNCATE TABLE processor_executions");
seedTestData();
// Try the failing query to capture it in query_log, then check
try {
jdbc.queryForMap(
"SELECT countMerge(total_count) AS tc, countIfMerge(failed_count) AS fc, " +
"sumMerge(duration_sum) / greatest(countMerge(total_count), 1) AS avg, " +
"quantileMerge(0.99)(p99_duration) AS p99, " +
"countIfMerge(running_count) AS rc " +
"FROM stats_1m_all WHERE tenant_id = 'default' " +
"AND bucket >= '2026-03-31 09:59:00' AND bucket < '2026-03-31 10:05:00'");
} catch (Exception e) {
System.out.println("Expected error: " + e.getMessage().substring(0, 80));
}
jdbc.execute("SYSTEM FLUSH LOGS");
// Get ALL recent queries to see what the driver sends
var queryLog = jdbc.queryForList(
"SELECT type, substring(query, 1, 200) AS q " +
"FROM system.query_log WHERE event_time > now() - 30 " +
"AND query NOT LIKE '%system.query_log%' AND query NOT LIKE '%FLUSH%' " +
"ORDER BY event_time DESC LIMIT 20");
for (var entry : queryLog) {
System.out.println("LOG: " + entry.get("type") + " | " + entry.get("q"));
}
store = new ClickHouseStatsStore(jdbc);
}
private void seedTestData() {
// 10 executions across 2 apps, 2 routes, spanning 5 minutes
// app-1, route-a: 4 COMPLETED (200ms, 300ms, 400ms, 500ms)
insertExecution("exec-01", BASE.plusSeconds(0), "app-1", "route-a", "agent-1",
"COMPLETED", 200L, "", "");
insertExecution("exec-02", BASE.plusSeconds(60), "app-1", "route-a", "agent-1",
"COMPLETED", 300L, "", "");
insertExecution("exec-03", BASE.plusSeconds(120), "app-1", "route-a", "agent-1",
"COMPLETED", 400L, "", "");
insertExecution("exec-04", BASE.plusSeconds(180), "app-1", "route-a", "agent-1",
"COMPLETED", 500L, "", "");
// app-1, route-a: 2 FAILED (100ms, 150ms) with error_type="NPE"
insertExecution("exec-05", BASE.plusSeconds(60), "app-1", "route-a", "agent-1",
"FAILED", 100L, "NPE", "null ref");
insertExecution("exec-06", BASE.plusSeconds(120), "app-1", "route-a", "agent-1",
"FAILED", 150L, "NPE", "null ref");
// app-1, route-b: 2 COMPLETED (50ms, 60ms)
insertExecution("exec-07", BASE.plusSeconds(60), "app-1", "route-b", "agent-1",
"COMPLETED", 50L, "", "");
insertExecution("exec-08", BASE.plusSeconds(120), "app-1", "route-b", "agent-1",
"COMPLETED", 60L, "", "");
// app-2, route-c: 1 COMPLETED (1000ms)
insertExecution("exec-09", BASE.plusSeconds(60), "app-2", "route-c", "agent-2",
"COMPLETED", 1000L, "", "");
// app-2, route-c: 1 RUNNING (null duration)
insertExecution("exec-10", BASE.plusSeconds(180), "app-2", "route-c", "agent-2",
"RUNNING", null, "", "");
// 5 processor records for processor stats testing
// app-1, route-a, processor_type="to": 3 COMPLETED
insertProcessor("exec-01", 1, "proc-to-1", "to", BASE.plusSeconds(0),
"app-1", "route-a", "COMPLETED", 50L);
insertProcessor("exec-02", 1, "proc-to-2", "to", BASE.plusSeconds(60),
"app-1", "route-a", "COMPLETED", 80L);
insertProcessor("exec-03", 1, "proc-to-3", "to", BASE.plusSeconds(120),
"app-1", "route-a", "COMPLETED", 90L);
// app-1, route-a, processor_type="log": 2 COMPLETED
insertProcessor("exec-01", 2, "proc-log-1", "log", BASE.plusSeconds(1),
"app-1", "route-a", "COMPLETED", 10L);
insertProcessor("exec-02", 2, "proc-log-2", "log", BASE.plusSeconds(61),
"app-1", "route-a", "COMPLETED", 15L);
}
private void insertExecution(String executionId, Instant startTime, String appName,
String routeId, String agentId, String status,
Long durationMs, String errorType, String errorMessage) {
jdbc.update(
"INSERT INTO executions (tenant_id, execution_id, start_time, route_id, " +
"agent_id, application_name, status, duration_ms, error_type, error_message) " +
"VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?)",
executionId, Timestamp.from(startTime), routeId, agentId, appName,
status, durationMs, errorType, errorMessage);
}
private void insertProcessor(String executionId, int seq, String processorId,
String processorType, Instant startTime,
String appName, String routeId, String status,
Long durationMs) {
jdbc.update(
"INSERT INTO processor_executions (tenant_id, execution_id, seq, processor_id, " +
"processor_type, start_time, route_id, application_name, status, duration_ms) " +
"VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?)",
executionId, seq, processorId, processorType, Timestamp.from(startTime),
routeId, appName, status, durationMs);
}
// ── Stats Tests ──────────────────────────────────────────────────────
@Test
void stats_returnsCorrectGlobalTotals() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
ExecutionStats stats = store.stats(from, to);
assertThat(stats.totalCount()).isEqualTo(10);
assertThat(stats.failedCount()).isEqualTo(2);
assertThat(stats.activeCount()).isEqualTo(1);
assertThat(stats.avgDurationMs()).isGreaterThan(0);
assertThat(stats.p99LatencyMs()).isGreaterThan(0);
}
@Test
void statsForApp_filtersCorrectly() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
ExecutionStats app1 = store.statsForApp(from, to, "app-1");
assertThat(app1.totalCount()).isEqualTo(8);
ExecutionStats app2 = store.statsForApp(from, to, "app-2");
assertThat(app2.totalCount()).isEqualTo(2);
}
@Test
void statsForRoute_filtersCorrectly() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
ExecutionStats routeA = store.statsForRoute(from, to, "route-a", List.of());
assertThat(routeA.totalCount()).isEqualTo(6);
}
// ── Timeseries Tests ─────────────────────────────────────────────────
@Test
void timeseries_returnsBuckets() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
StatsTimeseries ts = store.timeseries(from, to, 5);
assertThat(ts.buckets()).isNotEmpty();
long totalAcrossBuckets = ts.buckets().stream()
.mapToLong(StatsTimeseries.TimeseriesBucket::totalCount).sum();
assertThat(totalAcrossBuckets).isEqualTo(10);
}
@Test
void timeseriesForApp_filtersCorrectly() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
StatsTimeseries ts = store.timeseriesForApp(from, to, 5, "app-1");
long totalAcrossBuckets = ts.buckets().stream()
.mapToLong(StatsTimeseries.TimeseriesBucket::totalCount).sum();
assertThat(totalAcrossBuckets).isEqualTo(8);
}
@Test
void timeseriesGroupedByApp_returnsMap() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
Map<String, StatsTimeseries> grouped = store.timeseriesGroupedByApp(from, to, 5);
assertThat(grouped).containsKeys("app-1", "app-2");
}
@Test
void timeseriesGroupedByRoute_returnsMap() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
Map<String, StatsTimeseries> grouped = store.timeseriesGroupedByRoute(from, to, 5, "app-1");
assertThat(grouped).containsKeys("route-a", "route-b");
}
// ── SLA Tests ────────────────────────────────────────────────────────
@Test
void slaCompliance_calculatesCorrectly() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
// threshold=250ms: among 9 non-RUNNING executions:
// compliant (<=250ms): exec-01(200), exec-05(100), exec-06(150), exec-07(50), exec-08(60) = 5
// total non-running: 9
// compliance = 5/9 * 100 ~ 55.56%
double sla = store.slaCompliance(from, to, 250, null, null);
assertThat(sla).isBetween(55.0, 56.0);
}
// ── Top Errors Tests ─────────────────────────────────────────────────
@Test
void topErrors_returnsRankedErrors() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
List<TopError> errors = store.topErrors(from, to, null, null, 10);
assertThat(errors).isNotEmpty();
assertThat(errors.get(0).errorType()).isEqualTo("NPE");
assertThat(errors.get(0).count()).isEqualTo(2);
}
// ── Active Error Types Test ──────────────────────────────────────────
@Test
void activeErrorTypes_countsDistinct() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
int count = store.activeErrorTypes(from, to, "app-1");
assertThat(count).isEqualTo(1); // only "NPE"
}
// ── Punchcard Test ───────────────────────────────────────────────────
@Test
void punchcard_returnsWeekdayHourCells() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
List<PunchcardCell> cells = store.punchcard(from, to, null);
assertThat(cells).isNotEmpty();
long totalCount = cells.stream().mapToLong(PunchcardCell::totalCount).sum();
assertThat(totalCount).isEqualTo(10);
}
@Test
void slaCountsByApp_returnsMap() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
// threshold=250ms
Map<String, long[]> counts = store.slaCountsByApp(from, to, 250);
assertThat(counts).containsKeys("app-1", "app-2");
// app-1: 8 total executions, all non-RUNNING
// compliant (<=250ms): exec-01(200), exec-05(100), exec-06(150), exec-07(50), exec-08(60) = 5
long[] app1 = counts.get("app-1");
assertThat(app1[0]).isEqualTo(5); // compliant
assertThat(app1[1]).isEqualTo(8); // total non-running
// app-2: 1 COMPLETED(1000ms) + 1 RUNNING → 1 non-RUNNING, 0 compliant
long[] app2 = counts.get("app-2");
assertThat(app2[0]).isEqualTo(0); // compliant
assertThat(app2[1]).isEqualTo(1); // total non-running
}
@Test
void slaCountsByRoute_returnsMap() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
Map<String, long[]> counts = store.slaCountsByRoute(from, to, "app-1", 250);
assertThat(counts).containsKeys("route-a", "route-b");
// route-a: exec-01(200)OK, exec-02(300)NO, exec-03(400)NO, exec-04(500)NO,
// exec-05(100)OK, exec-06(150)OK → 3 compliant, 6 total
long[] routeA = counts.get("route-a");
assertThat(routeA[0]).isEqualTo(3); // compliant
assertThat(routeA[1]).isEqualTo(6); // total
// route-b: exec-07(50)OK, exec-08(60)OK → 2 compliant, 2 total
long[] routeB = counts.get("route-b");
assertThat(routeB[0]).isEqualTo(2);
assertThat(routeB[1]).isEqualTo(2);
}
// ── Processor Stats Test ─────────────────────────────────────────────
@Test
void statsForProcessor_filtersCorrectly() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
ExecutionStats toStats = store.statsForProcessor(from, to, "route-a", "to");
assertThat(toStats.totalCount()).isEqualTo(3);
assertThat(toStats.activeCount()).isEqualTo(0); // processor stats have no running_count
ExecutionStats logStats = store.statsForProcessor(from, to, "route-a", "log");
assertThat(logStats.totalCount()).isEqualTo(2);
}
}