docs: add ClickHouse migration design and append-only protocol spec

Design for replacing PostgreSQL/TimescaleDB + OpenSearch with ClickHouse
OSS. Covers table schemas, ingestion pipeline (ExecutionAccumulator),
ngram search indexes, materialized views, multitenancy, and retention.

Companion doc proposes append-only execution protocol for the agent repo.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-31 16:36:22 +02:00
parent ebe768711b
commit e7eda7a7b3
2 changed files with 1062 additions and 0 deletions

View File

@@ -0,0 +1,146 @@
# Append-Only Execution Data Protocol
A reference document for redesigning the Cameleer agent's data reporting to be append-only,
eliminating the need for upserts in the storage layer.
## Problem
The current protocol sends execution data in two phases:
1. **RUNNING phase**: Agent sends a partial record when a route starts executing (execution_id, route_id, start_time, status=RUNNING). No bodies, no duration, no error info.
2. **COMPLETED/FAILED phase**: Agent sends an enriched record when execution finishes (duration, output body, headers, errors, processor tree).
The server uses `INSERT ... ON CONFLICT DO UPDATE SET COALESCE(...)` to merge these into a single row. This works in PostgreSQL but creates problems for append-only stores like ClickHouse, Kafka topics, or any event-sourced architecture.
### Why This Matters
- **ClickHouse**: No native upsert. Must use ReplacingMergeTree (eventual consistency, FINAL overhead) or application-side buffering.
- **Event streaming**: Kafka/Pulsar topics are append-only. Two-phase lifecycle requires a stateful stream processor to merge.
- **Data lakes**: Parquet files are immutable. Updates require read-modify-write of entire files.
- **Materialized views**: Insert-triggered aggregations (ClickHouse MVs, Kafka Streams, Flink) double-count if they see both RUNNING and COMPLETED inserts for the same execution.
## Proposed Protocol Change
### Option A: Single-Phase Reporting (Recommended)
The agent buffers the execution locally and sends a **single, complete record** only when the execution reaches a terminal state (COMPLETED or FAILED).
```
Current: Agent -> [RUNNING] -> Server -> [COMPLETED] -> Server (upsert)
Proposed: Agent -> [buffer locally] -> [COMPLETED with all fields] -> Server (append)
```
**What changes in the agent:**
- `RouteExecutionTracker` holds in-flight executions in a local `ConcurrentHashMap`
- On route start: create tracker entry with start_time, route_id, etc.
- On route complete: enrich tracker entry with duration, bodies, errors, processor tree
- On report: send the complete record in one HTTP POST
- On timeout (configurable, e.g., 5 minutes): flush as RUNNING (for visibility of stuck routes)
**What changes in the server:**
- Storage becomes pure append: `INSERT INTO executions VALUES (...)` — no upsert, no COALESCE
- No `SearchIndexer` / `ExecutionAccumulator` needed — the server just writes what it receives
- Materialized views count correctly (one insert = one execution)
- Works with any append-only store (ClickHouse, Kafka, S3/Parquet)
**Trade-offs:**
- RUNNING executions are not visible on the server until they complete (or timeout-flush)
- "Active execution count" must come from agent heartbeat/registry data, not from stored RUNNING rows
- If the agent crashes, in-flight executions are lost (same as current behavior — RUNNING rows become orphans anyway)
### Option B: Event Log with Reconstruction
Send both phases as separate **events** (not records), and let the server reconstruct the current state.
```
Event 1: {type: "EXECUTION_STARTED", executionId: "abc", startTime: ..., routeId: ...}
Event 2: {type: "EXECUTION_COMPLETED", executionId: "abc", duration: 250, outputBody: ..., processors: [...]}
```
**Server-side:**
- Store raw events in an append-only log table
- Reconstruct current state via `SELECT argMax(field, event_time) FROM events WHERE execution_id = ? GROUP BY execution_id`
- Or: use a materialized view with `AggregatingMergeTree` + `argMaxState` to maintain a "latest state" table
**Trade-offs:**
- More complex server-side reconstruction
- Higher storage (two rows per execution instead of one)
- More flexible: supports any number of state transitions (RUNNING -> PAUSED -> RUNNING -> COMPLETED)
- Natural fit for event sourcing architectures
### Option C: Hybrid (Current Cameleer3-Server Approach)
Keep the two-phase protocol but handle merging at the server application layer. This is what cameleer3-server implements today with the `ExecutionAccumulator`:
- RUNNING POST -> hold in `ConcurrentHashMap` (no DB write)
- COMPLETED POST -> merge with RUNNING in-memory -> single INSERT to DB
- Timeout sweep -> flush stale RUNNING entries for visibility
**Trade-offs:**
- No agent changes required
- Server must be stateful (in-memory accumulator)
- Crash window: active executions lost if server restarts
- Adds complexity to the server that wouldn't exist with Option A
## Recommendation
**Option A (single-phase reporting)** is the strongest choice for a new protocol version:
1. **Simplest server implementation**: Pure append, no state, no merging
2. **Works everywhere**: ClickHouse, Kafka, S3, any append-only store
3. **Correct by construction**: MVs, aggregations, and stream processing all see one event per execution
4. **Agent is the natural place to buffer**: The agent already tracks in-flight executions for instrumentation — it just needs to hold the report until completion
5. **Minimal data loss risk**: Agent crash loses in-flight data regardless of protocol — this doesn't make it worse
### Migration Strategy
1. Add `protocol_version` field to agent registration
2. v1 agents: server uses `ExecutionAccumulator` (current behavior)
3. v2 agents: server does pure append (no accumulator needed for v2 data)
4. Both can coexist — the server checks protocol version per agent
### Fields for Single-Phase Record
The complete record sent by a v2 agent:
```json
{
"executionId": "uuid",
"routeId": "myRoute",
"agentId": "agent-1",
"applicationName": "my-app",
"correlationId": "corr-123",
"exchangeId": "exchange-456",
"status": "COMPLETED",
"startTime": "2026-03-31T10:00:00.000Z",
"endTime": "2026-03-31T10:00:00.250Z",
"durationMs": 250,
"errorMessage": null,
"errorStackTrace": null,
"errorType": null,
"errorCategory": null,
"rootCauseType": null,
"rootCauseMessage": null,
"inputSnapshot": {"body": "...", "headers": {"Content-Type": "application/json"}},
"outputSnapshot": {"body": "...", "headers": {"Content-Type": "application/xml"}},
"attributes": {"key": "value"},
"traceId": "otel-trace-id",
"spanId": "otel-span-id",
"replayExchangeId": null,
"processors": [
{
"processorId": "proc-1",
"processorType": "to",
"status": "COMPLETED",
"startTime": "...",
"endTime": "...",
"durationMs": 120,
"inputBody": "...",
"outputBody": "...",
"children": []
}
]
}
```
All fields populated. No second POST needed. Server does a single INSERT.

View File

@@ -0,0 +1,916 @@
# ClickHouse Migration Design
Replace PostgreSQL/TimescaleDB + OpenSearch with ClickHouse OSS for all observability data.
PostgreSQL retained only for RBAC, config, and audit log.
## Context
Cameleer3-server currently uses three storage systems:
- **PostgreSQL/TimescaleDB**: executions, processor_executions, agent_metrics (hypertables), agent_events, route_diagrams, plus RBAC/config/audit tables. Continuous aggregates for dashboard statistics.
- **OpenSearch**: executions-YYYY-MM-DD indices (full-text search on bodies/headers/errors), logs-YYYY-MM-DD indices (application log storage with 7-day retention).
- **Dual-write pattern**: PG is source of truth, OpenSearch is async-indexed via debounced `SearchIndexer`.
This architecture has scaling limits: three systems to operate, data duplication between PG and OpenSearch, TimescaleDB continuous aggregates with limited flexibility, and no multitenancy support.
**Goal**: Consolidate to ClickHouse OSS (self-hosted) for all observability data. Add multitenancy with custom per-tenant, per-document-type retention. Support billions of documents, terabytes of data, sub-second wildcard search.
## Decisions
| Decision | Choice | Rationale |
|----------|--------|-----------|
| Deployment | Self-hosted ClickHouse OSS on k3s | All needed features available in OSS. Fits existing infra. |
| Execution lifecycle | Approach B: Application-side accumulator | Merges RUNNING+COMPLETED in-memory, writes one row. Avoids upsert problem. |
| Table engine (executions) | ReplacingMergeTree | Handles rare late corrections via version column. Normal flow writes once. |
| Table engine (all others) | MergeTree | Append-only data, no dedup needed. |
| Client | JDBC + JdbcTemplate | Familiar pattern, matches current PG code. Async inserts via JDBC URL settings. |
| Multitenancy | Shared tables + tenant_id column | Row policies for defense-in-depth. Application-layer WHERE for primary enforcement. |
| Retention | Application-driven scheduler | Per-tenant, per-document-type. Config in PG, execution via ALTER TABLE DELETE. |
| Search | Ngram bloom filter indexes | Sub-second wildcard search. Materialized `_search_text` column for cross-field search. |
| Highlighting | Application-side in Java | Extract 120-char fragment around match from returned fields. |
| Storage tiering | Local SSD only (initially) | S3/MinIO tiering can be added later via TTL MOVE rules. |
## ClickHouse OSS Constraints
These are features NOT available in the open-source version:
| Constraint | Impact on Cameleer3 |
|------------|---------------------|
| No SharedMergeTree | No elastic compute scaling; must size nodes up-front. Acceptable for self-hosted. |
| No BM25 relevance scoring | Search returns matches without ranking. Acceptable for observability (want all matches, not ranked). |
| No search highlighting | Replaced by application-side highlighting in Java. |
| No fuzzy/typo-tolerant search | Must match exact tokens or use ngram index for substring match. Acceptable. |
| No ClickPipes | Must build own ingestion pipeline. Already exists (agents push via HTTP POST). |
| No managed backups | Must configure `clickhouse-backup` (Altinity, open-source) or built-in BACKUP SQL. |
| No auto-scaling | Manual capacity planning. Single node handles 14+ TiB, sufficient for initial scale. |
General ClickHouse constraints (apply to both OSS and Cloud):
| Constraint | Mitigation |
|------------|------------|
| ORDER BY is immutable | Careful upfront schema design. Documented below. |
| No transactions | Single-table INSERT atomic per block. No cross-table atomicity needed. |
| Mutations are expensive | Avoid ALTER UPDATE/DELETE. Use ReplacingMergeTree for corrections, append-only for everything else. |
| Row policies skip mutations | Application-layer WHERE on mutations. Mutations are rare (retention scheduler only). |
| No JPA/Hibernate | Use JdbcTemplate (already the pattern for PG). |
| JSON max_dynamic_paths | Store attributes as flattened String, not JSON type. Use ngram index for search. |
| Text indexes can't index JSON subcolumns | Extract searchable text into materialized String columns. |
| MVs only process new inserts | Historical data backfill writes through MV pipeline. |
| MV errors block source inserts | Careful MV design. Test thoroughly before production. |
| ReplacingMergeTree eventual consistency | Use FINAL on queries that need latest version. |
## What Stays in PostgreSQL
| Table | Reason |
|-------|--------|
| `users`, `roles`, `groups`, `user_groups`, `user_roles`, `group_roles` | RBAC with relational joins, foreign keys, transactions |
| `server_config` | Global config, low volume, needs transactions |
| `application_config` | Per-app observability settings |
| `app_settings` | Per-app SLA thresholds |
| `audit_log` | Security compliance, needs transactions, joins with RBAC tables |
| OIDC config | Auth provider config |
| `tenant_retention_config` (new) | Per-tenant retention settings, referenced by scheduler |
## What Moves to ClickHouse
| Data | Current Location | ClickHouse Table |
|------|-----------------|------------------|
| Route executions | PG `executions` hypertable + OpenSearch `executions-*` | `executions` |
| Processor executions | PG `processor_executions` hypertable | `processor_executions` |
| Agent metrics | PG `agent_metrics` hypertable | `agent_metrics` |
| Agent events | PG `agent_events` | `agent_events` |
| Route diagrams | PG `route_diagrams` | `route_diagrams` |
| Application logs | OpenSearch `logs-*` | `logs` |
| Dashboard statistics | PG continuous aggregates (`stats_1m_*`) | ClickHouse materialized views (`stats_1m_*`) |
## Table Schemas
### executions
```sql
CREATE TABLE executions (
tenant_id LowCardinality(String),
execution_id String,
start_time DateTime64(3),
_version UInt64 DEFAULT 1,
route_id LowCardinality(String),
agent_id LowCardinality(String),
application_name LowCardinality(String),
status LowCardinality(String),
correlation_id String DEFAULT '',
exchange_id String DEFAULT '',
end_time Nullable(DateTime64(3)),
duration_ms Nullable(Int64),
error_message String DEFAULT '',
error_stacktrace String DEFAULT '',
error_type LowCardinality(String) DEFAULT '',
error_category LowCardinality(String) DEFAULT '',
root_cause_type String DEFAULT '',
root_cause_message String DEFAULT '',
diagram_content_hash String DEFAULT '',
engine_level LowCardinality(String) DEFAULT '',
input_body String DEFAULT '',
output_body String DEFAULT '',
input_headers String DEFAULT '',
output_headers String DEFAULT '',
attributes String DEFAULT '',
trace_id String DEFAULT '',
span_id String DEFAULT '',
processors_json String DEFAULT '',
has_trace_data Bool DEFAULT false,
is_replay Bool DEFAULT false,
_search_text String MATERIALIZED
concat(error_message, ' ', error_stacktrace, ' ', attributes,
' ', input_body, ' ', output_body, ' ', input_headers,
' ', output_headers, ' ', root_cause_message),
INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_status status TYPE set(10) GRANULARITY 1,
INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4
)
ENGINE = ReplacingMergeTree(_version)
PARTITION BY (tenant_id, toYYYYMM(start_time))
ORDER BY (tenant_id, start_time, application_name, route_id, execution_id)
TTL start_time + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
```
Design rationale:
- **ORDER BY** `(tenant_id, start_time, application_name, route_id, execution_id)`: Matches UI query pattern (tenant -> time range -> app -> route). Time before application because observability queries almost always include a time range.
- **PARTITION BY** `(tenant_id, toYYYYMM(start_time))`: Enables per-tenant partition drops for retention. Monthly granularity balances partition count vs drop efficiency.
- **ReplacingMergeTree(_version)**: Normal flow writes once (version 1). Late corrections write version 2+. Background merges keep latest version.
- **`_search_text` materialized column**: Computed at insert time. Concatenates all searchable fields for cross-field wildcard search.
- **`ngrambf_v1(3, 256, 2, 0)`**: 3-char ngrams in a 256-byte bloom filter with 2 hash functions. Prunes most granules for `LIKE '%term%'` queries. The bloom filter size (256 bytes) is a starting point — increase to 4096-8192 if false positive rates are too high for long text fields. Tune after benchmarking with real data.
- **`LowCardinality(String)`**: Dictionary encoding for columns with few distinct values. Major compression improvement.
- **TTL 365 days**: Safety net. Application-driven scheduler handles per-tenant retention at finer granularity.
### processor_executions
```sql
CREATE TABLE processor_executions (
tenant_id LowCardinality(String),
execution_id String,
processor_id String,
start_time DateTime64(3),
route_id LowCardinality(String),
application_name LowCardinality(String),
processor_type LowCardinality(String),
parent_processor_id String DEFAULT '',
depth UInt16 DEFAULT 0,
status LowCardinality(String),
end_time Nullable(DateTime64(3)),
duration_ms Nullable(Int64),
error_message String DEFAULT '',
error_stacktrace String DEFAULT '',
error_type LowCardinality(String) DEFAULT '',
error_category LowCardinality(String) DEFAULT '',
root_cause_type String DEFAULT '',
root_cause_message String DEFAULT '',
input_body String DEFAULT '',
output_body String DEFAULT '',
input_headers String DEFAULT '',
output_headers String DEFAULT '',
attributes String DEFAULT '',
loop_index Nullable(Int32),
loop_size Nullable(Int32),
split_index Nullable(Int32),
split_size Nullable(Int32),
multicast_index Nullable(Int32),
resolved_endpoint_uri String DEFAULT '',
error_handler_type LowCardinality(String) DEFAULT '',
circuit_breaker_state LowCardinality(String) DEFAULT '',
fallback_triggered Bool DEFAULT false,
_search_text String MATERIALIZED
concat(error_message, ' ', error_stacktrace, ' ', attributes,
' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers),
INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(start_time))
ORDER BY (tenant_id, start_time, application_name, route_id, execution_id, processor_id)
TTL start_time + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
```
### logs
```sql
CREATE TABLE logs (
tenant_id LowCardinality(String),
timestamp DateTime64(3),
application LowCardinality(String),
agent_id LowCardinality(String),
level LowCardinality(String),
logger_name LowCardinality(String) DEFAULT '',
message String,
thread_name LowCardinality(String) DEFAULT '',
stack_trace String DEFAULT '',
exchange_id String DEFAULT '',
mdc Map(String, String) DEFAULT map(),
INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_level level TYPE set(10) GRANULARITY 1
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, application, timestamp)
TTL timestamp + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
```
### agent_metrics
```sql
CREATE TABLE agent_metrics (
tenant_id LowCardinality(String),
collected_at DateTime64(3),
agent_id LowCardinality(String),
metric_name LowCardinality(String),
metric_value Float64,
tags Map(String, String) DEFAULT map(),
server_received_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(collected_at))
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
TTL collected_at + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
```
### agent_events
```sql
CREATE TABLE agent_events (
tenant_id LowCardinality(String),
timestamp DateTime64(3) DEFAULT now64(3),
agent_id LowCardinality(String),
app_id LowCardinality(String),
event_type LowCardinality(String),
detail String DEFAULT ''
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, app_id, agent_id, timestamp)
TTL timestamp + INTERVAL 365 DAY DELETE;
```
### route_diagrams
```sql
CREATE TABLE route_diagrams (
tenant_id LowCardinality(String),
content_hash String,
route_id LowCardinality(String),
agent_id LowCardinality(String),
application_name LowCardinality(String),
definition String,
created_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplacingMergeTree(created_at)
ORDER BY (tenant_id, content_hash)
SETTINGS index_granularity = 8192;
```
## Materialized Views (Stats)
Replace TimescaleDB continuous aggregates. ClickHouse MVs trigger on INSERT and store aggregate states in target tables.
### stats_1m_all (global)
```sql
CREATE TABLE stats_1m_all (
tenant_id LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
running_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW stats_1m_all_mv TO stats_1m_all AS
SELECT
tenant_id,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, bucket;
```
### stats_1m_app (per-application)
```sql
CREATE TABLE stats_1m_app (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
running_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW stats_1m_app_mv TO stats_1m_app AS
SELECT
tenant_id,
application_name,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_name, bucket;
```
### stats_1m_route (per-route)
```sql
CREATE TABLE stats_1m_route (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
route_id LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
running_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, route_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW stats_1m_route_mv TO stats_1m_route AS
SELECT
tenant_id,
application_name,
route_id,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_name, route_id, bucket;
```
### stats_1m_processor (per-processor-type)
```sql
CREATE TABLE stats_1m_processor (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
processor_type LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, processor_type, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW stats_1m_processor_mv TO stats_1m_processor AS
SELECT
tenant_id,
application_name,
processor_type,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_name, processor_type, bucket;
```
### stats_1m_processor_detail (per-processor-id)
```sql
CREATE TABLE stats_1m_processor_detail (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
route_id LowCardinality(String),
processor_id String,
bucket DateTime,
total_count AggregateFunction(count, UInt64),
failed_count AggregateFunction(countIf, UInt64, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, route_id, processor_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW stats_1m_processor_detail_mv TO stats_1m_processor_detail AS
SELECT
tenant_id,
application_name,
route_id,
processor_id,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_name, route_id, processor_id, bucket;
```
## Ingestion Pipeline
### Current Flow (replaced)
```
Agent POST -> IngestionService -> PostgresExecutionStore.upsert() -> PG
-> SearchIndexer (debounced 2s) -> reads from PG -> OpenSearch
```
### New Flow
```
Agent POST -> IngestionService -> ExecutionAccumulator
|-- RUNNING: ConcurrentHashMap (no DB write)
|-- COMPLETED/FAILED: merge with pending -> WriteBuffer
'-- Timeout sweep (60s): flush stale -> WriteBuffer
|
ClickHouseExecutionStore.insertBatch()
ClickHouseProcessorStore.insertBatch()
```
### ExecutionAccumulator
New component replacing `SearchIndexer`. Core responsibilities:
1. **On RUNNING POST**: Store `PendingExecution` in `ConcurrentHashMap<String, PendingExecution>` keyed by `execution_id`. Return 200 OK immediately. No database write.
2. **On COMPLETED/FAILED POST**: Look up pending RUNNING by `execution_id`. If found, merge fields using the same COALESCE logic currently in `PostgresExecutionStore.upsert()`. Produce a complete `MergedExecution` and push to `WriteBuffer`. If not found (race condition or RUNNING already flushed by timeout), write COMPLETED directly with `_version=2`.
3. **Timeout sweep** (scheduled every 60s): Scan for RUNNING entries older than 5 minutes. Flush them to ClickHouse as-is with status=RUNNING, making them visible in the UI. When COMPLETED eventually arrives, it writes with `_version=2` (ReplacingMergeTree deduplicates).
4. **Late corrections**: If a correction arrives for an already-written execution, insert with `_version` incremented. ReplacingMergeTree handles deduplication.
### WriteBuffer
Reuse the existing `WriteBuffer` pattern (bounded queue, configurable batch size, scheduled drain):
- Buffer capacity: 50,000 items
- Batch size: 5,000 per flush
- Flush interval: 1 second
- Separate buffers for executions and processor_executions (independent batch inserts)
- Drain calls `ClickHouseExecutionStore.insertBatch()` using JDBC batch update
### Logs Ingestion
Direct batch INSERT, bypasses accumulator (logs are single-phase):
```
Agent POST /api/v1/data/logs -> LogIngestionController -> ClickHouseLogStore.insertBatch()
```
### Metrics Ingestion
Existing `MetricsWriteBuffer` targets ClickHouse instead of PG:
```
Agent POST /api/v1/data/metrics -> MetricsController -> WriteBuffer -> ClickHouseMetricsStore.insertBatch()
```
### JDBC Batch Insert Pattern
```java
jdbcTemplate.batchUpdate(
"INSERT INTO executions (tenant_id, execution_id, start_time, ...) VALUES (?, ?, ?, ...)",
batchArgs
);
```
JDBC URL includes `async_insert=1&wait_for_async_insert=0` for server-side buffering, preventing "too many parts" errors under high load.
## Search Implementation
### Query Translation
Current OpenSearch bool queries map to ClickHouse SQL:
```sql
-- Full-text wildcard search with time range, status filter, and pagination
SELECT *
FROM executions FINAL
WHERE tenant_id = {tenant_id:String}
AND start_time >= {time_from:DateTime64(3)}
AND start_time < {time_to:DateTime64(3)}
AND status IN ({statuses:Array(String)})
AND (
_search_text LIKE '%{search_term}%'
OR execution_id IN (
SELECT DISTINCT execution_id
FROM processor_executions
WHERE tenant_id = {tenant_id:String}
AND start_time >= {time_from:DateTime64(3)}
AND start_time < {time_to:DateTime64(3)}
AND _search_text LIKE '%{search_term}%'
)
)
ORDER BY start_time DESC
LIMIT {limit:UInt32} OFFSET {offset:UInt32}
```
### Scoped Searches
| Scope | ClickHouse WHERE clause |
|-------|------------------------|
| textInBody | `input_body LIKE '%term%' OR output_body LIKE '%term%'` |
| textInHeaders | `input_headers LIKE '%term%' OR output_headers LIKE '%term%'` |
| textInErrors | `error_message LIKE '%term%' OR error_stacktrace LIKE '%term%'` |
| global text | `_search_text LIKE '%term%'` (covers all fields) |
All accelerated by `ngrambf_v1` indexes which prune 95%+ of data granules before scanning.
### Application-Side Highlighting
```java
public String extractHighlight(String text, String searchTerm, int contextChars) {
int idx = text.toLowerCase().indexOf(searchTerm.toLowerCase());
if (idx < 0) return null;
int start = Math.max(0, idx - contextChars / 2);
int end = Math.min(text.length(), idx + searchTerm.length() + contextChars / 2);
return (start > 0 ? "..." : "")
+ text.substring(start, end)
+ (end < text.length() ? "..." : "");
}
```
Returns the same `highlight` map structure the UI currently expects.
### Nested Processor Search
OpenSearch nested queries become a subquery on the `processor_executions` table:
```sql
execution_id IN (
SELECT DISTINCT execution_id
FROM processor_executions
WHERE tenant_id = ? AND start_time >= ? AND start_time < ?
AND _search_text LIKE '%term%'
)
```
This is evaluated once with ngram index acceleration, then joined via IN.
## Stats Query Translation
### TimescaleDB -> ClickHouse Query Patterns
| TimescaleDB | ClickHouse |
|-------------|------------|
| `time_bucket('1 minute', bucket)` | `toStartOfInterval(bucket, INTERVAL 1 MINUTE)` |
| `SUM(total_count)` | `countMerge(total_count)` |
| `SUM(failed_count)` | `countIfMerge(failed_count)` |
| `approx_percentile(0.99, rollup(p99_duration))` | `quantileMerge(0.99)(p99_duration)` |
| `SUM(duration_sum) / SUM(total_count)` | `sumMerge(duration_sum) / countMerge(total_count)` |
| `MAX(duration_max)` | `maxMerge(duration_max)` |
### Example: Timeseries Query
```sql
SELECT
toStartOfInterval(bucket, INTERVAL {interval:UInt32} SECOND) AS period,
countMerge(total_count) AS total_count,
countIfMerge(failed_count) AS failed_count,
sumMerge(duration_sum) / countMerge(total_count) AS avg_duration,
quantileMerge(0.99)(p99_duration) AS p99_duration
FROM stats_1m_app
WHERE tenant_id = {tenant_id:String}
AND application_name = {app:String}
AND bucket >= {from:DateTime}
AND bucket < {to:DateTime}
GROUP BY period
ORDER BY period
```
### SLA and Top Errors
SLA queries hit the raw `executions` table (need per-row duration filtering):
```sql
SELECT
countIf(duration_ms <= {threshold:Int64} AND status != 'RUNNING') * 100.0 / count() AS sla_pct
FROM executions FINAL
WHERE tenant_id = ? AND application_name = ? AND start_time >= ? AND start_time < ?
```
Top errors query:
```sql
SELECT
error_message,
count() AS error_count,
max(start_time) AS last_seen
FROM executions FINAL
WHERE tenant_id = ? AND status = 'FAILED'
AND start_time >= now() - INTERVAL 1 HOUR
GROUP BY error_message
ORDER BY error_count DESC
LIMIT 10
```
## Multitenancy
### Data Isolation
**Primary**: Application-layer WHERE clause injection. Every ClickHouse query gets `WHERE tenant_id = ?` from the authenticated user's JWT claims.
**Defense-in-depth**: ClickHouse row policies:
```sql
-- Create a ClickHouse user per tenant
CREATE USER tenant_acme IDENTIFIED BY '...';
-- Row policy ensures tenant can only see their data
CREATE ROW POLICY tenant_acme_executions ON executions
FOR SELECT USING tenant_id = 'acme';
-- Repeat for all tables
```
### Tenant ID in Schema
`tenant_id` is the first column in every table's ORDER BY and PARTITION BY. This ensures:
- Data for the same tenant is physically co-located on disk
- Queries filtering by tenant_id use the sparse index efficiently
- Partition drops for retention are scoped to individual tenants
### Resource Quotas
```sql
CREATE SETTINGS PROFILE tenant_limits
SETTINGS max_execution_time = 30,
max_rows_to_read = 100000000,
max_memory_usage = '4G';
ALTER USER tenant_acme SETTINGS PROFILE tenant_limits;
```
Prevents noisy neighbor problems where one tenant's expensive query affects others.
## Retention
### Strategy: Application-Driven Scheduler
Per-tenant, per-document-type retention is too dynamic for static ClickHouse TTL rules. Instead:
1. **Config table** in PostgreSQL:
```sql
CREATE TABLE tenant_retention_config (
tenant_id VARCHAR(255) NOT NULL,
document_type VARCHAR(50) NOT NULL, -- executions, logs, metrics, etc.
retention_days INT NOT NULL,
PRIMARY KEY (tenant_id, document_type)
);
```
2. **RetentionScheduler** (Spring `@Scheduled`, runs daily at 03:00 UTC):
```java
@Scheduled(cron = "0 0 3 * * *")
public void enforceRetention() {
List<TenantRetention> configs = retentionConfigRepo.findAll();
for (TenantRetention config : configs) {
String table = config.documentType(); // executions, logs, metrics, etc.
clickHouseJdbc.execute(
"ALTER TABLE " + table + " DELETE WHERE tenant_id = ? AND start_time < now() - INTERVAL ? DAY",
config.tenantId(), config.retentionDays()
);
}
}
```
3. **Safety-net TTL**: Each table has a generous default TTL (365 days) as a backstop in case the scheduler fails. The scheduler handles the per-tenant granularity.
4. **Partition-aligned drops**: Since `PARTITION BY (tenant_id, toYYYYMM(start_time))`, when all rows in a partition match the DELETE condition, ClickHouse drops the entire partition (fast, no rewrite). Enable `ttl_only_drop_parts=1` on tables.
## Java/Spring Integration
### Dependencies
```xml
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.7.x</version> <!-- latest stable -->
<classifier>all</classifier>
</dependency>
```
### Configuration
```yaml
clickhouse:
url: jdbc:clickhouse://clickhouse:8123/cameleer?async_insert=1&wait_for_async_insert=0
username: cameleer_app
password: ${CLICKHOUSE_PASSWORD}
```
### DataSource Bean
```java
@Configuration
public class ClickHouseConfig {
@Bean
public DataSource clickHouseDataSource(ClickHouseProperties props) {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(props.getUrl());
ds.setUsername(props.getUsername());
ds.setPassword(props.getPassword());
ds.setMaximumPoolSize(10);
return ds;
}
@Bean
public JdbcTemplate clickHouseJdbcTemplate(
@Qualifier("clickHouseDataSource") DataSource ds) {
return new JdbcTemplate(ds);
}
}
```
### Interface Implementations
Existing interfaces remain unchanged. New implementations:
| Interface | Current Impl | New Impl |
|-----------|-------------|----------|
| `ExecutionStore` | `PostgresExecutionStore` | `ClickHouseExecutionStore` |
| `SearchIndex` | `OpenSearchIndex` | `ClickHouseSearchIndex` |
| `StatsStore` | `PostgresStatsStore` | `ClickHouseStatsStore` |
| `DiagramStore` | `PostgresDiagramStore` | `ClickHouseDiagramStore` |
| `MetricsStore` | `PostgresMetricsStore` | `ClickHouseMetricsStore` |
| (log search) | `OpenSearchLogIndex` | `ClickHouseLogStore` |
| (new) | `SearchIndexer` | `ExecutionAccumulator` |
## Kubernetes Deployment
### ClickHouse StatefulSet
```yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: clickhouse
spec:
serviceName: clickhouse
replicas: 1 # single node initially
template:
spec:
containers:
- name: clickhouse
image: clickhouse/clickhouse-server:26.2
ports:
- containerPort: 8123 # HTTP
- containerPort: 9000 # Native
volumeMounts:
- name: data
mountPath: /var/lib/clickhouse
- name: config
mountPath: /etc/clickhouse-server/config.d
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100Gi # NVMe/SSD
```
### Health Check
```yaml
livenessProbe:
httpGet:
path: /ping
port: 8123
readinessProbe:
httpGet:
path: /ping
port: 8123
```
## Migration Path
### Phase 1: Foundation
- Add `clickhouse-jdbc` dependency
- Create `ClickHouseConfig` (DataSource, JdbcTemplate)
- Schema initialization (idempotent DDL scripts, not Flyway -- ClickHouse DDL is different enough)
- Implement `ClickHouseMetricsStore` (simplest table, validates pipeline)
- Deploy ClickHouse to k8s alongside existing PG+OpenSearch
### Phase 2: Executions + Search
- Build `ExecutionAccumulator` (replaces SearchIndexer)
- Implement `ClickHouseExecutionStore` and `ClickHouseProcessorStore`
- Implement `ClickHouseSearchIndex` (ngram-based SQL queries)
- Feature flag: dual-write to both PG and CH, read from PG
### Phase 3: Stats & Analytics
- Create MV definitions (all 5 stats views)
- Implement `ClickHouseStatsStore`
- Validate stats accuracy: compare CH vs PG continuous aggregates
### Phase 4: Remaining Tables
- `ClickHouseDiagramStore` (ReplacingMergeTree)
- `ClickHouseAgentEventStore`
- `ClickHouseLogStore` (replaces OpenSearchLogIndex)
- Application-side highlighting
### Phase 5: Multitenancy
- Tables already include `tenant_id` from Phase 1 (schema is forward-looking). This phase activates multitenancy.
- Wire `tenant_id` from JWT claims into all ClickHouse queries (application-layer WHERE injection)
- Add `tenant_id` to PostgreSQL RBAC/config tables
- Create ClickHouse row policies per tenant (defense-in-depth)
- Create `tenant_retention_config` table in PG and `RetentionScheduler` component
- Tenant user management and resource quotas in ClickHouse
### Phase 6: Cutover
- Backfill historical data from PG/OpenSearch to ClickHouse
- Switch read path to ClickHouse (feature flag)
- Validate end-to-end
- Remove OpenSearch dependency (POM, config, k8s manifests)
- Remove TimescaleDB extensions and hypertable-specific code
- Keep PostgreSQL for RBAC/config/audit only
## Verification
### Functional Verification
1. **Ingestion**: Send executions via agent, verify they appear in ClickHouse with correct fields
2. **Two-phase lifecycle**: Send RUNNING, then COMPLETED. Verify single merged row in CH
3. **Search**: Wildcard search across bodies, headers, errors. Verify sub-second response
4. **Stats**: Dashboard statistics match expected values. Compare with PG aggregates during dual-write
5. **Logs**: Ingest log batches, query by app/level/time/text. Verify correctness
6. **Retention**: Configure per-tenant retention, run scheduler, verify expired data is deleted
7. **Multitenancy**: Two tenants, verify data isolation (one tenant cannot see another's data)
### Performance Verification
1. **Insert throughput**: 5K executions/batch at 1 flush/sec sustained
2. **Search latency**: Sub-second for `LIKE '%term%'` across 1M+ rows
3. **Stats query latency**: Dashboard stats in <100ms from materialized views
4. **Log search**: <1s for text search across 7 days of logs
### Data Integrity
1. During dual-write phase: compare row counts between PG and CH
2. After cutover: spot-check execution details, processor trees, search results