diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java
new file mode 100644
index 00000000..63fed6f6
--- /dev/null
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java
@@ -0,0 +1,565 @@
+package com.cameleer3.server.app.storage;
+
+import com.cameleer3.server.core.search.ExecutionStats;
+import com.cameleer3.server.core.search.StatsTimeseries;
+import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket;
+import com.cameleer3.server.core.search.TopError;
+import com.cameleer3.server.core.storage.StatsStore;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ClickHouse implementation of {@link StatsStore}.
+ * Reads from AggregatingMergeTree tables populated by materialized views,
+ * using {@code -Merge} aggregate combinators to finalize partial states.
+ *
+ *
Queries against AggregatingMergeTree tables use literal SQL values instead
+ * of JDBC prepared-statement parameters because the ClickHouse JDBC v2 driver
+ * (0.9.x) wraps prepared statements in a sub-query that strips the
+ * {@code AggregateFunction} column type, breaking {@code -Merge} combinators.
+ * Queries against raw tables ({@code executions FINAL},
+ * {@code processor_executions}) use normal prepared-statement parameters
+ * since they have no AggregateFunction columns.
+ */
+public class ClickHouseStatsStore implements StatsStore {
+
+ private static final String TENANT = "default";
+
+ private final JdbcTemplate jdbc;
+
+ public ClickHouseStatsStore(JdbcTemplate jdbc) {
+ this.jdbc = jdbc;
+ }
+
+ // ── Stats (aggregate) ────────────────────────────────────────────────
+
+ @Override
+ public ExecutionStats stats(Instant from, Instant to) {
+ return queryStats("stats_1m_all", from, to, List.of(), true);
+ }
+
+ @Override
+ public ExecutionStats statsForApp(Instant from, Instant to, String applicationName) {
+ return queryStats("stats_1m_app", from, to, List.of(
+ new Filter("application_name", applicationName)), true);
+ }
+
+ @Override
+ public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds) {
+ return queryStats("stats_1m_route", from, to, List.of(
+ new Filter("route_id", routeId)), true);
+ }
+
+ @Override
+ public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) {
+ return queryProcessorStatsRaw(from, to, routeId, processorType);
+ }
+
+ // ── Timeseries ───────────────────────────────────────────────────────
+
+ @Override
+ public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
+ return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true);
+ }
+
+ @Override
+ public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationName) {
+ return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
+ new Filter("application_name", applicationName)), true);
+ }
+
+ @Override
+ public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
+ String routeId, List agentIds) {
+ return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of(
+ new Filter("route_id", routeId)), true);
+ }
+
+ @Override
+ public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
+ String routeId, String processorType) {
+ return queryProcessorTimeseriesRaw(from, to, bucketCount, routeId, processorType);
+ }
+
+ // ── Grouped timeseries ───────────────────────────────────────────────
+
+ @Override
+ public Map timeseriesGroupedByApp(Instant from, Instant to, int bucketCount) {
+ return queryGroupedTimeseries("stats_1m_app", "application_name", from, to,
+ bucketCount, List.of());
+ }
+
+ @Override
+ public Map timeseriesGroupedByRoute(Instant from, Instant to,
+ int bucketCount, String applicationName) {
+ return queryGroupedTimeseries("stats_1m_route", "route_id", from, to,
+ bucketCount, List.of(new Filter("application_name", applicationName)));
+ }
+
+ // ── SLA compliance (raw table — prepared statements OK) ──────────────
+
+ @Override
+ public double slaCompliance(Instant from, Instant to, int thresholdMs,
+ String applicationName, String routeId) {
+ String sql = "SELECT " +
+ "countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
+ "countIf(status != 'RUNNING') AS total " +
+ "FROM executions FINAL " +
+ "WHERE tenant_id = ? AND start_time >= ? AND start_time < ?";
+
+ List params = new ArrayList<>();
+ params.add(thresholdMs);
+ params.add(TENANT);
+ params.add(Timestamp.from(from));
+ params.add(Timestamp.from(to));
+ if (applicationName != null) {
+ sql += " AND application_name = ?";
+ params.add(applicationName);
+ }
+ if (routeId != null) {
+ sql += " AND route_id = ?";
+ params.add(routeId);
+ }
+
+ return jdbc.query(sql, (rs, rowNum) -> {
+ long total = rs.getLong("total");
+ if (total == 0) return 1.0;
+ return rs.getLong("compliant") * 100.0 / total;
+ }, params.toArray()).stream().findFirst().orElse(1.0);
+ }
+
+ @Override
+ public Map slaCountsByApp(Instant from, Instant to, int defaultThresholdMs) {
+ String sql = "SELECT application_name, " +
+ "countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
+ "countIf(status != 'RUNNING') AS total " +
+ "FROM executions FINAL " +
+ "WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
+ "GROUP BY application_name";
+
+ Map result = new LinkedHashMap<>();
+ jdbc.query(sql, (rs) -> {
+ result.put(rs.getString("application_name"),
+ new long[]{rs.getLong("compliant"), rs.getLong("total")});
+ }, defaultThresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to));
+ return result;
+ }
+
+ @Override
+ public Map slaCountsByRoute(Instant from, Instant to,
+ String applicationName, int thresholdMs) {
+ String sql = "SELECT route_id, " +
+ "countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
+ "countIf(status != 'RUNNING') AS total " +
+ "FROM executions FINAL " +
+ "WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
+ "AND application_name = ? GROUP BY route_id";
+
+ Map result = new LinkedHashMap<>();
+ jdbc.query(sql, (rs) -> {
+ result.put(rs.getString("route_id"),
+ new long[]{rs.getLong("compliant"), rs.getLong("total")});
+ }, thresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to), applicationName);
+ return result;
+ }
+
+ // ── Top errors (raw table — prepared statements OK) ──────────────────
+
+ @Override
+ public List topErrors(Instant from, Instant to, String applicationName,
+ String routeId, int limit) {
+ StringBuilder where = new StringBuilder(
+ "status = 'FAILED' AND start_time >= ? AND start_time < ?");
+ List params = new ArrayList<>();
+ params.add(Timestamp.from(from));
+ params.add(Timestamp.from(to));
+ if (applicationName != null) {
+ where.append(" AND application_name = ?");
+ params.add(applicationName);
+ }
+
+ String table;
+ String groupId;
+ if (routeId != null) {
+ table = "processor_executions";
+ groupId = "processor_id";
+ where.append(" AND route_id = ?");
+ params.add(routeId);
+ } else {
+ table = "executions FINAL";
+ groupId = "route_id";
+ }
+
+ Instant fiveMinAgo = Instant.now().minus(5, ChronoUnit.MINUTES);
+ Instant tenMinAgo = Instant.now().minus(10, ChronoUnit.MINUTES);
+
+ String sql = "WITH counted AS (" +
+ " SELECT COALESCE(error_type, substring(error_message, 1, 200)) AS error_key, " +
+ " " + groupId + " AS group_id, " +
+ " count() AS cnt, max(start_time) AS last_seen " +
+ " FROM " + table + " WHERE tenant_id = ? AND " + where +
+ " GROUP BY error_key, group_id ORDER BY cnt DESC LIMIT ?" +
+ "), velocity AS (" +
+ " SELECT COALESCE(error_type, substring(error_message, 1, 200)) AS error_key, " +
+ " countIf(start_time >= ?) AS recent_5m, " +
+ " countIf(start_time >= ? AND start_time < ?) AS prev_5m " +
+ " FROM " + table + " WHERE tenant_id = ? AND " + where +
+ " GROUP BY error_key" +
+ ") SELECT c.error_key, c.group_id, c.cnt, c.last_seen, " +
+ " COALESCE(v.recent_5m, 0) / 5.0 AS velocity, " +
+ " CASE " +
+ " WHEN COALESCE(v.recent_5m, 0) > COALESCE(v.prev_5m, 0) * 1.2 THEN 'accelerating' " +
+ " WHEN COALESCE(v.recent_5m, 0) < COALESCE(v.prev_5m, 0) * 0.8 THEN 'decelerating' " +
+ " ELSE 'stable' END AS trend " +
+ "FROM counted c LEFT JOIN velocity v ON c.error_key = v.error_key " +
+ "ORDER BY c.cnt DESC";
+
+ List fullParams = new ArrayList<>();
+ fullParams.add(TENANT);
+ fullParams.addAll(params);
+ fullParams.add(limit);
+ fullParams.add(Timestamp.from(fiveMinAgo));
+ fullParams.add(Timestamp.from(tenMinAgo));
+ fullParams.add(Timestamp.from(fiveMinAgo));
+ fullParams.add(TENANT);
+ fullParams.addAll(params);
+
+ return jdbc.query(sql, (rs, rowNum) -> {
+ String errorKey = rs.getString("error_key");
+ String gid = rs.getString("group_id");
+ return new TopError(
+ errorKey,
+ routeId != null ? routeId : gid,
+ routeId != null ? gid : null,
+ rs.getLong("cnt"),
+ rs.getDouble("velocity"),
+ rs.getString("trend"),
+ rs.getTimestamp("last_seen").toInstant());
+ }, fullParams.toArray());
+ }
+
+ @Override
+ public int activeErrorTypes(Instant from, Instant to, String applicationName) {
+ String sql = "SELECT COUNT(DISTINCT COALESCE(error_type, substring(error_message, 1, 200))) " +
+ "FROM executions FINAL " +
+ "WHERE tenant_id = ? AND status = 'FAILED' AND start_time >= ? AND start_time < ?";
+
+ List params = new ArrayList<>();
+ params.add(TENANT);
+ params.add(Timestamp.from(from));
+ params.add(Timestamp.from(to));
+ if (applicationName != null) {
+ sql += " AND application_name = ?";
+ params.add(applicationName);
+ }
+
+ Integer count = jdbc.queryForObject(sql, Integer.class, params.toArray());
+ return count != null ? count : 0;
+ }
+
+ // ── Punchcard (AggregatingMergeTree — literal SQL) ───────────────────
+
+ @Override
+ public List punchcard(Instant from, Instant to, String applicationName) {
+ String view = applicationName != null ? "stats_1m_app" : "stats_1m_all";
+ String sql = "SELECT toDayOfWeek(bucket, 1) % 7 AS weekday, " +
+ "toHour(bucket) AS hour, " +
+ "countMerge(total_count) AS total_count, " +
+ "countIfMerge(failed_count) AS failed_count " +
+ "FROM " + view +
+ " WHERE tenant_id = " + lit(TENANT) +
+ " AND bucket >= " + lit(from) +
+ " AND bucket < " + lit(to);
+ if (applicationName != null) {
+ sql += " AND application_name = " + lit(applicationName);
+ }
+ sql += " GROUP BY weekday, hour ORDER BY weekday, hour";
+
+ return jdbc.query(sql, (rs, rowNum) -> new PunchcardCell(
+ rs.getInt("weekday"), rs.getInt("hour"),
+ rs.getLong("total_count"), rs.getLong("failed_count")));
+ }
+
+ // ── Private helpers ──────────────────────────────────────────────────
+
+ private record Filter(String column, String value) {}
+
+ /**
+ * Format an Instant as a ClickHouse DateTime literal.
+ * Uses java.sql.Timestamp to match the JVM→ClickHouse timezone convention
+ * used by the JDBC driver, then truncates to second precision for DateTime
+ * column compatibility.
+ */
+ private static String lit(Instant instant) {
+ // Truncate to seconds — ClickHouse DateTime has second precision
+ Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
+ String ts = new Timestamp(truncated.toEpochMilli()).toString();
+ // Remove trailing ".0" that Timestamp.toString() always appends
+ if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
+ return "'" + ts + "'";
+ }
+
+ /** Format a string as a SQL literal with single-quote escaping. */
+ private static String lit(String value) {
+ return "'" + value.replace("'", "\\'") + "'";
+ }
+
+ /** Convert Instant to java.sql.Timestamp for JDBC binding. */
+ private static Timestamp ts(Instant instant) {
+ return Timestamp.from(instant);
+ }
+
+ /**
+ * Build -Merge combinator SQL for the given view and time range.
+ */
+ private String buildStatsSql(String view, Instant rangeFrom, Instant rangeTo,
+ List filters, boolean hasRunning) {
+ String runningCol = hasRunning ? "countIfMerge(running_count)" : "0";
+ String sql = "SELECT " +
+ "countMerge(total_count) AS total_count, " +
+ "countIfMerge(failed_count) AS failed_count, " +
+ "sumMerge(duration_sum) AS duration_sum, " +
+ "quantileMerge(0.99)(p99_duration) AS p99_duration, " +
+ runningCol + " AS active_count " +
+ "FROM " + view +
+ " WHERE tenant_id = " + lit(TENANT) +
+ " AND bucket >= " + lit(rangeFrom) +
+ " AND bucket < " + lit(rangeTo);
+ for (Filter f : filters) {
+ sql += " AND " + f.column() + " = " + lit(f.value());
+ }
+ return sql;
+ }
+
+ /**
+ * Query an AggregatingMergeTree stats table using -Merge combinators.
+ * Uses literal SQL to avoid ClickHouse JDBC driver PreparedStatement issues.
+ */
+ private ExecutionStats queryStats(String view, Instant from, Instant to,
+ List filters, boolean hasRunning) {
+
+ String sql = buildStatsSql(view, from, to, filters, hasRunning);
+
+ long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0;
+ var currentResult = jdbc.query(sql, (rs, rowNum) -> {
+ long tc = rs.getLong("total_count");
+ long fc = rs.getLong("failed_count");
+ long ds = rs.getLong("duration_sum"); // Nullable → 0 if null
+ long p99 = (long) rs.getDouble("p99_duration"); // quantileMerge returns Float64
+ long ac = rs.getLong("active_count");
+ return new long[]{tc, fc, ds, p99, ac};
+ });
+ if (!currentResult.isEmpty()) {
+ long[] r = currentResult.get(0);
+ totalCount = r[0]; failedCount = r[1];
+ avgDuration = totalCount > 0 ? r[2] / totalCount : 0;
+ p99Duration = r[3]; activeCount = r[4];
+ }
+
+ // Previous period (shifted back 24h)
+ Instant prevFrom = from.minus(Duration.ofHours(24));
+ Instant prevTo = to.minus(Duration.ofHours(24));
+ String prevSql = buildStatsSql(view, prevFrom, prevTo, filters, hasRunning);
+
+ long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0;
+ var prevResult = jdbc.query(prevSql, (rs, rowNum) -> {
+ long tc = rs.getLong("total_count");
+ long fc = rs.getLong("failed_count");
+ long ds = rs.getLong("duration_sum");
+ long p99 = (long) rs.getDouble("p99_duration");
+ return new long[]{tc, fc, ds, p99};
+ });
+ if (!prevResult.isEmpty()) {
+ long[] r = prevResult.get(0);
+ prevTotal = r[0]; prevFailed = r[1];
+ prevAvg = prevTotal > 0 ? r[2] / prevTotal : 0;
+ prevP99 = r[3];
+ }
+
+ // Today total
+ Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
+ String todaySql = buildStatsSql(view, todayStart, Instant.now(), filters, hasRunning);
+
+ long totalToday = 0;
+ var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"));
+ if (!todayResult.isEmpty()) totalToday = todayResult.get(0);
+
+ return new ExecutionStats(
+ totalCount, failedCount, avgDuration, p99Duration, activeCount,
+ totalToday, prevTotal, prevFailed, prevAvg, prevP99);
+ }
+
+ /**
+ * Timeseries from AggregatingMergeTree using -Merge combinators.
+ */
+ private StatsTimeseries queryTimeseries(String view, Instant from, Instant to,
+ int bucketCount, List filters,
+ boolean hasRunningCount) {
+ long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
+ if (intervalSeconds < 60) intervalSeconds = 60;
+
+ String runningCol = hasRunningCount ? "countIfMerge(running_count)" : "0";
+
+ String sql = "SELECT " +
+ "toStartOfInterval(bucket, INTERVAL " + intervalSeconds + " SECOND) AS period, " +
+ "countMerge(total_count) AS total_count, " +
+ "countIfMerge(failed_count) AS failed_count, " +
+ "sumMerge(duration_sum) AS duration_sum, " +
+ "quantileMerge(0.99)(p99_duration) AS p99_duration, " +
+ runningCol + " AS active_count " +
+ "FROM " + view +
+ " WHERE tenant_id = " + lit(TENANT) +
+ " AND bucket >= " + lit(from) +
+ " AND bucket < " + lit(to);
+ for (Filter f : filters) {
+ sql += " AND " + f.column() + " = " + lit(f.value());
+ }
+ sql += " GROUP BY period ORDER BY period";
+
+ List buckets = jdbc.query(sql, (rs, rowNum) -> {
+ long tc = rs.getLong("total_count");
+ long ds = rs.getLong("duration_sum");
+ return new TimeseriesBucket(
+ rs.getTimestamp("period").toInstant(),
+ tc, rs.getLong("failed_count"),
+ tc > 0 ? ds / tc : 0, (long) rs.getDouble("p99_duration"),
+ rs.getLong("active_count"));
+ });
+
+ return new StatsTimeseries(buckets);
+ }
+
+ /**
+ * Grouped timeseries from AggregatingMergeTree.
+ */
+ private Map queryGroupedTimeseries(
+ String view, String groupCol, Instant from, Instant to,
+ int bucketCount, List filters) {
+
+ long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
+ if (intervalSeconds < 60) intervalSeconds = 60;
+
+ String sql = "SELECT " +
+ "toStartOfInterval(bucket, INTERVAL " + intervalSeconds + " SECOND) AS period, " +
+ groupCol + " AS group_key, " +
+ "countMerge(total_count) AS total_count, " +
+ "countIfMerge(failed_count) AS failed_count, " +
+ "sumMerge(duration_sum) AS duration_sum, " +
+ "quantileMerge(0.99)(p99_duration) AS p99_duration, " +
+ "countIfMerge(running_count) AS active_count " +
+ "FROM " + view +
+ " WHERE tenant_id = " + lit(TENANT) +
+ " AND bucket >= " + lit(from) +
+ " AND bucket < " + lit(to);
+ for (Filter f : filters) {
+ sql += " AND " + f.column() + " = " + lit(f.value());
+ }
+ sql += " GROUP BY period, group_key ORDER BY period, group_key";
+
+ Map> grouped = new LinkedHashMap<>();
+ jdbc.query(sql, (rs) -> {
+ String key = rs.getString("group_key");
+ long tc = rs.getLong("total_count");
+ long ds = rs.getLong("duration_sum");
+ TimeseriesBucket bucket = new TimeseriesBucket(
+ rs.getTimestamp("period").toInstant(),
+ tc, rs.getLong("failed_count"),
+ tc > 0 ? ds / tc : 0, (long) rs.getDouble("p99_duration"),
+ rs.getLong("active_count"));
+ grouped.computeIfAbsent(key, k -> new ArrayList<>()).add(bucket);
+ });
+
+ Map result = new LinkedHashMap<>();
+ grouped.forEach((key, buckets) -> result.put(key, new StatsTimeseries(buckets)));
+ return result;
+ }
+
+ /**
+ * Direct aggregation on processor_executions for processor-level stats.
+ */
+ private ExecutionStats queryProcessorStatsRaw(Instant from, Instant to,
+ String routeId, String processorType) {
+ String sql = "SELECT " +
+ "count() AS total_count, " +
+ "countIf(status = 'FAILED') AS failed_count, " +
+ "CASE WHEN count() > 0 THEN sum(duration_ms) / count() ELSE 0 END AS avg_duration, " +
+ "quantile(0.99)(duration_ms) AS p99_duration, " +
+ "0 AS active_count " +
+ "FROM processor_executions " +
+ "WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
+ "AND route_id = ? AND processor_type = ?";
+
+ long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0;
+ var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
+ rs.getLong("total_count"), rs.getLong("failed_count"),
+ (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"),
+ rs.getLong("active_count")
+ }, TENANT, ts(from), ts(to), routeId, processorType);
+ if (!currentResult.isEmpty()) {
+ long[] r = currentResult.get(0);
+ totalCount = r[0]; failedCount = r[1]; avgDuration = r[2]; p99Duration = r[3];
+ }
+
+ Instant prevFrom = from.minus(Duration.ofHours(24));
+ Instant prevTo = to.minus(Duration.ofHours(24));
+ long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0;
+ var prevResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
+ rs.getLong("total_count"), rs.getLong("failed_count"),
+ (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration")
+ }, TENANT, ts(prevFrom), ts(prevTo), routeId, processorType);
+ if (!prevResult.isEmpty()) {
+ long[] r = prevResult.get(0);
+ prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3];
+ }
+
+ Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
+ long totalToday = 0;
+ var todayResult = jdbc.query(sql, (rs, rowNum) -> rs.getLong("total_count"),
+ TENANT, ts(todayStart), ts(Instant.now()), routeId, processorType);
+ if (!todayResult.isEmpty()) totalToday = todayResult.get(0);
+
+ return new ExecutionStats(
+ totalCount, failedCount, avgDuration, p99Duration, 0,
+ totalToday, prevTotal, prevFailed, prevAvg, prevP99);
+ }
+
+ /**
+ * Direct aggregation on processor_executions for processor-level timeseries.
+ */
+ private StatsTimeseries queryProcessorTimeseriesRaw(Instant from, Instant to,
+ int bucketCount,
+ String routeId, String processorType) {
+ long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
+ if (intervalSeconds < 60) intervalSeconds = 60;
+
+ String sql = "SELECT " +
+ "toStartOfInterval(start_time, INTERVAL " + intervalSeconds + " SECOND) AS period, " +
+ "count() AS total_count, " +
+ "countIf(status = 'FAILED') AS failed_count, " +
+ "CASE WHEN count() > 0 THEN sum(duration_ms) / count() ELSE 0 END AS avg_duration, " +
+ "quantile(0.99)(duration_ms) AS p99_duration, " +
+ "0 AS active_count " +
+ "FROM processor_executions " +
+ "WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
+ "AND route_id = ? AND processor_type = ? " +
+ "GROUP BY period ORDER BY period";
+
+ List buckets = jdbc.query(sql, (rs, rowNum) ->
+ new TimeseriesBucket(
+ rs.getTimestamp("period").toInstant(),
+ rs.getLong("total_count"), rs.getLong("failed_count"),
+ (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"),
+ rs.getLong("active_count")
+ ), TENANT, ts(from), ts(to), routeId, processorType);
+
+ return new StatsTimeseries(buckets);
+ }
+}
diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql
index fb87c2dc..f0e402a4 100644
--- a/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql
+++ b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql
@@ -7,9 +7,9 @@
CREATE TABLE IF NOT EXISTS stats_1m_all (
tenant_id LowCardinality(String),
bucket DateTime,
- total_count AggregateFunction(count, UInt64),
- failed_count AggregateFunction(countIf, UInt64, UInt8),
- running_count AggregateFunction(countIf, UInt64, UInt8),
+ total_count AggregateFunction(count),
+ failed_count AggregateFunction(countIf, UInt8),
+ running_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
@@ -38,9 +38,9 @@ CREATE TABLE IF NOT EXISTS stats_1m_app (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
bucket DateTime,
- total_count AggregateFunction(count, UInt64),
- failed_count AggregateFunction(countIf, UInt64, UInt8),
- running_count AggregateFunction(countIf, UInt64, UInt8),
+ total_count AggregateFunction(count),
+ failed_count AggregateFunction(countIf, UInt8),
+ running_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
@@ -71,9 +71,9 @@ CREATE TABLE IF NOT EXISTS stats_1m_route (
application_name LowCardinality(String),
route_id LowCardinality(String),
bucket DateTime,
- total_count AggregateFunction(count, UInt64),
- failed_count AggregateFunction(countIf, UInt64, UInt8),
- running_count AggregateFunction(countIf, UInt64, UInt8),
+ total_count AggregateFunction(count),
+ failed_count AggregateFunction(countIf, UInt8),
+ running_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
@@ -105,8 +105,8 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor (
application_name LowCardinality(String),
processor_type LowCardinality(String),
bucket DateTime,
- total_count AggregateFunction(count, UInt64),
- failed_count AggregateFunction(countIf, UInt64, UInt8),
+ total_count AggregateFunction(count),
+ failed_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
@@ -138,8 +138,8 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
route_id LowCardinality(String),
processor_id String,
bucket DateTime,
- total_count AggregateFunction(count, UInt64),
- failed_count AggregateFunction(countIf, UInt64, UInt8),
+ total_count AggregateFunction(count),
+ failed_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java
new file mode 100644
index 00000000..6e7eab7a
--- /dev/null
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java
@@ -0,0 +1,375 @@
+package com.cameleer3.server.app.storage;
+
+import com.cameleer3.server.core.search.ExecutionStats;
+import com.cameleer3.server.core.search.StatsTimeseries;
+import com.cameleer3.server.core.search.TopError;
+import com.cameleer3.server.core.storage.StatsStore.PunchcardCell;
+import com.zaxxer.hikari.HikariDataSource;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.testcontainers.clickhouse.ClickHouseContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Testcontainers
+class ClickHouseStatsStoreIT {
+
+ @Container
+ static final ClickHouseContainer clickhouse =
+ new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
+
+ private JdbcTemplate jdbc;
+ private ClickHouseStatsStore store;
+
+ // base time: 2026-03-31T10:00:00Z (a Tuesday)
+ private static final Instant BASE = Instant.parse("2026-03-31T10:00:00Z");
+
+ @BeforeEach
+ void setUp() throws Exception {
+ HikariDataSource ds = new HikariDataSource();
+ ds.setJdbcUrl(clickhouse.getJdbcUrl());
+ ds.setUsername(clickhouse.getUsername());
+ ds.setPassword(clickhouse.getPassword());
+
+ jdbc = new JdbcTemplate(ds);
+
+ // Load DDL from classpath resources
+ String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql")
+ .getContentAsString(StandardCharsets.UTF_8);
+ String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql")
+ .getContentAsString(StandardCharsets.UTF_8);
+ String statsDdl = new ClassPathResource("clickhouse/V4__stats_tables_and_mvs.sql")
+ .getContentAsString(StandardCharsets.UTF_8);
+
+ jdbc.execute(executionsDdl);
+ jdbc.execute(processorsDdl);
+
+ // Drop MVs first (they reference the stats tables), then recreate everything
+ jdbc.execute("DROP TABLE IF EXISTS stats_1m_all_mv");
+ jdbc.execute("DROP TABLE IF EXISTS stats_1m_app_mv");
+ jdbc.execute("DROP TABLE IF EXISTS stats_1m_route_mv");
+ jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_mv");
+ jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_detail_mv");
+ jdbc.execute("DROP TABLE IF EXISTS stats_1m_all");
+ jdbc.execute("DROP TABLE IF EXISTS stats_1m_app");
+ jdbc.execute("DROP TABLE IF EXISTS stats_1m_route");
+ jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor");
+ jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_detail");
+
+ // Strip SQL line comments first (they may contain semicolons),
+ // then split by ';' and execute non-empty statements.
+ String cleanedDdl = statsDdl.replaceAll("--[^\n]*", "");
+ for (String stmt : cleanedDdl.split(";")) {
+ String trimmed = stmt.trim();
+ if (!trimmed.isEmpty()) {
+ jdbc.execute(trimmed);
+ }
+ }
+
+ // Truncate base tables
+ jdbc.execute("TRUNCATE TABLE executions");
+ jdbc.execute("TRUNCATE TABLE processor_executions");
+
+ seedTestData();
+
+ // Try the failing query to capture it in query_log, then check
+ try {
+ jdbc.queryForMap(
+ "SELECT countMerge(total_count) AS tc, countIfMerge(failed_count) AS fc, " +
+ "sumMerge(duration_sum) / greatest(countMerge(total_count), 1) AS avg, " +
+ "quantileMerge(0.99)(p99_duration) AS p99, " +
+ "countIfMerge(running_count) AS rc " +
+ "FROM stats_1m_all WHERE tenant_id = 'default' " +
+ "AND bucket >= '2026-03-31 09:59:00' AND bucket < '2026-03-31 10:05:00'");
+ } catch (Exception e) {
+ System.out.println("Expected error: " + e.getMessage().substring(0, 80));
+ }
+
+ jdbc.execute("SYSTEM FLUSH LOGS");
+ // Get ALL recent queries to see what the driver sends
+ var queryLog = jdbc.queryForList(
+ "SELECT type, substring(query, 1, 200) AS q " +
+ "FROM system.query_log WHERE event_time > now() - 30 " +
+ "AND query NOT LIKE '%system.query_log%' AND query NOT LIKE '%FLUSH%' " +
+ "ORDER BY event_time DESC LIMIT 20");
+ for (var entry : queryLog) {
+ System.out.println("LOG: " + entry.get("type") + " | " + entry.get("q"));
+ }
+
+ store = new ClickHouseStatsStore(jdbc);
+ }
+
+ private void seedTestData() {
+ // 10 executions across 2 apps, 2 routes, spanning 5 minutes
+ // app-1, route-a: 4 COMPLETED (200ms, 300ms, 400ms, 500ms)
+ insertExecution("exec-01", BASE.plusSeconds(0), "app-1", "route-a", "agent-1",
+ "COMPLETED", 200L, "", "");
+ insertExecution("exec-02", BASE.plusSeconds(60), "app-1", "route-a", "agent-1",
+ "COMPLETED", 300L, "", "");
+ insertExecution("exec-03", BASE.plusSeconds(120), "app-1", "route-a", "agent-1",
+ "COMPLETED", 400L, "", "");
+ insertExecution("exec-04", BASE.plusSeconds(180), "app-1", "route-a", "agent-1",
+ "COMPLETED", 500L, "", "");
+
+ // app-1, route-a: 2 FAILED (100ms, 150ms) with error_type="NPE"
+ insertExecution("exec-05", BASE.plusSeconds(60), "app-1", "route-a", "agent-1",
+ "FAILED", 100L, "NPE", "null ref");
+ insertExecution("exec-06", BASE.plusSeconds(120), "app-1", "route-a", "agent-1",
+ "FAILED", 150L, "NPE", "null ref");
+
+ // app-1, route-b: 2 COMPLETED (50ms, 60ms)
+ insertExecution("exec-07", BASE.plusSeconds(60), "app-1", "route-b", "agent-1",
+ "COMPLETED", 50L, "", "");
+ insertExecution("exec-08", BASE.plusSeconds(120), "app-1", "route-b", "agent-1",
+ "COMPLETED", 60L, "", "");
+
+ // app-2, route-c: 1 COMPLETED (1000ms)
+ insertExecution("exec-09", BASE.plusSeconds(60), "app-2", "route-c", "agent-2",
+ "COMPLETED", 1000L, "", "");
+
+ // app-2, route-c: 1 RUNNING (null duration)
+ insertExecution("exec-10", BASE.plusSeconds(180), "app-2", "route-c", "agent-2",
+ "RUNNING", null, "", "");
+
+ // 5 processor records for processor stats testing
+ // app-1, route-a, processor_type="to": 3 COMPLETED
+ insertProcessor("exec-01", 1, "proc-to-1", "to", BASE.plusSeconds(0),
+ "app-1", "route-a", "COMPLETED", 50L);
+ insertProcessor("exec-02", 1, "proc-to-2", "to", BASE.plusSeconds(60),
+ "app-1", "route-a", "COMPLETED", 80L);
+ insertProcessor("exec-03", 1, "proc-to-3", "to", BASE.plusSeconds(120),
+ "app-1", "route-a", "COMPLETED", 90L);
+
+ // app-1, route-a, processor_type="log": 2 COMPLETED
+ insertProcessor("exec-01", 2, "proc-log-1", "log", BASE.plusSeconds(1),
+ "app-1", "route-a", "COMPLETED", 10L);
+ insertProcessor("exec-02", 2, "proc-log-2", "log", BASE.plusSeconds(61),
+ "app-1", "route-a", "COMPLETED", 15L);
+ }
+
+ private void insertExecution(String executionId, Instant startTime, String appName,
+ String routeId, String agentId, String status,
+ Long durationMs, String errorType, String errorMessage) {
+ jdbc.update(
+ "INSERT INTO executions (tenant_id, execution_id, start_time, route_id, " +
+ "agent_id, application_name, status, duration_ms, error_type, error_message) " +
+ "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+ executionId, Timestamp.from(startTime), routeId, agentId, appName,
+ status, durationMs, errorType, errorMessage);
+ }
+
+ private void insertProcessor(String executionId, int seq, String processorId,
+ String processorType, Instant startTime,
+ String appName, String routeId, String status,
+ Long durationMs) {
+ jdbc.update(
+ "INSERT INTO processor_executions (tenant_id, execution_id, seq, processor_id, " +
+ "processor_type, start_time, route_id, application_name, status, duration_ms) " +
+ "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+ executionId, seq, processorId, processorType, Timestamp.from(startTime),
+ routeId, appName, status, durationMs);
+ }
+
+ // ── Stats Tests ──────────────────────────────────────────────────────
+
+ @Test
+ void stats_returnsCorrectGlobalTotals() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ ExecutionStats stats = store.stats(from, to);
+
+ assertThat(stats.totalCount()).isEqualTo(10);
+ assertThat(stats.failedCount()).isEqualTo(2);
+ assertThat(stats.activeCount()).isEqualTo(1);
+ assertThat(stats.avgDurationMs()).isGreaterThan(0);
+ assertThat(stats.p99LatencyMs()).isGreaterThan(0);
+ }
+
+ @Test
+ void statsForApp_filtersCorrectly() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ ExecutionStats app1 = store.statsForApp(from, to, "app-1");
+ assertThat(app1.totalCount()).isEqualTo(8);
+
+ ExecutionStats app2 = store.statsForApp(from, to, "app-2");
+ assertThat(app2.totalCount()).isEqualTo(2);
+ }
+
+ @Test
+ void statsForRoute_filtersCorrectly() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ ExecutionStats routeA = store.statsForRoute(from, to, "route-a", List.of());
+ assertThat(routeA.totalCount()).isEqualTo(6);
+ }
+
+ // ── Timeseries Tests ─────────────────────────────────────────────────
+
+ @Test
+ void timeseries_returnsBuckets() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ StatsTimeseries ts = store.timeseries(from, to, 5);
+
+ assertThat(ts.buckets()).isNotEmpty();
+ long totalAcrossBuckets = ts.buckets().stream()
+ .mapToLong(StatsTimeseries.TimeseriesBucket::totalCount).sum();
+ assertThat(totalAcrossBuckets).isEqualTo(10);
+ }
+
+ @Test
+ void timeseriesForApp_filtersCorrectly() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ StatsTimeseries ts = store.timeseriesForApp(from, to, 5, "app-1");
+
+ long totalAcrossBuckets = ts.buckets().stream()
+ .mapToLong(StatsTimeseries.TimeseriesBucket::totalCount).sum();
+ assertThat(totalAcrossBuckets).isEqualTo(8);
+ }
+
+ @Test
+ void timeseriesGroupedByApp_returnsMap() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ Map grouped = store.timeseriesGroupedByApp(from, to, 5);
+
+ assertThat(grouped).containsKeys("app-1", "app-2");
+ }
+
+ @Test
+ void timeseriesGroupedByRoute_returnsMap() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ Map grouped = store.timeseriesGroupedByRoute(from, to, 5, "app-1");
+
+ assertThat(grouped).containsKeys("route-a", "route-b");
+ }
+
+ // ── SLA Tests ────────────────────────────────────────────────────────
+
+ @Test
+ void slaCompliance_calculatesCorrectly() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ // threshold=250ms: among 9 non-RUNNING executions:
+ // compliant (<=250ms): exec-01(200), exec-05(100), exec-06(150), exec-07(50), exec-08(60) = 5
+ // total non-running: 9
+ // compliance = 5/9 * 100 ~ 55.56%
+ double sla = store.slaCompliance(from, to, 250, null, null);
+ assertThat(sla).isBetween(55.0, 56.0);
+ }
+
+ // ── Top Errors Tests ─────────────────────────────────────────────────
+
+ @Test
+ void topErrors_returnsRankedErrors() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ List errors = store.topErrors(from, to, null, null, 10);
+
+ assertThat(errors).isNotEmpty();
+ assertThat(errors.get(0).errorType()).isEqualTo("NPE");
+ assertThat(errors.get(0).count()).isEqualTo(2);
+ }
+
+ // ── Active Error Types Test ──────────────────────────────────────────
+
+ @Test
+ void activeErrorTypes_countsDistinct() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ int count = store.activeErrorTypes(from, to, "app-1");
+
+ assertThat(count).isEqualTo(1); // only "NPE"
+ }
+
+ // ── Punchcard Test ───────────────────────────────────────────────────
+
+ @Test
+ void punchcard_returnsWeekdayHourCells() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ List cells = store.punchcard(from, to, null);
+
+ assertThat(cells).isNotEmpty();
+ long totalCount = cells.stream().mapToLong(PunchcardCell::totalCount).sum();
+ assertThat(totalCount).isEqualTo(10);
+ }
+
+ @Test
+ void slaCountsByApp_returnsMap() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ // threshold=250ms
+ Map counts = store.slaCountsByApp(from, to, 250);
+
+ assertThat(counts).containsKeys("app-1", "app-2");
+ // app-1: 8 total executions, all non-RUNNING
+ // compliant (<=250ms): exec-01(200), exec-05(100), exec-06(150), exec-07(50), exec-08(60) = 5
+ long[] app1 = counts.get("app-1");
+ assertThat(app1[0]).isEqualTo(5); // compliant
+ assertThat(app1[1]).isEqualTo(8); // total non-running
+ // app-2: 1 COMPLETED(1000ms) + 1 RUNNING → 1 non-RUNNING, 0 compliant
+ long[] app2 = counts.get("app-2");
+ assertThat(app2[0]).isEqualTo(0); // compliant
+ assertThat(app2[1]).isEqualTo(1); // total non-running
+ }
+
+ @Test
+ void slaCountsByRoute_returnsMap() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ Map counts = store.slaCountsByRoute(from, to, "app-1", 250);
+
+ assertThat(counts).containsKeys("route-a", "route-b");
+ // route-a: exec-01(200)OK, exec-02(300)NO, exec-03(400)NO, exec-04(500)NO,
+ // exec-05(100)OK, exec-06(150)OK → 3 compliant, 6 total
+ long[] routeA = counts.get("route-a");
+ assertThat(routeA[0]).isEqualTo(3); // compliant
+ assertThat(routeA[1]).isEqualTo(6); // total
+ // route-b: exec-07(50)OK, exec-08(60)OK → 2 compliant, 2 total
+ long[] routeB = counts.get("route-b");
+ assertThat(routeB[0]).isEqualTo(2);
+ assertThat(routeB[1]).isEqualTo(2);
+ }
+
+ // ── Processor Stats Test ─────────────────────────────────────────────
+
+ @Test
+ void statsForProcessor_filtersCorrectly() {
+ Instant from = BASE.minusSeconds(60);
+ Instant to = BASE.plusSeconds(300);
+
+ ExecutionStats toStats = store.statsForProcessor(from, to, "route-a", "to");
+ assertThat(toStats.totalCount()).isEqualTo(3);
+ assertThat(toStats.activeCount()).isEqualTo(0); // processor stats have no running_count
+
+ ExecutionStats logStats = store.statsForProcessor(from, to, "route-a", "log");
+ assertThat(logStats.totalCount()).isEqualTo(2);
+ }
+}