Server fully owns ClickHouse schema — create database + tables on startup
ClickHouseConfig.ensureDatabaseExists() connects without the database path to run CREATE DATABASE IF NOT EXISTS before the main DataSource is used. Removes the ConfigMap-based init scripts from the K8s manifest — the server is now the single owner of all ClickHouse schema management. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,6 +2,7 @@ package com.cameleer3.server.app.config;
|
|||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.core.io.ClassPathResource;
|
import org.springframework.core.io.ClassPathResource;
|
||||||
@@ -10,6 +11,9 @@ import org.springframework.jdbc.core.JdbcTemplate;
|
|||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.DriverManager;
|
||||||
|
import java.sql.Statement;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ClickHouse configuration.
|
* ClickHouse configuration.
|
||||||
@@ -25,6 +29,15 @@ public class ClickHouseConfig {
|
|||||||
|
|
||||||
private final DataSource dataSource;
|
private final DataSource dataSource;
|
||||||
|
|
||||||
|
@Value("${spring.datasource.url}")
|
||||||
|
private String datasourceUrl;
|
||||||
|
|
||||||
|
@Value("${spring.datasource.username}")
|
||||||
|
private String datasourceUsername;
|
||||||
|
|
||||||
|
@Value("${spring.datasource.password}")
|
||||||
|
private String datasourcePassword;
|
||||||
|
|
||||||
public ClickHouseConfig(DataSource dataSource) {
|
public ClickHouseConfig(DataSource dataSource) {
|
||||||
this.dataSource = dataSource;
|
this.dataSource = dataSource;
|
||||||
}
|
}
|
||||||
@@ -36,6 +49,7 @@ public class ClickHouseConfig {
|
|||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
void initSchema() {
|
void initSchema() {
|
||||||
|
ensureDatabaseExists();
|
||||||
var jdbc = new JdbcTemplate(dataSource);
|
var jdbc = new JdbcTemplate(dataSource);
|
||||||
for (String schemaFile : SCHEMA_FILES) {
|
for (String schemaFile : SCHEMA_FILES) {
|
||||||
try {
|
try {
|
||||||
@@ -53,4 +67,33 @@ public class ClickHouseConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the ClickHouse database if it doesn't exist.
|
||||||
|
* Uses a separate connection without the database path, since the main
|
||||||
|
* DataSource connection fails if the database doesn't exist yet.
|
||||||
|
*/
|
||||||
|
private void ensureDatabaseExists() {
|
||||||
|
// Extract database name from URL: jdbc:ch://host:port/dbname -> dbname
|
||||||
|
// Strip the database path to connect at root level
|
||||||
|
String rootUrl = datasourceUrl.replaceFirst("/[^/?]+($|\\?)", "$1");
|
||||||
|
String dbName = extractDatabaseName(datasourceUrl);
|
||||||
|
|
||||||
|
try (Connection conn = DriverManager.getConnection(rootUrl, datasourceUsername, datasourcePassword);
|
||||||
|
Statement stmt = conn.createStatement()) {
|
||||||
|
stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName);
|
||||||
|
log.info("Ensured database '{}' exists", dbName);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to ensure database exists", e);
|
||||||
|
throw new RuntimeException("Database creation failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static String extractDatabaseName(String jdbcUrl) {
|
||||||
|
// jdbc:ch://host:port/dbname or jdbc:ch://host:port/dbname?params
|
||||||
|
String afterScheme = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3);
|
||||||
|
String afterHost = afterScheme.substring(afterScheme.indexOf('/') + 1);
|
||||||
|
int paramIdx = afterHost.indexOf('?');
|
||||||
|
return paramIdx >= 0 ? afterHost.substring(0, paramIdx) : afterHost;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,86 +1,3 @@
|
|||||||
apiVersion: v1
|
|
||||||
kind: ConfigMap
|
|
||||||
metadata:
|
|
||||||
name: clickhouse-init-schema
|
|
||||||
namespace: cameleer
|
|
||||||
data:
|
|
||||||
01-schema.sql: |
|
|
||||||
CREATE DATABASE IF NOT EXISTS cameleer3;
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS cameleer3.route_executions (
|
|
||||||
execution_id String,
|
|
||||||
route_id LowCardinality(String),
|
|
||||||
agent_id LowCardinality(String),
|
|
||||||
status LowCardinality(String),
|
|
||||||
start_time DateTime64(3, 'UTC'),
|
|
||||||
end_time Nullable(DateTime64(3, 'UTC')),
|
|
||||||
duration_ms UInt64,
|
|
||||||
correlation_id String,
|
|
||||||
exchange_id String,
|
|
||||||
error_message String DEFAULT '',
|
|
||||||
error_stacktrace String DEFAULT '',
|
|
||||||
processor_ids Array(String),
|
|
||||||
processor_types Array(LowCardinality(String)),
|
|
||||||
processor_starts Array(DateTime64(3, 'UTC')),
|
|
||||||
processor_ends Array(DateTime64(3, 'UTC')),
|
|
||||||
processor_durations Array(UInt64),
|
|
||||||
processor_statuses Array(LowCardinality(String)),
|
|
||||||
server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'),
|
|
||||||
INDEX idx_correlation correlation_id TYPE bloom_filter GRANULARITY 4,
|
|
||||||
INDEX idx_error error_message TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4
|
|
||||||
)
|
|
||||||
ENGINE = MergeTree()
|
|
||||||
PARTITION BY toYYYYMMDD(start_time)
|
|
||||||
ORDER BY (agent_id, status, start_time, execution_id)
|
|
||||||
TTL toDateTime(start_time) + toIntervalDay(30)
|
|
||||||
SETTINGS ttl_only_drop_parts = 1;
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS cameleer3.route_diagrams (
|
|
||||||
content_hash String,
|
|
||||||
route_id LowCardinality(String),
|
|
||||||
agent_id LowCardinality(String),
|
|
||||||
definition String,
|
|
||||||
created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
|
|
||||||
)
|
|
||||||
ENGINE = ReplacingMergeTree(created_at)
|
|
||||||
ORDER BY (content_hash);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS cameleer3.agent_metrics (
|
|
||||||
agent_id LowCardinality(String),
|
|
||||||
collected_at DateTime64(3, 'UTC'),
|
|
||||||
metric_name LowCardinality(String),
|
|
||||||
metric_value Float64,
|
|
||||||
tags Map(String, String),
|
|
||||||
server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
|
|
||||||
)
|
|
||||||
ENGINE = MergeTree()
|
|
||||||
PARTITION BY toYYYYMMDD(collected_at)
|
|
||||||
ORDER BY (agent_id, metric_name, collected_at)
|
|
||||||
TTL toDateTime(collected_at) + toIntervalDay(30)
|
|
||||||
SETTINGS ttl_only_drop_parts = 1;
|
|
||||||
|
|
||||||
02-search-columns.sql: |
|
|
||||||
ALTER TABLE cameleer3.route_executions
|
|
||||||
ADD COLUMN IF NOT EXISTS exchange_bodies String DEFAULT '',
|
|
||||||
ADD COLUMN IF NOT EXISTS exchange_headers String DEFAULT '',
|
|
||||||
ADD COLUMN IF NOT EXISTS processor_depths Array(UInt16) DEFAULT [],
|
|
||||||
ADD COLUMN IF NOT EXISTS processor_parent_indexes Array(Int32) DEFAULT [],
|
|
||||||
ADD COLUMN IF NOT EXISTS processor_error_messages Array(String) DEFAULT [],
|
|
||||||
ADD COLUMN IF NOT EXISTS processor_error_stacktraces Array(String) DEFAULT [],
|
|
||||||
ADD COLUMN IF NOT EXISTS processor_input_bodies Array(String) DEFAULT [],
|
|
||||||
ADD COLUMN IF NOT EXISTS processor_output_bodies Array(String) DEFAULT [],
|
|
||||||
ADD COLUMN IF NOT EXISTS processor_input_headers Array(String) DEFAULT [],
|
|
||||||
ADD COLUMN IF NOT EXISTS processor_output_headers Array(String) DEFAULT [],
|
|
||||||
ADD COLUMN IF NOT EXISTS processor_diagram_node_ids Array(String) DEFAULT [],
|
|
||||||
ADD COLUMN IF NOT EXISTS diagram_content_hash String DEFAULT '';
|
|
||||||
|
|
||||||
ALTER TABLE cameleer3.route_executions
|
|
||||||
ADD INDEX IF NOT EXISTS idx_exchange_bodies exchange_bodies TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4,
|
|
||||||
ADD INDEX IF NOT EXISTS idx_exchange_headers exchange_headers TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;
|
|
||||||
|
|
||||||
ALTER TABLE cameleer3.route_executions
|
|
||||||
ADD INDEX IF NOT EXISTS idx_error_stacktrace error_stacktrace TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;
|
|
||||||
---
|
|
||||||
apiVersion: apps/v1
|
apiVersion: apps/v1
|
||||||
kind: StatefulSet
|
kind: StatefulSet
|
||||||
metadata:
|
metadata:
|
||||||
@@ -115,9 +32,6 @@ spec:
|
|||||||
volumeMounts:
|
volumeMounts:
|
||||||
- name: data
|
- name: data
|
||||||
mountPath: /var/lib/clickhouse
|
mountPath: /var/lib/clickhouse
|
||||||
- name: init-schema
|
|
||||||
mountPath: /docker-entrypoint-initdb.d
|
|
||||||
readOnly: true
|
|
||||||
resources:
|
resources:
|
||||||
requests:
|
requests:
|
||||||
memory: "512Mi"
|
memory: "512Mi"
|
||||||
@@ -125,10 +39,6 @@ spec:
|
|||||||
limits:
|
limits:
|
||||||
memory: "1Gi"
|
memory: "1Gi"
|
||||||
cpu: "1000m"
|
cpu: "1000m"
|
||||||
volumes:
|
|
||||||
- name: init-schema
|
|
||||||
configMap:
|
|
||||||
name: clickhouse-init-schema
|
|
||||||
volumeClaimTemplates:
|
volumeClaimTemplates:
|
||||||
- metadata:
|
- metadata:
|
||||||
name: data
|
name: data
|
||||||
|
|||||||
Reference in New Issue
Block a user