From eb0d26814f4e3e3e863dca698ea7ce103b09f00e Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 20:11:38 +0200 Subject: [PATCH 1/3] feat(clickhouse): add stats materialized views DDL (5 tables + 5 MVs) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../clickhouse/V4__stats_tables_and_mvs.sql | 165 ++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql new file mode 100644 index 00000000..fb87c2dc --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql @@ -0,0 +1,165 @@ +-- V4__stats_tables_and_mvs.sql +-- Materialized views replacing TimescaleDB continuous aggregates. +-- Tables use AggregatingMergeTree; MVs use -State combinators. + +-- stats_1m_all (global) + +CREATE TABLE IF NOT EXISTS stats_1m_all ( + tenant_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS +SELECT + tenant_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, bucket; + +-- stats_1m_app (per-application) + +CREATE TABLE IF NOT EXISTS stats_1m_app ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS +SELECT + tenant_id, + application_name, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_name, bucket; + +-- stats_1m_route (per-route) + +CREATE TABLE IF NOT EXISTS stats_1m_route ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + route_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, route_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS +SELECT + tenant_id, + application_name, + route_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_name, route_id, bucket; + +-- stats_1m_processor (per-processor-type) + +CREATE TABLE IF NOT EXISTS stats_1m_processor ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + processor_type LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, processor_type, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS +SELECT + tenant_id, + application_name, + processor_type, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_name, processor_type, bucket; + +-- stats_1m_processor_detail (per-processor-id) + +CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + route_id LowCardinality(String), + processor_id String, + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, route_id, processor_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS +SELECT + tenant_id, + application_name, + route_id, + processor_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_name, route_id, processor_id, bucket; From 052990bb59744f66c0623cb028602960fe3dc46e Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 21:49:22 +0200 Subject: [PATCH 2/3] feat(clickhouse): add ClickHouseStatsStore with -Merge aggregate queries Implements StatsStore interface for ClickHouse using AggregatingMergeTree tables with -Merge combinators (countMerge, countIfMerge, sumMerge, quantileMerge). Uses literal SQL for aggregate table queries to avoid ClickHouse JDBC driver PreparedStatement issues with AggregateFunction columns. Raw table queries (SLA, topErrors, activeErrorTypes) use normal prepared statements. Includes 13 integration tests covering stats, timeseries, grouped timeseries, SLA compliance, SLA counts by app/route, top errors, active error types, punchcard, and processor stats. Also fixes AggregateFunction type signatures in V4 DDL (count() takes no args, countIf takes UInt8). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/storage/ClickHouseStatsStore.java | 565 ++++++++++++++++++ .../clickhouse/V4__stats_tables_and_mvs.sql | 26 +- .../app/storage/ClickHouseStatsStoreIT.java | 375 ++++++++++++ 3 files changed, 953 insertions(+), 13 deletions(-) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java create mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java new file mode 100644 index 00000000..63fed6f6 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java @@ -0,0 +1,565 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.search.ExecutionStats; +import com.cameleer3.server.core.search.StatsTimeseries; +import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket; +import com.cameleer3.server.core.search.TopError; +import com.cameleer3.server.core.storage.StatsStore; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * ClickHouse implementation of {@link StatsStore}. + * Reads from AggregatingMergeTree tables populated by materialized views, + * using {@code -Merge} aggregate combinators to finalize partial states. + * + *

Queries against AggregatingMergeTree tables use literal SQL values instead + * of JDBC prepared-statement parameters because the ClickHouse JDBC v2 driver + * (0.9.x) wraps prepared statements in a sub-query that strips the + * {@code AggregateFunction} column type, breaking {@code -Merge} combinators. + * Queries against raw tables ({@code executions FINAL}, + * {@code processor_executions}) use normal prepared-statement parameters + * since they have no AggregateFunction columns.

+ */ +public class ClickHouseStatsStore implements StatsStore { + + private static final String TENANT = "default"; + + private final JdbcTemplate jdbc; + + public ClickHouseStatsStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + // ── Stats (aggregate) ──────────────────────────────────────────────── + + @Override + public ExecutionStats stats(Instant from, Instant to) { + return queryStats("stats_1m_all", from, to, List.of(), true); + } + + @Override + public ExecutionStats statsForApp(Instant from, Instant to, String applicationName) { + return queryStats("stats_1m_app", from, to, List.of( + new Filter("application_name", applicationName)), true); + } + + @Override + public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds) { + return queryStats("stats_1m_route", from, to, List.of( + new Filter("route_id", routeId)), true); + } + + @Override + public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) { + return queryProcessorStatsRaw(from, to, routeId, processorType); + } + + // ── Timeseries ─────────────────────────────────────────────────────── + + @Override + public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) { + return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true); + } + + @Override + public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationName) { + return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of( + new Filter("application_name", applicationName)), true); + } + + @Override + public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount, + String routeId, List agentIds) { + return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of( + new Filter("route_id", routeId)), true); + } + + @Override + public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount, + String routeId, String processorType) { + return queryProcessorTimeseriesRaw(from, to, bucketCount, routeId, processorType); + } + + // ── Grouped timeseries ─────────────────────────────────────────────── + + @Override + public Map timeseriesGroupedByApp(Instant from, Instant to, int bucketCount) { + return queryGroupedTimeseries("stats_1m_app", "application_name", from, to, + bucketCount, List.of()); + } + + @Override + public Map timeseriesGroupedByRoute(Instant from, Instant to, + int bucketCount, String applicationName) { + return queryGroupedTimeseries("stats_1m_route", "route_id", from, to, + bucketCount, List.of(new Filter("application_name", applicationName))); + } + + // ── SLA compliance (raw table — prepared statements OK) ────────────── + + @Override + public double slaCompliance(Instant from, Instant to, int thresholdMs, + String applicationName, String routeId) { + String sql = "SELECT " + + "countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " + + "countIf(status != 'RUNNING') AS total " + + "FROM executions FINAL " + + "WHERE tenant_id = ? AND start_time >= ? AND start_time < ?"; + + List params = new ArrayList<>(); + params.add(thresholdMs); + params.add(TENANT); + params.add(Timestamp.from(from)); + params.add(Timestamp.from(to)); + if (applicationName != null) { + sql += " AND application_name = ?"; + params.add(applicationName); + } + if (routeId != null) { + sql += " AND route_id = ?"; + params.add(routeId); + } + + return jdbc.query(sql, (rs, rowNum) -> { + long total = rs.getLong("total"); + if (total == 0) return 1.0; + return rs.getLong("compliant") * 100.0 / total; + }, params.toArray()).stream().findFirst().orElse(1.0); + } + + @Override + public Map slaCountsByApp(Instant from, Instant to, int defaultThresholdMs) { + String sql = "SELECT application_name, " + + "countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " + + "countIf(status != 'RUNNING') AS total " + + "FROM executions FINAL " + + "WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " + + "GROUP BY application_name"; + + Map result = new LinkedHashMap<>(); + jdbc.query(sql, (rs) -> { + result.put(rs.getString("application_name"), + new long[]{rs.getLong("compliant"), rs.getLong("total")}); + }, defaultThresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to)); + return result; + } + + @Override + public Map slaCountsByRoute(Instant from, Instant to, + String applicationName, int thresholdMs) { + String sql = "SELECT route_id, " + + "countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " + + "countIf(status != 'RUNNING') AS total " + + "FROM executions FINAL " + + "WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " + + "AND application_name = ? GROUP BY route_id"; + + Map result = new LinkedHashMap<>(); + jdbc.query(sql, (rs) -> { + result.put(rs.getString("route_id"), + new long[]{rs.getLong("compliant"), rs.getLong("total")}); + }, thresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to), applicationName); + return result; + } + + // ── Top errors (raw table — prepared statements OK) ────────────────── + + @Override + public List topErrors(Instant from, Instant to, String applicationName, + String routeId, int limit) { + StringBuilder where = new StringBuilder( + "status = 'FAILED' AND start_time >= ? AND start_time < ?"); + List params = new ArrayList<>(); + params.add(Timestamp.from(from)); + params.add(Timestamp.from(to)); + if (applicationName != null) { + where.append(" AND application_name = ?"); + params.add(applicationName); + } + + String table; + String groupId; + if (routeId != null) { + table = "processor_executions"; + groupId = "processor_id"; + where.append(" AND route_id = ?"); + params.add(routeId); + } else { + table = "executions FINAL"; + groupId = "route_id"; + } + + Instant fiveMinAgo = Instant.now().minus(5, ChronoUnit.MINUTES); + Instant tenMinAgo = Instant.now().minus(10, ChronoUnit.MINUTES); + + String sql = "WITH counted AS (" + + " SELECT COALESCE(error_type, substring(error_message, 1, 200)) AS error_key, " + + " " + groupId + " AS group_id, " + + " count() AS cnt, max(start_time) AS last_seen " + + " FROM " + table + " WHERE tenant_id = ? AND " + where + + " GROUP BY error_key, group_id ORDER BY cnt DESC LIMIT ?" + + "), velocity AS (" + + " SELECT COALESCE(error_type, substring(error_message, 1, 200)) AS error_key, " + + " countIf(start_time >= ?) AS recent_5m, " + + " countIf(start_time >= ? AND start_time < ?) AS prev_5m " + + " FROM " + table + " WHERE tenant_id = ? AND " + where + + " GROUP BY error_key" + + ") SELECT c.error_key, c.group_id, c.cnt, c.last_seen, " + + " COALESCE(v.recent_5m, 0) / 5.0 AS velocity, " + + " CASE " + + " WHEN COALESCE(v.recent_5m, 0) > COALESCE(v.prev_5m, 0) * 1.2 THEN 'accelerating' " + + " WHEN COALESCE(v.recent_5m, 0) < COALESCE(v.prev_5m, 0) * 0.8 THEN 'decelerating' " + + " ELSE 'stable' END AS trend " + + "FROM counted c LEFT JOIN velocity v ON c.error_key = v.error_key " + + "ORDER BY c.cnt DESC"; + + List fullParams = new ArrayList<>(); + fullParams.add(TENANT); + fullParams.addAll(params); + fullParams.add(limit); + fullParams.add(Timestamp.from(fiveMinAgo)); + fullParams.add(Timestamp.from(tenMinAgo)); + fullParams.add(Timestamp.from(fiveMinAgo)); + fullParams.add(TENANT); + fullParams.addAll(params); + + return jdbc.query(sql, (rs, rowNum) -> { + String errorKey = rs.getString("error_key"); + String gid = rs.getString("group_id"); + return new TopError( + errorKey, + routeId != null ? routeId : gid, + routeId != null ? gid : null, + rs.getLong("cnt"), + rs.getDouble("velocity"), + rs.getString("trend"), + rs.getTimestamp("last_seen").toInstant()); + }, fullParams.toArray()); + } + + @Override + public int activeErrorTypes(Instant from, Instant to, String applicationName) { + String sql = "SELECT COUNT(DISTINCT COALESCE(error_type, substring(error_message, 1, 200))) " + + "FROM executions FINAL " + + "WHERE tenant_id = ? AND status = 'FAILED' AND start_time >= ? AND start_time < ?"; + + List params = new ArrayList<>(); + params.add(TENANT); + params.add(Timestamp.from(from)); + params.add(Timestamp.from(to)); + if (applicationName != null) { + sql += " AND application_name = ?"; + params.add(applicationName); + } + + Integer count = jdbc.queryForObject(sql, Integer.class, params.toArray()); + return count != null ? count : 0; + } + + // ── Punchcard (AggregatingMergeTree — literal SQL) ─────────────────── + + @Override + public List punchcard(Instant from, Instant to, String applicationName) { + String view = applicationName != null ? "stats_1m_app" : "stats_1m_all"; + String sql = "SELECT toDayOfWeek(bucket, 1) % 7 AS weekday, " + + "toHour(bucket) AS hour, " + + "countMerge(total_count) AS total_count, " + + "countIfMerge(failed_count) AS failed_count " + + "FROM " + view + + " WHERE tenant_id = " + lit(TENANT) + + " AND bucket >= " + lit(from) + + " AND bucket < " + lit(to); + if (applicationName != null) { + sql += " AND application_name = " + lit(applicationName); + } + sql += " GROUP BY weekday, hour ORDER BY weekday, hour"; + + return jdbc.query(sql, (rs, rowNum) -> new PunchcardCell( + rs.getInt("weekday"), rs.getInt("hour"), + rs.getLong("total_count"), rs.getLong("failed_count"))); + } + + // ── Private helpers ────────────────────────────────────────────────── + + private record Filter(String column, String value) {} + + /** + * Format an Instant as a ClickHouse DateTime literal. + * Uses java.sql.Timestamp to match the JVM→ClickHouse timezone convention + * used by the JDBC driver, then truncates to second precision for DateTime + * column compatibility. + */ + private static String lit(Instant instant) { + // Truncate to seconds — ClickHouse DateTime has second precision + Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS); + String ts = new Timestamp(truncated.toEpochMilli()).toString(); + // Remove trailing ".0" that Timestamp.toString() always appends + if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2); + return "'" + ts + "'"; + } + + /** Format a string as a SQL literal with single-quote escaping. */ + private static String lit(String value) { + return "'" + value.replace("'", "\\'") + "'"; + } + + /** Convert Instant to java.sql.Timestamp for JDBC binding. */ + private static Timestamp ts(Instant instant) { + return Timestamp.from(instant); + } + + /** + * Build -Merge combinator SQL for the given view and time range. + */ + private String buildStatsSql(String view, Instant rangeFrom, Instant rangeTo, + List filters, boolean hasRunning) { + String runningCol = hasRunning ? "countIfMerge(running_count)" : "0"; + String sql = "SELECT " + + "countMerge(total_count) AS total_count, " + + "countIfMerge(failed_count) AS failed_count, " + + "sumMerge(duration_sum) AS duration_sum, " + + "quantileMerge(0.99)(p99_duration) AS p99_duration, " + + runningCol + " AS active_count " + + "FROM " + view + + " WHERE tenant_id = " + lit(TENANT) + + " AND bucket >= " + lit(rangeFrom) + + " AND bucket < " + lit(rangeTo); + for (Filter f : filters) { + sql += " AND " + f.column() + " = " + lit(f.value()); + } + return sql; + } + + /** + * Query an AggregatingMergeTree stats table using -Merge combinators. + * Uses literal SQL to avoid ClickHouse JDBC driver PreparedStatement issues. + */ + private ExecutionStats queryStats(String view, Instant from, Instant to, + List filters, boolean hasRunning) { + + String sql = buildStatsSql(view, from, to, filters, hasRunning); + + long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0; + var currentResult = jdbc.query(sql, (rs, rowNum) -> { + long tc = rs.getLong("total_count"); + long fc = rs.getLong("failed_count"); + long ds = rs.getLong("duration_sum"); // Nullable → 0 if null + long p99 = (long) rs.getDouble("p99_duration"); // quantileMerge returns Float64 + long ac = rs.getLong("active_count"); + return new long[]{tc, fc, ds, p99, ac}; + }); + if (!currentResult.isEmpty()) { + long[] r = currentResult.get(0); + totalCount = r[0]; failedCount = r[1]; + avgDuration = totalCount > 0 ? r[2] / totalCount : 0; + p99Duration = r[3]; activeCount = r[4]; + } + + // Previous period (shifted back 24h) + Instant prevFrom = from.minus(Duration.ofHours(24)); + Instant prevTo = to.minus(Duration.ofHours(24)); + String prevSql = buildStatsSql(view, prevFrom, prevTo, filters, hasRunning); + + long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0; + var prevResult = jdbc.query(prevSql, (rs, rowNum) -> { + long tc = rs.getLong("total_count"); + long fc = rs.getLong("failed_count"); + long ds = rs.getLong("duration_sum"); + long p99 = (long) rs.getDouble("p99_duration"); + return new long[]{tc, fc, ds, p99}; + }); + if (!prevResult.isEmpty()) { + long[] r = prevResult.get(0); + prevTotal = r[0]; prevFailed = r[1]; + prevAvg = prevTotal > 0 ? r[2] / prevTotal : 0; + prevP99 = r[3]; + } + + // Today total + Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS); + String todaySql = buildStatsSql(view, todayStart, Instant.now(), filters, hasRunning); + + long totalToday = 0; + var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count")); + if (!todayResult.isEmpty()) totalToday = todayResult.get(0); + + return new ExecutionStats( + totalCount, failedCount, avgDuration, p99Duration, activeCount, + totalToday, prevTotal, prevFailed, prevAvg, prevP99); + } + + /** + * Timeseries from AggregatingMergeTree using -Merge combinators. + */ + private StatsTimeseries queryTimeseries(String view, Instant from, Instant to, + int bucketCount, List filters, + boolean hasRunningCount) { + long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1); + if (intervalSeconds < 60) intervalSeconds = 60; + + String runningCol = hasRunningCount ? "countIfMerge(running_count)" : "0"; + + String sql = "SELECT " + + "toStartOfInterval(bucket, INTERVAL " + intervalSeconds + " SECOND) AS period, " + + "countMerge(total_count) AS total_count, " + + "countIfMerge(failed_count) AS failed_count, " + + "sumMerge(duration_sum) AS duration_sum, " + + "quantileMerge(0.99)(p99_duration) AS p99_duration, " + + runningCol + " AS active_count " + + "FROM " + view + + " WHERE tenant_id = " + lit(TENANT) + + " AND bucket >= " + lit(from) + + " AND bucket < " + lit(to); + for (Filter f : filters) { + sql += " AND " + f.column() + " = " + lit(f.value()); + } + sql += " GROUP BY period ORDER BY period"; + + List buckets = jdbc.query(sql, (rs, rowNum) -> { + long tc = rs.getLong("total_count"); + long ds = rs.getLong("duration_sum"); + return new TimeseriesBucket( + rs.getTimestamp("period").toInstant(), + tc, rs.getLong("failed_count"), + tc > 0 ? ds / tc : 0, (long) rs.getDouble("p99_duration"), + rs.getLong("active_count")); + }); + + return new StatsTimeseries(buckets); + } + + /** + * Grouped timeseries from AggregatingMergeTree. + */ + private Map queryGroupedTimeseries( + String view, String groupCol, Instant from, Instant to, + int bucketCount, List filters) { + + long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1); + if (intervalSeconds < 60) intervalSeconds = 60; + + String sql = "SELECT " + + "toStartOfInterval(bucket, INTERVAL " + intervalSeconds + " SECOND) AS period, " + + groupCol + " AS group_key, " + + "countMerge(total_count) AS total_count, " + + "countIfMerge(failed_count) AS failed_count, " + + "sumMerge(duration_sum) AS duration_sum, " + + "quantileMerge(0.99)(p99_duration) AS p99_duration, " + + "countIfMerge(running_count) AS active_count " + + "FROM " + view + + " WHERE tenant_id = " + lit(TENANT) + + " AND bucket >= " + lit(from) + + " AND bucket < " + lit(to); + for (Filter f : filters) { + sql += " AND " + f.column() + " = " + lit(f.value()); + } + sql += " GROUP BY period, group_key ORDER BY period, group_key"; + + Map> grouped = new LinkedHashMap<>(); + jdbc.query(sql, (rs) -> { + String key = rs.getString("group_key"); + long tc = rs.getLong("total_count"); + long ds = rs.getLong("duration_sum"); + TimeseriesBucket bucket = new TimeseriesBucket( + rs.getTimestamp("period").toInstant(), + tc, rs.getLong("failed_count"), + tc > 0 ? ds / tc : 0, (long) rs.getDouble("p99_duration"), + rs.getLong("active_count")); + grouped.computeIfAbsent(key, k -> new ArrayList<>()).add(bucket); + }); + + Map result = new LinkedHashMap<>(); + grouped.forEach((key, buckets) -> result.put(key, new StatsTimeseries(buckets))); + return result; + } + + /** + * Direct aggregation on processor_executions for processor-level stats. + */ + private ExecutionStats queryProcessorStatsRaw(Instant from, Instant to, + String routeId, String processorType) { + String sql = "SELECT " + + "count() AS total_count, " + + "countIf(status = 'FAILED') AS failed_count, " + + "CASE WHEN count() > 0 THEN sum(duration_ms) / count() ELSE 0 END AS avg_duration, " + + "quantile(0.99)(duration_ms) AS p99_duration, " + + "0 AS active_count " + + "FROM processor_executions " + + "WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " + + "AND route_id = ? AND processor_type = ?"; + + long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0; + var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{ + rs.getLong("total_count"), rs.getLong("failed_count"), + (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"), + rs.getLong("active_count") + }, TENANT, ts(from), ts(to), routeId, processorType); + if (!currentResult.isEmpty()) { + long[] r = currentResult.get(0); + totalCount = r[0]; failedCount = r[1]; avgDuration = r[2]; p99Duration = r[3]; + } + + Instant prevFrom = from.minus(Duration.ofHours(24)); + Instant prevTo = to.minus(Duration.ofHours(24)); + long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0; + var prevResult = jdbc.query(sql, (rs, rowNum) -> new long[]{ + rs.getLong("total_count"), rs.getLong("failed_count"), + (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration") + }, TENANT, ts(prevFrom), ts(prevTo), routeId, processorType); + if (!prevResult.isEmpty()) { + long[] r = prevResult.get(0); + prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3]; + } + + Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS); + long totalToday = 0; + var todayResult = jdbc.query(sql, (rs, rowNum) -> rs.getLong("total_count"), + TENANT, ts(todayStart), ts(Instant.now()), routeId, processorType); + if (!todayResult.isEmpty()) totalToday = todayResult.get(0); + + return new ExecutionStats( + totalCount, failedCount, avgDuration, p99Duration, 0, + totalToday, prevTotal, prevFailed, prevAvg, prevP99); + } + + /** + * Direct aggregation on processor_executions for processor-level timeseries. + */ + private StatsTimeseries queryProcessorTimeseriesRaw(Instant from, Instant to, + int bucketCount, + String routeId, String processorType) { + long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1); + if (intervalSeconds < 60) intervalSeconds = 60; + + String sql = "SELECT " + + "toStartOfInterval(start_time, INTERVAL " + intervalSeconds + " SECOND) AS period, " + + "count() AS total_count, " + + "countIf(status = 'FAILED') AS failed_count, " + + "CASE WHEN count() > 0 THEN sum(duration_ms) / count() ELSE 0 END AS avg_duration, " + + "quantile(0.99)(duration_ms) AS p99_duration, " + + "0 AS active_count " + + "FROM processor_executions " + + "WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " + + "AND route_id = ? AND processor_type = ? " + + "GROUP BY period ORDER BY period"; + + List buckets = jdbc.query(sql, (rs, rowNum) -> + new TimeseriesBucket( + rs.getTimestamp("period").toInstant(), + rs.getLong("total_count"), rs.getLong("failed_count"), + (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"), + rs.getLong("active_count") + ), TENANT, ts(from), ts(to), routeId, processorType); + + return new StatsTimeseries(buckets); + } +} diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql index fb87c2dc..f0e402a4 100644 --- a/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql +++ b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql @@ -7,9 +7,9 @@ CREATE TABLE IF NOT EXISTS stats_1m_all ( tenant_id LowCardinality(String), bucket DateTime, - total_count AggregateFunction(count, UInt64), - failed_count AggregateFunction(countIf, UInt64, UInt8), - running_count AggregateFunction(countIf, UInt64, UInt8), + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + running_count AggregateFunction(countIf, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) @@ -38,9 +38,9 @@ CREATE TABLE IF NOT EXISTS stats_1m_app ( tenant_id LowCardinality(String), application_name LowCardinality(String), bucket DateTime, - total_count AggregateFunction(count, UInt64), - failed_count AggregateFunction(countIf, UInt64, UInt8), - running_count AggregateFunction(countIf, UInt64, UInt8), + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + running_count AggregateFunction(countIf, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) @@ -71,9 +71,9 @@ CREATE TABLE IF NOT EXISTS stats_1m_route ( application_name LowCardinality(String), route_id LowCardinality(String), bucket DateTime, - total_count AggregateFunction(count, UInt64), - failed_count AggregateFunction(countIf, UInt64, UInt8), - running_count AggregateFunction(countIf, UInt64, UInt8), + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + running_count AggregateFunction(countIf, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) @@ -105,8 +105,8 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor ( application_name LowCardinality(String), processor_type LowCardinality(String), bucket DateTime, - total_count AggregateFunction(count, UInt64), - failed_count AggregateFunction(countIf, UInt64, UInt8), + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) @@ -138,8 +138,8 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( route_id LowCardinality(String), processor_id String, bucket DateTime, - total_count AggregateFunction(count, UInt64), - failed_count AggregateFunction(countIf, UInt64, UInt8), + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java new file mode 100644 index 00000000..6e7eab7a --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java @@ -0,0 +1,375 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.search.ExecutionStats; +import com.cameleer3.server.core.search.StatsTimeseries; +import com.cameleer3.server.core.search.TopError; +import com.cameleer3.server.core.storage.StatsStore.PunchcardCell; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseStatsStoreIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseStatsStore store; + + // base time: 2026-03-31T10:00:00Z (a Tuesday) + private static final Instant BASE = Instant.parse("2026-03-31T10:00:00Z"); + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + // Load DDL from classpath resources + String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + String statsDdl = new ClassPathResource("clickhouse/V4__stats_tables_and_mvs.sql") + .getContentAsString(StandardCharsets.UTF_8); + + jdbc.execute(executionsDdl); + jdbc.execute(processorsDdl); + + // Drop MVs first (they reference the stats tables), then recreate everything + jdbc.execute("DROP TABLE IF EXISTS stats_1m_all_mv"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_app_mv"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_route_mv"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_mv"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_detail_mv"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_all"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_app"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_route"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor"); + jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_detail"); + + // Strip SQL line comments first (they may contain semicolons), + // then split by ';' and execute non-empty statements. + String cleanedDdl = statsDdl.replaceAll("--[^\n]*", ""); + for (String stmt : cleanedDdl.split(";")) { + String trimmed = stmt.trim(); + if (!trimmed.isEmpty()) { + jdbc.execute(trimmed); + } + } + + // Truncate base tables + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + seedTestData(); + + // Try the failing query to capture it in query_log, then check + try { + jdbc.queryForMap( + "SELECT countMerge(total_count) AS tc, countIfMerge(failed_count) AS fc, " + + "sumMerge(duration_sum) / greatest(countMerge(total_count), 1) AS avg, " + + "quantileMerge(0.99)(p99_duration) AS p99, " + + "countIfMerge(running_count) AS rc " + + "FROM stats_1m_all WHERE tenant_id = 'default' " + + "AND bucket >= '2026-03-31 09:59:00' AND bucket < '2026-03-31 10:05:00'"); + } catch (Exception e) { + System.out.println("Expected error: " + e.getMessage().substring(0, 80)); + } + + jdbc.execute("SYSTEM FLUSH LOGS"); + // Get ALL recent queries to see what the driver sends + var queryLog = jdbc.queryForList( + "SELECT type, substring(query, 1, 200) AS q " + + "FROM system.query_log WHERE event_time > now() - 30 " + + "AND query NOT LIKE '%system.query_log%' AND query NOT LIKE '%FLUSH%' " + + "ORDER BY event_time DESC LIMIT 20"); + for (var entry : queryLog) { + System.out.println("LOG: " + entry.get("type") + " | " + entry.get("q")); + } + + store = new ClickHouseStatsStore(jdbc); + } + + private void seedTestData() { + // 10 executions across 2 apps, 2 routes, spanning 5 minutes + // app-1, route-a: 4 COMPLETED (200ms, 300ms, 400ms, 500ms) + insertExecution("exec-01", BASE.plusSeconds(0), "app-1", "route-a", "agent-1", + "COMPLETED", 200L, "", ""); + insertExecution("exec-02", BASE.plusSeconds(60), "app-1", "route-a", "agent-1", + "COMPLETED", 300L, "", ""); + insertExecution("exec-03", BASE.plusSeconds(120), "app-1", "route-a", "agent-1", + "COMPLETED", 400L, "", ""); + insertExecution("exec-04", BASE.plusSeconds(180), "app-1", "route-a", "agent-1", + "COMPLETED", 500L, "", ""); + + // app-1, route-a: 2 FAILED (100ms, 150ms) with error_type="NPE" + insertExecution("exec-05", BASE.plusSeconds(60), "app-1", "route-a", "agent-1", + "FAILED", 100L, "NPE", "null ref"); + insertExecution("exec-06", BASE.plusSeconds(120), "app-1", "route-a", "agent-1", + "FAILED", 150L, "NPE", "null ref"); + + // app-1, route-b: 2 COMPLETED (50ms, 60ms) + insertExecution("exec-07", BASE.plusSeconds(60), "app-1", "route-b", "agent-1", + "COMPLETED", 50L, "", ""); + insertExecution("exec-08", BASE.plusSeconds(120), "app-1", "route-b", "agent-1", + "COMPLETED", 60L, "", ""); + + // app-2, route-c: 1 COMPLETED (1000ms) + insertExecution("exec-09", BASE.plusSeconds(60), "app-2", "route-c", "agent-2", + "COMPLETED", 1000L, "", ""); + + // app-2, route-c: 1 RUNNING (null duration) + insertExecution("exec-10", BASE.plusSeconds(180), "app-2", "route-c", "agent-2", + "RUNNING", null, "", ""); + + // 5 processor records for processor stats testing + // app-1, route-a, processor_type="to": 3 COMPLETED + insertProcessor("exec-01", 1, "proc-to-1", "to", BASE.plusSeconds(0), + "app-1", "route-a", "COMPLETED", 50L); + insertProcessor("exec-02", 1, "proc-to-2", "to", BASE.plusSeconds(60), + "app-1", "route-a", "COMPLETED", 80L); + insertProcessor("exec-03", 1, "proc-to-3", "to", BASE.plusSeconds(120), + "app-1", "route-a", "COMPLETED", 90L); + + // app-1, route-a, processor_type="log": 2 COMPLETED + insertProcessor("exec-01", 2, "proc-log-1", "log", BASE.plusSeconds(1), + "app-1", "route-a", "COMPLETED", 10L); + insertProcessor("exec-02", 2, "proc-log-2", "log", BASE.plusSeconds(61), + "app-1", "route-a", "COMPLETED", 15L); + } + + private void insertExecution(String executionId, Instant startTime, String appName, + String routeId, String agentId, String status, + Long durationMs, String errorType, String errorMessage) { + jdbc.update( + "INSERT INTO executions (tenant_id, execution_id, start_time, route_id, " + + "agent_id, application_name, status, duration_ms, error_type, error_message) " + + "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?)", + executionId, Timestamp.from(startTime), routeId, agentId, appName, + status, durationMs, errorType, errorMessage); + } + + private void insertProcessor(String executionId, int seq, String processorId, + String processorType, Instant startTime, + String appName, String routeId, String status, + Long durationMs) { + jdbc.update( + "INSERT INTO processor_executions (tenant_id, execution_id, seq, processor_id, " + + "processor_type, start_time, route_id, application_name, status, duration_ms) " + + "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?)", + executionId, seq, processorId, processorType, Timestamp.from(startTime), + routeId, appName, status, durationMs); + } + + // ── Stats Tests ────────────────────────────────────────────────────── + + @Test + void stats_returnsCorrectGlobalTotals() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + ExecutionStats stats = store.stats(from, to); + + assertThat(stats.totalCount()).isEqualTo(10); + assertThat(stats.failedCount()).isEqualTo(2); + assertThat(stats.activeCount()).isEqualTo(1); + assertThat(stats.avgDurationMs()).isGreaterThan(0); + assertThat(stats.p99LatencyMs()).isGreaterThan(0); + } + + @Test + void statsForApp_filtersCorrectly() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + ExecutionStats app1 = store.statsForApp(from, to, "app-1"); + assertThat(app1.totalCount()).isEqualTo(8); + + ExecutionStats app2 = store.statsForApp(from, to, "app-2"); + assertThat(app2.totalCount()).isEqualTo(2); + } + + @Test + void statsForRoute_filtersCorrectly() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + ExecutionStats routeA = store.statsForRoute(from, to, "route-a", List.of()); + assertThat(routeA.totalCount()).isEqualTo(6); + } + + // ── Timeseries Tests ───────────────────────────────────────────────── + + @Test + void timeseries_returnsBuckets() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + StatsTimeseries ts = store.timeseries(from, to, 5); + + assertThat(ts.buckets()).isNotEmpty(); + long totalAcrossBuckets = ts.buckets().stream() + .mapToLong(StatsTimeseries.TimeseriesBucket::totalCount).sum(); + assertThat(totalAcrossBuckets).isEqualTo(10); + } + + @Test + void timeseriesForApp_filtersCorrectly() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + StatsTimeseries ts = store.timeseriesForApp(from, to, 5, "app-1"); + + long totalAcrossBuckets = ts.buckets().stream() + .mapToLong(StatsTimeseries.TimeseriesBucket::totalCount).sum(); + assertThat(totalAcrossBuckets).isEqualTo(8); + } + + @Test + void timeseriesGroupedByApp_returnsMap() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + Map grouped = store.timeseriesGroupedByApp(from, to, 5); + + assertThat(grouped).containsKeys("app-1", "app-2"); + } + + @Test + void timeseriesGroupedByRoute_returnsMap() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + Map grouped = store.timeseriesGroupedByRoute(from, to, 5, "app-1"); + + assertThat(grouped).containsKeys("route-a", "route-b"); + } + + // ── SLA Tests ──────────────────────────────────────────────────────── + + @Test + void slaCompliance_calculatesCorrectly() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + // threshold=250ms: among 9 non-RUNNING executions: + // compliant (<=250ms): exec-01(200), exec-05(100), exec-06(150), exec-07(50), exec-08(60) = 5 + // total non-running: 9 + // compliance = 5/9 * 100 ~ 55.56% + double sla = store.slaCompliance(from, to, 250, null, null); + assertThat(sla).isBetween(55.0, 56.0); + } + + // ── Top Errors Tests ───────────────────────────────────────────────── + + @Test + void topErrors_returnsRankedErrors() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + List errors = store.topErrors(from, to, null, null, 10); + + assertThat(errors).isNotEmpty(); + assertThat(errors.get(0).errorType()).isEqualTo("NPE"); + assertThat(errors.get(0).count()).isEqualTo(2); + } + + // ── Active Error Types Test ────────────────────────────────────────── + + @Test + void activeErrorTypes_countsDistinct() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + int count = store.activeErrorTypes(from, to, "app-1"); + + assertThat(count).isEqualTo(1); // only "NPE" + } + + // ── Punchcard Test ─────────────────────────────────────────────────── + + @Test + void punchcard_returnsWeekdayHourCells() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + List cells = store.punchcard(from, to, null); + + assertThat(cells).isNotEmpty(); + long totalCount = cells.stream().mapToLong(PunchcardCell::totalCount).sum(); + assertThat(totalCount).isEqualTo(10); + } + + @Test + void slaCountsByApp_returnsMap() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + // threshold=250ms + Map counts = store.slaCountsByApp(from, to, 250); + + assertThat(counts).containsKeys("app-1", "app-2"); + // app-1: 8 total executions, all non-RUNNING + // compliant (<=250ms): exec-01(200), exec-05(100), exec-06(150), exec-07(50), exec-08(60) = 5 + long[] app1 = counts.get("app-1"); + assertThat(app1[0]).isEqualTo(5); // compliant + assertThat(app1[1]).isEqualTo(8); // total non-running + // app-2: 1 COMPLETED(1000ms) + 1 RUNNING → 1 non-RUNNING, 0 compliant + long[] app2 = counts.get("app-2"); + assertThat(app2[0]).isEqualTo(0); // compliant + assertThat(app2[1]).isEqualTo(1); // total non-running + } + + @Test + void slaCountsByRoute_returnsMap() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + Map counts = store.slaCountsByRoute(from, to, "app-1", 250); + + assertThat(counts).containsKeys("route-a", "route-b"); + // route-a: exec-01(200)OK, exec-02(300)NO, exec-03(400)NO, exec-04(500)NO, + // exec-05(100)OK, exec-06(150)OK → 3 compliant, 6 total + long[] routeA = counts.get("route-a"); + assertThat(routeA[0]).isEqualTo(3); // compliant + assertThat(routeA[1]).isEqualTo(6); // total + // route-b: exec-07(50)OK, exec-08(60)OK → 2 compliant, 2 total + long[] routeB = counts.get("route-b"); + assertThat(routeB[0]).isEqualTo(2); + assertThat(routeB[1]).isEqualTo(2); + } + + // ── Processor Stats Test ───────────────────────────────────────────── + + @Test + void statsForProcessor_filtersCorrectly() { + Instant from = BASE.minusSeconds(60); + Instant to = BASE.plusSeconds(300); + + ExecutionStats toStats = store.statsForProcessor(from, to, "route-a", "to"); + assertThat(toStats.totalCount()).isEqualTo(3); + assertThat(toStats.activeCount()).isEqualTo(0); // processor stats have no running_count + + ExecutionStats logStats = store.statsForProcessor(from, to, "route-a", "log"); + assertThat(logStats.totalCount()).isEqualTo(2); + } +} From 9df00fdde0a333dc232e7d66c7ed6a3320cba64e Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 21:51:45 +0200 Subject: [PATCH 3/3] feat(clickhouse): wire ClickHouseStatsStore with cameleer.storage.stats feature flag (default: clickhouse) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../server/app/config/StorageBeanConfig.java | 11 +++++++++++ .../server/app/storage/PostgresStatsStore.java | 2 ++ .../src/main/resources/application.yml | 1 + deploy/base/server.yaml | 2 ++ 4 files changed, 16 insertions(+) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index ab733408..c0cf7c5d 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -2,6 +2,7 @@ package com.cameleer3.server.app.config; import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore; import com.cameleer3.server.app.storage.ClickHouseMetricsStore; +import com.cameleer3.server.app.storage.ClickHouseStatsStore; import com.cameleer3.server.app.storage.PostgresMetricsQueryStore; import com.cameleer3.server.app.storage.PostgresMetricsStore; import com.cameleer3.server.core.admin.AuditRepository; @@ -16,6 +17,7 @@ import com.cameleer3.server.core.ingestion.IngestionService; import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.storage.*; +import com.cameleer3.server.core.storage.StatsStore; import com.cameleer3.server.core.storage.model.MetricsSnapshot; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -118,4 +120,13 @@ public class StorageBeanConfig { @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseSearchIndex(clickHouseJdbc); } + + // ── ClickHouse Stats Store ───────────────────────────────────────── + + @Bean + @ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "clickhouse", matchIfMissing = true) + public StatsStore clickHouseStatsStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseStatsStore(clickHouseJdbc); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java index 563c5893..638b2da5 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java @@ -5,6 +5,7 @@ import com.cameleer3.server.core.search.StatsTimeseries; import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket; import com.cameleer3.server.core.search.TopError; import com.cameleer3.server.core.storage.StatsStore; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; @@ -18,6 +19,7 @@ import java.util.List; import java.util.Map; @Repository +@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "postgres") public class PostgresStatsStore implements StatsStore { private final JdbcTemplate jdbc; diff --git a/cameleer3-server-app/src/main/resources/application.yml b/cameleer3-server-app/src/main/resources/application.yml index 55cd27e1..fb33265c 100644 --- a/cameleer3-server-app/src/main/resources/application.yml +++ b/cameleer3-server-app/src/main/resources/application.yml @@ -51,6 +51,7 @@ cameleer: storage: metrics: ${CAMELEER_STORAGE_METRICS:postgres} search: ${CAMELEER_STORAGE_SEARCH:opensearch} + stats: ${CAMELEER_STORAGE_STATS:clickhouse} security: access-token-expiry-ms: 3600000 diff --git a/deploy/base/server.yaml b/deploy/base/server.yaml index 06c131a3..0f76b3fb 100644 --- a/deploy/base/server.yaml +++ b/deploy/base/server.yaml @@ -93,6 +93,8 @@ spec: value: "postgres" - name: CAMELEER_STORAGE_SEARCH value: "opensearch" + - name: CAMELEER_STORAGE_STATS + value: "clickhouse" resources: requests: