From 3f2392b8f71cc44a842557c37f57d81c72acbd21 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Mon, 13 Apr 2026 11:49:53 +0200 Subject: [PATCH] refactor: consolidate ClickHouse init.sql as clean idempotent schema Rewrite init.sql as a pure CREATE IF NOT EXISTS file with no DROP or INSERT statements. Safe for repeated runs on every startup without corrupting aggregated stats data. Old deployments with count()-based stats tables are migrated automatically: ClickHouseSchemaInitializer checks system.columns for the old AggregateFunction(count) type and drops those tables before init.sql recreates them with the correct uniq() schema. This runs once per table and is a no-op on fresh installs or already-migrated deployments. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../config/ClickHouseSchemaInitializer.java | 34 ++++++ .../src/main/resources/clickhouse/init.sql | 102 +----------------- 2 files changed, 38 insertions(+), 98 deletions(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java index 85171872..f666de39 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java @@ -27,6 +27,8 @@ public class ClickHouseSchemaInitializer { @EventListener(ApplicationReadyEvent.class) public void initializeSchema() { try { + migrateStatsTablesIfNeeded(); + PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); Resource script = resolver.getResource("classpath:clickhouse/init.sql"); @@ -50,4 +52,36 @@ public class ClickHouseSchemaInitializer { log.error("ClickHouse schema initialization failed — server will continue but ClickHouse features may not work", e); } } + + /** + * One-time migration: stats tables originally used count()/countIf() which + * double-counted chunk retry duplicates. The new schema uses uniq()/uniqIf(). + * Since AggregateFunction column types cannot be ALTERed, we must drop and + * let init.sql recreate them. MVs that depend on the tables are dropped first. + * Data rebuilds organically from new inserts via the materialized views. + */ + private void migrateStatsTablesIfNeeded() { + String[] statsTables = { + "stats_1m_all", "stats_1m_app", "stats_1m_route", + "stats_1m_processor", "stats_1m_processor_detail" + }; + + for (String table : statsTables) { + try { + Integer oldSchema = clickHouseJdbc.queryForObject( + "SELECT count() FROM system.columns " + + "WHERE database = currentDatabase() AND table = '" + table + "' " + + "AND name = 'total_count' AND type = 'AggregateFunction(count)'", + Integer.class); + + if (oldSchema != null && oldSchema > 0) { + log.info("Migrating stats table '{}': dropping old count()-based schema", table); + clickHouseJdbc.execute("DROP VIEW IF EXISTS " + table + "_mv"); + clickHouseJdbc.execute("DROP TABLE IF EXISTS " + table); + } + } catch (Exception e) { + log.warn("Could not check migration status for '{}': {}", table, e.getMessage()); + } + } + } } diff --git a/cameleer3-server-app/src/main/resources/clickhouse/init.sql b/cameleer3-server-app/src/main/resources/clickhouse/init.sql index 8b980ce3..26ed083b 100644 --- a/cameleer3-server-app/src/main/resources/clickhouse/init.sql +++ b/cameleer3-server-app/src/main/resources/clickhouse/init.sql @@ -1,5 +1,6 @@ -- ClickHouse schema initialization (single file, idempotent) --- All tables use IF NOT EXISTS for safe re-execution on every startup. +-- All statements use IF NOT EXISTS / IF EXISTS for safe re-execution on every startup. +-- No DROP or INSERT statements -- this file is safe for repeated runs. -- ── Agent Metrics ─────────────────────────────────────────────────────── @@ -124,13 +125,10 @@ TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- ── Stats: Materialized Views + AggregatingMergeTree ──────────────────── +-- Counts use uniq(execution_id) to deduplicate chunk retries. +-- Processor counts use uniq(concat(execution_id, seq)) to also preserve loop iterations. -- stats_1m_all (global) --- All stats tables use uniq(execution_id) to deduplicate chunk retries --- DROP + CREATE + backfill rebuilds from raw data on startup - -DROP VIEW IF EXISTS stats_1m_all_mv; -DROP TABLE IF EXISTS stats_1m_all; CREATE TABLE IF NOT EXISTS stats_1m_all ( tenant_id LowCardinality(String), @@ -148,20 +146,6 @@ PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment) TTL bucket + INTERVAL 365 DAY DELETE; -INSERT INTO stats_1m_all -SELECT - tenant_id, - toStartOfMinute(start_time) AS bucket, - environment, - uniqState(execution_id) AS total_count, - uniqIfState(execution_id, status = 'FAILED') AS failed_count, - uniqIfState(execution_id, status = 'RUNNING') AS running_count, - sumState(duration_ms) AS duration_sum, - maxState(duration_ms) AS duration_max, - quantileState(0.99)(duration_ms) AS p99_duration -FROM executions -GROUP BY tenant_id, bucket, environment; - CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS SELECT tenant_id, @@ -178,9 +162,6 @@ GROUP BY tenant_id, bucket, environment; -- stats_1m_app (per-application) -DROP VIEW IF EXISTS stats_1m_app_mv; -DROP TABLE IF EXISTS stats_1m_app; - CREATE TABLE IF NOT EXISTS stats_1m_app ( tenant_id LowCardinality(String), application_id LowCardinality(String), @@ -198,21 +179,6 @@ PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id) TTL bucket + INTERVAL 365 DAY DELETE; -INSERT INTO stats_1m_app -SELECT - tenant_id, - application_id, - toStartOfMinute(start_time) AS bucket, - environment, - uniqState(execution_id) AS total_count, - uniqIfState(execution_id, status = 'FAILED') AS failed_count, - uniqIfState(execution_id, status = 'RUNNING') AS running_count, - sumState(duration_ms) AS duration_sum, - maxState(duration_ms) AS duration_max, - quantileState(0.99)(duration_ms) AS p99_duration -FROM executions -GROUP BY tenant_id, application_id, bucket, environment; - CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS SELECT tenant_id, @@ -230,9 +196,6 @@ GROUP BY tenant_id, application_id, bucket, environment; -- stats_1m_route (per-route) -DROP VIEW IF EXISTS stats_1m_route_mv; -DROP TABLE IF EXISTS stats_1m_route; - CREATE TABLE IF NOT EXISTS stats_1m_route ( tenant_id LowCardinality(String), application_id LowCardinality(String), @@ -251,22 +214,6 @@ PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id, route_id) TTL bucket + INTERVAL 365 DAY DELETE; -INSERT INTO stats_1m_route -SELECT - tenant_id, - application_id, - route_id, - toStartOfMinute(start_time) AS bucket, - environment, - uniqState(execution_id) AS total_count, - uniqIfState(execution_id, status = 'FAILED') AS failed_count, - uniqIfState(execution_id, status = 'RUNNING') AS running_count, - sumState(duration_ms) AS duration_sum, - maxState(duration_ms) AS duration_max, - quantileState(0.99)(duration_ms) AS p99_duration -FROM executions -GROUP BY tenant_id, application_id, route_id, bucket, environment; - CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS SELECT tenant_id, @@ -284,11 +231,6 @@ FROM executions GROUP BY tenant_id, application_id, route_id, bucket, environment; -- stats_1m_processor (per-processor-type) --- Migration: replaced count() with uniq(execution_id) to deduplicate --- DROP + CREATE + backfill rebuilds from raw data on startup - -DROP VIEW IF EXISTS stats_1m_processor_mv; -DROP TABLE IF EXISTS stats_1m_processor; CREATE TABLE IF NOT EXISTS stats_1m_processor ( tenant_id LowCardinality(String), @@ -307,21 +249,6 @@ PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id, processor_type) TTL bucket + INTERVAL 365 DAY DELETE; -INSERT INTO stats_1m_processor -SELECT - tenant_id, - application_id, - processor_type, - toStartOfMinute(start_time) AS bucket, - environment, - uniqState(concat(execution_id, toString(seq))) AS total_count, - uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count, - sumState(duration_ms) AS duration_sum, - maxState(duration_ms) AS duration_max, - quantileState(0.99)(duration_ms) AS p99_duration -FROM processor_executions -GROUP BY tenant_id, application_id, processor_type, bucket, environment; - CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS SELECT tenant_id, @@ -339,9 +266,6 @@ GROUP BY tenant_id, application_id, processor_type, bucket, environment; -- stats_1m_processor_detail (per-processor-id) -DROP VIEW IF EXISTS stats_1m_processor_detail_mv; -DROP TABLE IF EXISTS stats_1m_processor_detail; - CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( tenant_id LowCardinality(String), application_id LowCardinality(String), @@ -361,23 +285,6 @@ PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id, route_id, processor_id, processor_type) TTL bucket + INTERVAL 365 DAY DELETE; -INSERT INTO stats_1m_processor_detail -SELECT - tenant_id, - application_id, - route_id, - processor_id, - processor_type, - toStartOfMinute(start_time) AS bucket, - environment, - uniqState(concat(execution_id, toString(seq))) AS total_count, - uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count, - sumState(duration_ms) AS duration_sum, - maxState(duration_ms) AS duration_max, - quantileState(0.99)(duration_ms) AS p99_duration -FROM processor_executions -GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket, environment; - CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS SELECT tenant_id, @@ -453,7 +360,6 @@ ORDER BY (tenant_id, timestamp, environment, application, instance_id) TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; --- Add source column for log forwarding v2 (app vs agent logs) ALTER TABLE logs ADD COLUMN IF NOT EXISTS source LowCardinality(String) DEFAULT 'app'; -- ── Usage Events ────────────────────────────────────────────────────────