Files
cameleer-server/cameleer3-server-app/src/main/resources/clickhouse/init.sql
hsiegeln ae908fb382
All checks were successful
CI / build (push) Successful in 2m25s
CI / cleanup-branch (push) Has been skipped
CI / docker (push) Successful in 1m20s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 1m3s
SonarQube / sonarqube (push) Successful in 3m49s
fix: deduplicate all stats MVs and preserve loop iterations
Extend uniq-based dedup from processor tables to all stats tables
(stats_1m_all, stats_1m_app, stats_1m_route). Execution-level tables
use uniq(execution_id). Processor-level tables now use
uniq(concat(execution_id, toString(seq))) so loop iterations (same
exchange, different seq) are counted while chunk retry duplicates
(same exchange+seq) are collapsed.

All stats tables are dropped, recreated, and backfilled from raw
data on startup. All Java queries updated: countMerge -> uniqMerge,
countIfMerge -> uniqIfMerge.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 23:48:01 +02:00

476 lines
20 KiB
SQL

-- ClickHouse schema initialization (single file, idempotent)
-- All tables use IF NOT EXISTS for safe re-execution on every startup.
-- ── Agent Metrics ───────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS agent_metrics (
tenant_id LowCardinality(String) DEFAULT 'default',
collected_at DateTime64(3),
environment LowCardinality(String) DEFAULT 'default',
instance_id LowCardinality(String),
metric_name LowCardinality(String),
metric_value Float64,
tags Map(String, String) DEFAULT map(),
server_received_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(collected_at))
ORDER BY (tenant_id, collected_at, environment, instance_id, metric_name)
TTL toDateTime(collected_at) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
-- ── Executions ──────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS executions (
tenant_id LowCardinality(String) DEFAULT 'default',
execution_id String,
start_time DateTime64(3),
_version UInt64 DEFAULT 1,
route_id LowCardinality(String),
instance_id LowCardinality(String),
application_id LowCardinality(String),
environment LowCardinality(String) DEFAULT 'default',
status LowCardinality(String),
correlation_id String DEFAULT '',
exchange_id String DEFAULT '',
end_time Nullable(DateTime64(3)),
duration_ms Nullable(Int64),
error_message String DEFAULT '',
error_stacktrace String DEFAULT '',
error_type LowCardinality(String) DEFAULT '',
error_category LowCardinality(String) DEFAULT '',
root_cause_type String DEFAULT '',
root_cause_message String DEFAULT '',
diagram_content_hash String DEFAULT '',
engine_level LowCardinality(String) DEFAULT '',
input_body String DEFAULT '',
output_body String DEFAULT '',
input_headers String DEFAULT '',
output_headers String DEFAULT '',
attributes String DEFAULT '',
trace_id String DEFAULT '',
span_id String DEFAULT '',
has_trace_data Bool DEFAULT false,
is_replay Bool DEFAULT false,
original_exchange_id String DEFAULT '',
replay_exchange_id String DEFAULT '',
_search_text String MATERIALIZED
concat(execution_id, ' ', correlation_id, ' ', exchange_id, ' ', route_id,
' ', error_message, ' ', error_stacktrace, ' ', attributes,
' ', input_body, ' ', output_body, ' ', input_headers,
' ', output_headers, ' ', root_cause_message),
INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_status status TYPE set(10) GRANULARITY 1,
INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4
)
ENGINE = ReplacingMergeTree(_version)
PARTITION BY (tenant_id, toYYYYMM(start_time))
ORDER BY (tenant_id, start_time, environment, application_id, route_id, execution_id)
TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
-- ── Processor Executions ────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS processor_executions (
tenant_id LowCardinality(String) DEFAULT 'default',
execution_id String,
seq UInt32,
parent_seq Nullable(UInt32),
parent_processor_id String DEFAULT '',
processor_id String,
processor_type LowCardinality(String),
start_time DateTime64(3),
route_id LowCardinality(String),
application_id LowCardinality(String),
environment LowCardinality(String) DEFAULT 'default',
iteration Nullable(Int32),
iteration_size Nullable(Int32),
status LowCardinality(String),
end_time Nullable(DateTime64(3)),
duration_ms Nullable(Int64),
error_message String DEFAULT '',
error_stacktrace String DEFAULT '',
error_type LowCardinality(String) DEFAULT '',
error_category LowCardinality(String) DEFAULT '',
root_cause_type String DEFAULT '',
root_cause_message String DEFAULT '',
input_body String DEFAULT '',
output_body String DEFAULT '',
input_headers String DEFAULT '',
output_headers String DEFAULT '',
attributes String DEFAULT '',
resolved_endpoint_uri String DEFAULT '',
circuit_breaker_state LowCardinality(String) DEFAULT '',
fallback_triggered Bool DEFAULT false,
filter_matched Bool DEFAULT false,
duplicate_message Bool DEFAULT false,
_search_text String MATERIALIZED
concat(error_message, ' ', error_stacktrace, ' ', attributes,
' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers),
INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(start_time))
ORDER BY (tenant_id, start_time, environment, application_id, route_id, execution_id, seq)
TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
-- ── Stats: Materialized Views + AggregatingMergeTree ────────────────────
-- stats_1m_all (global)
-- All stats tables use uniq(execution_id) to deduplicate chunk retries
-- DROP + CREATE + backfill rebuilds from raw data on startup
DROP VIEW IF EXISTS stats_1m_all_mv;
DROP TABLE IF EXISTS stats_1m_all;
CREATE TABLE IF NOT EXISTS stats_1m_all (
tenant_id LowCardinality(String),
bucket DateTime,
environment LowCardinality(String) DEFAULT 'default',
total_count AggregateFunction(uniq, String),
failed_count AggregateFunction(uniqIf, String, UInt8),
running_count AggregateFunction(uniqIf, String, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket, environment)
TTL bucket + INTERVAL 365 DAY DELETE;
INSERT INTO stats_1m_all
SELECT
tenant_id,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
uniqIfState(execution_id, status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, bucket, environment;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS
SELECT
tenant_id,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
uniqIfState(execution_id, status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, bucket, environment;
-- stats_1m_app (per-application)
DROP VIEW IF EXISTS stats_1m_app_mv;
DROP TABLE IF EXISTS stats_1m_app;
CREATE TABLE IF NOT EXISTS stats_1m_app (
tenant_id LowCardinality(String),
application_id LowCardinality(String),
bucket DateTime,
environment LowCardinality(String) DEFAULT 'default',
total_count AggregateFunction(uniq, String),
failed_count AggregateFunction(uniqIf, String, UInt8),
running_count AggregateFunction(uniqIf, String, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket, environment, application_id)
TTL bucket + INTERVAL 365 DAY DELETE;
INSERT INTO stats_1m_app
SELECT
tenant_id,
application_id,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
uniqIfState(execution_id, status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_id, bucket, environment;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS
SELECT
tenant_id,
application_id,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
uniqIfState(execution_id, status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_id, bucket, environment;
-- stats_1m_route (per-route)
DROP VIEW IF EXISTS stats_1m_route_mv;
DROP TABLE IF EXISTS stats_1m_route;
CREATE TABLE IF NOT EXISTS stats_1m_route (
tenant_id LowCardinality(String),
application_id LowCardinality(String),
route_id LowCardinality(String),
bucket DateTime,
environment LowCardinality(String) DEFAULT 'default',
total_count AggregateFunction(uniq, String),
failed_count AggregateFunction(uniqIf, String, UInt8),
running_count AggregateFunction(uniqIf, String, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket, environment, application_id, route_id)
TTL bucket + INTERVAL 365 DAY DELETE;
INSERT INTO stats_1m_route
SELECT
tenant_id,
application_id,
route_id,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
uniqIfState(execution_id, status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_id, route_id, bucket, environment;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS
SELECT
tenant_id,
application_id,
route_id,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(execution_id) AS total_count,
uniqIfState(execution_id, status = 'FAILED') AS failed_count,
uniqIfState(execution_id, status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_id, route_id, bucket, environment;
-- stats_1m_processor (per-processor-type)
-- Migration: replaced count() with uniq(execution_id) to deduplicate
-- DROP + CREATE + backfill rebuilds from raw data on startup
DROP VIEW IF EXISTS stats_1m_processor_mv;
DROP TABLE IF EXISTS stats_1m_processor;
CREATE TABLE IF NOT EXISTS stats_1m_processor (
tenant_id LowCardinality(String),
application_id LowCardinality(String),
processor_type LowCardinality(String),
bucket DateTime,
environment LowCardinality(String) DEFAULT 'default',
total_count AggregateFunction(uniq, String),
failed_count AggregateFunction(uniqIf, String, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket, environment, application_id, processor_type)
TTL bucket + INTERVAL 365 DAY DELETE;
INSERT INTO stats_1m_processor
SELECT
tenant_id,
application_id,
processor_type,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(concat(execution_id, toString(seq))) AS total_count,
uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_id, processor_type, bucket, environment;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS
SELECT
tenant_id,
application_id,
processor_type,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(concat(execution_id, toString(seq))) AS total_count,
uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_id, processor_type, bucket, environment;
-- stats_1m_processor_detail (per-processor-id)
DROP VIEW IF EXISTS stats_1m_processor_detail_mv;
DROP TABLE IF EXISTS stats_1m_processor_detail;
CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
tenant_id LowCardinality(String),
application_id LowCardinality(String),
route_id LowCardinality(String),
processor_id String,
processor_type LowCardinality(String),
bucket DateTime,
environment LowCardinality(String) DEFAULT 'default',
total_count AggregateFunction(uniq, String),
failed_count AggregateFunction(uniqIf, String, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket, environment, application_id, route_id, processor_id, processor_type)
TTL bucket + INTERVAL 365 DAY DELETE;
INSERT INTO stats_1m_processor_detail
SELECT
tenant_id,
application_id,
route_id,
processor_id,
processor_type,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(concat(execution_id, toString(seq))) AS total_count,
uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket, environment;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS
SELECT
tenant_id,
application_id,
route_id,
processor_id,
processor_type,
toStartOfMinute(start_time) AS bucket,
environment,
uniqState(concat(execution_id, toString(seq))) AS total_count,
uniqIfState(concat(execution_id, toString(seq)), status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket, environment;
-- ── Route Diagrams ──────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS route_diagrams (
tenant_id LowCardinality(String) DEFAULT 'default',
content_hash String,
route_id LowCardinality(String),
instance_id LowCardinality(String),
application_id LowCardinality(String),
environment LowCardinality(String) DEFAULT 'default',
definition String,
created_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplacingMergeTree(created_at)
ORDER BY (tenant_id, environment, route_id, instance_id, content_hash)
SETTINGS index_granularity = 8192;
-- ── Agent Events ────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS agent_events (
tenant_id LowCardinality(String) DEFAULT 'default',
timestamp DateTime64(3) DEFAULT now64(3),
environment LowCardinality(String) DEFAULT 'default',
instance_id LowCardinality(String),
application_id LowCardinality(String),
event_type LowCardinality(String),
detail String DEFAULT ''
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, timestamp, environment, instance_id)
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE;
-- ── Logs ────────────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS logs (
tenant_id LowCardinality(String) DEFAULT 'default',
timestamp DateTime64(3),
environment LowCardinality(String) DEFAULT 'default',
application LowCardinality(String),
instance_id LowCardinality(String),
level LowCardinality(String),
logger_name LowCardinality(String) DEFAULT '',
message String,
thread_name LowCardinality(String) DEFAULT '',
stack_trace String DEFAULT '',
exchange_id String DEFAULT '',
mdc Map(String, String) DEFAULT map(),
INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_level level TYPE set(10) GRANULARITY 1
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, timestamp, environment, application, instance_id)
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
-- Add source column for log forwarding v2 (app vs agent logs)
ALTER TABLE logs ADD COLUMN IF NOT EXISTS source LowCardinality(String) DEFAULT 'app';
-- ── Usage Events ────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS usage_events (
tenant_id LowCardinality(String) DEFAULT 'default',
timestamp DateTime64(3) DEFAULT now64(3),
environment LowCardinality(String) DEFAULT 'default',
username LowCardinality(String),
method LowCardinality(String),
path String,
normalized LowCardinality(String),
status_code UInt16,
duration_ms UInt32,
query_params String DEFAULT ''
)
ENGINE = MergeTree()
ORDER BY (tenant_id, timestamp, environment, username, normalized)
TTL toDateTime(timestamp) + INTERVAL 90 DAY;