Move schema initialization from ClickHouse init scripts to server startup
All checks were successful
CI / build (push) Successful in 49s
CI / docker (push) Successful in 43s
CI / deploy (push) Successful in 15s

Server now applies schema via @PostConstruct using classpath SQL files.
All statements use IF NOT EXISTS/IF NOT EXISTS so it's idempotent and
safe to run on every startup. Removes ConfigMap and init script mount
from K8s manifest since ClickHouse no longer needs to manage the schema.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-12 19:59:33 +01:00
parent 129b97183a
commit 9dffa9ea81
4 changed files with 118 additions and 89 deletions

View File

@@ -1,22 +1,56 @@
package com.cameleer3.server.app.config; package com.cameleer3.server.app.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import jakarta.annotation.PostConstruct;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.nio.charset.StandardCharsets;
/** /**
* ClickHouse configuration. * ClickHouse configuration.
* <p> * <p>
* Spring Boot auto-configures the DataSource from {@code spring.datasource.*} properties. * Spring Boot auto-configures the DataSource from {@code spring.datasource.*} properties.
* This class exposes a JdbcTemplate bean for repository implementations. * This class exposes a JdbcTemplate bean and initializes the schema on startup.
*/ */
@Configuration @Configuration
public class ClickHouseConfig { public class ClickHouseConfig {
private static final Logger log = LoggerFactory.getLogger(ClickHouseConfig.class);
private static final String[] SCHEMA_FILES = {"clickhouse/01-schema.sql", "clickhouse/02-search-columns.sql"};
private final DataSource dataSource;
public ClickHouseConfig(DataSource dataSource) {
this.dataSource = dataSource;
}
@Bean @Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) { public JdbcTemplate jdbcTemplate() {
return new JdbcTemplate(dataSource); return new JdbcTemplate(dataSource);
} }
@PostConstruct
void initSchema() {
var jdbc = new JdbcTemplate(dataSource);
for (String schemaFile : SCHEMA_FILES) {
try {
String sql = new ClassPathResource(schemaFile).getContentAsString(StandardCharsets.UTF_8);
for (String statement : sql.split(";")) {
String trimmed = statement.trim();
if (!trimmed.isEmpty() && !trimmed.startsWith("--")) {
jdbc.execute(trimmed);
}
}
log.info("Applied schema: {}", schemaFile);
} catch (Exception e) {
log.error("Failed to apply schema: {}", schemaFile, e);
throw new RuntimeException("Schema initialization failed: " + schemaFile, e);
}
}
}
} }

View File

@@ -0,0 +1,57 @@
-- Cameleer3 ClickHouse Schema
-- Tables for route executions, route diagrams, and agent metrics.
CREATE TABLE IF NOT EXISTS route_executions (
execution_id String,
route_id LowCardinality(String),
agent_id LowCardinality(String),
status LowCardinality(String),
start_time DateTime64(3, 'UTC'),
end_time Nullable(DateTime64(3, 'UTC')),
duration_ms UInt64,
correlation_id String,
exchange_id String,
error_message String DEFAULT '',
error_stacktrace String DEFAULT '',
-- Nested processor executions stored as parallel arrays
processor_ids Array(String),
processor_types Array(LowCardinality(String)),
processor_starts Array(DateTime64(3, 'UTC')),
processor_ends Array(DateTime64(3, 'UTC')),
processor_durations Array(UInt64),
processor_statuses Array(LowCardinality(String)),
-- Metadata
server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'),
-- Skip indexes
INDEX idx_correlation correlation_id TYPE bloom_filter GRANULARITY 4,
INDEX idx_error error_message TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4
)
ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(start_time)
ORDER BY (agent_id, status, start_time, execution_id)
TTL toDateTime(start_time) + toIntervalDay(30)
SETTINGS ttl_only_drop_parts = 1;
CREATE TABLE IF NOT EXISTS route_diagrams (
content_hash String,
route_id LowCardinality(String),
agent_id LowCardinality(String),
definition String,
created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
)
ENGINE = ReplacingMergeTree(created_at)
ORDER BY (content_hash);
CREATE TABLE IF NOT EXISTS agent_metrics (
agent_id LowCardinality(String),
collected_at DateTime64(3, 'UTC'),
metric_name LowCardinality(String),
metric_value Float64,
tags Map(String, String),
server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
)
ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(collected_at)
ORDER BY (agent_id, metric_name, collected_at)
TTL toDateTime(collected_at) + toIntervalDay(30)
SETTINGS ttl_only_drop_parts = 1;

View File

@@ -0,0 +1,25 @@
-- Phase 2: Schema extension for search, detail, and diagram linking columns.
-- Adds exchange snapshot data, processor tree metadata, and diagram content hash.
ALTER TABLE route_executions
ADD COLUMN IF NOT EXISTS exchange_bodies String DEFAULT '',
ADD COLUMN IF NOT EXISTS exchange_headers String DEFAULT '',
ADD COLUMN IF NOT EXISTS processor_depths Array(UInt16) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_parent_indexes Array(Int32) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_error_messages Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_error_stacktraces Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_input_bodies Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_output_bodies Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_input_headers Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_output_headers Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_diagram_node_ids Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS diagram_content_hash String DEFAULT '';
-- Skip indexes for full-text search on new text columns
ALTER TABLE route_executions
ADD INDEX IF NOT EXISTS idx_exchange_bodies exchange_bodies TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4,
ADD INDEX IF NOT EXISTS idx_exchange_headers exchange_headers TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;
-- Skip index on error_stacktrace (not indexed in 01-schema.sql, needed for SRCH-05)
ALTER TABLE route_executions
ADD INDEX IF NOT EXISTS idx_error_stacktrace error_stacktrace TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;

View File

@@ -1,84 +1,3 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: clickhouse-init
namespace: cameleer
data:
01-schema.sql: |
CREATE TABLE IF NOT EXISTS cameleer3.route_executions (
execution_id String,
route_id LowCardinality(String),
agent_id LowCardinality(String),
status LowCardinality(String),
start_time DateTime64(3, 'UTC'),
end_time Nullable(DateTime64(3, 'UTC')),
duration_ms UInt64,
correlation_id String,
exchange_id String,
error_message String DEFAULT '',
error_stacktrace String DEFAULT '',
processor_ids Array(String),
processor_types Array(LowCardinality(String)),
processor_starts Array(DateTime64(3, 'UTC')),
processor_ends Array(DateTime64(3, 'UTC')),
processor_durations Array(UInt64),
processor_statuses Array(LowCardinality(String)),
server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'),
INDEX idx_correlation correlation_id TYPE bloom_filter GRANULARITY 4,
INDEX idx_error error_message TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4
)
ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(start_time)
ORDER BY (agent_id, status, start_time, execution_id)
TTL toDateTime(start_time) + toIntervalDay(30)
SETTINGS ttl_only_drop_parts = 1;
CREATE TABLE IF NOT EXISTS cameleer3.route_diagrams (
content_hash String,
route_id LowCardinality(String),
agent_id LowCardinality(String),
definition String,
created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
)
ENGINE = ReplacingMergeTree(created_at)
ORDER BY (content_hash);
CREATE TABLE IF NOT EXISTS cameleer3.agent_metrics (
agent_id LowCardinality(String),
collected_at DateTime64(3, 'UTC'),
metric_name LowCardinality(String),
metric_value Float64,
tags Map(String, String),
server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
)
ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(collected_at)
ORDER BY (agent_id, metric_name, collected_at)
TTL toDateTime(collected_at) + toIntervalDay(30)
SETTINGS ttl_only_drop_parts = 1;
02-search-columns.sql: |
ALTER TABLE cameleer3.route_executions
ADD COLUMN IF NOT EXISTS exchange_bodies String DEFAULT '',
ADD COLUMN IF NOT EXISTS exchange_headers String DEFAULT '',
ADD COLUMN IF NOT EXISTS processor_depths Array(UInt16) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_parent_indexes Array(Int32) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_error_messages Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_error_stacktraces Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_input_bodies Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_output_bodies Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_input_headers Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_output_headers Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_diagram_node_ids Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS diagram_content_hash String DEFAULT '';
ALTER TABLE cameleer3.route_executions
ADD INDEX IF NOT EXISTS idx_exchange_bodies exchange_bodies TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4,
ADD INDEX IF NOT EXISTS idx_exchange_headers exchange_headers TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;
ALTER TABLE cameleer3.route_executions
ADD INDEX IF NOT EXISTS idx_error_stacktrace error_stacktrace TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: StatefulSet kind: StatefulSet
metadata: metadata:
@@ -113,8 +32,6 @@ spec:
volumeMounts: volumeMounts:
- name: data - name: data
mountPath: /var/lib/clickhouse mountPath: /var/lib/clickhouse
- name: init-scripts
mountPath: /docker-entrypoint-initdb.d
resources: resources:
requests: requests:
memory: "512Mi" memory: "512Mi"
@@ -122,10 +39,6 @@ spec:
limits: limits:
memory: "1Gi" memory: "1Gi"
cpu: "1000m" cpu: "1000m"
volumes:
- name: init-scripts
configMap:
name: clickhouse-init
volumeClaimTemplates: volumeClaimTemplates:
- metadata: - metadata:
name: data name: data