feat(clickhouse): add stats materialized views DDL (5 tables + 5 MVs)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,165 @@
|
||||
-- V4__stats_tables_and_mvs.sql
|
||||
-- Materialized views replacing TimescaleDB continuous aggregates.
|
||||
-- Tables use AggregatingMergeTree; MVs use -State combinators.
|
||||
|
||||
-- stats_1m_all (global)
|
||||
|
||||
CREATE TABLE IF NOT EXISTS stats_1m_all (
|
||||
tenant_id LowCardinality(String),
|
||||
bucket DateTime,
|
||||
total_count AggregateFunction(count, UInt64),
|
||||
failed_count AggregateFunction(countIf, UInt64, UInt8),
|
||||
running_count AggregateFunction(countIf, UInt64, UInt8),
|
||||
duration_sum AggregateFunction(sum, Nullable(Int64)),
|
||||
duration_max AggregateFunction(max, Nullable(Int64)),
|
||||
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
|
||||
)
|
||||
ENGINE = AggregatingMergeTree()
|
||||
PARTITION BY (tenant_id, toYYYYMM(bucket))
|
||||
ORDER BY (tenant_id, bucket)
|
||||
TTL bucket + INTERVAL 365 DAY DELETE;
|
||||
|
||||
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS
|
||||
SELECT
|
||||
tenant_id,
|
||||
toStartOfMinute(start_time) AS bucket,
|
||||
countState() AS total_count,
|
||||
countIfState(status = 'FAILED') AS failed_count,
|
||||
countIfState(status = 'RUNNING') AS running_count,
|
||||
sumState(duration_ms) AS duration_sum,
|
||||
maxState(duration_ms) AS duration_max,
|
||||
quantileState(0.99)(duration_ms) AS p99_duration
|
||||
FROM executions
|
||||
GROUP BY tenant_id, bucket;
|
||||
|
||||
-- stats_1m_app (per-application)
|
||||
|
||||
CREATE TABLE IF NOT EXISTS stats_1m_app (
|
||||
tenant_id LowCardinality(String),
|
||||
application_name LowCardinality(String),
|
||||
bucket DateTime,
|
||||
total_count AggregateFunction(count, UInt64),
|
||||
failed_count AggregateFunction(countIf, UInt64, UInt8),
|
||||
running_count AggregateFunction(countIf, UInt64, UInt8),
|
||||
duration_sum AggregateFunction(sum, Nullable(Int64)),
|
||||
duration_max AggregateFunction(max, Nullable(Int64)),
|
||||
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
|
||||
)
|
||||
ENGINE = AggregatingMergeTree()
|
||||
PARTITION BY (tenant_id, toYYYYMM(bucket))
|
||||
ORDER BY (tenant_id, application_name, bucket)
|
||||
TTL bucket + INTERVAL 365 DAY DELETE;
|
||||
|
||||
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS
|
||||
SELECT
|
||||
tenant_id,
|
||||
application_name,
|
||||
toStartOfMinute(start_time) AS bucket,
|
||||
countState() AS total_count,
|
||||
countIfState(status = 'FAILED') AS failed_count,
|
||||
countIfState(status = 'RUNNING') AS running_count,
|
||||
sumState(duration_ms) AS duration_sum,
|
||||
maxState(duration_ms) AS duration_max,
|
||||
quantileState(0.99)(duration_ms) AS p99_duration
|
||||
FROM executions
|
||||
GROUP BY tenant_id, application_name, bucket;
|
||||
|
||||
-- stats_1m_route (per-route)
|
||||
|
||||
CREATE TABLE IF NOT EXISTS stats_1m_route (
|
||||
tenant_id LowCardinality(String),
|
||||
application_name LowCardinality(String),
|
||||
route_id LowCardinality(String),
|
||||
bucket DateTime,
|
||||
total_count AggregateFunction(count, UInt64),
|
||||
failed_count AggregateFunction(countIf, UInt64, UInt8),
|
||||
running_count AggregateFunction(countIf, UInt64, UInt8),
|
||||
duration_sum AggregateFunction(sum, Nullable(Int64)),
|
||||
duration_max AggregateFunction(max, Nullable(Int64)),
|
||||
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
|
||||
)
|
||||
ENGINE = AggregatingMergeTree()
|
||||
PARTITION BY (tenant_id, toYYYYMM(bucket))
|
||||
ORDER BY (tenant_id, application_name, route_id, bucket)
|
||||
TTL bucket + INTERVAL 365 DAY DELETE;
|
||||
|
||||
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS
|
||||
SELECT
|
||||
tenant_id,
|
||||
application_name,
|
||||
route_id,
|
||||
toStartOfMinute(start_time) AS bucket,
|
||||
countState() AS total_count,
|
||||
countIfState(status = 'FAILED') AS failed_count,
|
||||
countIfState(status = 'RUNNING') AS running_count,
|
||||
sumState(duration_ms) AS duration_sum,
|
||||
maxState(duration_ms) AS duration_max,
|
||||
quantileState(0.99)(duration_ms) AS p99_duration
|
||||
FROM executions
|
||||
GROUP BY tenant_id, application_name, route_id, bucket;
|
||||
|
||||
-- stats_1m_processor (per-processor-type)
|
||||
|
||||
CREATE TABLE IF NOT EXISTS stats_1m_processor (
|
||||
tenant_id LowCardinality(String),
|
||||
application_name LowCardinality(String),
|
||||
processor_type LowCardinality(String),
|
||||
bucket DateTime,
|
||||
total_count AggregateFunction(count, UInt64),
|
||||
failed_count AggregateFunction(countIf, UInt64, UInt8),
|
||||
duration_sum AggregateFunction(sum, Nullable(Int64)),
|
||||
duration_max AggregateFunction(max, Nullable(Int64)),
|
||||
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
|
||||
)
|
||||
ENGINE = AggregatingMergeTree()
|
||||
PARTITION BY (tenant_id, toYYYYMM(bucket))
|
||||
ORDER BY (tenant_id, application_name, processor_type, bucket)
|
||||
TTL bucket + INTERVAL 365 DAY DELETE;
|
||||
|
||||
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS
|
||||
SELECT
|
||||
tenant_id,
|
||||
application_name,
|
||||
processor_type,
|
||||
toStartOfMinute(start_time) AS bucket,
|
||||
countState() AS total_count,
|
||||
countIfState(status = 'FAILED') AS failed_count,
|
||||
sumState(duration_ms) AS duration_sum,
|
||||
maxState(duration_ms) AS duration_max,
|
||||
quantileState(0.99)(duration_ms) AS p99_duration
|
||||
FROM processor_executions
|
||||
GROUP BY tenant_id, application_name, processor_type, bucket;
|
||||
|
||||
-- stats_1m_processor_detail (per-processor-id)
|
||||
|
||||
CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
|
||||
tenant_id LowCardinality(String),
|
||||
application_name LowCardinality(String),
|
||||
route_id LowCardinality(String),
|
||||
processor_id String,
|
||||
bucket DateTime,
|
||||
total_count AggregateFunction(count, UInt64),
|
||||
failed_count AggregateFunction(countIf, UInt64, UInt8),
|
||||
duration_sum AggregateFunction(sum, Nullable(Int64)),
|
||||
duration_max AggregateFunction(max, Nullable(Int64)),
|
||||
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
|
||||
)
|
||||
ENGINE = AggregatingMergeTree()
|
||||
PARTITION BY (tenant_id, toYYYYMM(bucket))
|
||||
ORDER BY (tenant_id, application_name, route_id, processor_id, bucket)
|
||||
TTL bucket + INTERVAL 365 DAY DELETE;
|
||||
|
||||
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS
|
||||
SELECT
|
||||
tenant_id,
|
||||
application_name,
|
||||
route_id,
|
||||
processor_id,
|
||||
toStartOfMinute(start_time) AS bucket,
|
||||
countState() AS total_count,
|
||||
countIfState(status = 'FAILED') AS failed_count,
|
||||
sumState(duration_ms) AS duration_sum,
|
||||
maxState(duration_ms) AS duration_max,
|
||||
quantileState(0.99)(duration_ms) AS p99_duration
|
||||
FROM processor_executions
|
||||
GROUP BY tenant_id, application_name, route_id, processor_id, bucket;
|
||||
Reference in New Issue
Block a user