diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java index 7ab202cd..5d5dda8e 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java @@ -342,8 +342,8 @@ public class AgentRegistrationController { // that strip AggregateFunction column types, breaking -Merge combinators jdbc.query( "SELECT application_id, " + - "countMerge(total_count) AS total, " + - "countIfMerge(failed_count) AS failed, " + + "uniqMerge(total_count) AS total, " + + "uniqIfMerge(failed_count) AS failed, " + "COUNT(DISTINCT route_id) AS active_routes " + "FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) + " GROUP BY application_id", diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/CatalogController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/CatalogController.java index cf77a7e0..09b4a134 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/CatalogController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/CatalogController.java @@ -133,7 +133,7 @@ public class CatalogController { String envFilter = (environment != null && !environment.isBlank()) ? " AND environment = " + lit(environment) : ""; jdbc.query( - "SELECT application_id, route_id, countMerge(total_count) AS cnt, MAX(bucket) AS last_seen " + + "SELECT application_id, route_id, uniqMerge(total_count) AS cnt, MAX(bucket) AS last_seen " + "FROM stats_1m_route WHERE bucket >= " + lit(rangeFrom) + " AND bucket < " + lit(rangeTo) + envFilter + " GROUP BY application_id, route_id", rs -> { diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteCatalogController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteCatalogController.java index 06157cf3..52cc8c14 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteCatalogController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteCatalogController.java @@ -98,7 +98,7 @@ public class RouteCatalogController { String envFilter = (environment != null && !environment.isBlank()) ? " AND environment = " + lit(environment) : ""; jdbc.query( - "SELECT application_id, route_id, countMerge(total_count) AS cnt, MAX(bucket) AS last_seen " + + "SELECT application_id, route_id, uniqMerge(total_count) AS cnt, MAX(bucket) AS last_seen " + "FROM stats_1m_route WHERE bucket >= " + lit(rangeFrom) + " AND bucket < " + lit(rangeTo) + envFilter + " GROUP BY application_id, route_id", diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteMetricsController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteMetricsController.java index 76d42fc4..e4aa23b8 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteMetricsController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteMetricsController.java @@ -57,9 +57,9 @@ public class RouteMetricsController { // that strip AggregateFunction column types, breaking -Merge combinators var sql = new StringBuilder( "SELECT application_id, route_id, " + - "countMerge(total_count) AS total, " + - "countIfMerge(failed_count) AS failed, " + - "CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_dur, " + + "uniqMerge(total_count) AS total, " + + "uniqIfMerge(failed_count) AS failed, " + + "CASE WHEN uniqMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / uniqMerge(total_count) ELSE 0 END AS avg_dur, " + "COALESCE(quantileMerge(0.99)(p99_duration), 0) AS p99_dur " + "FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant)); @@ -102,7 +102,7 @@ public class RouteMetricsController { sparkWhere.append(" AND environment = " + lit(environment)); } String sparkSql = "SELECT toStartOfInterval(bucket, toIntervalSecond(" + bucketSeconds + ")) AS period, " + - "COALESCE(countMerge(total_count), 0) AS cnt " + + "COALESCE(uniqMerge(total_count), 0) AS cnt " + sparkWhere + " GROUP BY period ORDER BY period"; List sparkline = jdbc.query(sparkSql, (rs, rowNum) -> rs.getDouble("cnt")); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java index ebe5ca3b..25cb7133 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java @@ -306,8 +306,8 @@ public class ClickHouseStatsStore implements StatsStore { String view = applicationId != null ? "stats_1m_app" : "stats_1m_all"; String sql = "SELECT toDayOfWeek(bucket, 1) % 7 AS weekday, " + "toHour(bucket) AS hour, " + - "countMerge(total_count) AS total_count, " + - "countIfMerge(failed_count) AS failed_count " + + "uniqMerge(total_count) AS total_count, " + + "uniqIfMerge(failed_count) AS failed_count " + "FROM " + view + " WHERE tenant_id = " + lit(tenantId) + " AND bucket >= " + lit(from) + @@ -356,10 +356,10 @@ public class ClickHouseStatsStore implements StatsStore { */ private String buildStatsSql(String view, Instant rangeFrom, Instant rangeTo, List filters, boolean hasRunning, String environment) { - String runningCol = hasRunning ? "countIfMerge(running_count)" : "0"; + String runningCol = hasRunning ? "uniqIfMerge(running_count)" : "0"; String sql = "SELECT " + - "countMerge(total_count) AS total_count, " + - "countIfMerge(failed_count) AS failed_count, " + + "uniqMerge(total_count) AS total_count, " + + "uniqIfMerge(failed_count) AS failed_count, " + "sumMerge(duration_sum) AS duration_sum, " + "quantileMerge(0.99)(p99_duration) AS p99_duration, " + runningCol + " AS active_count " + @@ -443,12 +443,12 @@ public class ClickHouseStatsStore implements StatsStore { long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1); if (intervalSeconds < 60) intervalSeconds = 60; - String runningCol = hasRunningCount ? "countIfMerge(running_count)" : "0"; + String runningCol = hasRunningCount ? "uniqIfMerge(running_count)" : "0"; String sql = "SELECT " + "toStartOfInterval(bucket, INTERVAL " + intervalSeconds + " SECOND) AS period, " + - "countMerge(total_count) AS total_count, " + - "countIfMerge(failed_count) AS failed_count, " + + "uniqMerge(total_count) AS total_count, " + + "uniqIfMerge(failed_count) AS failed_count, " + "sumMerge(duration_sum) AS duration_sum, " + "quantileMerge(0.99)(p99_duration) AS p99_duration, " + runningCol + " AS active_count " + @@ -490,11 +490,11 @@ public class ClickHouseStatsStore implements StatsStore { String sql = "SELECT " + "toStartOfInterval(bucket, INTERVAL " + intervalSeconds + " SECOND) AS period, " + groupCol + " AS group_key, " + - "countMerge(total_count) AS total_count, " + - "countIfMerge(failed_count) AS failed_count, " + + "uniqMerge(total_count) AS total_count, " + + "uniqIfMerge(failed_count) AS failed_count, " + "sumMerge(duration_sum) AS duration_sum, " + "quantileMerge(0.99)(p99_duration) AS p99_duration, " + - "countIfMerge(running_count) AS active_count " + + "uniqIfMerge(running_count) AS active_count " + "FROM " + view + " WHERE tenant_id = " + lit(tenantId) + " AND bucket >= " + lit(from) + diff --git a/cameleer3-server-app/src/main/resources/clickhouse/init.sql b/cameleer3-server-app/src/main/resources/clickhouse/init.sql index e5e83955..8b980ce3 100644 --- a/cameleer3-server-app/src/main/resources/clickhouse/init.sql +++ b/cameleer3-server-app/src/main/resources/clickhouse/init.sql @@ -126,14 +126,19 @@ SETTINGS index_granularity = 8192; -- ── Stats: Materialized Views + AggregatingMergeTree ──────────────────── -- stats_1m_all (global) +-- All stats tables use uniq(execution_id) to deduplicate chunk retries +-- DROP + CREATE + backfill rebuilds from raw data on startup + +DROP VIEW IF EXISTS stats_1m_all_mv; +DROP TABLE IF EXISTS stats_1m_all; CREATE TABLE IF NOT EXISTS stats_1m_all ( tenant_id LowCardinality(String), bucket DateTime, environment LowCardinality(String) DEFAULT 'default', - total_count AggregateFunction(count), - failed_count AggregateFunction(countIf, UInt8), - running_count AggregateFunction(countIf, UInt8), + total_count AggregateFunction(uniq, String), + failed_count AggregateFunction(uniqIf, String, UInt8), + running_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) @@ -143,14 +148,28 @@ PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment) TTL bucket + INTERVAL 365 DAY DELETE; +INSERT INTO stats_1m_all +SELECT + tenant_id, + toStartOfMinute(start_time) AS bucket, + environment, + uniqState(execution_id) AS total_count, + uniqIfState(execution_id, status = 'FAILED') AS failed_count, + uniqIfState(execution_id, status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, bucket, environment; + CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS SELECT tenant_id, toStartOfMinute(start_time) AS bucket, environment, - countState() AS total_count, - countIfState(status = 'FAILED') AS failed_count, - countIfState(status = 'RUNNING') AS running_count, + uniqState(execution_id) AS total_count, + uniqIfState(execution_id, status = 'FAILED') AS failed_count, + uniqIfState(execution_id, status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration @@ -159,14 +178,17 @@ GROUP BY tenant_id, bucket, environment; -- stats_1m_app (per-application) +DROP VIEW IF EXISTS stats_1m_app_mv; +DROP TABLE IF EXISTS stats_1m_app; + CREATE TABLE IF NOT EXISTS stats_1m_app ( tenant_id LowCardinality(String), application_id LowCardinality(String), bucket DateTime, environment LowCardinality(String) DEFAULT 'default', - total_count AggregateFunction(count), - failed_count AggregateFunction(countIf, UInt8), - running_count AggregateFunction(countIf, UInt8), + total_count AggregateFunction(uniq, String), + failed_count AggregateFunction(uniqIf, String, UInt8), + running_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) @@ -176,15 +198,30 @@ PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id) TTL bucket + INTERVAL 365 DAY DELETE; +INSERT INTO stats_1m_app +SELECT + tenant_id, + application_id, + toStartOfMinute(start_time) AS bucket, + environment, + uniqState(execution_id) AS total_count, + uniqIfState(execution_id, status = 'FAILED') AS failed_count, + uniqIfState(execution_id, status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_id, bucket, environment; + CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS SELECT tenant_id, application_id, toStartOfMinute(start_time) AS bucket, environment, - countState() AS total_count, - countIfState(status = 'FAILED') AS failed_count, - countIfState(status = 'RUNNING') AS running_count, + uniqState(execution_id) AS total_count, + uniqIfState(execution_id, status = 'FAILED') AS failed_count, + uniqIfState(execution_id, status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration @@ -193,15 +230,18 @@ GROUP BY tenant_id, application_id, bucket, environment; -- stats_1m_route (per-route) +DROP VIEW IF EXISTS stats_1m_route_mv; +DROP TABLE IF EXISTS stats_1m_route; + CREATE TABLE IF NOT EXISTS stats_1m_route ( tenant_id LowCardinality(String), application_id LowCardinality(String), route_id LowCardinality(String), bucket DateTime, environment LowCardinality(String) DEFAULT 'default', - total_count AggregateFunction(count), - failed_count AggregateFunction(countIf, UInt8), - running_count AggregateFunction(countIf, UInt8), + total_count AggregateFunction(uniq, String), + failed_count AggregateFunction(uniqIf, String, UInt8), + running_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) @@ -211,6 +251,22 @@ PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id, route_id) TTL bucket + INTERVAL 365 DAY DELETE; +INSERT INTO stats_1m_route +SELECT + tenant_id, + application_id, + route_id, + toStartOfMinute(start_time) AS bucket, + environment, + uniqState(execution_id) AS total_count, + uniqIfState(execution_id, status = 'FAILED') AS failed_count, + uniqIfState(execution_id, status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_id, route_id, bucket, environment; + CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS SELECT tenant_id, @@ -218,9 +274,9 @@ SELECT route_id, toStartOfMinute(start_time) AS bucket, environment, - countState() AS total_count, - countIfState(status = 'FAILED') AS failed_count, - countIfState(status = 'RUNNING') AS running_count, + uniqState(execution_id) AS total_count, + uniqIfState(execution_id, status = 'FAILED') AS failed_count, + uniqIfState(execution_id, status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration @@ -258,8 +314,8 @@ SELECT processor_type, toStartOfMinute(start_time) AS bucket, environment, - uniqState(execution_id) AS total_count, - uniqIfState(execution_id, status = 'FAILED') AS failed_count, + uniqState(concat(execution_id, toString(seq))) AS total_count, + uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration @@ -273,8 +329,8 @@ SELECT processor_type, toStartOfMinute(start_time) AS bucket, environment, - uniqState(execution_id) AS total_count, - uniqIfState(execution_id, status = 'FAILED') AS failed_count, + uniqState(concat(execution_id, toString(seq))) AS total_count, + uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration @@ -314,8 +370,8 @@ SELECT processor_type, toStartOfMinute(start_time) AS bucket, environment, - uniqState(execution_id) AS total_count, - uniqIfState(execution_id, status = 'FAILED') AS failed_count, + uniqState(concat(execution_id, toString(seq))) AS total_count, + uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration @@ -331,8 +387,8 @@ SELECT processor_type, toStartOfMinute(start_time) AS bucket, environment, - uniqState(execution_id) AS total_count, - uniqIfState(execution_id, status = 'FAILED') AS failed_count, + uniqState(concat(execution_id, toString(seq))) AS total_count, + uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration