-- ClickHouse schema initialization (single file, idempotent) -- All statements use IF NOT EXISTS / IF EXISTS for safe re-execution on every startup. -- No DROP or INSERT statements -- this file is safe for repeated runs. -- ── Agent Metrics ─────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS agent_metrics ( tenant_id LowCardinality(String) DEFAULT 'default', collected_at DateTime64(3), environment LowCardinality(String) DEFAULT 'default', instance_id LowCardinality(String), metric_name LowCardinality(String), metric_value Float64, tags Map(String, String) DEFAULT map(), server_received_at DateTime64(3) DEFAULT now64(3) ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(collected_at)) ORDER BY (tenant_id, collected_at, environment, instance_id, metric_name) TTL toDateTime(collected_at) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- ── Executions ────────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS executions ( tenant_id LowCardinality(String) DEFAULT 'default', execution_id String, start_time DateTime64(3), _version UInt64 DEFAULT 1, route_id LowCardinality(String), instance_id LowCardinality(String), application_id LowCardinality(String), environment LowCardinality(String) DEFAULT 'default', status LowCardinality(String), correlation_id String DEFAULT '', exchange_id String DEFAULT '', end_time Nullable(DateTime64(3)), duration_ms Nullable(Int64), error_message String DEFAULT '', error_stacktrace String DEFAULT '', error_type LowCardinality(String) DEFAULT '', error_category LowCardinality(String) DEFAULT '', root_cause_type String DEFAULT '', root_cause_message String DEFAULT '', diagram_content_hash String DEFAULT '', engine_level LowCardinality(String) DEFAULT '', input_body String DEFAULT '', output_body String DEFAULT '', input_headers String DEFAULT '', output_headers String DEFAULT '', attributes String DEFAULT '', trace_id String DEFAULT '', span_id String DEFAULT '', has_trace_data Bool DEFAULT false, is_replay Bool DEFAULT false, original_exchange_id String DEFAULT '', replay_exchange_id String DEFAULT '', _search_text String MATERIALIZED concat(execution_id, ' ', correlation_id, ' ', exchange_id, ' ', route_id, ' ', error_message, ' ', error_stacktrace, ' ', attributes, ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers, ' ', root_cause_message), INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_status status TYPE set(10) GRANULARITY 1, INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4 ) ENGINE = ReplacingMergeTree(_version) PARTITION BY (tenant_id, toYYYYMM(start_time)) ORDER BY (tenant_id, start_time, environment, application_id, route_id, execution_id) TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- ── Processor Executions ──────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS processor_executions ( tenant_id LowCardinality(String) DEFAULT 'default', execution_id String, seq UInt32, parent_seq Nullable(UInt32), parent_processor_id String DEFAULT '', processor_id String, processor_type LowCardinality(String), start_time DateTime64(3), route_id LowCardinality(String), application_id LowCardinality(String), environment LowCardinality(String) DEFAULT 'default', iteration Nullable(Int32), iteration_size Nullable(Int32), status LowCardinality(String), end_time Nullable(DateTime64(3)), duration_ms Nullable(Int64), error_message String DEFAULT '', error_stacktrace String DEFAULT '', error_type LowCardinality(String) DEFAULT '', error_category LowCardinality(String) DEFAULT '', root_cause_type String DEFAULT '', root_cause_message String DEFAULT '', input_body String DEFAULT '', output_body String DEFAULT '', input_headers String DEFAULT '', output_headers String DEFAULT '', attributes String DEFAULT '', resolved_endpoint_uri String DEFAULT '', circuit_breaker_state LowCardinality(String) DEFAULT '', fallback_triggered Bool DEFAULT false, filter_matched Bool DEFAULT false, duplicate_message Bool DEFAULT false, _search_text String MATERIALIZED concat(error_message, ' ', error_stacktrace, ' ', attributes, ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers), INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4 ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(start_time)) ORDER BY (tenant_id, start_time, environment, application_id, route_id, execution_id, seq) TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- ── Stats: Materialized Views + AggregatingMergeTree ──────────────────── -- Counts use uniq(execution_id) to deduplicate chunk retries. -- Processor counts use uniq(concat(execution_id, seq)) to also preserve loop iterations. -- stats_1m_all (global) CREATE TABLE IF NOT EXISTS stats_1m_all ( tenant_id LowCardinality(String), bucket DateTime('UTC'), environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(uniq, String), failed_count AggregateFunction(uniqIf, String, UInt8), running_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS SELECT tenant_id, toDateTime(toStartOfMinute(start_time), 'UTC') AS bucket, environment, uniqState(execution_id) AS total_count, uniqIfState(execution_id, status = 'FAILED') AS failed_count, uniqIfState(execution_id, status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, bucket, environment; -- stats_1m_app (per-application) CREATE TABLE IF NOT EXISTS stats_1m_app ( tenant_id LowCardinality(String), application_id LowCardinality(String), bucket DateTime('UTC'), environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(uniq, String), failed_count AggregateFunction(uniqIf, String, UInt8), running_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS SELECT tenant_id, application_id, toDateTime(toStartOfMinute(start_time), 'UTC') AS bucket, environment, uniqState(execution_id) AS total_count, uniqIfState(execution_id, status = 'FAILED') AS failed_count, uniqIfState(execution_id, status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, application_id, bucket, environment; -- stats_1m_route (per-route) CREATE TABLE IF NOT EXISTS stats_1m_route ( tenant_id LowCardinality(String), application_id LowCardinality(String), route_id LowCardinality(String), bucket DateTime('UTC'), environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(uniq, String), failed_count AggregateFunction(uniqIf, String, UInt8), running_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id, route_id) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS SELECT tenant_id, application_id, route_id, toDateTime(toStartOfMinute(start_time), 'UTC') AS bucket, environment, uniqState(execution_id) AS total_count, uniqIfState(execution_id, status = 'FAILED') AS failed_count, uniqIfState(execution_id, status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, application_id, route_id, bucket, environment; -- stats_1m_processor (per-processor-type) CREATE TABLE IF NOT EXISTS stats_1m_processor ( tenant_id LowCardinality(String), application_id LowCardinality(String), processor_type LowCardinality(String), bucket DateTime('UTC'), environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(uniq, String), failed_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id, processor_type) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS SELECT tenant_id, application_id, processor_type, toDateTime(toStartOfMinute(start_time), 'UTC') AS bucket, environment, uniqState(concat(execution_id, toString(seq))) AS total_count, uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions GROUP BY tenant_id, application_id, processor_type, bucket, environment; -- stats_1m_processor_detail (per-processor-id) CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( tenant_id LowCardinality(String), application_id LowCardinality(String), route_id LowCardinality(String), processor_id String, processor_type LowCardinality(String), bucket DateTime('UTC'), environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(uniq, String), failed_count AggregateFunction(uniqIf, String, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket, environment, application_id, route_id, processor_id, processor_type) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS SELECT tenant_id, application_id, route_id, processor_id, processor_type, toDateTime(toStartOfMinute(start_time), 'UTC') AS bucket, environment, uniqState(concat(execution_id, toString(seq))) AS total_count, uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket, environment; -- ── Route Diagrams ────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS route_diagrams ( tenant_id LowCardinality(String) DEFAULT 'default', content_hash String, route_id LowCardinality(String), instance_id LowCardinality(String), application_id LowCardinality(String), environment LowCardinality(String) DEFAULT 'default', definition String, created_at DateTime64(3) DEFAULT now64(3) ) ENGINE = ReplacingMergeTree(created_at) ORDER BY (tenant_id, environment, route_id, instance_id, content_hash) SETTINGS index_granularity = 8192; -- ── Agent Events ──────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS agent_events ( tenant_id LowCardinality(String) DEFAULT 'default', timestamp DateTime64(3) DEFAULT now64(3), environment LowCardinality(String) DEFAULT 'default', instance_id LowCardinality(String), application_id LowCardinality(String), event_type LowCardinality(String), detail String DEFAULT '', insert_id UUID DEFAULT generateUUIDv4() ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(timestamp)) ORDER BY (tenant_id, timestamp, environment, instance_id) TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE; -- ── Logs ──────────────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS logs ( tenant_id LowCardinality(String) DEFAULT 'default', timestamp DateTime64(3), environment LowCardinality(String) DEFAULT 'default', application LowCardinality(String), instance_id LowCardinality(String), level LowCardinality(String), logger_name LowCardinality(String) DEFAULT '', message String, thread_name LowCardinality(String) DEFAULT '', stack_trace String DEFAULT '', exchange_id String DEFAULT '', mdc Map(String, String) DEFAULT map(), insert_id UUID DEFAULT generateUUIDv4(), INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_level level TYPE set(10) GRANULARITY 1 ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(timestamp)) ORDER BY (tenant_id, timestamp, environment, application, instance_id) TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; ALTER TABLE logs ADD COLUMN IF NOT EXISTS source LowCardinality(String) DEFAULT 'app'; -- ── Exchange Properties (added for agent protocol v2) ────────────────── ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_properties String DEFAULT ''; ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_properties String DEFAULT ''; ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS input_properties String DEFAULT ''; ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS output_properties String DEFAULT ''; -- ── Usage Events ──────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS usage_events ( tenant_id LowCardinality(String) DEFAULT 'default', timestamp DateTime64(3) DEFAULT now64(3), environment LowCardinality(String) DEFAULT 'default', username LowCardinality(String), method LowCardinality(String), path String, normalized LowCardinality(String), status_code UInt16, duration_ms UInt32, query_params String DEFAULT '' ) ENGINE = MergeTree() ORDER BY (tenant_id, timestamp, environment, username, normalized) TTL toDateTime(timestamp) + INTERVAL 90 DAY; -- ── Route Catalog ────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS route_catalog ( tenant_id LowCardinality(String) DEFAULT 'default', environment LowCardinality(String) DEFAULT 'default', application_id LowCardinality(String), route_id LowCardinality(String), first_seen DateTime64(3), last_seen DateTime64(3) ) ENGINE = ReplacingMergeTree(last_seen) ORDER BY (tenant_id, environment, application_id, route_id); -- ── Server Self-Metrics ──────────────────────────────────────────────── -- Periodic snapshot of the server's own Micrometer registry (written by -- ServerMetricsSnapshotScheduler). No `environment` column — the server -- straddles environments. `statistic` distinguishes Timer/DistributionSummary -- sub-measurements (count, total_time, max, mean) from plain counter/gauge values. CREATE TABLE IF NOT EXISTS server_metrics ( tenant_id LowCardinality(String) DEFAULT 'default', collected_at DateTime64(3), server_instance_id LowCardinality(String), metric_name LowCardinality(String), metric_type LowCardinality(String), statistic LowCardinality(String) DEFAULT 'value', metric_value Float64, tags Map(String, String) DEFAULT map(), server_received_at DateTime64(3) DEFAULT now64(3) ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(collected_at)) ORDER BY (tenant_id, collected_at, server_instance_id, metric_name, statistic) TTL toDateTime(collected_at) + INTERVAL 90 DAY DELETE SETTINGS index_granularity = 8192; -- insert_id tiebreak for keyset pagination (fixes same-millisecond cursor collision). -- IF NOT EXISTS on ADD COLUMN is idempotent. MATERIALIZE COLUMN is a background mutation, -- effectively a no-op once all parts are already materialized. ALTER TABLE logs ADD COLUMN IF NOT EXISTS insert_id UUID DEFAULT generateUUIDv4(); ALTER TABLE logs MATERIALIZE COLUMN insert_id; ALTER TABLE agent_events ADD COLUMN IF NOT EXISTS insert_id UUID DEFAULT generateUUIDv4(); ALTER TABLE agent_events MATERIALIZE COLUMN insert_id;