From eb0d26814f4e3e3e863dca698ea7ce103b09f00e Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 20:11:38 +0200 Subject: [PATCH] feat(clickhouse): add stats materialized views DDL (5 tables + 5 MVs) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../clickhouse/V4__stats_tables_and_mvs.sql | 165 ++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql new file mode 100644 index 00000000..fb87c2dc --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql @@ -0,0 +1,165 @@ +-- V4__stats_tables_and_mvs.sql +-- Materialized views replacing TimescaleDB continuous aggregates. +-- Tables use AggregatingMergeTree; MVs use -State combinators. + +-- stats_1m_all (global) + +CREATE TABLE IF NOT EXISTS stats_1m_all ( + tenant_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS +SELECT + tenant_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, bucket; + +-- stats_1m_app (per-application) + +CREATE TABLE IF NOT EXISTS stats_1m_app ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS +SELECT + tenant_id, + application_name, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_name, bucket; + +-- stats_1m_route (per-route) + +CREATE TABLE IF NOT EXISTS stats_1m_route ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + route_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, route_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS +SELECT + tenant_id, + application_name, + route_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_name, route_id, bucket; + +-- stats_1m_processor (per-processor-type) + +CREATE TABLE IF NOT EXISTS stats_1m_processor ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + processor_type LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, processor_type, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS +SELECT + tenant_id, + application_name, + processor_type, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_name, processor_type, bucket; + +-- stats_1m_processor_detail (per-processor-id) + +CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + route_id LowCardinality(String), + processor_id String, + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, route_id, processor_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS +SELECT + tenant_id, + application_name, + route_id, + processor_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_name, route_id, processor_id, bucket;