diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/CatalogController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/CatalogController.java index 3a870474..cf77a7e0 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/CatalogController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/CatalogController.java @@ -347,7 +347,8 @@ public class CatalogController { private void deleteClickHouseData(String tenantId, String applicationId) { String[] tablesWithAppId = { "executions", "processor_executions", "route_diagrams", "agent_events", - "stats_1m_app", "stats_1m_route", "stats_1m_processor_type", "stats_1m_processor" + "stats_1m_app", "stats_1m_route", "stats_1m_processor_type", "stats_1m_processor", + "stats_1m_processor_detail" }; for (String table : tablesWithAppId) { try { diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteMetricsController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteMetricsController.java index 6b64aa27..76d42fc4 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteMetricsController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteMetricsController.java @@ -155,13 +155,14 @@ public class RouteMetricsController { // Literal SQL for AggregatingMergeTree -Merge combinators. // Aliases (tc, fc) must NOT shadow column names (total_count, failed_count) — - // ClickHouse 24.12 new analyzer resolves subsequent countMerge(total_count) + // ClickHouse 24.12 new analyzer resolves subsequent uniqMerge(total_count) // to the alias (UInt64) instead of the AggregateFunction column. + // total_count/failed_count use uniq(execution_id) to deduplicate repeated inserts. var sql = new StringBuilder( "SELECT processor_id, processor_type, route_id, application_id, " + - "countMerge(total_count) AS tc, " + - "countIfMerge(failed_count) AS fc, " + - "CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_duration_ms, " + + "uniqMerge(total_count) AS tc, " + + "uniqIfMerge(failed_count) AS fc, " + + "CASE WHEN uniqMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / uniqMerge(total_count) ELSE 0 END AS avg_duration_ms, " + "quantileMerge(0.99)(p99_duration) AS p99_duration_ms " + "FROM stats_1m_processor_detail " + "WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) + diff --git a/cameleer3-server-app/src/main/resources/clickhouse/init.sql b/cameleer3-server-app/src/main/resources/clickhouse/init.sql index 735d1526..9dc9d885 100644 --- a/cameleer3-server-app/src/main/resources/clickhouse/init.sql +++ b/cameleer3-server-app/src/main/resources/clickhouse/init.sql @@ -228,6 +228,11 @@ FROM executions GROUP BY tenant_id, application_id, route_id, bucket, environment; -- stats_1m_processor (per-processor-type) +-- Migration: count() double-counted duplicate inserts; replaced with uniq(execution_id). +-- DROP + CREATE ensures schema migration; backfill rebuilds from raw data on startup. + +DROP VIEW IF EXISTS stats_1m_processor_mv; +DROP TABLE IF EXISTS stats_1m_processor; CREATE TABLE IF NOT EXISTS stats_1m_processor ( tenant_id LowCardinality(String), @@ -235,8 +240,8 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor ( processor_type LowCardinality(String), bucket DateTime, environment LowCardinality(String) DEFAULT 'default', - total_count AggregateFunction(count), - failed_count AggregateFunction(countIf, UInt8), + total_count AggregateFunction(uniq, String), + failed_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) @@ -246,6 +251,21 @@ PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id, processor_type) TTL bucket + INTERVAL 365 DAY DELETE; +INSERT INTO stats_1m_processor +SELECT + tenant_id, + application_id, + processor_type, + toStartOfMinute(start_time) AS bucket, + environment, + uniqState(execution_id) AS total_count, + uniqIfState(execution_id, status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_id, processor_type, bucket, environment; + CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS SELECT tenant_id, @@ -253,8 +273,8 @@ SELECT processor_type, toStartOfMinute(start_time) AS bucket, environment, - countState() AS total_count, - countIfState(status = 'FAILED') AS failed_count, + uniqState(execution_id) AS total_count, + uniqIfState(execution_id, status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration @@ -263,6 +283,9 @@ GROUP BY tenant_id, application_id, processor_type, bucket, environment; -- stats_1m_processor_detail (per-processor-id) +DROP VIEW IF EXISTS stats_1m_processor_detail_mv; +DROP TABLE IF EXISTS stats_1m_processor_detail; + CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( tenant_id LowCardinality(String), application_id LowCardinality(String), @@ -271,8 +294,8 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( processor_type LowCardinality(String), bucket DateTime, environment LowCardinality(String) DEFAULT 'default', - total_count AggregateFunction(count), - failed_count AggregateFunction(countIf, UInt8), + total_count AggregateFunction(uniq, String), + failed_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) @@ -282,6 +305,23 @@ PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id, route_id, processor_id, processor_type) TTL bucket + INTERVAL 365 DAY DELETE; +INSERT INTO stats_1m_processor_detail +SELECT + tenant_id, + application_id, + route_id, + processor_id, + processor_type, + toStartOfMinute(start_time) AS bucket, + environment, + uniqState(execution_id) AS total_count, + uniqIfState(execution_id, status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket, environment; + CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS SELECT tenant_id, @@ -291,8 +331,8 @@ SELECT processor_type, toStartOfMinute(start_time) AS bucket, environment, - countState() AS total_count, - countIfState(status = 'FAILED') AS failed_count, + uniqState(execution_id) AS total_count, + uniqIfState(execution_id, status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration