-- ClickHouse schema initialization (single file, idempotent) -- All tables use IF NOT EXISTS for safe re-execution on every startup. -- ── Agent Metrics ─────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS agent_metrics ( tenant_id LowCardinality(String) DEFAULT 'default', collected_at DateTime64(3), instance_id LowCardinality(String), metric_name LowCardinality(String), metric_value Float64, tags Map(String, String) DEFAULT map(), server_received_at DateTime64(3) DEFAULT now64(3) ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(collected_at)) ORDER BY (tenant_id, instance_id, metric_name, collected_at) TTL toDateTime(collected_at) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- ── Executions ────────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS executions ( tenant_id LowCardinality(String) DEFAULT 'default', execution_id String, start_time DateTime64(3), _version UInt64 DEFAULT 1, route_id LowCardinality(String), instance_id LowCardinality(String), application_id LowCardinality(String), status LowCardinality(String), correlation_id String DEFAULT '', exchange_id String DEFAULT '', end_time Nullable(DateTime64(3)), duration_ms Nullable(Int64), error_message String DEFAULT '', error_stacktrace String DEFAULT '', error_type LowCardinality(String) DEFAULT '', error_category LowCardinality(String) DEFAULT '', root_cause_type String DEFAULT '', root_cause_message String DEFAULT '', diagram_content_hash String DEFAULT '', engine_level LowCardinality(String) DEFAULT '', input_body String DEFAULT '', output_body String DEFAULT '', input_headers String DEFAULT '', output_headers String DEFAULT '', attributes String DEFAULT '', trace_id String DEFAULT '', span_id String DEFAULT '', has_trace_data Bool DEFAULT false, is_replay Bool DEFAULT false, original_exchange_id String DEFAULT '', replay_exchange_id String DEFAULT '', _search_text String MATERIALIZED concat(execution_id, ' ', correlation_id, ' ', exchange_id, ' ', route_id, ' ', error_message, ' ', error_stacktrace, ' ', attributes, ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers, ' ', root_cause_message), INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_status status TYPE set(10) GRANULARITY 1, INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4 ) ENGINE = ReplacingMergeTree(_version) PARTITION BY (tenant_id, toYYYYMM(start_time)) ORDER BY (tenant_id, start_time, application_id, route_id, execution_id) TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- ── Processor Executions ──────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS processor_executions ( tenant_id LowCardinality(String) DEFAULT 'default', execution_id String, seq UInt32, parent_seq Nullable(UInt32), parent_processor_id String DEFAULT '', processor_id String, processor_type LowCardinality(String), start_time DateTime64(3), route_id LowCardinality(String), application_id LowCardinality(String), iteration Nullable(Int32), iteration_size Nullable(Int32), status LowCardinality(String), end_time Nullable(DateTime64(3)), duration_ms Nullable(Int64), error_message String DEFAULT '', error_stacktrace String DEFAULT '', error_type LowCardinality(String) DEFAULT '', error_category LowCardinality(String) DEFAULT '', root_cause_type String DEFAULT '', root_cause_message String DEFAULT '', input_body String DEFAULT '', output_body String DEFAULT '', input_headers String DEFAULT '', output_headers String DEFAULT '', attributes String DEFAULT '', resolved_endpoint_uri String DEFAULT '', circuit_breaker_state LowCardinality(String) DEFAULT '', fallback_triggered Bool DEFAULT false, filter_matched Bool DEFAULT false, duplicate_message Bool DEFAULT false, _search_text String MATERIALIZED concat(error_message, ' ', error_stacktrace, ' ', attributes, ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers), INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4 ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(start_time)) ORDER BY (tenant_id, start_time, application_id, route_id, execution_id, seq) TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- ── Stats: Materialized Views + AggregatingMergeTree ──────────────────── -- stats_1m_all (global) CREATE TABLE IF NOT EXISTS stats_1m_all ( tenant_id LowCardinality(String), bucket DateTime, total_count AggregateFunction(count), failed_count AggregateFunction(countIf, UInt8), running_count AggregateFunction(countIf, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS SELECT tenant_id, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, bucket; -- stats_1m_app (per-application) CREATE TABLE IF NOT EXISTS stats_1m_app ( tenant_id LowCardinality(String), application_id LowCardinality(String), bucket DateTime, total_count AggregateFunction(count), failed_count AggregateFunction(countIf, UInt8), running_count AggregateFunction(countIf, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, application_id, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS SELECT tenant_id, application_id, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, application_id, bucket; -- stats_1m_route (per-route) CREATE TABLE IF NOT EXISTS stats_1m_route ( tenant_id LowCardinality(String), application_id LowCardinality(String), route_id LowCardinality(String), bucket DateTime, total_count AggregateFunction(count), failed_count AggregateFunction(countIf, UInt8), running_count AggregateFunction(countIf, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, application_id, route_id, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS SELECT tenant_id, application_id, route_id, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, application_id, route_id, bucket; -- stats_1m_processor (per-processor-type) CREATE TABLE IF NOT EXISTS stats_1m_processor ( tenant_id LowCardinality(String), application_id LowCardinality(String), processor_type LowCardinality(String), bucket DateTime, total_count AggregateFunction(count), failed_count AggregateFunction(countIf, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, application_id, processor_type, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS SELECT tenant_id, application_id, processor_type, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions GROUP BY tenant_id, application_id, processor_type, bucket; -- stats_1m_processor_detail (per-processor-id) CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( tenant_id LowCardinality(String), application_id LowCardinality(String), route_id LowCardinality(String), processor_id String, processor_type LowCardinality(String), bucket DateTime, total_count AggregateFunction(count), failed_count AggregateFunction(countIf, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, application_id, route_id, processor_id, processor_type, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS SELECT tenant_id, application_id, route_id, processor_id, processor_type, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket; -- ── Route Diagrams ────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS route_diagrams ( tenant_id LowCardinality(String) DEFAULT 'default', content_hash String, route_id LowCardinality(String), instance_id LowCardinality(String), application_id LowCardinality(String), definition String, created_at DateTime64(3) DEFAULT now64(3) ) ENGINE = ReplacingMergeTree(created_at) ORDER BY (tenant_id, content_hash) SETTINGS index_granularity = 8192; -- ── Agent Events ──────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS agent_events ( tenant_id LowCardinality(String) DEFAULT 'default', timestamp DateTime64(3) DEFAULT now64(3), instance_id LowCardinality(String), application_id LowCardinality(String), event_type LowCardinality(String), detail String DEFAULT '' ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(timestamp)) ORDER BY (tenant_id, application_id, instance_id, timestamp) TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE; -- ── Logs ──────────────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS logs ( tenant_id LowCardinality(String) DEFAULT 'default', timestamp DateTime64(3), application LowCardinality(String), instance_id LowCardinality(String), level LowCardinality(String), logger_name LowCardinality(String) DEFAULT '', message String, thread_name LowCardinality(String) DEFAULT '', stack_trace String DEFAULT '', exchange_id String DEFAULT '', mdc Map(String, String) DEFAULT map(), INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_level level TYPE set(10) GRANULARITY 1 ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(timestamp)) ORDER BY (tenant_id, application, timestamp) TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- ── Usage Events ──────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS usage_events ( timestamp DateTime64(3) DEFAULT now64(3), username LowCardinality(String), method LowCardinality(String), path String, normalized LowCardinality(String), status_code UInt16, duration_ms UInt32, query_params String DEFAULT '' ) ENGINE = MergeTree() ORDER BY (username, timestamp) TTL toDateTime(timestamp) + INTERVAL 90 DAY;