diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java index 46406d95..a8881789 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java @@ -1,22 +1,56 @@ package com.cameleer3.server.app.config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; import org.springframework.jdbc.core.JdbcTemplate; +import jakarta.annotation.PostConstruct; import javax.sql.DataSource; +import java.nio.charset.StandardCharsets; /** * ClickHouse configuration. *

* Spring Boot auto-configures the DataSource from {@code spring.datasource.*} properties. - * This class exposes a JdbcTemplate bean for repository implementations. + * This class exposes a JdbcTemplate bean and initializes the schema on startup. */ @Configuration public class ClickHouseConfig { + private static final Logger log = LoggerFactory.getLogger(ClickHouseConfig.class); + private static final String[] SCHEMA_FILES = {"clickhouse/01-schema.sql", "clickhouse/02-search-columns.sql"}; + + private final DataSource dataSource; + + public ClickHouseConfig(DataSource dataSource) { + this.dataSource = dataSource; + } + @Bean - public JdbcTemplate jdbcTemplate(DataSource dataSource) { + public JdbcTemplate jdbcTemplate() { return new JdbcTemplate(dataSource); } + + @PostConstruct + void initSchema() { + var jdbc = new JdbcTemplate(dataSource); + for (String schemaFile : SCHEMA_FILES) { + try { + String sql = new ClassPathResource(schemaFile).getContentAsString(StandardCharsets.UTF_8); + for (String statement : sql.split(";")) { + String trimmed = statement.trim(); + if (!trimmed.isEmpty() && !trimmed.startsWith("--")) { + jdbc.execute(trimmed); + } + } + log.info("Applied schema: {}", schemaFile); + } catch (Exception e) { + log.error("Failed to apply schema: {}", schemaFile, e); + throw new RuntimeException("Schema initialization failed: " + schemaFile, e); + } + } + } } diff --git a/cameleer3-server-app/src/main/resources/clickhouse/01-schema.sql b/cameleer3-server-app/src/main/resources/clickhouse/01-schema.sql new file mode 100644 index 00000000..ab56da70 --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/01-schema.sql @@ -0,0 +1,57 @@ +-- Cameleer3 ClickHouse Schema +-- Tables for route executions, route diagrams, and agent metrics. + +CREATE TABLE IF NOT EXISTS route_executions ( + execution_id String, + route_id LowCardinality(String), + agent_id LowCardinality(String), + status LowCardinality(String), + start_time DateTime64(3, 'UTC'), + end_time Nullable(DateTime64(3, 'UTC')), + duration_ms UInt64, + correlation_id String, + exchange_id String, + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + -- Nested processor executions stored as parallel arrays + processor_ids Array(String), + processor_types Array(LowCardinality(String)), + processor_starts Array(DateTime64(3, 'UTC')), + processor_ends Array(DateTime64(3, 'UTC')), + processor_durations Array(UInt64), + processor_statuses Array(LowCardinality(String)), + -- Metadata + server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'), + -- Skip indexes + INDEX idx_correlation correlation_id TYPE bloom_filter GRANULARITY 4, + INDEX idx_error error_message TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMMDD(start_time) +ORDER BY (agent_id, status, start_time, execution_id) +TTL toDateTime(start_time) + toIntervalDay(30) +SETTINGS ttl_only_drop_parts = 1; + +CREATE TABLE IF NOT EXISTS route_diagrams ( + content_hash String, + route_id LowCardinality(String), + agent_id LowCardinality(String), + definition String, + created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC') +) +ENGINE = ReplacingMergeTree(created_at) +ORDER BY (content_hash); + +CREATE TABLE IF NOT EXISTS agent_metrics ( + agent_id LowCardinality(String), + collected_at DateTime64(3, 'UTC'), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String), + server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC') +) +ENGINE = MergeTree() +PARTITION BY toYYYYMMDD(collected_at) +ORDER BY (agent_id, metric_name, collected_at) +TTL toDateTime(collected_at) + toIntervalDay(30) +SETTINGS ttl_only_drop_parts = 1; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/02-search-columns.sql b/cameleer3-server-app/src/main/resources/clickhouse/02-search-columns.sql new file mode 100644 index 00000000..2b11b435 --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/02-search-columns.sql @@ -0,0 +1,25 @@ +-- Phase 2: Schema extension for search, detail, and diagram linking columns. +-- Adds exchange snapshot data, processor tree metadata, and diagram content hash. + +ALTER TABLE route_executions + ADD COLUMN IF NOT EXISTS exchange_bodies String DEFAULT '', + ADD COLUMN IF NOT EXISTS exchange_headers String DEFAULT '', + ADD COLUMN IF NOT EXISTS processor_depths Array(UInt16) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_parent_indexes Array(Int32) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_error_messages Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_error_stacktraces Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_input_bodies Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_output_bodies Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_input_headers Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_output_headers Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_diagram_node_ids Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS diagram_content_hash String DEFAULT ''; + +-- Skip indexes for full-text search on new text columns +ALTER TABLE route_executions + ADD INDEX IF NOT EXISTS idx_exchange_bodies exchange_bodies TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4, + ADD INDEX IF NOT EXISTS idx_exchange_headers exchange_headers TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4; + +-- Skip index on error_stacktrace (not indexed in 01-schema.sql, needed for SRCH-05) +ALTER TABLE route_executions + ADD INDEX IF NOT EXISTS idx_error_stacktrace error_stacktrace TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4; diff --git a/deploy/clickhouse.yaml b/deploy/clickhouse.yaml index 1a889b60..771bde62 100644 --- a/deploy/clickhouse.yaml +++ b/deploy/clickhouse.yaml @@ -1,84 +1,3 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: clickhouse-init - namespace: cameleer -data: - 01-schema.sql: | - CREATE TABLE IF NOT EXISTS cameleer3.route_executions ( - execution_id String, - route_id LowCardinality(String), - agent_id LowCardinality(String), - status LowCardinality(String), - start_time DateTime64(3, 'UTC'), - end_time Nullable(DateTime64(3, 'UTC')), - duration_ms UInt64, - correlation_id String, - exchange_id String, - error_message String DEFAULT '', - error_stacktrace String DEFAULT '', - processor_ids Array(String), - processor_types Array(LowCardinality(String)), - processor_starts Array(DateTime64(3, 'UTC')), - processor_ends Array(DateTime64(3, 'UTC')), - processor_durations Array(UInt64), - processor_statuses Array(LowCardinality(String)), - server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'), - INDEX idx_correlation correlation_id TYPE bloom_filter GRANULARITY 4, - INDEX idx_error error_message TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4 - ) - ENGINE = MergeTree() - PARTITION BY toYYYYMMDD(start_time) - ORDER BY (agent_id, status, start_time, execution_id) - TTL toDateTime(start_time) + toIntervalDay(30) - SETTINGS ttl_only_drop_parts = 1; - - CREATE TABLE IF NOT EXISTS cameleer3.route_diagrams ( - content_hash String, - route_id LowCardinality(String), - agent_id LowCardinality(String), - definition String, - created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC') - ) - ENGINE = ReplacingMergeTree(created_at) - ORDER BY (content_hash); - - CREATE TABLE IF NOT EXISTS cameleer3.agent_metrics ( - agent_id LowCardinality(String), - collected_at DateTime64(3, 'UTC'), - metric_name LowCardinality(String), - metric_value Float64, - tags Map(String, String), - server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC') - ) - ENGINE = MergeTree() - PARTITION BY toYYYYMMDD(collected_at) - ORDER BY (agent_id, metric_name, collected_at) - TTL toDateTime(collected_at) + toIntervalDay(30) - SETTINGS ttl_only_drop_parts = 1; - - 02-search-columns.sql: | - ALTER TABLE cameleer3.route_executions - ADD COLUMN IF NOT EXISTS exchange_bodies String DEFAULT '', - ADD COLUMN IF NOT EXISTS exchange_headers String DEFAULT '', - ADD COLUMN IF NOT EXISTS processor_depths Array(UInt16) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_parent_indexes Array(Int32) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_error_messages Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_error_stacktraces Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_input_bodies Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_output_bodies Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_input_headers Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_output_headers Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_diagram_node_ids Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS diagram_content_hash String DEFAULT ''; - - ALTER TABLE cameleer3.route_executions - ADD INDEX IF NOT EXISTS idx_exchange_bodies exchange_bodies TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4, - ADD INDEX IF NOT EXISTS idx_exchange_headers exchange_headers TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4; - - ALTER TABLE cameleer3.route_executions - ADD INDEX IF NOT EXISTS idx_error_stacktrace error_stacktrace TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4; ---- apiVersion: apps/v1 kind: StatefulSet metadata: @@ -113,8 +32,6 @@ spec: volumeMounts: - name: data mountPath: /var/lib/clickhouse - - name: init-scripts - mountPath: /docker-entrypoint-initdb.d resources: requests: memory: "512Mi" @@ -122,10 +39,6 @@ spec: limits: memory: "1Gi" cpu: "1000m" - volumes: - - name: init-scripts - configMap: - name: clickhouse-init volumeClaimTemplates: - metadata: name: data