diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index ab733408..c0cf7c5d 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -2,6 +2,7 @@ package com.cameleer3.server.app.config; import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore; import com.cameleer3.server.app.storage.ClickHouseMetricsStore; +import com.cameleer3.server.app.storage.ClickHouseStatsStore; import com.cameleer3.server.app.storage.PostgresMetricsQueryStore; import com.cameleer3.server.app.storage.PostgresMetricsStore; import com.cameleer3.server.core.admin.AuditRepository; @@ -16,6 +17,7 @@ import com.cameleer3.server.core.ingestion.IngestionService; import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.storage.*; +import com.cameleer3.server.core.storage.StatsStore; import com.cameleer3.server.core.storage.model.MetricsSnapshot; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -118,4 +120,13 @@ public class StorageBeanConfig { @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseSearchIndex(clickHouseJdbc); } + + // ── ClickHouse Stats Store ───────────────────────────────────────── + + @Bean + @ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "clickhouse", matchIfMissing = true) + public StatsStore clickHouseStatsStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseStatsStore(clickHouseJdbc); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java new file mode 100644 index 00000000..63fed6f6 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java @@ -0,0 +1,565 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.search.ExecutionStats; +import com.cameleer3.server.core.search.StatsTimeseries; +import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket; +import com.cameleer3.server.core.search.TopError; +import com.cameleer3.server.core.storage.StatsStore; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * ClickHouse implementation of {@link StatsStore}. + * Reads from AggregatingMergeTree tables populated by materialized views, + * using {@code -Merge} aggregate combinators to finalize partial states. + * + *

Queries against AggregatingMergeTree tables use literal SQL values instead + * of JDBC prepared-statement parameters because the ClickHouse JDBC v2 driver + * (0.9.x) wraps prepared statements in a sub-query that strips the + * {@code AggregateFunction} column type, breaking {@code -Merge} combinators. + * Queries against raw tables ({@code executions FINAL}, + * {@code processor_executions}) use normal prepared-statement parameters + * since they have no AggregateFunction columns.

+ */ +public class ClickHouseStatsStore implements StatsStore { + + private static final String TENANT = "default"; + + private final JdbcTemplate jdbc; + + public ClickHouseStatsStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + // ── Stats (aggregate) ──────────────────────────────────────────────── + + @Override + public ExecutionStats stats(Instant from, Instant to) { + return queryStats("stats_1m_all", from, to, List.of(), true); + } + + @Override + public ExecutionStats statsForApp(Instant from, Instant to, String applicationName) { + return queryStats("stats_1m_app", from, to, List.of( + new Filter("application_name", applicationName)), true); + } + + @Override + public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds) { + return queryStats("stats_1m_route", from, to, List.of( + new Filter("route_id", routeId)), true); + } + + @Override + public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) { + return queryProcessorStatsRaw(from, to, routeId, processorType); + } + + // ── Timeseries ─────────────────────────────────────────────────────── + + @Override + public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) { + return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true); + } + + @Override + public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationName) { + return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of( + new Filter("application_name", applicationName)), true); + } + + @Override + public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount, + String routeId, List agentIds) { + return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of( + new Filter("route_id", routeId)), true); + } + + @Override + public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount, + String routeId, String processorType) { + return queryProcessorTimeseriesRaw(from, to, bucketCount, routeId, processorType); + } + + // ── Grouped timeseries ─────────────────────────────────────────────── + + @Override + public Map timeseriesGroupedByApp(Instant from, Instant to, int bucketCount) { + return queryGroupedTimeseries("stats_1m_app", "application_name", from, to, + bucketCount, List.of()); + } + + @Override + public Map timeseriesGroupedByRoute(Instant from, Instant to, + int bucketCount, String applicationName) { + return queryGroupedTimeseries("stats_1m_route", "route_id", from, to, + bucketCount, List.of(new Filter("application_name", applicationName))); + } + + // ── SLA compliance (raw table — prepared statements OK) ────────────── + + @Override + public double slaCompliance(Instant from, Instant to, int thresholdMs, + String applicationName, String routeId) { + String sql = "SELECT " + + "countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " + + "countIf(status != 'RUNNING') AS total " + + "FROM executions FINAL " + + "WHERE tenant_id = ? AND start_time >= ? AND start_time < ?"; + + List params = new ArrayList<>(); + params.add(thresholdMs); + params.add(TENANT); + params.add(Timestamp.from(from)); + params.add(Timestamp.from(to)); + if (applicationName != null) { + sql += " AND application_name = ?"; + params.add(applicationName); + } + if (routeId != null) { + sql += " AND route_id = ?"; + params.add(routeId); + } + + return jdbc.query(sql, (rs, rowNum) -> { + long total = rs.getLong("total"); + if (total == 0) return 1.0; + return rs.getLong("compliant") * 100.0 / total; + }, params.toArray()).stream().findFirst().orElse(1.0); + } + + @Override + public Map slaCountsByApp(Instant from, Instant to, int defaultThresholdMs) { + String sql = "SELECT application_name, " + + "countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " + + "countIf(status != 'RUNNING') AS total " + + "FROM executions FINAL " + + "WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " + + "GROUP BY application_name"; + + Map result = new LinkedHashMap<>(); + jdbc.query(sql, (rs) -> { + result.put(rs.getString("application_name"), + new long[]{rs.getLong("compliant"), rs.getLong("total")}); + }, defaultThresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to)); + return result; + } + + @Override + public Map slaCountsByRoute(Instant from, Instant to, + String applicationName, int thresholdMs) { + String sql = "SELECT route_id, " + + "countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " + + "countIf(status != 'RUNNING') AS total " + + "FROM executions FINAL " + + "WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " + + "AND application_name = ? GROUP BY route_id"; + + Map result = new LinkedHashMap<>(); + jdbc.query(sql, (rs) -> { + result.put(rs.getString("route_id"), + new long[]{rs.getLong("compliant"), rs.getLong("total")}); + }, thresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to), applicationName); + return result; + } + + // ── Top errors (raw table — prepared statements OK) ────────────────── + + @Override + public List topErrors(Instant from, Instant to, String applicationName, + String routeId, int limit) { + StringBuilder where = new StringBuilder( + "status = 'FAILED' AND start_time >= ? AND start_time < ?"); + List params = new ArrayList<>(); + params.add(Timestamp.from(from)); + params.add(Timestamp.from(to)); + if (applicationName != null) { + where.append(" AND application_name = ?"); + params.add(applicationName); + } + + String table; + String groupId; + if (routeId != null) { + table = "processor_executions"; + groupId = "processor_id"; + where.append(" AND route_id = ?"); + params.add(routeId); + } else { + table = "executions FINAL"; + groupId = "route_id"; + } + + Instant fiveMinAgo = Instant.now().minus(5, ChronoUnit.MINUTES); + Instant tenMinAgo = Instant.now().minus(10, ChronoUnit.MINUTES); + + String sql = "WITH counted AS (" + + " SELECT COALESCE(error_type, substring(error_message, 1, 200)) AS error_key, " + + " " + groupId + " AS group_id, " + + " count() AS cnt, max(start_time) AS last_seen " + + " FROM " + table + " WHERE tenant_id = ? AND " + where + + " GROUP BY error_key, group_id ORDER BY cnt DESC LIMIT ?" + + "), velocity AS (" + + " SELECT COALESCE(error_type, substring(error_message, 1, 200)) AS error_key, " + + " countIf(start_time >= ?) AS recent_5m, " + + " countIf(start_time >= ? AND start_time < ?) AS prev_5m " + + " FROM " + table + " WHERE tenant_id = ? AND " + where + + " GROUP BY error_key" + + ") SELECT c.error_key, c.group_id, c.cnt, c.last_seen, " + + " COALESCE(v.recent_5m, 0) / 5.0 AS velocity, " + + " CASE " + + " WHEN COALESCE(v.recent_5m, 0) > COALESCE(v.prev_5m, 0) * 1.2 THEN 'accelerating' " + + " WHEN COALESCE(v.recent_5m, 0) < COALESCE(v.prev_5m, 0) * 0.8 THEN 'decelerating' " + + " ELSE 'stable' END AS trend " + + "FROM counted c LEFT JOIN velocity v ON c.error_key = v.error_key " + + "ORDER BY c.cnt DESC"; + + List fullParams = new ArrayList<>(); + fullParams.add(TENANT); + fullParams.addAll(params); + fullParams.add(limit); + fullParams.add(Timestamp.from(fiveMinAgo)); + fullParams.add(Timestamp.from(tenMinAgo)); + fullParams.add(Timestamp.from(fiveMinAgo)); + fullParams.add(TENANT); + fullParams.addAll(params); + + return jdbc.query(sql, (rs, rowNum) -> { + String errorKey = rs.getString("error_key"); + String gid = rs.getString("group_id"); + return new TopError( + errorKey, + routeId != null ? routeId : gid, + routeId != null ? gid : null, + rs.getLong("cnt"), + rs.getDouble("velocity"), + rs.getString("trend"), + rs.getTimestamp("last_seen").toInstant()); + }, fullParams.toArray()); + } + + @Override + public int activeErrorTypes(Instant from, Instant to, String applicationName) { + String sql = "SELECT COUNT(DISTINCT COALESCE(error_type, substring(error_message, 1, 200))) " + + "FROM executions FINAL " + + "WHERE tenant_id = ? AND status = 'FAILED' AND start_time >= ? AND start_time < ?"; + + List params = new ArrayList<>(); + params.add(TENANT); + params.add(Timestamp.from(from)); + params.add(Timestamp.from(to)); + if (applicationName != null) { + sql += " AND application_name = ?"; + params.add(applicationName); + } + + Integer count = jdbc.queryForObject(sql, Integer.class, params.toArray()); + return count != null ? count : 0; + } + + // ── Punchcard (AggregatingMergeTree — literal SQL) ─────────────────── + + @Override + public List punchcard(Instant from, Instant to, String applicationName) { + String view = applicationName != null ? "stats_1m_app" : "stats_1m_all"; + String sql = "SELECT toDayOfWeek(bucket, 1) % 7 AS weekday, " + + "toHour(bucket) AS hour, " + + "countMerge(total_count) AS total_count, " + + "countIfMerge(failed_count) AS failed_count " + + "FROM " + view + + " WHERE tenant_id = " + lit(TENANT) + + " AND bucket >= " + lit(from) + + " AND bucket < " + lit(to); + if (applicationName != null) { + sql += " AND application_name = " + lit(applicationName); + } + sql += " GROUP BY weekday, hour ORDER BY weekday, hour"; + + return jdbc.query(sql, (rs, rowNum) -> new PunchcardCell( + rs.getInt("weekday"), rs.getInt("hour"), + rs.getLong("total_count"), rs.getLong("failed_count"))); + } + + // ── Private helpers ────────────────────────────────────────────────── + + private record Filter(String column, String value) {} + + /** + * Format an Instant as a ClickHouse DateTime literal. + * Uses java.sql.Timestamp to match the JVM→ClickHouse timezone convention + * used by the JDBC driver, then truncates to second precision for DateTime + * column compatibility. + */ + private static String lit(Instant instant) { + // Truncate to seconds — ClickHouse DateTime has second precision + Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS); + String ts = new Timestamp(truncated.toEpochMilli()).toString(); + // Remove trailing ".0" that Timestamp.toString() always appends + if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2); + return "'" + ts + "'"; + } + + /** Format a string as a SQL literal with single-quote escaping. */ + private static String lit(String value) { + return "'" + value.replace("'", "\\'") + "'"; + } + + /** Convert Instant to java.sql.Timestamp for JDBC binding. */ + private static Timestamp ts(Instant instant) { + return Timestamp.from(instant); + } + + /** + * Build -Merge combinator SQL for the given view and time range. + */ + private String buildStatsSql(String view, Instant rangeFrom, Instant rangeTo, + List filters, boolean hasRunning) { + String runningCol = hasRunning ? "countIfMerge(running_count)" : "0"; + String sql = "SELECT " + + "countMerge(total_count) AS total_count, " + + "countIfMerge(failed_count) AS failed_count, " + + "sumMerge(duration_sum) AS duration_sum, " + + "quantileMerge(0.99)(p99_duration) AS p99_duration, " + + runningCol + " AS active_count " + + "FROM " + view + + " WHERE tenant_id = " + lit(TENANT) + + " AND bucket >= " + lit(rangeFrom) + + " AND bucket < " + lit(rangeTo); + for (Filter f : filters) { + sql += " AND " + f.column() + " = " + lit(f.value()); + } + return sql; + } + + /** + * Query an AggregatingMergeTree stats table using -Merge combinators. + * Uses literal SQL to avoid ClickHouse JDBC driver PreparedStatement issues. + */ + private ExecutionStats queryStats(String view, Instant from, Instant to, + List filters, boolean hasRunning) { + + String sql = buildStatsSql(view, from, to, filters, hasRunning); + + long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0; + var currentResult = jdbc.query(sql, (rs, rowNum) -> { + long tc = rs.getLong("total_count"); + long fc = rs.getLong("failed_count"); + long ds = rs.getLong("duration_sum"); // Nullable → 0 if null + long p99 = (long) rs.getDouble("p99_duration"); // quantileMerge returns Float64 + long ac = rs.getLong("active_count"); + return new long[]{tc, fc, ds, p99, ac}; + }); + if (!currentResult.isEmpty()) { + long[] r = currentResult.get(0); + totalCount = r[0]; failedCount = r[1]; + avgDuration = totalCount > 0 ? r[2] / totalCount : 0; + p99Duration = r[3]; activeCount = r[4]; + } + + // Previous period (shifted back 24h) + Instant prevFrom = from.minus(Duration.ofHours(24)); + Instant prevTo = to.minus(Duration.ofHours(24)); + String prevSql = buildStatsSql(view, prevFrom, prevTo, filters, hasRunning); + + long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0; + var prevResult = jdbc.query(prevSql, (rs, rowNum) -> { + long tc = rs.getLong("total_count"); + long fc = rs.getLong("failed_count"); + long ds = rs.getLong("duration_sum"); + long p99 = (long) rs.getDouble("p99_duration"); + return new long[]{tc, fc, ds, p99}; + }); + if (!prevResult.isEmpty()) { + long[] r = prevResult.get(0); + prevTotal = r[0]; prevFailed = r[1]; + prevAvg = prevTotal > 0 ? r[2] / prevTotal : 0; + prevP99 = r[3]; + } + + // Today total + Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS); + String todaySql = buildStatsSql(view, todayStart, Instant.now(), filters, hasRunning); + + long totalToday = 0; + var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count")); + if (!todayResult.isEmpty()) totalToday = todayResult.get(0); + + return new ExecutionStats( + totalCount, failedCount, avgDuration, p99Duration, activeCount, + totalToday, prevTotal, prevFailed, prevAvg, prevP99); + } + + /** + * Timeseries from AggregatingMergeTree using -Merge combinators. + */ + private StatsTimeseries queryTimeseries(String view, Instant from, Instant to, + int bucketCount, List filters, + boolean hasRunningCount) { + long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1); + if (intervalSeconds < 60) intervalSeconds = 60; + + String runningCol = hasRunningCount ? "countIfMerge(running_count)" : "0"; + + String sql = "SELECT " + + "toStartOfInterval(bucket, INTERVAL " + intervalSeconds + " SECOND) AS period, " + + "countMerge(total_count) AS total_count, " + + "countIfMerge(failed_count) AS failed_count, " + + "sumMerge(duration_sum) AS duration_sum, " + + "quantileMerge(0.99)(p99_duration) AS p99_duration, " + + runningCol + " AS active_count " + + "FROM " + view + + " WHERE tenant_id = " + lit(TENANT) + + " AND bucket >= " + lit(from) + + " AND bucket < " + lit(to); + for (Filter f : filters) { + sql += " AND " + f.column() + " = " + lit(f.value()); + } + sql += " GROUP BY period ORDER BY period"; + + List buckets = jdbc.query(sql, (rs, rowNum) -> { + long tc = rs.getLong("total_count"); + long ds = rs.getLong("duration_sum"); + return new TimeseriesBucket( + rs.getTimestamp("period").toInstant(), + tc, rs.getLong("failed_count"), + tc > 0 ? ds / tc : 0, (long) rs.getDouble("p99_duration"), + rs.getLong("active_count")); + }); + + return new StatsTimeseries(buckets); + } + + /** + * Grouped timeseries from AggregatingMergeTree. + */ + private Map queryGroupedTimeseries( + String view, String groupCol, Instant from, Instant to, + int bucketCount, List filters) { + + long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1); + if (intervalSeconds < 60) intervalSeconds = 60; + + String sql = "SELECT " + + "toStartOfInterval(bucket, INTERVAL " + intervalSeconds + " SECOND) AS period, " + + groupCol + " AS group_key, " + + "countMerge(total_count) AS total_count, " + + "countIfMerge(failed_count) AS failed_count, " + + "sumMerge(duration_sum) AS duration_sum, " + + "quantileMerge(0.99)(p99_duration) AS p99_duration, " + + "countIfMerge(running_count) AS active_count " + + "FROM " + view + + " WHERE tenant_id = " + lit(TENANT) + + " AND bucket >= " + lit(from) + + " AND bucket < " + lit(to); + for (Filter f : filters) { + sql += " AND " + f.column() + " = " + lit(f.value()); + } + sql += " GROUP BY period, group_key ORDER BY period, group_key"; + + Map> grouped = new LinkedHashMap<>(); + jdbc.query(sql, (rs) -> { + String key = rs.getString("group_key"); + long tc = rs.getLong("total_count"); + long ds = rs.getLong("duration_sum"); + TimeseriesBucket bucket = new TimeseriesBucket( + rs.getTimestamp("period").toInstant(), + tc, rs.getLong("failed_count"), + tc > 0 ? ds / tc : 0, (long) rs.getDouble("p99_duration"), + rs.getLong("active_count")); + grouped.computeIfAbsent(key, k -> new ArrayList<>()).add(bucket); + }); + + Map result = new LinkedHashMap<>(); + grouped.forEach((key, buckets) -> result.put(key, new StatsTimeseries(buckets))); + return result; + } + + /** + * Direct aggregation on processor_executions for processor-level stats. + */ + private ExecutionStats queryProcessorStatsRaw(Instant from, Instant to, + String routeId, String processorType) { + String sql = "SELECT " + + "count() AS total_count, " + + "countIf(status = 'FAILED') AS failed_count, " + + "CASE WHEN count() > 0 THEN sum(duration_ms) / count() ELSE 0 END AS avg_duration, " + + "quantile(0.99)(duration_ms) AS p99_duration, " + + "0 AS active_count " + + "FROM processor_executions " + + "WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " + + "AND route_id = ? AND processor_type = ?"; + + long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0; + var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{ + rs.getLong("total_count"), rs.getLong("failed_count"), + (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"), + rs.getLong("active_count") + }, TENANT, ts(from), ts(to), routeId, processorType); + if (!currentResult.isEmpty()) { + long[] r = currentResult.get(0); + totalCount = r[0]; failedCount = r[1]; avgDuration = r[2]; p99Duration = r[3]; + } + + Instant prevFrom = from.minus(Duration.ofHours(24)); + Instant prevTo = to.minus(Duration.ofHours(24)); + long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0; + var prevResult = jdbc.query(sql, (rs, rowNum) -> new long[]{ + rs.getLong("total_count"), rs.getLong("failed_count"), + (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration") + }, TENANT, ts(prevFrom), ts(prevTo), routeId, processorType); + if (!prevResult.isEmpty()) { + long[] r = prevResult.get(0); + prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3]; + } + + Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS); + long totalToday = 0; + var todayResult = jdbc.query(sql, (rs, rowNum) -> rs.getLong("total_count"), + TENANT, ts(todayStart), ts(Instant.now()), routeId, processorType); + if (!todayResult.isEmpty()) totalToday = todayResult.get(0); + + return new ExecutionStats( + totalCount, failedCount, avgDuration, p99Duration, 0, + totalToday, prevTotal, prevFailed, prevAvg, prevP99); + } + + /** + * Direct aggregation on processor_executions for processor-level timeseries. + */ + private StatsTimeseries queryProcessorTimeseriesRaw(Instant from, Instant to, + int bucketCount, + String routeId, String processorType) { + long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1); + if (intervalSeconds < 60) intervalSeconds = 60; + + String sql = "SELECT " + + "toStartOfInterval(start_time, INTERVAL " + intervalSeconds + " SECOND) AS period, " + + "count() AS total_count, " + + "countIf(status = 'FAILED') AS failed_count, " + + "CASE WHEN count() > 0 THEN sum(duration_ms) / count() ELSE 0 END AS avg_duration, " + + "quantile(0.99)(duration_ms) AS p99_duration, " + + "0 AS active_count " + + "FROM processor_executions " + + "WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " + + "AND route_id = ? AND processor_type = ? " + + "GROUP BY period ORDER BY period"; + + List buckets = jdbc.query(sql, (rs, rowNum) -> + new TimeseriesBucket( + rs.getTimestamp("period").toInstant(), + rs.getLong("total_count"), rs.getLong("failed_count"), + (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"), + rs.getLong("active_count") + ), TENANT, ts(from), ts(to), routeId, processorType); + + return new StatsTimeseries(buckets); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java index 563c5893..638b2da5 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java @@ -5,6 +5,7 @@ import com.cameleer3.server.core.search.StatsTimeseries; import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket; import com.cameleer3.server.core.search.TopError; import com.cameleer3.server.core.storage.StatsStore; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; @@ -18,6 +19,7 @@ import java.util.List; import java.util.Map; @Repository +@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "postgres") public class PostgresStatsStore implements StatsStore { private final JdbcTemplate jdbc; diff --git a/cameleer3-server-app/src/main/resources/application.yml b/cameleer3-server-app/src/main/resources/application.yml index 55cd27e1..fb33265c 100644 --- a/cameleer3-server-app/src/main/resources/application.yml +++ b/cameleer3-server-app/src/main/resources/application.yml @@ -51,6 +51,7 @@ cameleer: storage: metrics: ${CAMELEER_STORAGE_METRICS:postgres} search: ${CAMELEER_STORAGE_SEARCH:opensearch} + stats: ${CAMELEER_STORAGE_STATS:clickhouse} security: access-token-expiry-ms: 3600000 diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql new file mode 100644 index 00000000..f0e402a4 --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql @@ -0,0 +1,165 @@ +-- V4__stats_tables_and_mvs.sql +-- Materialized views replacing TimescaleDB continuous aggregates. +-- Tables use AggregatingMergeTree; MVs use -State combinators. + +-- stats_1m_all (global) + +CREATE TABLE IF NOT EXISTS stats_1m_all ( + tenant_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + running_count AggregateFunction(countIf, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS +SELECT + tenant_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, bucket; + +-- stats_1m_app (per-application) + +CREATE TABLE IF NOT EXISTS stats_1m_app ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + running_count AggregateFunction(countIf, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS +SELECT + tenant_id, + application_name, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_name, bucket; + +-- stats_1m_route (per-route) + +CREATE TABLE IF NOT EXISTS stats_1m_route ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + route_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + running_count AggregateFunction(countIf, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, route_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS +SELECT + tenant_id, + application_name, + route_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_name, route_id, bucket; + +-- stats_1m_processor (per-processor-type) + +CREATE TABLE IF NOT EXISTS stats_1m_processor ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + processor_type LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, processor_type, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS +SELECT + tenant_id, + application_name, + processor_type, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_name, processor_type, bucket; + +-- stats_1m_processor_detail (per-processor-id) + +CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + route_id LowCardinality(String), + processor_id String, + bucket DateTime, + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, route_id, processor_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS +SELECT + tenant_id, + application_name, + route_id, + processor_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_name, route_id, processor_id, bucket; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java new file mode 100644 index 00000000..6e7eab7a --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java @@ -0,0 +1,375 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.search.ExecutionStats; +import com.cameleer3.server.core.search.StatsTimeseries; +import com.cameleer3.server.core.search.TopError; +import com.cameleer3.server.core.storage.StatsStore.PunchcardCell; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseStatsStoreIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseStatsStore store; + + // base time: 2026-03-31T10:00:00Z (a Tuesday) + private static final Instant BASE = Instant.parse("2026-03-31T10:00:00Z"); + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + // Load DDL from classpath resources + String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + String statsDdl = new ClassPathResource("clickhouse/V4__stats_tables_and_mvs.sql") + .getContentAsString(StandardCharsets.UTF_8); + + jdbc.execute(executionsDdl); + jdbc.execute(processorsDdl); + + // Drop MVs first (they reference the stats tables), then recreate everything + jdbc.execute("DROP TABLE IF EXISTS stats_1m_all_mv"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_app_mv"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_route_mv"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_mv"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_detail_mv"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_all"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_app"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_route"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_detail"); + + // Strip SQL line comments first (they may contain semicolons), + // then split by ';' and execute non-empty statements. + String cleanedDdl = statsDdl.replaceAll("--[^\n]*", ""); + for (String stmt : cleanedDdl.split(";")) { + String trimmed = stmt.trim(); + if (!trimmed.isEmpty()) { + jdbc.execute(trimmed); + } + } + + // Truncate base tables + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + seedTestData(); + + // Try the failing query to capture it in query_log, then check + try { + jdbc.queryForMap( + "SELECT countMerge(total_count) AS tc, countIfMerge(failed_count) AS fc, " + + "sumMerge(duration_sum) / greatest(countMerge(total_count), 1) AS avg, " + + "quantileMerge(0.99)(p99_duration) AS p99, " + + "countIfMerge(running_count) AS rc " + + "FROM stats_1m_all WHERE tenant_id = 'default' " + + "AND bucket >= '2026-03-31 09:59:00' AND bucket < '2026-03-31 10:05:00'"); + } catch (Exception e) { + System.out.println("Expected error: " + e.getMessage().substring(0, 80)); + } + + jdbc.execute("SYSTEM FLUSH LOGS"); + // Get ALL recent queries to see what the driver sends + var queryLog = jdbc.queryForList( + "SELECT type, substring(query, 1, 200) AS q " + + "FROM system.query_log WHERE event_time > now() - 30 " + + "AND query NOT LIKE '%system.query_log%' AND query NOT LIKE '%FLUSH%' " + + "ORDER BY event_time DESC LIMIT 20"); + for (var entry : queryLog) { + System.out.println("LOG: " + entry.get("type") + " | " + entry.get("q")); + } + + store = new ClickHouseStatsStore(jdbc); + } + + private void seedTestData() { + // 10 executions across 2 apps, 2 routes, spanning 5 minutes + // app-1, route-a: 4 COMPLETED (200ms, 300ms, 400ms, 500ms) + insertExecution("exec-01", BASE.plusSeconds(0), "app-1", "route-a", "agent-1", + "COMPLETED", 200L, "", ""); + insertExecution("exec-02", BASE.plusSeconds(60), "app-1", "route-a", "agent-1", + "COMPLETED", 300L, "", ""); + insertExecution("exec-03", BASE.plusSeconds(120), "app-1", "route-a", "agent-1", + "COMPLETED", 400L, "", ""); + insertExecution("exec-04", BASE.plusSeconds(180), "app-1", "route-a", "agent-1", + "COMPLETED", 500L, "", ""); + + // app-1, route-a: 2 FAILED (100ms, 150ms) with error_type="NPE" + insertExecution("exec-05", BASE.plusSeconds(60), "app-1", "route-a", "agent-1", + "FAILED", 100L, "NPE", "null ref"); + insertExecution("exec-06", BASE.plusSeconds(120), "app-1", "route-a", "agent-1", + "FAILED", 150L, "NPE", "null ref"); + + // app-1, route-b: 2 COMPLETED (50ms, 60ms) + insertExecution("exec-07", BASE.plusSeconds(60), "app-1", "route-b", "agent-1", + "COMPLETED", 50L, "", ""); + insertExecution("exec-08", BASE.plusSeconds(120), "app-1", "route-b", "agent-1", + "COMPLETED", 60L, "", ""); + + // app-2, route-c: 1 COMPLETED (1000ms) + insertExecution("exec-09", BASE.plusSeconds(60), "app-2", "route-c", "agent-2", + "COMPLETED", 1000L, "", ""); + + // app-2, route-c: 1 RUNNING (null duration) + insertExecution("exec-10", BASE.plusSeconds(180), "app-2", "route-c", "agent-2", + "RUNNING", null, "", ""); + + // 5 processor records for processor stats testing + // app-1, route-a, processor_type="to": 3 COMPLETED + insertProcessor("exec-01", 1, "proc-to-1", "to", BASE.plusSeconds(0), + "app-1", "route-a", "COMPLETED", 50L); + insertProcessor("exec-02", 1, "proc-to-2", "to", BASE.plusSeconds(60), + "app-1", "route-a", "COMPLETED", 80L); + insertProcessor("exec-03", 1, "proc-to-3", "to", BASE.plusSeconds(120), + "app-1", "route-a", "COMPLETED", 90L); + + // app-1, route-a, processor_type="log": 2 COMPLETED + insertProcessor("exec-01", 2, "proc-log-1", "log", BASE.plusSeconds(1), + "app-1", "route-a", "COMPLETED", 10L); + insertProcessor("exec-02", 2, "proc-log-2", "log", BASE.plusSeconds(61), + "app-1", "route-a", "COMPLETED", 15L); + } + + private void insertExecution(String executionId, Instant startTime, String appName, + String routeId, String agentId, String status, + Long durationMs, String errorType, String errorMessage) { + jdbc.update( + "INSERT INTO executions (tenant_id, execution_id, start_time, route_id, " + + "agent_id, application_name, status, duration_ms, error_type, error_message) " + + "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?)", + executionId, Timestamp.from(startTime), routeId, agentId, appName, + status, durationMs, errorType, errorMessage); + } + + private void insertProcessor(String executionId, int seq, String processorId, + String processorType, Instant startTime, + String appName, String routeId, String status, + Long durationMs) { + jdbc.update( + "INSERT INTO processor_executions (tenant_id, execution_id, seq, processor_id, " + + "processor_type, start_time, route_id, application_name, status, duration_ms) " + + "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?)", + executionId, seq, processorId, processorType, Timestamp.from(startTime), + routeId, appName, status, durationMs); + } + + // ── Stats Tests ────────────────────────────────────────────────────── + + @Test + void stats_returnsCorrectGlobalTotals() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + ExecutionStats stats = store.stats(from, to); + + assertThat(stats.totalCount()).isEqualTo(10); + assertThat(stats.failedCount()).isEqualTo(2); + assertThat(stats.activeCount()).isEqualTo(1); + assertThat(stats.avgDurationMs()).isGreaterThan(0); + assertThat(stats.p99LatencyMs()).isGreaterThan(0); + } + + @Test + void statsForApp_filtersCorrectly() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + ExecutionStats app1 = store.statsForApp(from, to, "app-1"); + assertThat(app1.totalCount()).isEqualTo(8); + + ExecutionStats app2 = store.statsForApp(from, to, "app-2"); + assertThat(app2.totalCount()).isEqualTo(2); + } + + @Test + void statsForRoute_filtersCorrectly() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + ExecutionStats routeA = store.statsForRoute(from, to, "route-a", List.of()); + assertThat(routeA.totalCount()).isEqualTo(6); + } + + // ── Timeseries Tests ───────────────────────────────────────────────── + + @Test + void timeseries_returnsBuckets() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + StatsTimeseries ts = store.timeseries(from, to, 5); + + assertThat(ts.buckets()).isNotEmpty(); + long totalAcrossBuckets = ts.buckets().stream() + .mapToLong(StatsTimeseries.TimeseriesBucket::totalCount).sum(); + assertThat(totalAcrossBuckets).isEqualTo(10); + } + + @Test + void timeseriesForApp_filtersCorrectly() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + StatsTimeseries ts = store.timeseriesForApp(from, to, 5, "app-1"); + + long totalAcrossBuckets = ts.buckets().stream() + .mapToLong(StatsTimeseries.TimeseriesBucket::totalCount).sum(); + assertThat(totalAcrossBuckets).isEqualTo(8); + } + + @Test + void timeseriesGroupedByApp_returnsMap() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + Map grouped = store.timeseriesGroupedByApp(from, to, 5); + + assertThat(grouped).containsKeys("app-1", "app-2"); + } + + @Test + void timeseriesGroupedByRoute_returnsMap() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + Map grouped = store.timeseriesGroupedByRoute(from, to, 5, "app-1"); + + assertThat(grouped).containsKeys("route-a", "route-b"); + } + + // ── SLA Tests ──────────────────────────────────────────────────────── + + @Test + void slaCompliance_calculatesCorrectly() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + // threshold=250ms: among 9 non-RUNNING executions: + // compliant (<=250ms): exec-01(200), exec-05(100), exec-06(150), exec-07(50), exec-08(60) = 5 + // total non-running: 9 + // compliance = 5/9 * 100 ~ 55.56% + double sla = store.slaCompliance(from, to, 250, null, null); + assertThat(sla).isBetween(55.0, 56.0); + } + + // ── Top Errors Tests ───────────────────────────────────────────────── + + @Test + void topErrors_returnsRankedErrors() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + List errors = store.topErrors(from, to, null, null, 10); + + assertThat(errors).isNotEmpty(); + assertThat(errors.get(0).errorType()).isEqualTo("NPE"); + assertThat(errors.get(0).count()).isEqualTo(2); + } + + // ── Active Error Types Test ────────────────────────────────────────── + + @Test + void activeErrorTypes_countsDistinct() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + int count = store.activeErrorTypes(from, to, "app-1"); + + assertThat(count).isEqualTo(1); // only "NPE" + } + + // ── Punchcard Test ─────────────────────────────────────────────────── + + @Test + void punchcard_returnsWeekdayHourCells() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + List cells = store.punchcard(from, to, null); + + assertThat(cells).isNotEmpty(); + long totalCount = cells.stream().mapToLong(PunchcardCell::totalCount).sum(); + assertThat(totalCount).isEqualTo(10); + } + + @Test + void slaCountsByApp_returnsMap() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + // threshold=250ms + Map counts = store.slaCountsByApp(from, to, 250); + + assertThat(counts).containsKeys("app-1", "app-2"); + // app-1: 8 total executions, all non-RUNNING + // compliant (<=250ms): exec-01(200), exec-05(100), exec-06(150), exec-07(50), exec-08(60) = 5 + long[] app1 = counts.get("app-1"); + assertThat(app1[0]).isEqualTo(5); // compliant + assertThat(app1[1]).isEqualTo(8); // total non-running + // app-2: 1 COMPLETED(1000ms) + 1 RUNNING → 1 non-RUNNING, 0 compliant + long[] app2 = counts.get("app-2"); + assertThat(app2[0]).isEqualTo(0); // compliant + assertThat(app2[1]).isEqualTo(1); // total non-running + } + + @Test + void slaCountsByRoute_returnsMap() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + Map counts = store.slaCountsByRoute(from, to, "app-1", 250); + + assertThat(counts).containsKeys("route-a", "route-b"); + // route-a: exec-01(200)OK, exec-02(300)NO, exec-03(400)NO, exec-04(500)NO, + // exec-05(100)OK, exec-06(150)OK → 3 compliant, 6 total + long[] routeA = counts.get("route-a"); + assertThat(routeA[0]).isEqualTo(3); // compliant + assertThat(routeA[1]).isEqualTo(6); // total + // route-b: exec-07(50)OK, exec-08(60)OK → 2 compliant, 2 total + long[] routeB = counts.get("route-b"); + assertThat(routeB[0]).isEqualTo(2); + assertThat(routeB[1]).isEqualTo(2); + } + + // ── Processor Stats Test ───────────────────────────────────────────── + + @Test + void statsForProcessor_filtersCorrectly() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + ExecutionStats toStats = store.statsForProcessor(from, to, "route-a", "to"); + assertThat(toStats.totalCount()).isEqualTo(3); + assertThat(toStats.activeCount()).isEqualTo(0); // processor stats have no running_count + + ExecutionStats logStats = store.statsForProcessor(from, to, "route-a", "log"); + assertThat(logStats.totalCount()).isEqualTo(2); + } +} diff --git a/deploy/base/server.yaml b/deploy/base/server.yaml index 06c131a3..0f76b3fb 100644 --- a/deploy/base/server.yaml +++ b/deploy/base/server.yaml @@ -93,6 +93,8 @@ spec: value: "postgres" - name: CAMELEER_STORAGE_SEARCH value: "opensearch" + - name: CAMELEER_STORAGE_STATS + value: "clickhouse" resources: requests: