refactor: consolidate ClickHouse init.sql as clean idempotent schema
Some checks failed
CI / build (push) Has been cancelled
CI / docker (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / deploy-feature (push) Has been cancelled
CI / cleanup-branch (push) Has been cancelled

Rewrite init.sql as a pure CREATE IF NOT EXISTS file with no DROP or
INSERT statements. Safe for repeated runs on every startup without
corrupting aggregated stats data.

Old deployments with count()-based stats tables are migrated
automatically: ClickHouseSchemaInitializer checks system.columns for
the old AggregateFunction(count) type and drops those tables before
init.sql recreates them with the correct uniq() schema. This runs
once per table and is a no-op on fresh installs or already-migrated
deployments.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-13 11:49:53 +02:00
parent 6b558b649d
commit 3f2392b8f7
2 changed files with 38 additions and 98 deletions

View File

@@ -27,6 +27,8 @@ public class ClickHouseSchemaInitializer {
@EventListener(ApplicationReadyEvent.class)
public void initializeSchema() {
try {
migrateStatsTablesIfNeeded();
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
Resource script = resolver.getResource("classpath:clickhouse/init.sql");
@@ -50,4 +52,36 @@ public class ClickHouseSchemaInitializer {
log.error("ClickHouse schema initialization failed — server will continue but ClickHouse features may not work", e);
}
}
/**
* One-time migration: stats tables originally used count()/countIf() which
* double-counted chunk retry duplicates. The new schema uses uniq()/uniqIf().
* Since AggregateFunction column types cannot be ALTERed, we must drop and
* let init.sql recreate them. MVs that depend on the tables are dropped first.
* Data rebuilds organically from new inserts via the materialized views.
*/
private void migrateStatsTablesIfNeeded() {
String[] statsTables = {
"stats_1m_all", "stats_1m_app", "stats_1m_route",
"stats_1m_processor", "stats_1m_processor_detail"
};
for (String table : statsTables) {
try {
Integer oldSchema = clickHouseJdbc.queryForObject(
"SELECT count() FROM system.columns " +
"WHERE database = currentDatabase() AND table = '" + table + "' " +
"AND name = 'total_count' AND type = 'AggregateFunction(count)'",
Integer.class);
if (oldSchema != null && oldSchema > 0) {
log.info("Migrating stats table '{}': dropping old count()-based schema", table);
clickHouseJdbc.execute("DROP VIEW IF EXISTS " + table + "_mv");
clickHouseJdbc.execute("DROP TABLE IF EXISTS " + table);
}
} catch (Exception e) {
log.warn("Could not check migration status for '{}': {}", table, e.getMessage());
}
}
}
}

View File

@@ -1,5 +1,6 @@
-- ClickHouse schema initialization (single file, idempotent)
-- All tables use IF NOT EXISTS for safe re-execution on every startup.
-- All statements use IF NOT EXISTS / IF EXISTS for safe re-execution on every startup.
-- No DROP or INSERT statements -- this file is safe for repeated runs.
-- ── Agent Metrics ───────────────────────────────────────────────────────
@@ -124,13 +125,10 @@ TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
-- ── Stats: Materialized Views + AggregatingMergeTree ────────────────────
-- Counts use uniq(execution_id) to deduplicate chunk retries.
-- Processor counts use uniq(concat(execution_id, seq)) to also preserve loop iterations.
-- stats_1m_all (global)
-- All stats tables use uniq(execution_id) to deduplicate chunk retries
-- DROP + CREATE + backfill rebuilds from raw data on startup
DROP VIEW IF EXISTS stats_1m_all_mv;
DROP TABLE IF EXISTS stats_1m_all;
CREATE TABLE IF NOT EXISTS stats_1m_all (
tenant_id LowCardinality(String),
@@ -148,20 +146,6 @@ PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket, environment)
TTL bucket + INTERVAL 365 DAY DELETE;
INSERT INTO stats_1m_all
SELECT
tenant_id,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
uniqIfState(execution_id, status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, bucket, environment;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS
SELECT
tenant_id,
@@ -178,9 +162,6 @@ GROUP BY tenant_id, bucket, environment;
-- stats_1m_app (per-application)
DROP VIEW IF EXISTS stats_1m_app_mv;
DROP TABLE IF EXISTS stats_1m_app;
CREATE TABLE IF NOT EXISTS stats_1m_app (
tenant_id LowCardinality(String),
application_id LowCardinality(String),
@@ -198,21 +179,6 @@ PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket, environment, application_id)
TTL bucket + INTERVAL 365 DAY DELETE;
INSERT INTO stats_1m_app
SELECT
tenant_id,
application_id,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
uniqIfState(execution_id, status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_id, bucket, environment;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS
SELECT
tenant_id,
@@ -230,9 +196,6 @@ GROUP BY tenant_id, application_id, bucket, environment;
-- stats_1m_route (per-route)
DROP VIEW IF EXISTS stats_1m_route_mv;
DROP TABLE IF EXISTS stats_1m_route;
CREATE TABLE IF NOT EXISTS stats_1m_route (
tenant_id LowCardinality(String),
application_id LowCardinality(String),
@@ -251,22 +214,6 @@ PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket, environment, application_id, route_id)
TTL bucket + INTERVAL 365 DAY DELETE;
INSERT INTO stats_1m_route
SELECT
tenant_id,
application_id,
route_id,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
uniqIfState(execution_id, status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_id, route_id, bucket, environment;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS
SELECT
tenant_id,
@@ -284,11 +231,6 @@ FROM executions
GROUP BY tenant_id, application_id, route_id, bucket, environment;
-- stats_1m_processor (per-processor-type)
-- Migration: replaced count() with uniq(execution_id) to deduplicate
-- DROP + CREATE + backfill rebuilds from raw data on startup
DROP VIEW IF EXISTS stats_1m_processor_mv;
DROP TABLE IF EXISTS stats_1m_processor;
CREATE TABLE IF NOT EXISTS stats_1m_processor (
tenant_id LowCardinality(String),
@@ -307,21 +249,6 @@ PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket, environment, application_id, processor_type)
TTL bucket + INTERVAL 365 DAY DELETE;
INSERT INTO stats_1m_processor
SELECT
tenant_id,
application_id,
processor_type,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(concat(execution_id, toString(seq))) AS total_count,
uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_id, processor_type, bucket, environment;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS
SELECT
tenant_id,
@@ -339,9 +266,6 @@ GROUP BY tenant_id, application_id, processor_type, bucket, environment;
-- stats_1m_processor_detail (per-processor-id)
DROP VIEW IF EXISTS stats_1m_processor_detail_mv;
DROP TABLE IF EXISTS stats_1m_processor_detail;
CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
tenant_id LowCardinality(String),
application_id LowCardinality(String),
@@ -361,23 +285,6 @@ PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket, environment, application_id, route_id, processor_id, processor_type)
TTL bucket + INTERVAL 365 DAY DELETE;
INSERT INTO stats_1m_processor_detail
SELECT
tenant_id,
application_id,
route_id,
processor_id,
processor_type,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(concat(execution_id, toString(seq))) AS total_count,
uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket, environment;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS
SELECT
tenant_id,
@@ -453,7 +360,6 @@ ORDER BY (tenant_id, timestamp, environment, application, instance_id)
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
-- Add source column for log forwarding v2 (app vs agent logs)
ALTER TABLE logs ADD COLUMN IF NOT EXISTS source LowCardinality(String) DEFAULT 'app';
-- ── Usage Events ────────────────────────────────────────────────────────