-- ClickHouse schema initialization (single file, idempotent) -- All tables use IF NOT EXISTS for safe re-execution on every startup. -- ── Agent Metrics ─────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS agent_metrics ( tenant_id LowCardinality(String) DEFAULT 'default', collected_at DateTime64(3), environment LowCardinality(String) DEFAULT 'default', instance_id LowCardinality(String), metric_name LowCardinality(String), metric_value Float64, tags Map(String, String) DEFAULT map(), server_received_at DateTime64(3) DEFAULT now64(3) ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(collected_at)) ORDER BY (tenant_id, collected_at, environment, instance_id, metric_name) TTL toDateTime(collected_at) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- ── Executions ────────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS executions ( tenant_id LowCardinality(String) DEFAULT 'default', execution_id String, start_time DateTime64(3), _version UInt64 DEFAULT 1, route_id LowCardinality(String), instance_id LowCardinality(String), application_id LowCardinality(String), environment LowCardinality(String) DEFAULT 'default', status LowCardinality(String), correlation_id String DEFAULT '', exchange_id String DEFAULT '', end_time Nullable(DateTime64(3)), duration_ms Nullable(Int64), error_message String DEFAULT '', error_stacktrace String DEFAULT '', error_type LowCardinality(String) DEFAULT '', error_category LowCardinality(String) DEFAULT '', root_cause_type String DEFAULT '', root_cause_message String DEFAULT '', diagram_content_hash String DEFAULT '', engine_level LowCardinality(String) DEFAULT '', input_body String DEFAULT '', output_body String DEFAULT '', input_headers String DEFAULT '', output_headers String DEFAULT '', attributes String DEFAULT '', trace_id String DEFAULT '', span_id String DEFAULT '', has_trace_data Bool DEFAULT false, is_replay Bool DEFAULT false, original_exchange_id String DEFAULT '', replay_exchange_id String DEFAULT '', _search_text String MATERIALIZED concat(execution_id, ' ', correlation_id, ' ', exchange_id, ' ', route_id, ' ', error_message, ' ', error_stacktrace, ' ', attributes, ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers, ' ', root_cause_message), INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_status status TYPE set(10) GRANULARITY 1, INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4 ) ENGINE = ReplacingMergeTree(_version) PARTITION BY (tenant_id, toYYYYMM(start_time)) ORDER BY (tenant_id, start_time, environment, application_id, route_id, execution_id) TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- ── Processor Executions ──────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS processor_executions ( tenant_id LowCardinality(String) DEFAULT 'default', execution_id String, seq UInt32, parent_seq Nullable(UInt32), parent_processor_id String DEFAULT '', processor_id String, processor_type LowCardinality(String), start_time DateTime64(3), route_id LowCardinality(String), application_id LowCardinality(String), environment LowCardinality(String) DEFAULT 'default', iteration Nullable(Int32), iteration_size Nullable(Int32), status LowCardinality(String), end_time Nullable(DateTime64(3)), duration_ms Nullable(Int64), error_message String DEFAULT '', error_stacktrace String DEFAULT '', error_type LowCardinality(String) DEFAULT '', error_category LowCardinality(String) DEFAULT '', root_cause_type String DEFAULT '', root_cause_message String DEFAULT '', input_body String DEFAULT '', output_body String DEFAULT '', input_headers String DEFAULT '', output_headers String DEFAULT '', attributes String DEFAULT '', resolved_endpoint_uri String DEFAULT '', circuit_breaker_state LowCardinality(String) DEFAULT '', fallback_triggered Bool DEFAULT false, filter_matched Bool DEFAULT false, duplicate_message Bool DEFAULT false, _search_text String MATERIALIZED concat(error_message, ' ', error_stacktrace, ' ', attributes, ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers), INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4 ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(start_time)) ORDER BY (tenant_id, start_time, environment, application_id, route_id, execution_id, seq) TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- ── Stats: Materialized Views + AggregatingMergeTree ──────────────────── -- stats_1m_all (global) -- All stats tables use uniq(execution_id) to deduplicate chunk retries -- DROP + CREATE + backfill rebuilds from raw data on startup DROP VIEW IF EXISTS stats_1m_all_mv; DROP TABLE IF EXISTS stats_1m_all; CREATE TABLE IF NOT EXISTS stats_1m_all ( tenant_id LowCardinality(String), bucket DateTime, environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(uniq, String), failed_count AggregateFunction(uniqIf, String, UInt8), running_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment) TTL bucket + INTERVAL 365 DAY DELETE; INSERT INTO stats_1m_all SELECT tenant_id, toStartOfMinute(start_time) AS bucket, environment, uniqState(execution_id) AS total_count, uniqIfState(execution_id, status = 'FAILED') AS failed_count, uniqIfState(execution_id, status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, bucket, environment; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS SELECT tenant_id, toStartOfMinute(start_time) AS bucket, environment, uniqState(execution_id) AS total_count, uniqIfState(execution_id, status = 'FAILED') AS failed_count, uniqIfState(execution_id, status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, bucket, environment; -- stats_1m_app (per-application) DROP VIEW IF EXISTS stats_1m_app_mv; DROP TABLE IF EXISTS stats_1m_app; CREATE TABLE IF NOT EXISTS stats_1m_app ( tenant_id LowCardinality(String), application_id LowCardinality(String), bucket DateTime, environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(uniq, String), failed_count AggregateFunction(uniqIf, String, UInt8), running_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id) TTL bucket + INTERVAL 365 DAY DELETE; INSERT INTO stats_1m_app SELECT tenant_id, application_id, toStartOfMinute(start_time) AS bucket, environment, uniqState(execution_id) AS total_count, uniqIfState(execution_id, status = 'FAILED') AS failed_count, uniqIfState(execution_id, status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, application_id, bucket, environment; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS SELECT tenant_id, application_id, toStartOfMinute(start_time) AS bucket, environment, uniqState(execution_id) AS total_count, uniqIfState(execution_id, status = 'FAILED') AS failed_count, uniqIfState(execution_id, status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, application_id, bucket, environment; -- stats_1m_route (per-route) DROP VIEW IF EXISTS stats_1m_route_mv; DROP TABLE IF EXISTS stats_1m_route; CREATE TABLE IF NOT EXISTS stats_1m_route ( tenant_id LowCardinality(String), application_id LowCardinality(String), route_id LowCardinality(String), bucket DateTime, environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(uniq, String), failed_count AggregateFunction(uniqIf, String, UInt8), running_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id, route_id) TTL bucket + INTERVAL 365 DAY DELETE; INSERT INTO stats_1m_route SELECT tenant_id, application_id, route_id, toStartOfMinute(start_time) AS bucket, environment, uniqState(execution_id) AS total_count, uniqIfState(execution_id, status = 'FAILED') AS failed_count, uniqIfState(execution_id, status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, application_id, route_id, bucket, environment; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS SELECT tenant_id, application_id, route_id, toStartOfMinute(start_time) AS bucket, environment, uniqState(execution_id) AS total_count, uniqIfState(execution_id, status = 'FAILED') AS failed_count, uniqIfState(execution_id, status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, application_id, route_id, bucket, environment; -- stats_1m_processor (per-processor-type) -- Migration: replaced count() with uniq(execution_id) to deduplicate -- DROP + CREATE + backfill rebuilds from raw data on startup DROP VIEW IF EXISTS stats_1m_processor_mv; DROP TABLE IF EXISTS stats_1m_processor; CREATE TABLE IF NOT EXISTS stats_1m_processor ( tenant_id LowCardinality(String), application_id LowCardinality(String), processor_type LowCardinality(String), bucket DateTime, environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(uniq, String), failed_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id, processor_type) TTL bucket + INTERVAL 365 DAY DELETE; INSERT INTO stats_1m_processor SELECT tenant_id, application_id, processor_type, toStartOfMinute(start_time) AS bucket, environment, uniqState(concat(execution_id, toString(seq))) AS total_count, uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions GROUP BY tenant_id, application_id, processor_type, bucket, environment; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS SELECT tenant_id, application_id, processor_type, toStartOfMinute(start_time) AS bucket, environment, uniqState(concat(execution_id, toString(seq))) AS total_count, uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions GROUP BY tenant_id, application_id, processor_type, bucket, environment; -- stats_1m_processor_detail (per-processor-id) DROP VIEW IF EXISTS stats_1m_processor_detail_mv; DROP TABLE IF EXISTS stats_1m_processor_detail; CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( tenant_id LowCardinality(String), application_id LowCardinality(String), route_id LowCardinality(String), processor_id String, processor_type LowCardinality(String), bucket DateTime, environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(uniq, String), failed_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id, route_id, processor_id, processor_type) TTL bucket + INTERVAL 365 DAY DELETE; INSERT INTO stats_1m_processor_detail SELECT tenant_id, application_id, route_id, processor_id, processor_type, toStartOfMinute(start_time) AS bucket, environment, uniqState(concat(execution_id, toString(seq))) AS total_count, uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket, environment; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS SELECT tenant_id, application_id, route_id, processor_id, processor_type, toStartOfMinute(start_time) AS bucket, environment, uniqState(concat(execution_id, toString(seq))) AS total_count, uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket, environment; -- ── Route Diagrams ────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS route_diagrams ( tenant_id LowCardinality(String) DEFAULT 'default', content_hash String, route_id LowCardinality(String), instance_id LowCardinality(String), application_id LowCardinality(String), environment LowCardinality(String) DEFAULT 'default', definition String, created_at DateTime64(3) DEFAULT now64(3) ) ENGINE = ReplacingMergeTree(created_at) ORDER BY (tenant_id, environment, route_id, instance_id, content_hash) SETTINGS index_granularity = 8192; -- ── Agent Events ──────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS agent_events ( tenant_id LowCardinality(String) DEFAULT 'default', timestamp DateTime64(3) DEFAULT now64(3), environment LowCardinality(String) DEFAULT 'default', instance_id LowCardinality(String), application_id LowCardinality(String), event_type LowCardinality(String), detail String DEFAULT '' ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(timestamp)) ORDER BY (tenant_id, timestamp, environment, instance_id) TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE; -- ── Logs ──────────────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS logs ( tenant_id LowCardinality(String) DEFAULT 'default', timestamp DateTime64(3), environment LowCardinality(String) DEFAULT 'default', application LowCardinality(String), instance_id LowCardinality(String), level LowCardinality(String), logger_name LowCardinality(String) DEFAULT '', message String, thread_name LowCardinality(String) DEFAULT '', stack_trace String DEFAULT '', exchange_id String DEFAULT '', mdc Map(String, String) DEFAULT map(), INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_level level TYPE set(10) GRANULARITY 1 ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(timestamp)) ORDER BY (tenant_id, timestamp, environment, application, instance_id) TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- Add source column for log forwarding v2 (app vs agent logs) ALTER TABLE logs ADD COLUMN IF NOT EXISTS source LowCardinality(String) DEFAULT 'app'; -- ── Usage Events ──────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS usage_events ( tenant_id LowCardinality(String) DEFAULT 'default', timestamp DateTime64(3) DEFAULT now64(3), environment LowCardinality(String) DEFAULT 'default', username LowCardinality(String), method LowCardinality(String), path String, normalized LowCardinality(String), status_code UInt16, duration_ms UInt32, query_params String DEFAULT '' ) ENGINE = MergeTree() ORDER BY (tenant_id, timestamp, environment, username, normalized) TTL toDateTime(timestamp) + INTERVAL 90 DAY;