From d4327af6a42cbbecc8929106864947682ab450ec Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Fri, 3 Apr 2026 15:24:53 +0200 Subject: [PATCH] refactor: consolidate ClickHouse schema into single init.sql, cache diagrams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Merge all V1-V11 migration scripts into one idempotent init.sql - Simplify ClickHouseSchemaInitializer to load single file - Replace route_diagrams projection with in-memory caches: hashCache (routeId+instanceId → contentHash) warm-loaded on startup, graphCache (contentHash → RouteGraph) lazy-populated on access - Eliminates 9M+ row scans on diagram lookups Co-Authored-By: Claude Opus 4.6 (1M context) --- CLAUDE.md | 3 +- .../config/ClickHouseSchemaInitializer.java | 45 +-- .../app/storage/ClickHouseDiagramStore.java | 68 +++- .../clickhouse/V10__usage_events.sql | 13 - .../V11__route_diagrams_projection.sql | 11 - .../clickhouse/V1__agent_metrics.sql | 14 - .../resources/clickhouse/V2__executions.sql | 49 --- .../clickhouse/V3__processor_executions.sql | 45 --- .../clickhouse/V4__stats_tables_and_mvs.sql | 167 -------- .../clickhouse/V5__replay_fields.sql | 2 - .../clickhouse/V6__route_diagrams.sql | 12 - .../resources/clickhouse/V7__agent_events.sql | 12 - .../main/resources/clickhouse/V8__logs.sql | 22 -- .../src/main/resources/clickhouse/init.sql | 358 ++++++++++++++++++ 14 files changed, 434 insertions(+), 387 deletions(-) delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V10__usage_events.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V11__route_diagrams_projection.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V5__replay_fields.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V6__route_diagrams.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V7__agent_events.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V8__logs.sql create mode 100644 cameleer3-server-app/src/main/resources/clickhouse/init.sql diff --git a/CLAUDE.md b/CLAUDE.md index 68086069..d890a7f4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -38,7 +38,8 @@ java -jar cameleer3-server-app/target/cameleer3-server-app-1.0-SNAPSHOT.jar - Jackson `JavaTimeModule` for `Instant` deserialization - Communication: receives HTTP POST data from agents (executions, diagrams, metrics, logs), serves SSE event streams for config push/commands (config-update, deep-trace, replay, route-control) - Maintains agent instance registry (in-memory) with states: LIVE → STALE → DEAD. Auto-heals from JWT claims on heartbeat/SSE after server restart. Route catalog falls back to ClickHouse stats for route discovery when registry has incomplete data. -- Storage: PostgreSQL for RBAC, config, and audit; ClickHouse for all observability data (executions, search, logs, metrics, stats, diagrams) +- Storage: PostgreSQL for RBAC, config, and audit; ClickHouse for all observability data (executions, search, logs, metrics, stats, diagrams). ClickHouse schema migrations in `clickhouse/*.sql`, run idempotently on startup by `ClickHouseSchemaInitializer`. Use `IF NOT EXISTS` for CREATE and ADD PROJECTION. +- Logging: ClickHouse JDBC set to INFO (`com.clickhouse`), HTTP client to WARN (`org.apache.hc.client5`) in application.yml - Security: JWT auth with RBAC (AGENT/VIEWER/OPERATOR/ADMIN roles), Ed25519 config signing, bootstrap token for registration - OIDC: Optional external identity provider support (token exchange pattern). Configured via admin API, stored in database (`server_config` table) - User persistence: PostgreSQL `users` table, admin CRUD at `/api/v1/admin/users` diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java index f8783ee2..20174316 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java @@ -11,10 +11,7 @@ import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; -import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Comparator; @Component @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") @@ -33,38 +30,24 @@ public class ClickHouseSchemaInitializer { public void initializeSchema() { try { PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); - Resource[] scripts = resolver.getResources("classpath:clickhouse/*.sql"); + Resource script = resolver.getResource("classpath:clickhouse/init.sql"); - // Sort by numeric version prefix (V1, V2, ..., V10, V11) — not alphabetically - Arrays.sort(scripts, Comparator.comparingInt(r -> { - String name = r.getFilename(); - if (name != null && name.startsWith("V")) { - int end = name.indexOf('_'); - if (end > 1) { - try { return Integer.parseInt(name.substring(1, end)); } catch (NumberFormatException ignored) {} - } - } - return Integer.MAX_VALUE; - })); - - for (Resource script : scripts) { - String sql = script.getContentAsString(StandardCharsets.UTF_8); - log.info("Executing ClickHouse schema script: {}", script.getFilename()); - for (String statement : sql.split(";")) { - String trimmed = statement.trim(); - // Skip empty segments and comment-only segments - String withoutComments = trimmed.lines() - .filter(line -> !line.stripLeading().startsWith("--")) - .map(String::trim) - .filter(line -> !line.isEmpty()) - .reduce("", (a, b) -> a + b); - if (!withoutComments.isEmpty()) { - clickHouseJdbc.execute(trimmed); - } + String sql = script.getContentAsString(StandardCharsets.UTF_8); + log.info("Executing ClickHouse schema: {}", script.getFilename()); + for (String statement : sql.split(";")) { + String trimmed = statement.trim(); + // Skip empty segments and comment-only segments + String withoutComments = trimmed.lines() + .filter(line -> !line.stripLeading().startsWith("--")) + .map(String::trim) + .filter(line -> !line.isEmpty()) + .reduce("", (a, b) -> a + b); + if (!withoutComments.isEmpty()) { + clickHouseJdbc.execute(trimmed); } } - log.info("ClickHouse schema initialization complete ({} scripts)", scripts.length); + log.info("ClickHouse schema initialization complete"); } catch (Exception e) { log.error("ClickHouse schema initialization failed — server will continue but ClickHouse features may not work", e); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramStore.java index 2153cc8d..97c47dad 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramStore.java @@ -23,6 +23,7 @@ import java.util.HexFormat; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; /** * ClickHouse implementation of {@link DiagramStore}. @@ -30,9 +31,9 @@ import java.util.Optional; * Stores route graphs as JSON with SHA-256 content-hash deduplication. * Uses ReplacingMergeTree — duplicate inserts are deduplicated on merge. *

- * {@code findProcessorRouteMapping} fetches all definitions for the application - * and deserializes them in Java because ClickHouse has no equivalent of - * PostgreSQL's {@code jsonb_array_elements()}. + * In-memory caches avoid expensive ClickHouse scans for the hot lookup paths + * (route+instance → hash, hash → graph). Diagrams change rarely (only on + * route topology changes), so the caches are effectively append-only. */ public class ClickHouseDiagramStore implements DiagramStore { @@ -66,10 +67,35 @@ public class ClickHouseDiagramStore implements DiagramStore { private final JdbcTemplate jdbc; private final ObjectMapper objectMapper; + // (routeId + "\0" + instanceId) → contentHash + private final ConcurrentHashMap hashCache = new ConcurrentHashMap<>(); + // contentHash → deserialized RouteGraph + private final ConcurrentHashMap graphCache = new ConcurrentHashMap<>(); + public ClickHouseDiagramStore(JdbcTemplate jdbc) { this.jdbc = jdbc; this.objectMapper = new ObjectMapper(); this.objectMapper.registerModule(new JavaTimeModule()); + warmLoadHashCache(); + } + + private void warmLoadHashCache() { + try { + jdbc.query( + "SELECT route_id, instance_id, content_hash FROM route_diagrams WHERE tenant_id = ?", + rs -> { + String key = rs.getString("route_id") + "\0" + rs.getString("instance_id"); + hashCache.put(key, rs.getString("content_hash")); + }, + TENANT); + log.info("Diagram hash cache warmed: {} entries", hashCache.size()); + } catch (Exception e) { + log.warn("Failed to warm diagram hash cache — lookups will fall back to ClickHouse: {}", e.getMessage()); + } + } + + private static String cacheKey(String routeId, String instanceId) { + return routeId + "\0" + instanceId; } @Override @@ -90,6 +116,11 @@ public class ClickHouseDiagramStore implements DiagramStore { applicationId, json, Timestamp.from(Instant.now())); + + // Update caches + hashCache.put(cacheKey(routeId, agentId), contentHash); + graphCache.put(contentHash, graph); + log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize RouteGraph to JSON", e); @@ -98,13 +129,20 @@ public class ClickHouseDiagramStore implements DiagramStore { @Override public Optional findByContentHash(String contentHash) { + RouteGraph cached = graphCache.get(contentHash); + if (cached != null) { + return Optional.of(cached); + } + List> rows = jdbc.queryForList(SELECT_BY_HASH, TENANT, contentHash); if (rows.isEmpty()) { return Optional.empty(); } String json = (String) rows.get(0).get("definition"); try { - return Optional.of(objectMapper.readValue(json, RouteGraph.class)); + RouteGraph graph = objectMapper.readValue(json, RouteGraph.class); + graphCache.put(contentHash, graph); + return Optional.of(graph); } catch (JsonProcessingException e) { log.error("Failed to deserialize RouteGraph from ClickHouse", e); return Optional.empty(); @@ -113,12 +151,19 @@ public class ClickHouseDiagramStore implements DiagramStore { @Override public Optional findContentHashForRoute(String routeId, String agentId) { + String cached = hashCache.get(cacheKey(routeId, agentId)); + if (cached != null) { + return Optional.of(cached); + } + List> rows = jdbc.queryForList( SELECT_HASH_FOR_ROUTE, TENANT, routeId, agentId); if (rows.isEmpty()) { return Optional.empty(); } - return Optional.of((String) rows.get(0).get("content_hash")); + String hash = (String) rows.get(0).get("content_hash"); + hashCache.put(cacheKey(routeId, agentId), hash); + return Optional.of(hash); } @Override @@ -126,6 +171,16 @@ public class ClickHouseDiagramStore implements DiagramStore { if (agentIds == null || agentIds.isEmpty()) { return Optional.empty(); } + + // Try cache first — return first hit + for (String agentId : agentIds) { + String cached = hashCache.get(cacheKey(routeId, agentId)); + if (cached != null) { + return Optional.of(cached); + } + } + + // Fall back to ClickHouse String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?")); String sql = "SELECT content_hash FROM route_diagrams " + "WHERE tenant_id = ? AND route_id = ? AND instance_id IN (" + placeholders + ") " + @@ -162,9 +217,6 @@ public class ClickHouseDiagramStore implements DiagramStore { return mapping; } - /** - * Recursively walks the RouteNode tree and maps each node ID to the given routeId. - */ private void collectNodeIds(RouteNode node, String routeId, Map mapping) { if (node == null) { return; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V10__usage_events.sql b/cameleer3-server-app/src/main/resources/clickhouse/V10__usage_events.sql deleted file mode 100644 index 561f84fb..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/V10__usage_events.sql +++ /dev/null @@ -1,13 +0,0 @@ -CREATE TABLE IF NOT EXISTS usage_events ( - timestamp DateTime64(3) DEFAULT now64(3), - username LowCardinality(String), - method LowCardinality(String), - path String, - normalized LowCardinality(String), - status_code UInt16, - duration_ms UInt32, - query_params String DEFAULT '' -) -ENGINE = MergeTree() -ORDER BY (username, timestamp) -TTL toDateTime(timestamp) + INTERVAL 90 DAY; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V11__route_diagrams_projection.sql b/cameleer3-server-app/src/main/resources/clickhouse/V11__route_diagrams_projection.sql deleted file mode 100644 index 18d57a65..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/V11__route_diagrams_projection.sql +++ /dev/null @@ -1,11 +0,0 @@ --- Projection for fast route_id + instance_id lookups on route_diagrams. --- The primary key is (tenant_id, content_hash) which serves hash-based lookups. --- Queries filtering by route_id + instance_id were scanning millions of rows. --- ReplacingMergeTree requires deduplicate_merge_projection_mode to be set (ClickHouse 24.12+). -ALTER TABLE route_diagrams MODIFY SETTING deduplicate_merge_projection_mode = 'drop'; - -ALTER TABLE route_diagrams - ADD PROJECTION IF NOT EXISTS prj_route_instance - (SELECT content_hash, created_at ORDER BY tenant_id, route_id, instance_id, created_at); - -ALTER TABLE route_diagrams MATERIALIZE PROJECTION IF NOT EXISTS prj_route_instance diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql b/cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql deleted file mode 100644 index a809a3ba..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql +++ /dev/null @@ -1,14 +0,0 @@ -CREATE TABLE IF NOT EXISTS agent_metrics ( - tenant_id LowCardinality(String) DEFAULT 'default', - collected_at DateTime64(3), - instance_id LowCardinality(String), - metric_name LowCardinality(String), - metric_value Float64, - tags Map(String, String) DEFAULT map(), - server_received_at DateTime64(3) DEFAULT now64(3) -) -ENGINE = MergeTree() -PARTITION BY (tenant_id, toYYYYMM(collected_at)) -ORDER BY (tenant_id, instance_id, metric_name, collected_at) -TTL toDateTime(collected_at) + INTERVAL 365 DAY DELETE -SETTINGS index_granularity = 8192; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql b/cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql deleted file mode 100644 index d9250c8a..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql +++ /dev/null @@ -1,49 +0,0 @@ -CREATE TABLE IF NOT EXISTS executions ( - tenant_id LowCardinality(String) DEFAULT 'default', - execution_id String, - start_time DateTime64(3), - _version UInt64 DEFAULT 1, - route_id LowCardinality(String), - instance_id LowCardinality(String), - application_id LowCardinality(String), - status LowCardinality(String), - correlation_id String DEFAULT '', - exchange_id String DEFAULT '', - end_time Nullable(DateTime64(3)), - duration_ms Nullable(Int64), - error_message String DEFAULT '', - error_stacktrace String DEFAULT '', - error_type LowCardinality(String) DEFAULT '', - error_category LowCardinality(String) DEFAULT '', - root_cause_type String DEFAULT '', - root_cause_message String DEFAULT '', - diagram_content_hash String DEFAULT '', - engine_level LowCardinality(String) DEFAULT '', - input_body String DEFAULT '', - output_body String DEFAULT '', - input_headers String DEFAULT '', - output_headers String DEFAULT '', - attributes String DEFAULT '', - trace_id String DEFAULT '', - span_id String DEFAULT '', - has_trace_data Bool DEFAULT false, - is_replay Bool DEFAULT false, - - _search_text String MATERIALIZED - concat(execution_id, ' ', correlation_id, ' ', exchange_id, ' ', route_id, - ' ', error_message, ' ', error_stacktrace, ' ', attributes, - ' ', input_body, ' ', output_body, ' ', input_headers, - ' ', output_headers, ' ', root_cause_message), - - INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, - INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, - INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, - INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, - INDEX idx_status status TYPE set(10) GRANULARITY 1, - INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4 -) -ENGINE = ReplacingMergeTree(_version) -PARTITION BY (tenant_id, toYYYYMM(start_time)) -ORDER BY (tenant_id, start_time, application_id, route_id, execution_id) -TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE -SETTINGS index_granularity = 8192; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql b/cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql deleted file mode 100644 index 3da0d586..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql +++ /dev/null @@ -1,45 +0,0 @@ -CREATE TABLE IF NOT EXISTS processor_executions ( - tenant_id LowCardinality(String) DEFAULT 'default', - execution_id String, - seq UInt32, - parent_seq Nullable(UInt32), - parent_processor_id String DEFAULT '', - processor_id String, - processor_type LowCardinality(String), - start_time DateTime64(3), - route_id LowCardinality(String), - application_id LowCardinality(String), - iteration Nullable(Int32), - iteration_size Nullable(Int32), - status LowCardinality(String), - end_time Nullable(DateTime64(3)), - duration_ms Nullable(Int64), - error_message String DEFAULT '', - error_stacktrace String DEFAULT '', - error_type LowCardinality(String) DEFAULT '', - error_category LowCardinality(String) DEFAULT '', - root_cause_type String DEFAULT '', - root_cause_message String DEFAULT '', - input_body String DEFAULT '', - output_body String DEFAULT '', - input_headers String DEFAULT '', - output_headers String DEFAULT '', - attributes String DEFAULT '', - resolved_endpoint_uri String DEFAULT '', - circuit_breaker_state LowCardinality(String) DEFAULT '', - fallback_triggered Bool DEFAULT false, - filter_matched Bool DEFAULT false, - duplicate_message Bool DEFAULT false, - - _search_text String MATERIALIZED - concat(error_message, ' ', error_stacktrace, ' ', attributes, - ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers), - - INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, - INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4 -) -ENGINE = MergeTree() -PARTITION BY (tenant_id, toYYYYMM(start_time)) -ORDER BY (tenant_id, start_time, application_id, route_id, execution_id, seq) -TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE -SETTINGS index_granularity = 8192; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql deleted file mode 100644 index 1aa1a6f0..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql +++ /dev/null @@ -1,167 +0,0 @@ --- V4__stats_tables_and_mvs.sql --- Pre-aggregated statistics tables and materialized views. --- Tables use AggregatingMergeTree, MVs use -State combinators. - --- stats_1m_all (global) - -CREATE TABLE IF NOT EXISTS stats_1m_all ( - tenant_id LowCardinality(String), - bucket DateTime, - total_count AggregateFunction(count), - failed_count AggregateFunction(countIf, UInt8), - running_count AggregateFunction(countIf, UInt8), - duration_sum AggregateFunction(sum, Nullable(Int64)), - duration_max AggregateFunction(max, Nullable(Int64)), - p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) -) -ENGINE = AggregatingMergeTree() -PARTITION BY (tenant_id, toYYYYMM(bucket)) -ORDER BY (tenant_id, bucket) -TTL bucket + INTERVAL 365 DAY DELETE; - -CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS -SELECT - tenant_id, - toStartOfMinute(start_time) AS bucket, - countState() AS total_count, - countIfState(status = 'FAILED') AS failed_count, - countIfState(status = 'RUNNING') AS running_count, - sumState(duration_ms) AS duration_sum, - maxState(duration_ms) AS duration_max, - quantileState(0.99)(duration_ms) AS p99_duration -FROM executions -GROUP BY tenant_id, bucket; - --- stats_1m_app (per-application) - -CREATE TABLE IF NOT EXISTS stats_1m_app ( - tenant_id LowCardinality(String), - application_id LowCardinality(String), - bucket DateTime, - total_count AggregateFunction(count), - failed_count AggregateFunction(countIf, UInt8), - running_count AggregateFunction(countIf, UInt8), - duration_sum AggregateFunction(sum, Nullable(Int64)), - duration_max AggregateFunction(max, Nullable(Int64)), - p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) -) -ENGINE = AggregatingMergeTree() -PARTITION BY (tenant_id, toYYYYMM(bucket)) -ORDER BY (tenant_id, application_id, bucket) -TTL bucket + INTERVAL 365 DAY DELETE; - -CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS -SELECT - tenant_id, - application_id, - toStartOfMinute(start_time) AS bucket, - countState() AS total_count, - countIfState(status = 'FAILED') AS failed_count, - countIfState(status = 'RUNNING') AS running_count, - sumState(duration_ms) AS duration_sum, - maxState(duration_ms) AS duration_max, - quantileState(0.99)(duration_ms) AS p99_duration -FROM executions -GROUP BY tenant_id, application_id, bucket; - --- stats_1m_route (per-route) - -CREATE TABLE IF NOT EXISTS stats_1m_route ( - tenant_id LowCardinality(String), - application_id LowCardinality(String), - route_id LowCardinality(String), - bucket DateTime, - total_count AggregateFunction(count), - failed_count AggregateFunction(countIf, UInt8), - running_count AggregateFunction(countIf, UInt8), - duration_sum AggregateFunction(sum, Nullable(Int64)), - duration_max AggregateFunction(max, Nullable(Int64)), - p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) -) -ENGINE = AggregatingMergeTree() -PARTITION BY (tenant_id, toYYYYMM(bucket)) -ORDER BY (tenant_id, application_id, route_id, bucket) -TTL bucket + INTERVAL 365 DAY DELETE; - -CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS -SELECT - tenant_id, - application_id, - route_id, - toStartOfMinute(start_time) AS bucket, - countState() AS total_count, - countIfState(status = 'FAILED') AS failed_count, - countIfState(status = 'RUNNING') AS running_count, - sumState(duration_ms) AS duration_sum, - maxState(duration_ms) AS duration_max, - quantileState(0.99)(duration_ms) AS p99_duration -FROM executions -GROUP BY tenant_id, application_id, route_id, bucket; - --- stats_1m_processor (per-processor-type) - -CREATE TABLE IF NOT EXISTS stats_1m_processor ( - tenant_id LowCardinality(String), - application_id LowCardinality(String), - processor_type LowCardinality(String), - bucket DateTime, - total_count AggregateFunction(count), - failed_count AggregateFunction(countIf, UInt8), - duration_sum AggregateFunction(sum, Nullable(Int64)), - duration_max AggregateFunction(max, Nullable(Int64)), - p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) -) -ENGINE = AggregatingMergeTree() -PARTITION BY (tenant_id, toYYYYMM(bucket)) -ORDER BY (tenant_id, application_id, processor_type, bucket) -TTL bucket + INTERVAL 365 DAY DELETE; - -CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS -SELECT - tenant_id, - application_id, - processor_type, - toStartOfMinute(start_time) AS bucket, - countState() AS total_count, - countIfState(status = 'FAILED') AS failed_count, - sumState(duration_ms) AS duration_sum, - maxState(duration_ms) AS duration_max, - quantileState(0.99)(duration_ms) AS p99_duration -FROM processor_executions -GROUP BY tenant_id, application_id, processor_type, bucket; - --- stats_1m_processor_detail (per-processor-id) - -CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( - tenant_id LowCardinality(String), - application_id LowCardinality(String), - route_id LowCardinality(String), - processor_id String, - processor_type LowCardinality(String), - bucket DateTime, - total_count AggregateFunction(count), - failed_count AggregateFunction(countIf, UInt8), - duration_sum AggregateFunction(sum, Nullable(Int64)), - duration_max AggregateFunction(max, Nullable(Int64)), - p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) -) -ENGINE = AggregatingMergeTree() -PARTITION BY (tenant_id, toYYYYMM(bucket)) -ORDER BY (tenant_id, application_id, route_id, processor_id, processor_type, bucket) -TTL bucket + INTERVAL 365 DAY DELETE; - -CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS -SELECT - tenant_id, - application_id, - route_id, - processor_id, - processor_type, - toStartOfMinute(start_time) AS bucket, - countState() AS total_count, - countIfState(status = 'FAILED') AS failed_count, - sumState(duration_ms) AS duration_sum, - maxState(duration_ms) AS duration_max, - quantileState(0.99)(duration_ms) AS p99_duration -FROM processor_executions -GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V5__replay_fields.sql b/cameleer3-server-app/src/main/resources/clickhouse/V5__replay_fields.sql deleted file mode 100644 index 0c1d8b8c..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/V5__replay_fields.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE executions ADD COLUMN IF NOT EXISTS original_exchange_id String DEFAULT ''; -ALTER TABLE executions ADD COLUMN IF NOT EXISTS replay_exchange_id String DEFAULT ''; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V6__route_diagrams.sql b/cameleer3-server-app/src/main/resources/clickhouse/V6__route_diagrams.sql deleted file mode 100644 index b57df0f2..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/V6__route_diagrams.sql +++ /dev/null @@ -1,12 +0,0 @@ -CREATE TABLE IF NOT EXISTS route_diagrams ( - tenant_id LowCardinality(String) DEFAULT 'default', - content_hash String, - route_id LowCardinality(String), - instance_id LowCardinality(String), - application_id LowCardinality(String), - definition String, - created_at DateTime64(3) DEFAULT now64(3) -) -ENGINE = ReplacingMergeTree(created_at) -ORDER BY (tenant_id, content_hash) -SETTINGS index_granularity = 8192; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V7__agent_events.sql b/cameleer3-server-app/src/main/resources/clickhouse/V7__agent_events.sql deleted file mode 100644 index 97278131..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/V7__agent_events.sql +++ /dev/null @@ -1,12 +0,0 @@ -CREATE TABLE IF NOT EXISTS agent_events ( - tenant_id LowCardinality(String) DEFAULT 'default', - timestamp DateTime64(3) DEFAULT now64(3), - instance_id LowCardinality(String), - application_id LowCardinality(String), - event_type LowCardinality(String), - detail String DEFAULT '' -) -ENGINE = MergeTree() -PARTITION BY (tenant_id, toYYYYMM(timestamp)) -ORDER BY (tenant_id, application_id, instance_id, timestamp) -TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V8__logs.sql b/cameleer3-server-app/src/main/resources/clickhouse/V8__logs.sql deleted file mode 100644 index 02c1bc32..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/V8__logs.sql +++ /dev/null @@ -1,22 +0,0 @@ -CREATE TABLE IF NOT EXISTS logs ( - tenant_id LowCardinality(String) DEFAULT 'default', - timestamp DateTime64(3), - application LowCardinality(String), - instance_id LowCardinality(String), - level LowCardinality(String), - logger_name LowCardinality(String) DEFAULT '', - message String, - thread_name LowCardinality(String) DEFAULT '', - stack_trace String DEFAULT '', - exchange_id String DEFAULT '', - mdc Map(String, String) DEFAULT map(), - - INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, - INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, - INDEX idx_level level TYPE set(10) GRANULARITY 1 -) -ENGINE = MergeTree() -PARTITION BY (tenant_id, toYYYYMM(timestamp)) -ORDER BY (tenant_id, application, timestamp) -TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE -SETTINGS index_granularity = 8192; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/init.sql b/cameleer3-server-app/src/main/resources/clickhouse/init.sql new file mode 100644 index 00000000..5b92c4df --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/init.sql @@ -0,0 +1,358 @@ +-- ClickHouse schema initialization (single file, idempotent) +-- All tables use IF NOT EXISTS for safe re-execution on every startup. + +-- ── Agent Metrics ─────────────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + instance_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(collected_at)) +ORDER BY (tenant_id, instance_id, metric_name, collected_at) +TTL toDateTime(collected_at) + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; + +-- ── Executions ────────────────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS executions ( + tenant_id LowCardinality(String) DEFAULT 'default', + execution_id String, + start_time DateTime64(3), + _version UInt64 DEFAULT 1, + route_id LowCardinality(String), + instance_id LowCardinality(String), + application_id LowCardinality(String), + status LowCardinality(String), + correlation_id String DEFAULT '', + exchange_id String DEFAULT '', + end_time Nullable(DateTime64(3)), + duration_ms Nullable(Int64), + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + error_type LowCardinality(String) DEFAULT '', + error_category LowCardinality(String) DEFAULT '', + root_cause_type String DEFAULT '', + root_cause_message String DEFAULT '', + diagram_content_hash String DEFAULT '', + engine_level LowCardinality(String) DEFAULT '', + input_body String DEFAULT '', + output_body String DEFAULT '', + input_headers String DEFAULT '', + output_headers String DEFAULT '', + attributes String DEFAULT '', + trace_id String DEFAULT '', + span_id String DEFAULT '', + has_trace_data Bool DEFAULT false, + is_replay Bool DEFAULT false, + original_exchange_id String DEFAULT '', + replay_exchange_id String DEFAULT '', + + _search_text String MATERIALIZED + concat(execution_id, ' ', correlation_id, ' ', exchange_id, ' ', route_id, + ' ', error_message, ' ', error_stacktrace, ' ', attributes, + ' ', input_body, ' ', output_body, ' ', input_headers, + ' ', output_headers, ' ', root_cause_message), + + INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_status status TYPE set(10) GRANULARITY 1, + INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4 +) +ENGINE = ReplacingMergeTree(_version) +PARTITION BY (tenant_id, toYYYYMM(start_time)) +ORDER BY (tenant_id, start_time, application_id, route_id, execution_id) +TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; + +-- ── Processor Executions ──────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS processor_executions ( + tenant_id LowCardinality(String) DEFAULT 'default', + execution_id String, + seq UInt32, + parent_seq Nullable(UInt32), + parent_processor_id String DEFAULT '', + processor_id String, + processor_type LowCardinality(String), + start_time DateTime64(3), + route_id LowCardinality(String), + application_id LowCardinality(String), + iteration Nullable(Int32), + iteration_size Nullable(Int32), + status LowCardinality(String), + end_time Nullable(DateTime64(3)), + duration_ms Nullable(Int64), + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + error_type LowCardinality(String) DEFAULT '', + error_category LowCardinality(String) DEFAULT '', + root_cause_type String DEFAULT '', + root_cause_message String DEFAULT '', + input_body String DEFAULT '', + output_body String DEFAULT '', + input_headers String DEFAULT '', + output_headers String DEFAULT '', + attributes String DEFAULT '', + resolved_endpoint_uri String DEFAULT '', + circuit_breaker_state LowCardinality(String) DEFAULT '', + fallback_triggered Bool DEFAULT false, + filter_matched Bool DEFAULT false, + duplicate_message Bool DEFAULT false, + + _search_text String MATERIALIZED + concat(error_message, ' ', error_stacktrace, ' ', attributes, + ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers), + + INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4 +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(start_time)) +ORDER BY (tenant_id, start_time, application_id, route_id, execution_id, seq) +TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; + +-- ── Stats: Materialized Views + AggregatingMergeTree ──────────────────── + +-- stats_1m_all (global) + +CREATE TABLE IF NOT EXISTS stats_1m_all ( + tenant_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + running_count AggregateFunction(countIf, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS +SELECT + tenant_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, bucket; + +-- stats_1m_app (per-application) + +CREATE TABLE IF NOT EXISTS stats_1m_app ( + tenant_id LowCardinality(String), + application_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + running_count AggregateFunction(countIf, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS +SELECT + tenant_id, + application_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_id, bucket; + +-- stats_1m_route (per-route) + +CREATE TABLE IF NOT EXISTS stats_1m_route ( + tenant_id LowCardinality(String), + application_id LowCardinality(String), + route_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + running_count AggregateFunction(countIf, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_id, route_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS +SELECT + tenant_id, + application_id, + route_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_id, route_id, bucket; + +-- stats_1m_processor (per-processor-type) + +CREATE TABLE IF NOT EXISTS stats_1m_processor ( + tenant_id LowCardinality(String), + application_id LowCardinality(String), + processor_type LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_id, processor_type, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS +SELECT + tenant_id, + application_id, + processor_type, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_id, processor_type, bucket; + +-- stats_1m_processor_detail (per-processor-id) + +CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( + tenant_id LowCardinality(String), + application_id LowCardinality(String), + route_id LowCardinality(String), + processor_id String, + processor_type LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count), + failed_count AggregateFunction(countIf, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_id, route_id, processor_id, processor_type, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS +SELECT + tenant_id, + application_id, + route_id, + processor_id, + processor_type, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket; + +-- ── Route Diagrams ────────────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS route_diagrams ( + tenant_id LowCardinality(String) DEFAULT 'default', + content_hash String, + route_id LowCardinality(String), + instance_id LowCardinality(String), + application_id LowCardinality(String), + definition String, + created_at DateTime64(3) DEFAULT now64(3) +) +ENGINE = ReplacingMergeTree(created_at) +ORDER BY (tenant_id, content_hash) +SETTINGS index_granularity = 8192; + +-- ── Agent Events ──────────────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS agent_events ( + tenant_id LowCardinality(String) DEFAULT 'default', + timestamp DateTime64(3) DEFAULT now64(3), + instance_id LowCardinality(String), + application_id LowCardinality(String), + event_type LowCardinality(String), + detail String DEFAULT '' +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(timestamp)) +ORDER BY (tenant_id, application_id, instance_id, timestamp) +TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE; + +-- ── Logs ──────────────────────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS logs ( + tenant_id LowCardinality(String) DEFAULT 'default', + timestamp DateTime64(3), + application LowCardinality(String), + instance_id LowCardinality(String), + level LowCardinality(String), + logger_name LowCardinality(String) DEFAULT '', + message String, + thread_name LowCardinality(String) DEFAULT '', + stack_trace String DEFAULT '', + exchange_id String DEFAULT '', + mdc Map(String, String) DEFAULT map(), + + INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_level level TYPE set(10) GRANULARITY 1 +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(timestamp)) +ORDER BY (tenant_id, application, timestamp) +TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; + +-- ── Usage Events ──────────────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS usage_events ( + timestamp DateTime64(3) DEFAULT now64(3), + username LowCardinality(String), + method LowCardinality(String), + path String, + normalized LowCardinality(String), + status_code UInt16, + duration_ms UInt32, + query_params String DEFAULT '' +) +ENGINE = MergeTree() +ORDER BY (username, timestamp) +TTL toDateTime(timestamp) + INTERVAL 90 DAY;