fix: deduplicate processor stats using uniq(execution_id)
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m17s
CI / docker (push) Successful in 1m10s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 38s

Processor execution counts were inflated by duplicate inserts into the
plain MergeTree processor_executions table (chunk retries, reconnects).
Replace count()/countIf() with uniq(execution_id)/uniqIf() in both
stats_1m_processor and stats_1m_processor_detail MVs so each exchange
is counted once per processor regardless of duplicates.

Tables are dropped and rebuilt from raw data on startup. MV created
after backfill to avoid double-counting.

Also adds stats_1m_processor_detail to the catalog purge list (was
missing).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-12 23:12:00 +02:00
parent 27f2503640
commit e2f784bf82
3 changed files with 55 additions and 13 deletions

View File

@@ -228,6 +228,11 @@ FROM executions
GROUP BY tenant_id, application_id, route_id, bucket, environment;
-- stats_1m_processor (per-processor-type)
-- Migration: count() double-counted duplicate inserts; replaced with uniq(execution_id).
-- DROP + CREATE ensures schema migration; backfill rebuilds from raw data on startup.
DROP VIEW IF EXISTS stats_1m_processor_mv;
DROP TABLE IF EXISTS stats_1m_processor;
CREATE TABLE IF NOT EXISTS stats_1m_processor (
tenant_id LowCardinality(String),
@@ -235,8 +240,8 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor (
processor_type LowCardinality(String),
bucket DateTime,
environment LowCardinality(String) DEFAULT 'default',
total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8),
total_count AggregateFunction(uniq, String),
failed_count AggregateFunction(uniqIf, String, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
@@ -246,6 +251,21 @@ PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket, environment, application_id, processor_type)
TTL bucket + INTERVAL 365 DAY DELETE;
INSERT INTO stats_1m_processor
SELECT
tenant_id,
application_id,
processor_type,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_id, processor_type, bucket, environment;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS
SELECT
tenant_id,
@@ -253,8 +273,8 @@ SELECT
processor_type,
toStartOfMinute(start_time) AS bucket,
environment,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
@@ -263,6 +283,9 @@ GROUP BY tenant_id, application_id, processor_type, bucket, environment;
-- stats_1m_processor_detail (per-processor-id)
DROP VIEW IF EXISTS stats_1m_processor_detail_mv;
DROP TABLE IF EXISTS stats_1m_processor_detail;
CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
tenant_id LowCardinality(String),
application_id LowCardinality(String),
@@ -271,8 +294,8 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
processor_type LowCardinality(String),
bucket DateTime,
environment LowCardinality(String) DEFAULT 'default',
total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8),
total_count AggregateFunction(uniq, String),
failed_count AggregateFunction(uniqIf, String, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
@@ -282,6 +305,23 @@ PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket, environment, application_id, route_id, processor_id, processor_type)
TTL bucket + INTERVAL 365 DAY DELETE;
INSERT INTO stats_1m_processor_detail
SELECT
tenant_id,
application_id,
route_id,
processor_id,
processor_type,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket, environment;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS
SELECT
tenant_id,
@@ -291,8 +331,8 @@ SELECT
processor_type,
toStartOfMinute(start_time) AS bucket,
environment,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration