# ClickHouse Migration Design Replace PostgreSQL/TimescaleDB + OpenSearch with ClickHouse OSS for all observability data. PostgreSQL retained only for RBAC, config, and audit log. ## Context Cameleer3-server currently uses three storage systems: - **PostgreSQL/TimescaleDB**: executions, processor_executions, agent_metrics (hypertables), agent_events, route_diagrams, plus RBAC/config/audit tables. Continuous aggregates for dashboard statistics. - **OpenSearch**: executions-YYYY-MM-DD indices (full-text search on bodies/headers/errors), logs-YYYY-MM-DD indices (application log storage with 7-day retention). - **Dual-write pattern**: PG is source of truth, OpenSearch is async-indexed via debounced `SearchIndexer`. This architecture has scaling limits: three systems to operate, data duplication between PG and OpenSearch, TimescaleDB continuous aggregates with limited flexibility, and no multitenancy support. **Goal**: Consolidate to ClickHouse OSS (self-hosted) for all observability data. Add multitenancy with custom per-tenant, per-document-type retention. Support billions of documents, terabytes of data, sub-second wildcard search. ## Decisions | Decision | Choice | Rationale | |----------|--------|-----------| | Deployment | Self-hosted ClickHouse OSS on k3s | All needed features available in OSS. Fits existing infra. | | Execution lifecycle | Approach B: Application-side accumulator | Merges RUNNING+COMPLETED in-memory, writes one row. Avoids upsert problem. | | Table engine (executions) | ReplacingMergeTree | Handles rare late corrections via version column. Normal flow writes once. | | Table engine (all others) | MergeTree | Append-only data, no dedup needed. | | Client | JDBC + JdbcTemplate | Familiar pattern, matches current PG code. Async inserts via JDBC URL settings. | | Multitenancy | Shared tables + tenant_id column | Row policies for defense-in-depth. Application-layer WHERE for primary enforcement. | | Retention | Application-driven scheduler | Per-tenant, per-document-type. Config in PG, execution via ALTER TABLE DELETE. | | Search | Ngram bloom filter indexes | Sub-second wildcard search. Materialized `_search_text` column for cross-field search. | | Highlighting | Application-side in Java | Extract 120-char fragment around match from returned fields. | | Storage tiering | Local SSD only (initially) | S3/MinIO tiering can be added later via TTL MOVE rules. | ## ClickHouse OSS Constraints These are features NOT available in the open-source version: | Constraint | Impact on Cameleer3 | |------------|---------------------| | No SharedMergeTree | No elastic compute scaling; must size nodes up-front. Acceptable for self-hosted. | | No BM25 relevance scoring | Search returns matches without ranking. Acceptable for observability (want all matches, not ranked). | | No search highlighting | Replaced by application-side highlighting in Java. | | No fuzzy/typo-tolerant search | Must match exact tokens or use ngram index for substring match. Acceptable. | | No ClickPipes | Must build own ingestion pipeline. Already exists (agents push via HTTP POST). | | No managed backups | Must configure `clickhouse-backup` (Altinity, open-source) or built-in BACKUP SQL. | | No auto-scaling | Manual capacity planning. Single node handles 14+ TiB, sufficient for initial scale. | General ClickHouse constraints (apply to both OSS and Cloud): | Constraint | Mitigation | |------------|------------| | ORDER BY is immutable | Careful upfront schema design. Documented below. | | No transactions | Single-table INSERT atomic per block. No cross-table atomicity needed. | | Mutations are expensive | Avoid ALTER UPDATE/DELETE. Use ReplacingMergeTree for corrections, append-only for everything else. | | Row policies skip mutations | Application-layer WHERE on mutations. Mutations are rare (retention scheduler only). | | No JPA/Hibernate | Use JdbcTemplate (already the pattern for PG). | | JSON max_dynamic_paths | Store attributes as flattened String, not JSON type. Use ngram index for search. | | Text indexes can't index JSON subcolumns | Extract searchable text into materialized String columns. | | MVs only process new inserts | Historical data backfill writes through MV pipeline. | | MV errors block source inserts | Careful MV design. Test thoroughly before production. | | ReplacingMergeTree eventual consistency | Use FINAL on queries that need latest version. | ## What Stays in PostgreSQL | Table | Reason | |-------|--------| | `users`, `roles`, `groups`, `user_groups`, `user_roles`, `group_roles` | RBAC with relational joins, foreign keys, transactions | | `server_config` | Global config, low volume, needs transactions | | `application_config` | Per-app observability settings | | `app_settings` | Per-app SLA thresholds | | `audit_log` | Security compliance, needs transactions, joins with RBAC tables | | OIDC config | Auth provider config | | `tenant_retention_config` (new) | Per-tenant retention settings, referenced by scheduler | ## What Moves to ClickHouse | Data | Current Location | ClickHouse Table | |------|-----------------|------------------| | Route executions | PG `executions` hypertable + OpenSearch `executions-*` | `executions` | | Processor executions | PG `processor_executions` hypertable | `processor_executions` | | Agent metrics | PG `agent_metrics` hypertable | `agent_metrics` | | Agent events | PG `agent_events` | `agent_events` | | Route diagrams | PG `route_diagrams` | `route_diagrams` | | Application logs | OpenSearch `logs-*` | `logs` | | Dashboard statistics | PG continuous aggregates (`stats_1m_*`) | ClickHouse materialized views (`stats_1m_*`) | ## Table Schemas ### executions ```sql CREATE TABLE executions ( tenant_id LowCardinality(String), execution_id String, start_time DateTime64(3), _version UInt64 DEFAULT 1, route_id LowCardinality(String), agent_id LowCardinality(String), application_name LowCardinality(String), status LowCardinality(String), correlation_id String DEFAULT '', exchange_id String DEFAULT '', end_time Nullable(DateTime64(3)), duration_ms Nullable(Int64), error_message String DEFAULT '', error_stacktrace String DEFAULT '', error_type LowCardinality(String) DEFAULT '', error_category LowCardinality(String) DEFAULT '', root_cause_type String DEFAULT '', root_cause_message String DEFAULT '', diagram_content_hash String DEFAULT '', engine_level LowCardinality(String) DEFAULT '', input_body String DEFAULT '', output_body String DEFAULT '', input_headers String DEFAULT '', output_headers String DEFAULT '', attributes String DEFAULT '', trace_id String DEFAULT '', span_id String DEFAULT '', processors_json String DEFAULT '', has_trace_data Bool DEFAULT false, is_replay Bool DEFAULT false, _search_text String MATERIALIZED concat(error_message, ' ', error_stacktrace, ' ', attributes, ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers, ' ', root_cause_message), INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_status status TYPE set(10) GRANULARITY 1, INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4 ) ENGINE = ReplacingMergeTree(_version) PARTITION BY (tenant_id, toYYYYMM(start_time)) ORDER BY (tenant_id, start_time, application_name, route_id, execution_id) TTL start_time + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; ``` Design rationale: - **ORDER BY** `(tenant_id, start_time, application_name, route_id, execution_id)`: Matches UI query pattern (tenant -> time range -> app -> route). Time before application because observability queries almost always include a time range. - **PARTITION BY** `(tenant_id, toYYYYMM(start_time))`: Enables per-tenant partition drops for retention. Monthly granularity balances partition count vs drop efficiency. - **ReplacingMergeTree(_version)**: Normal flow writes once (version 1). Late corrections write version 2+. Background merges keep latest version. - **`_search_text` materialized column**: Computed at insert time. Concatenates all searchable fields for cross-field wildcard search. - **`ngrambf_v1(3, 256, 2, 0)`**: 3-char ngrams in a 256-byte bloom filter with 2 hash functions. Prunes most granules for `LIKE '%term%'` queries. The bloom filter size (256 bytes) is a starting point — increase to 4096-8192 if false positive rates are too high for long text fields. Tune after benchmarking with real data. - **`LowCardinality(String)`**: Dictionary encoding for columns with few distinct values. Major compression improvement. - **TTL 365 days**: Safety net. Application-driven scheduler handles per-tenant retention at finer granularity. ### processor_executions ```sql CREATE TABLE processor_executions ( tenant_id LowCardinality(String), execution_id String, processor_id String, start_time DateTime64(3), route_id LowCardinality(String), application_name LowCardinality(String), processor_type LowCardinality(String), parent_processor_id String DEFAULT '', depth UInt16 DEFAULT 0, status LowCardinality(String), end_time Nullable(DateTime64(3)), duration_ms Nullable(Int64), error_message String DEFAULT '', error_stacktrace String DEFAULT '', error_type LowCardinality(String) DEFAULT '', error_category LowCardinality(String) DEFAULT '', root_cause_type String DEFAULT '', root_cause_message String DEFAULT '', input_body String DEFAULT '', output_body String DEFAULT '', input_headers String DEFAULT '', output_headers String DEFAULT '', attributes String DEFAULT '', loop_index Nullable(Int32), loop_size Nullable(Int32), split_index Nullable(Int32), split_size Nullable(Int32), multicast_index Nullable(Int32), resolved_endpoint_uri String DEFAULT '', error_handler_type LowCardinality(String) DEFAULT '', circuit_breaker_state LowCardinality(String) DEFAULT '', fallback_triggered Bool DEFAULT false, _search_text String MATERIALIZED concat(error_message, ' ', error_stacktrace, ' ', attributes, ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers), INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4 ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(start_time)) ORDER BY (tenant_id, start_time, application_name, route_id, execution_id, processor_id) TTL start_time + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; ``` ### logs ```sql CREATE TABLE logs ( tenant_id LowCardinality(String), timestamp DateTime64(3), application LowCardinality(String), agent_id LowCardinality(String), level LowCardinality(String), logger_name LowCardinality(String) DEFAULT '', message String, thread_name LowCardinality(String) DEFAULT '', stack_trace String DEFAULT '', exchange_id String DEFAULT '', mdc Map(String, String) DEFAULT map(), INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_level level TYPE set(10) GRANULARITY 1 ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(timestamp)) ORDER BY (tenant_id, application, timestamp) TTL timestamp + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; ``` ### agent_metrics ```sql CREATE TABLE agent_metrics ( tenant_id LowCardinality(String), collected_at DateTime64(3), agent_id LowCardinality(String), metric_name LowCardinality(String), metric_value Float64, tags Map(String, String) DEFAULT map(), server_received_at DateTime64(3) DEFAULT now64(3) ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(collected_at)) ORDER BY (tenant_id, agent_id, metric_name, collected_at) TTL collected_at + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; ``` ### agent_events ```sql CREATE TABLE agent_events ( tenant_id LowCardinality(String), timestamp DateTime64(3) DEFAULT now64(3), agent_id LowCardinality(String), app_id LowCardinality(String), event_type LowCardinality(String), detail String DEFAULT '' ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(timestamp)) ORDER BY (tenant_id, app_id, agent_id, timestamp) TTL timestamp + INTERVAL 365 DAY DELETE; ``` ### route_diagrams ```sql CREATE TABLE route_diagrams ( tenant_id LowCardinality(String), content_hash String, route_id LowCardinality(String), agent_id LowCardinality(String), application_name LowCardinality(String), definition String, created_at DateTime64(3) DEFAULT now64(3) ) ENGINE = ReplacingMergeTree(created_at) ORDER BY (tenant_id, content_hash) SETTINGS index_granularity = 8192; ``` ## Materialized Views (Stats) Replace TimescaleDB continuous aggregates. ClickHouse MVs trigger on INSERT and store aggregate states in target tables. ### stats_1m_all (global) ```sql CREATE TABLE stats_1m_all ( tenant_id LowCardinality(String), bucket DateTime, total_count AggregateFunction(count, UInt64), failed_count AggregateFunction(countIf, UInt64, UInt8), running_count AggregateFunction(countIf, UInt64, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW stats_1m_all_mv TO stats_1m_all AS SELECT tenant_id, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, bucket; ``` ### stats_1m_app (per-application) ```sql CREATE TABLE stats_1m_app ( tenant_id LowCardinality(String), application_name LowCardinality(String), bucket DateTime, total_count AggregateFunction(count, UInt64), failed_count AggregateFunction(countIf, UInt64, UInt8), running_count AggregateFunction(countIf, UInt64, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, application_name, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW stats_1m_app_mv TO stats_1m_app AS SELECT tenant_id, application_name, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, application_name, bucket; ``` ### stats_1m_route (per-route) ```sql CREATE TABLE stats_1m_route ( tenant_id LowCardinality(String), application_name LowCardinality(String), route_id LowCardinality(String), bucket DateTime, total_count AggregateFunction(count, UInt64), failed_count AggregateFunction(countIf, UInt64, UInt8), running_count AggregateFunction(countIf, UInt64, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, application_name, route_id, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW stats_1m_route_mv TO stats_1m_route AS SELECT tenant_id, application_name, route_id, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'RUNNING') AS running_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions GROUP BY tenant_id, application_name, route_id, bucket; ``` ### stats_1m_processor (per-processor-type) ```sql CREATE TABLE stats_1m_processor ( tenant_id LowCardinality(String), application_name LowCardinality(String), processor_type LowCardinality(String), bucket DateTime, total_count AggregateFunction(count, UInt64), failed_count AggregateFunction(countIf, UInt64, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, application_name, processor_type, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW stats_1m_processor_mv TO stats_1m_processor AS SELECT tenant_id, application_name, processor_type, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions GROUP BY tenant_id, application_name, processor_type, bucket; ``` ### stats_1m_processor_detail (per-processor-id) ```sql CREATE TABLE stats_1m_processor_detail ( tenant_id LowCardinality(String), application_name LowCardinality(String), route_id LowCardinality(String), processor_id String, bucket DateTime, total_count AggregateFunction(count, UInt64), failed_count AggregateFunction(countIf, UInt64, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), duration_max AggregateFunction(max, Nullable(Int64)), p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) ORDER BY (tenant_id, application_name, route_id, processor_id, bucket) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW stats_1m_processor_detail_mv TO stats_1m_processor_detail AS SELECT tenant_id, application_name, route_id, processor_id, toStartOfMinute(start_time) AS bucket, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions GROUP BY tenant_id, application_name, route_id, processor_id, bucket; ``` ## Ingestion Pipeline ### Current Flow (replaced) ``` Agent POST -> IngestionService -> PostgresExecutionStore.upsert() -> PG -> SearchIndexer (debounced 2s) -> reads from PG -> OpenSearch ``` ### New Flow ``` Agent POST -> IngestionService -> ExecutionAccumulator |-- RUNNING: ConcurrentHashMap (no DB write) |-- COMPLETED/FAILED: merge with pending -> WriteBuffer '-- Timeout sweep (60s): flush stale -> WriteBuffer | ClickHouseExecutionStore.insertBatch() ClickHouseProcessorStore.insertBatch() ``` ### ExecutionAccumulator New component replacing `SearchIndexer`. Core responsibilities: 1. **On RUNNING POST**: Store `PendingExecution` in `ConcurrentHashMap` keyed by `execution_id`. Return 200 OK immediately. No database write. 2. **On COMPLETED/FAILED POST**: Look up pending RUNNING by `execution_id`. If found, merge fields using the same COALESCE logic currently in `PostgresExecutionStore.upsert()`. Produce a complete `MergedExecution` and push to `WriteBuffer`. If not found (race condition or RUNNING already flushed by timeout), write COMPLETED directly with `_version=2`. 3. **Timeout sweep** (scheduled every 60s): Scan for RUNNING entries older than 5 minutes. Flush them to ClickHouse as-is with status=RUNNING, making them visible in the UI. When COMPLETED eventually arrives, it writes with `_version=2` (ReplacingMergeTree deduplicates). 4. **Late corrections**: If a correction arrives for an already-written execution, insert with `_version` incremented. ReplacingMergeTree handles deduplication. ### WriteBuffer Reuse the existing `WriteBuffer` pattern (bounded queue, configurable batch size, scheduled drain): - Buffer capacity: 50,000 items - Batch size: 5,000 per flush - Flush interval: 1 second - Separate buffers for executions and processor_executions (independent batch inserts) - Drain calls `ClickHouseExecutionStore.insertBatch()` using JDBC batch update ### Logs Ingestion Direct batch INSERT, bypasses accumulator (logs are single-phase): ``` Agent POST /api/v1/data/logs -> LogIngestionController -> ClickHouseLogStore.insertBatch() ``` ### Metrics Ingestion Existing `MetricsWriteBuffer` targets ClickHouse instead of PG: ``` Agent POST /api/v1/data/metrics -> MetricsController -> WriteBuffer -> ClickHouseMetricsStore.insertBatch() ``` ### JDBC Batch Insert Pattern ```java jdbcTemplate.batchUpdate( "INSERT INTO executions (tenant_id, execution_id, start_time, ...) VALUES (?, ?, ?, ...)", batchArgs ); ``` JDBC URL includes `async_insert=1&wait_for_async_insert=0` for server-side buffering, preventing "too many parts" errors under high load. ## Search Implementation ### Query Translation Current OpenSearch bool queries map to ClickHouse SQL: ```sql -- Full-text wildcard search with time range, status filter, and pagination SELECT * FROM executions FINAL WHERE tenant_id = {tenant_id:String} AND start_time >= {time_from:DateTime64(3)} AND start_time < {time_to:DateTime64(3)} AND status IN ({statuses:Array(String)}) AND ( _search_text LIKE '%{search_term}%' OR execution_id IN ( SELECT DISTINCT execution_id FROM processor_executions WHERE tenant_id = {tenant_id:String} AND start_time >= {time_from:DateTime64(3)} AND start_time < {time_to:DateTime64(3)} AND _search_text LIKE '%{search_term}%' ) ) ORDER BY start_time DESC LIMIT {limit:UInt32} OFFSET {offset:UInt32} ``` ### Scoped Searches | Scope | ClickHouse WHERE clause | |-------|------------------------| | textInBody | `input_body LIKE '%term%' OR output_body LIKE '%term%'` | | textInHeaders | `input_headers LIKE '%term%' OR output_headers LIKE '%term%'` | | textInErrors | `error_message LIKE '%term%' OR error_stacktrace LIKE '%term%'` | | global text | `_search_text LIKE '%term%'` (covers all fields) | All accelerated by `ngrambf_v1` indexes which prune 95%+ of data granules before scanning. ### Application-Side Highlighting ```java public String extractHighlight(String text, String searchTerm, int contextChars) { int idx = text.toLowerCase().indexOf(searchTerm.toLowerCase()); if (idx < 0) return null; int start = Math.max(0, idx - contextChars / 2); int end = Math.min(text.length(), idx + searchTerm.length() + contextChars / 2); return (start > 0 ? "..." : "") + text.substring(start, end) + (end < text.length() ? "..." : ""); } ``` Returns the same `highlight` map structure the UI currently expects. ### Nested Processor Search OpenSearch nested queries become a subquery on the `processor_executions` table: ```sql execution_id IN ( SELECT DISTINCT execution_id FROM processor_executions WHERE tenant_id = ? AND start_time >= ? AND start_time < ? AND _search_text LIKE '%term%' ) ``` This is evaluated once with ngram index acceleration, then joined via IN. ## Stats Query Translation ### TimescaleDB -> ClickHouse Query Patterns | TimescaleDB | ClickHouse | |-------------|------------| | `time_bucket('1 minute', bucket)` | `toStartOfInterval(bucket, INTERVAL 1 MINUTE)` | | `SUM(total_count)` | `countMerge(total_count)` | | `SUM(failed_count)` | `countIfMerge(failed_count)` | | `approx_percentile(0.99, rollup(p99_duration))` | `quantileMerge(0.99)(p99_duration)` | | `SUM(duration_sum) / SUM(total_count)` | `sumMerge(duration_sum) / countMerge(total_count)` | | `MAX(duration_max)` | `maxMerge(duration_max)` | ### Example: Timeseries Query ```sql SELECT toStartOfInterval(bucket, INTERVAL {interval:UInt32} SECOND) AS period, countMerge(total_count) AS total_count, countIfMerge(failed_count) AS failed_count, sumMerge(duration_sum) / countMerge(total_count) AS avg_duration, quantileMerge(0.99)(p99_duration) AS p99_duration FROM stats_1m_app WHERE tenant_id = {tenant_id:String} AND application_name = {app:String} AND bucket >= {from:DateTime} AND bucket < {to:DateTime} GROUP BY period ORDER BY period ``` ### SLA and Top Errors SLA queries hit the raw `executions` table (need per-row duration filtering): ```sql SELECT countIf(duration_ms <= {threshold:Int64} AND status != 'RUNNING') * 100.0 / count() AS sla_pct FROM executions FINAL WHERE tenant_id = ? AND application_name = ? AND start_time >= ? AND start_time < ? ``` Top errors query: ```sql SELECT error_message, count() AS error_count, max(start_time) AS last_seen FROM executions FINAL WHERE tenant_id = ? AND status = 'FAILED' AND start_time >= now() - INTERVAL 1 HOUR GROUP BY error_message ORDER BY error_count DESC LIMIT 10 ``` ## Multitenancy ### Data Isolation **Primary**: Application-layer WHERE clause injection. Every ClickHouse query gets `WHERE tenant_id = ?` from the authenticated user's JWT claims. **Defense-in-depth**: ClickHouse row policies: ```sql -- Create a ClickHouse user per tenant CREATE USER tenant_acme IDENTIFIED BY '...'; -- Row policy ensures tenant can only see their data CREATE ROW POLICY tenant_acme_executions ON executions FOR SELECT USING tenant_id = 'acme'; -- Repeat for all tables ``` ### Tenant ID in Schema `tenant_id` is the first column in every table's ORDER BY and PARTITION BY. This ensures: - Data for the same tenant is physically co-located on disk - Queries filtering by tenant_id use the sparse index efficiently - Partition drops for retention are scoped to individual tenants ### Resource Quotas ```sql CREATE SETTINGS PROFILE tenant_limits SETTINGS max_execution_time = 30, max_rows_to_read = 100000000, max_memory_usage = '4G'; ALTER USER tenant_acme SETTINGS PROFILE tenant_limits; ``` Prevents noisy neighbor problems where one tenant's expensive query affects others. ## Retention ### Strategy: Application-Driven Scheduler Per-tenant, per-document-type retention is too dynamic for static ClickHouse TTL rules. Instead: 1. **Config table** in PostgreSQL: ```sql CREATE TABLE tenant_retention_config ( tenant_id VARCHAR(255) NOT NULL, document_type VARCHAR(50) NOT NULL, -- executions, logs, metrics, etc. retention_days INT NOT NULL, PRIMARY KEY (tenant_id, document_type) ); ``` 2. **RetentionScheduler** (Spring `@Scheduled`, runs daily at 03:00 UTC): ```java @Scheduled(cron = "0 0 3 * * *") public void enforceRetention() { List configs = retentionConfigRepo.findAll(); for (TenantRetention config : configs) { String table = config.documentType(); // executions, logs, metrics, etc. clickHouseJdbc.execute( "ALTER TABLE " + table + " DELETE WHERE tenant_id = ? AND start_time < now() - INTERVAL ? DAY", config.tenantId(), config.retentionDays() ); } } ``` 3. **Safety-net TTL**: Each table has a generous default TTL (365 days) as a backstop in case the scheduler fails. The scheduler handles the per-tenant granularity. 4. **Partition-aligned drops**: Since `PARTITION BY (tenant_id, toYYYYMM(start_time))`, when all rows in a partition match the DELETE condition, ClickHouse drops the entire partition (fast, no rewrite). Enable `ttl_only_drop_parts=1` on tables. ## Java/Spring Integration ### Dependencies ```xml com.clickhouse clickhouse-jdbc 0.7.x all ``` ### Configuration ```yaml clickhouse: url: jdbc:clickhouse://cameleer-clickhouse:8123/cameleer?async_insert=1&wait_for_async_insert=0 username: cameleer_app password: ${CLICKHOUSE_PASSWORD} ``` ### DataSource Bean ```java @Configuration public class ClickHouseConfig { @Bean public DataSource clickHouseDataSource(ClickHouseProperties props) { HikariDataSource ds = new HikariDataSource(); ds.setJdbcUrl(props.getUrl()); ds.setUsername(props.getUsername()); ds.setPassword(props.getPassword()); ds.setMaximumPoolSize(10); return ds; } @Bean public JdbcTemplate clickHouseJdbcTemplate( @Qualifier("clickHouseDataSource") DataSource ds) { return new JdbcTemplate(ds); } } ``` ### Interface Implementations Existing interfaces remain unchanged. New implementations: | Interface | Current Impl | New Impl | |-----------|-------------|----------| | `ExecutionStore` | `PostgresExecutionStore` | `ClickHouseExecutionStore` | | `SearchIndex` | `OpenSearchIndex` | `ClickHouseSearchIndex` | | `StatsStore` | `PostgresStatsStore` | `ClickHouseStatsStore` | | `DiagramStore` | `PostgresDiagramStore` | `ClickHouseDiagramStore` | | `MetricsStore` | `PostgresMetricsStore` | `ClickHouseMetricsStore` | | (log search) | `OpenSearchLogIndex` | `ClickHouseLogStore` | | (new) | `SearchIndexer` | `ExecutionAccumulator` | ## Kubernetes Deployment ### ClickHouse StatefulSet ```yaml apiVersion: apps/v1 kind: StatefulSet metadata: name: clickhouse spec: serviceName: clickhouse replicas: 1 # single node initially template: spec: containers: - name: clickhouse image: clickhouse/clickhouse-server:26.2 ports: - containerPort: 8123 # HTTP - containerPort: 9000 # Native volumeMounts: - name: data mountPath: /var/lib/clickhouse - name: config mountPath: /etc/clickhouse-server/config.d resources: requests: memory: "4Gi" cpu: "2" limits: memory: "8Gi" cpu: "4" volumeClaimTemplates: - metadata: name: data spec: accessModes: ["ReadWriteOnce"] resources: requests: storage: 100Gi # NVMe/SSD ``` ### Health Check ```yaml livenessProbe: httpGet: path: /ping port: 8123 readinessProbe: httpGet: path: /ping port: 8123 ``` ## Migration Path ### Phase 1: Foundation - Add `clickhouse-jdbc` dependency - Create `ClickHouseConfig` (DataSource, JdbcTemplate) - Schema initialization (idempotent DDL scripts, not Flyway -- ClickHouse DDL is different enough) - Implement `ClickHouseMetricsStore` (simplest table, validates pipeline) - Deploy ClickHouse to k8s alongside existing PG+OpenSearch ### Phase 2: Executions + Search - Build `ExecutionAccumulator` (replaces SearchIndexer) - Implement `ClickHouseExecutionStore` and `ClickHouseProcessorStore` - Implement `ClickHouseSearchIndex` (ngram-based SQL queries) - Feature flag: dual-write to both PG and CH, read from PG ### Phase 3: Stats & Analytics - Create MV definitions (all 5 stats views) - Implement `ClickHouseStatsStore` - Validate stats accuracy: compare CH vs PG continuous aggregates ### Phase 4: Remaining Tables - `ClickHouseDiagramStore` (ReplacingMergeTree) - `ClickHouseAgentEventStore` - `ClickHouseLogStore` (replaces OpenSearchLogIndex) - Application-side highlighting ### Phase 5: Multitenancy - Tables already include `tenant_id` from Phase 1 (schema is forward-looking). This phase activates multitenancy. - Wire `tenant_id` from JWT claims into all ClickHouse queries (application-layer WHERE injection) - Add `tenant_id` to PostgreSQL RBAC/config tables - Create ClickHouse row policies per tenant (defense-in-depth) - Create `tenant_retention_config` table in PG and `RetentionScheduler` component - Tenant user management and resource quotas in ClickHouse ### Phase 6: Cutover - Backfill historical data from PG/OpenSearch to ClickHouse - Switch read path to ClickHouse (feature flag) - Validate end-to-end - Remove OpenSearch dependency (POM, config, k8s manifests) - Remove TimescaleDB extensions and hypertable-specific code - Keep PostgreSQL for RBAC/config/audit only ## Verification ### Functional Verification 1. **Ingestion**: Send executions via agent, verify they appear in ClickHouse with correct fields 2. **Two-phase lifecycle**: Send RUNNING, then COMPLETED. Verify single merged row in CH 3. **Search**: Wildcard search across bodies, headers, errors. Verify sub-second response 4. **Stats**: Dashboard statistics match expected values. Compare with PG aggregates during dual-write 5. **Logs**: Ingest log batches, query by app/level/time/text. Verify correctness 6. **Retention**: Configure per-tenant retention, run scheduler, verify expired data is deleted 7. **Multitenancy**: Two tenants, verify data isolation (one tenant cannot see another's data) ### Performance Verification 1. **Insert throughput**: 5K executions/batch at 1 flush/sec sustained 2. **Search latency**: Sub-second for `LIKE '%term%'` across 1M+ rows 3. **Stats query latency**: Dashboard stats in <100ms from materialized views 4. **Log search**: <1s for text search across 7 days of logs ### Data Integrity 1. During dual-write phase: compare row counts between PG and CH 2. After cutover: spot-check execution details, processor trees, search results