Files
cameleer-server/docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md
hsiegeln cb3ebfea7c
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 18s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
chore: rename cameleer3 to cameleer
Rename Java packages from com.cameleer3 to com.cameleer, module
directories from cameleer3-* to cameleer-*, and all references
throughout workflows, Dockerfiles, docs, migrations, and pom.xml.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 15:28:42 +02:00

35 KiB

ClickHouse Migration Design

Replace PostgreSQL/TimescaleDB + OpenSearch with ClickHouse OSS for all observability data. PostgreSQL retained only for RBAC, config, and audit log.

Context

Cameleer-server currently uses three storage systems:

  • PostgreSQL/TimescaleDB: executions, processor_executions, agent_metrics (hypertables), agent_events, route_diagrams, plus RBAC/config/audit tables. Continuous aggregates for dashboard statistics.
  • OpenSearch: executions-YYYY-MM-DD indices (full-text search on bodies/headers/errors), logs-YYYY-MM-DD indices (application log storage with 7-day retention).
  • Dual-write pattern: PG is source of truth, OpenSearch is async-indexed via debounced SearchIndexer.

This architecture has scaling limits: three systems to operate, data duplication between PG and OpenSearch, TimescaleDB continuous aggregates with limited flexibility, and no multitenancy support.

Goal: Consolidate to ClickHouse OSS (self-hosted) for all observability data. Add multitenancy with custom per-tenant, per-document-type retention. Support billions of documents, terabytes of data, sub-second wildcard search.

Decisions

Decision Choice Rationale
Deployment Self-hosted ClickHouse OSS on k3s All needed features available in OSS. Fits existing infra.
Execution lifecycle Approach B: Application-side accumulator Merges RUNNING+COMPLETED in-memory, writes one row. Avoids upsert problem.
Table engine (executions) ReplacingMergeTree Handles rare late corrections via version column. Normal flow writes once.
Table engine (all others) MergeTree Append-only data, no dedup needed.
Client JDBC + JdbcTemplate Familiar pattern, matches current PG code. Async inserts via JDBC URL settings.
Multitenancy Shared tables + tenant_id column Row policies for defense-in-depth. Application-layer WHERE for primary enforcement.
Retention Application-driven scheduler Per-tenant, per-document-type. Config in PG, execution via ALTER TABLE DELETE.
Search Ngram bloom filter indexes Sub-second wildcard search. Materialized _search_text column for cross-field search.
Highlighting Application-side in Java Extract 120-char fragment around match from returned fields.
Storage tiering Local SSD only (initially) S3/MinIO tiering can be added later via TTL MOVE rules.

ClickHouse OSS Constraints

These are features NOT available in the open-source version:

Constraint Impact on Cameleer
No SharedMergeTree No elastic compute scaling; must size nodes up-front. Acceptable for self-hosted.
No BM25 relevance scoring Search returns matches without ranking. Acceptable for observability (want all matches, not ranked).
No search highlighting Replaced by application-side highlighting in Java.
No fuzzy/typo-tolerant search Must match exact tokens or use ngram index for substring match. Acceptable.
No ClickPipes Must build own ingestion pipeline. Already exists (agents push via HTTP POST).
No managed backups Must configure clickhouse-backup (Altinity, open-source) or built-in BACKUP SQL.
No auto-scaling Manual capacity planning. Single node handles 14+ TiB, sufficient for initial scale.

General ClickHouse constraints (apply to both OSS and Cloud):

Constraint Mitigation
ORDER BY is immutable Careful upfront schema design. Documented below.
No transactions Single-table INSERT atomic per block. No cross-table atomicity needed.
Mutations are expensive Avoid ALTER UPDATE/DELETE. Use ReplacingMergeTree for corrections, append-only for everything else.
Row policies skip mutations Application-layer WHERE on mutations. Mutations are rare (retention scheduler only).
No JPA/Hibernate Use JdbcTemplate (already the pattern for PG).
JSON max_dynamic_paths Store attributes as flattened String, not JSON type. Use ngram index for search.
Text indexes can't index JSON subcolumns Extract searchable text into materialized String columns.
MVs only process new inserts Historical data backfill writes through MV pipeline.
MV errors block source inserts Careful MV design. Test thoroughly before production.
ReplacingMergeTree eventual consistency Use FINAL on queries that need latest version.

What Stays in PostgreSQL

Table Reason
users, roles, groups, user_groups, user_roles, group_roles RBAC with relational joins, foreign keys, transactions
server_config Global config, low volume, needs transactions
application_config Per-app observability settings
app_settings Per-app SLA thresholds
audit_log Security compliance, needs transactions, joins with RBAC tables
OIDC config Auth provider config
tenant_retention_config (new) Per-tenant retention settings, referenced by scheduler

What Moves to ClickHouse

Data Current Location ClickHouse Table
Route executions PG executions hypertable + OpenSearch executions-* executions
Processor executions PG processor_executions hypertable processor_executions
Agent metrics PG agent_metrics hypertable agent_metrics
Agent events PG agent_events agent_events
Route diagrams PG route_diagrams route_diagrams
Application logs OpenSearch logs-* logs
Dashboard statistics PG continuous aggregates (stats_1m_*) ClickHouse materialized views (stats_1m_*)

Table Schemas

executions

CREATE TABLE executions (
    tenant_id            LowCardinality(String),
    execution_id         String,
    start_time           DateTime64(3),
    _version             UInt64 DEFAULT 1,
    route_id             LowCardinality(String),
    agent_id             LowCardinality(String),
    application_name     LowCardinality(String),
    status               LowCardinality(String),
    correlation_id       String DEFAULT '',
    exchange_id          String DEFAULT '',
    end_time             Nullable(DateTime64(3)),
    duration_ms          Nullable(Int64),
    error_message        String DEFAULT '',
    error_stacktrace     String DEFAULT '',
    error_type           LowCardinality(String) DEFAULT '',
    error_category       LowCardinality(String) DEFAULT '',
    root_cause_type      String DEFAULT '',
    root_cause_message   String DEFAULT '',
    diagram_content_hash String DEFAULT '',
    engine_level         LowCardinality(String) DEFAULT '',
    input_body           String DEFAULT '',
    output_body          String DEFAULT '',
    input_headers        String DEFAULT '',
    output_headers       String DEFAULT '',
    attributes           String DEFAULT '',
    trace_id             String DEFAULT '',
    span_id              String DEFAULT '',
    processors_json      String DEFAULT '',
    has_trace_data       Bool DEFAULT false,
    is_replay            Bool DEFAULT false,

    _search_text         String MATERIALIZED
        concat(error_message, ' ', error_stacktrace, ' ', attributes,
               ' ', input_body, ' ', output_body, ' ', input_headers,
               ' ', output_headers, ' ', root_cause_message),

    INDEX idx_search   _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_error    error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_bodies   concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_headers  concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_status   status TYPE set(10) GRANULARITY 1,
    INDEX idx_corr     correlation_id TYPE bloom_filter(0.01) GRANULARITY 4
)
ENGINE = ReplacingMergeTree(_version)
PARTITION BY (tenant_id, toYYYYMM(start_time))
ORDER BY (tenant_id, start_time, application_name, route_id, execution_id)
TTL start_time + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;

Design rationale:

  • ORDER BY (tenant_id, start_time, application_name, route_id, execution_id): Matches UI query pattern (tenant -> time range -> app -> route). Time before application because observability queries almost always include a time range.
  • PARTITION BY (tenant_id, toYYYYMM(start_time)): Enables per-tenant partition drops for retention. Monthly granularity balances partition count vs drop efficiency.
  • ReplacingMergeTree(_version): Normal flow writes once (version 1). Late corrections write version 2+. Background merges keep latest version.
  • _search_text materialized column: Computed at insert time. Concatenates all searchable fields for cross-field wildcard search.
  • ngrambf_v1(3, 256, 2, 0): 3-char ngrams in a 256-byte bloom filter with 2 hash functions. Prunes most granules for LIKE '%term%' queries. The bloom filter size (256 bytes) is a starting point — increase to 4096-8192 if false positive rates are too high for long text fields. Tune after benchmarking with real data.
  • LowCardinality(String): Dictionary encoding for columns with few distinct values. Major compression improvement.
  • TTL 365 days: Safety net. Application-driven scheduler handles per-tenant retention at finer granularity.

processor_executions

CREATE TABLE processor_executions (
    tenant_id             LowCardinality(String),
    execution_id          String,
    processor_id          String,
    start_time            DateTime64(3),
    route_id              LowCardinality(String),
    application_name      LowCardinality(String),
    processor_type        LowCardinality(String),
    parent_processor_id   String DEFAULT '',
    depth                 UInt16 DEFAULT 0,
    status                LowCardinality(String),
    end_time              Nullable(DateTime64(3)),
    duration_ms           Nullable(Int64),
    error_message         String DEFAULT '',
    error_stacktrace      String DEFAULT '',
    error_type            LowCardinality(String) DEFAULT '',
    error_category        LowCardinality(String) DEFAULT '',
    root_cause_type       String DEFAULT '',
    root_cause_message    String DEFAULT '',
    input_body            String DEFAULT '',
    output_body           String DEFAULT '',
    input_headers         String DEFAULT '',
    output_headers        String DEFAULT '',
    attributes            String DEFAULT '',
    loop_index            Nullable(Int32),
    loop_size             Nullable(Int32),
    split_index           Nullable(Int32),
    split_size            Nullable(Int32),
    multicast_index       Nullable(Int32),
    resolved_endpoint_uri String DEFAULT '',
    error_handler_type    LowCardinality(String) DEFAULT '',
    circuit_breaker_state LowCardinality(String) DEFAULT '',
    fallback_triggered    Bool DEFAULT false,

    _search_text          String MATERIALIZED
        concat(error_message, ' ', error_stacktrace, ' ', attributes,
               ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers),

    INDEX idx_search  _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(start_time))
ORDER BY (tenant_id, start_time, application_name, route_id, execution_id, processor_id)
TTL start_time + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;

logs

CREATE TABLE logs (
    tenant_id    LowCardinality(String),
    timestamp    DateTime64(3),
    application  LowCardinality(String),
    agent_id     LowCardinality(String),
    level        LowCardinality(String),
    logger_name  LowCardinality(String) DEFAULT '',
    message      String,
    thread_name  LowCardinality(String) DEFAULT '',
    stack_trace  String DEFAULT '',
    exchange_id  String DEFAULT '',
    mdc          Map(String, String) DEFAULT map(),

    INDEX idx_msg   message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_level level TYPE set(10) GRANULARITY 1
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, application, timestamp)
TTL timestamp + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;

agent_metrics

CREATE TABLE agent_metrics (
    tenant_id          LowCardinality(String),
    collected_at       DateTime64(3),
    agent_id           LowCardinality(String),
    metric_name        LowCardinality(String),
    metric_value       Float64,
    tags               Map(String, String) DEFAULT map(),
    server_received_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(collected_at))
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
TTL collected_at + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;

agent_events

CREATE TABLE agent_events (
    tenant_id   LowCardinality(String),
    timestamp   DateTime64(3) DEFAULT now64(3),
    agent_id    LowCardinality(String),
    app_id      LowCardinality(String),
    event_type  LowCardinality(String),
    detail      String DEFAULT ''
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, app_id, agent_id, timestamp)
TTL timestamp + INTERVAL 365 DAY DELETE;

route_diagrams

CREATE TABLE route_diagrams (
    tenant_id        LowCardinality(String),
    content_hash     String,
    route_id         LowCardinality(String),
    agent_id         LowCardinality(String),
    application_name LowCardinality(String),
    definition       String,
    created_at       DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplacingMergeTree(created_at)
ORDER BY (tenant_id, content_hash)
SETTINGS index_granularity = 8192;

Materialized Views (Stats)

Replace TimescaleDB continuous aggregates. ClickHouse MVs trigger on INSERT and store aggregate states in target tables.

stats_1m_all (global)

CREATE TABLE stats_1m_all (
    tenant_id    LowCardinality(String),
    bucket       DateTime,
    total_count  AggregateFunction(count, UInt64),
    failed_count AggregateFunction(countIf, UInt64, UInt8),
    running_count AggregateFunction(countIf, UInt64, UInt8),
    duration_sum AggregateFunction(sum, Nullable(Int64)),
    duration_max AggregateFunction(max, Nullable(Int64)),
    p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;

CREATE MATERIALIZED VIEW stats_1m_all_mv TO stats_1m_all AS
SELECT
    tenant_id,
    toStartOfMinute(start_time) AS bucket,
    countState()                        AS total_count,
    countIfState(status = 'FAILED')     AS failed_count,
    countIfState(status = 'RUNNING')    AS running_count,
    sumState(duration_ms)               AS duration_sum,
    maxState(duration_ms)               AS duration_max,
    quantileState(0.99)(duration_ms)    AS p99_duration
FROM executions
GROUP BY tenant_id, bucket;

stats_1m_app (per-application)

CREATE TABLE stats_1m_app (
    tenant_id        LowCardinality(String),
    application_name LowCardinality(String),
    bucket           DateTime,
    total_count      AggregateFunction(count, UInt64),
    failed_count     AggregateFunction(countIf, UInt64, UInt8),
    running_count    AggregateFunction(countIf, UInt64, UInt8),
    duration_sum     AggregateFunction(sum, Nullable(Int64)),
    duration_max     AggregateFunction(max, Nullable(Int64)),
    p99_duration     AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;

CREATE MATERIALIZED VIEW stats_1m_app_mv TO stats_1m_app AS
SELECT
    tenant_id,
    application_name,
    toStartOfMinute(start_time) AS bucket,
    countState()                        AS total_count,
    countIfState(status = 'FAILED')     AS failed_count,
    countIfState(status = 'RUNNING')    AS running_count,
    sumState(duration_ms)               AS duration_sum,
    maxState(duration_ms)               AS duration_max,
    quantileState(0.99)(duration_ms)    AS p99_duration
FROM executions
GROUP BY tenant_id, application_name, bucket;

stats_1m_route (per-route)

CREATE TABLE stats_1m_route (
    tenant_id        LowCardinality(String),
    application_name LowCardinality(String),
    route_id         LowCardinality(String),
    bucket           DateTime,
    total_count      AggregateFunction(count, UInt64),
    failed_count     AggregateFunction(countIf, UInt64, UInt8),
    running_count    AggregateFunction(countIf, UInt64, UInt8),
    duration_sum     AggregateFunction(sum, Nullable(Int64)),
    duration_max     AggregateFunction(max, Nullable(Int64)),
    p99_duration     AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, route_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;

CREATE MATERIALIZED VIEW stats_1m_route_mv TO stats_1m_route AS
SELECT
    tenant_id,
    application_name,
    route_id,
    toStartOfMinute(start_time) AS bucket,
    countState()                        AS total_count,
    countIfState(status = 'FAILED')     AS failed_count,
    countIfState(status = 'RUNNING')    AS running_count,
    sumState(duration_ms)               AS duration_sum,
    maxState(duration_ms)               AS duration_max,
    quantileState(0.99)(duration_ms)    AS p99_duration
FROM executions
GROUP BY tenant_id, application_name, route_id, bucket;

stats_1m_processor (per-processor-type)

CREATE TABLE stats_1m_processor (
    tenant_id        LowCardinality(String),
    application_name LowCardinality(String),
    processor_type   LowCardinality(String),
    bucket           DateTime,
    total_count      AggregateFunction(count, UInt64),
    failed_count     AggregateFunction(countIf, UInt64, UInt8),
    duration_sum     AggregateFunction(sum, Nullable(Int64)),
    duration_max     AggregateFunction(max, Nullable(Int64)),
    p99_duration     AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, processor_type, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;

CREATE MATERIALIZED VIEW stats_1m_processor_mv TO stats_1m_processor AS
SELECT
    tenant_id,
    application_name,
    processor_type,
    toStartOfMinute(start_time) AS bucket,
    countState()                        AS total_count,
    countIfState(status = 'FAILED')     AS failed_count,
    sumState(duration_ms)               AS duration_sum,
    maxState(duration_ms)               AS duration_max,
    quantileState(0.99)(duration_ms)    AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_name, processor_type, bucket;

stats_1m_processor_detail (per-processor-id)

CREATE TABLE stats_1m_processor_detail (
    tenant_id        LowCardinality(String),
    application_name LowCardinality(String),
    route_id         LowCardinality(String),
    processor_id     String,
    bucket           DateTime,
    total_count      AggregateFunction(count, UInt64),
    failed_count     AggregateFunction(countIf, UInt64, UInt8),
    duration_sum     AggregateFunction(sum, Nullable(Int64)),
    duration_max     AggregateFunction(max, Nullable(Int64)),
    p99_duration     AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, route_id, processor_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;

CREATE MATERIALIZED VIEW stats_1m_processor_detail_mv TO stats_1m_processor_detail AS
SELECT
    tenant_id,
    application_name,
    route_id,
    processor_id,
    toStartOfMinute(start_time) AS bucket,
    countState()                        AS total_count,
    countIfState(status = 'FAILED')     AS failed_count,
    sumState(duration_ms)               AS duration_sum,
    maxState(duration_ms)               AS duration_max,
    quantileState(0.99)(duration_ms)    AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_name, route_id, processor_id, bucket;

Ingestion Pipeline

Current Flow (replaced)

Agent POST -> IngestionService -> PostgresExecutionStore.upsert() -> PG
                               -> SearchIndexer (debounced 2s) -> reads from PG -> OpenSearch

New Flow

Agent POST -> IngestionService -> ExecutionAccumulator
                                     |-- RUNNING: ConcurrentHashMap (no DB write)
                                     |-- COMPLETED/FAILED: merge with pending -> WriteBuffer
                                     '-- Timeout sweep (60s): flush stale -> WriteBuffer
                                                                    |
                                                    ClickHouseExecutionStore.insertBatch()
                                                    ClickHouseProcessorStore.insertBatch()

ExecutionAccumulator

New component replacing SearchIndexer. Core responsibilities:

  1. On RUNNING POST: Store PendingExecution in ConcurrentHashMap<String, PendingExecution> keyed by execution_id. Return 200 OK immediately. No database write.

  2. On COMPLETED/FAILED POST: Look up pending RUNNING by execution_id. If found, merge fields using the same COALESCE logic currently in PostgresExecutionStore.upsert(). Produce a complete MergedExecution and push to WriteBuffer. If not found (race condition or RUNNING already flushed by timeout), write COMPLETED directly with _version=2.

  3. Timeout sweep (scheduled every 60s): Scan for RUNNING entries older than 5 minutes. Flush them to ClickHouse as-is with status=RUNNING, making them visible in the UI. When COMPLETED eventually arrives, it writes with _version=2 (ReplacingMergeTree deduplicates).

  4. Late corrections: If a correction arrives for an already-written execution, insert with _version incremented. ReplacingMergeTree handles deduplication.

WriteBuffer

Reuse the existing WriteBuffer pattern (bounded queue, configurable batch size, scheduled drain):

  • Buffer capacity: 50,000 items
  • Batch size: 5,000 per flush
  • Flush interval: 1 second
  • Separate buffers for executions and processor_executions (independent batch inserts)
  • Drain calls ClickHouseExecutionStore.insertBatch() using JDBC batch update

Logs Ingestion

Direct batch INSERT, bypasses accumulator (logs are single-phase):

Agent POST /api/v1/data/logs -> LogIngestionController -> ClickHouseLogStore.insertBatch()

Metrics Ingestion

Existing MetricsWriteBuffer targets ClickHouse instead of PG:

Agent POST /api/v1/data/metrics -> MetricsController -> WriteBuffer -> ClickHouseMetricsStore.insertBatch()

JDBC Batch Insert Pattern

jdbcTemplate.batchUpdate(
    "INSERT INTO executions (tenant_id, execution_id, start_time, ...) VALUES (?, ?, ?, ...)",
    batchArgs
);

JDBC URL includes async_insert=1&wait_for_async_insert=0 for server-side buffering, preventing "too many parts" errors under high load.

Search Implementation

Query Translation

Current OpenSearch bool queries map to ClickHouse SQL:

-- Full-text wildcard search with time range, status filter, and pagination
SELECT *
FROM executions FINAL
WHERE tenant_id = {tenant_id:String}
  AND start_time >= {time_from:DateTime64(3)}
  AND start_time < {time_to:DateTime64(3)}
  AND status IN ({statuses:Array(String)})
  AND (
    _search_text LIKE '%{search_term}%'
    OR execution_id IN (
      SELECT DISTINCT execution_id
      FROM processor_executions
      WHERE tenant_id = {tenant_id:String}
        AND start_time >= {time_from:DateTime64(3)}
        AND start_time < {time_to:DateTime64(3)}
        AND _search_text LIKE '%{search_term}%'
    )
  )
ORDER BY start_time DESC
LIMIT {limit:UInt32} OFFSET {offset:UInt32}

Scoped Searches

Scope ClickHouse WHERE clause
textInBody input_body LIKE '%term%' OR output_body LIKE '%term%'
textInHeaders input_headers LIKE '%term%' OR output_headers LIKE '%term%'
textInErrors error_message LIKE '%term%' OR error_stacktrace LIKE '%term%'
global text _search_text LIKE '%term%' (covers all fields)

All accelerated by ngrambf_v1 indexes which prune 95%+ of data granules before scanning.

Application-Side Highlighting

public String extractHighlight(String text, String searchTerm, int contextChars) {
    int idx = text.toLowerCase().indexOf(searchTerm.toLowerCase());
    if (idx < 0) return null;
    int start = Math.max(0, idx - contextChars / 2);
    int end = Math.min(text.length(), idx + searchTerm.length() + contextChars / 2);
    return (start > 0 ? "..." : "")
         + text.substring(start, end)
         + (end < text.length() ? "..." : "");
}

Returns the same highlight map structure the UI currently expects.

OpenSearch nested queries become a subquery on the processor_executions table:

execution_id IN (
    SELECT DISTINCT execution_id
    FROM processor_executions
    WHERE tenant_id = ? AND start_time >= ? AND start_time < ?
      AND _search_text LIKE '%term%'
)

This is evaluated once with ngram index acceleration, then joined via IN.

Stats Query Translation

TimescaleDB -> ClickHouse Query Patterns

TimescaleDB ClickHouse
time_bucket('1 minute', bucket) toStartOfInterval(bucket, INTERVAL 1 MINUTE)
SUM(total_count) countMerge(total_count)
SUM(failed_count) countIfMerge(failed_count)
approx_percentile(0.99, rollup(p99_duration)) quantileMerge(0.99)(p99_duration)
SUM(duration_sum) / SUM(total_count) sumMerge(duration_sum) / countMerge(total_count)
MAX(duration_max) maxMerge(duration_max)

Example: Timeseries Query

SELECT
    toStartOfInterval(bucket, INTERVAL {interval:UInt32} SECOND) AS period,
    countMerge(total_count)    AS total_count,
    countIfMerge(failed_count) AS failed_count,
    sumMerge(duration_sum) / countMerge(total_count) AS avg_duration,
    quantileMerge(0.99)(p99_duration) AS p99_duration
FROM stats_1m_app
WHERE tenant_id = {tenant_id:String}
  AND application_name = {app:String}
  AND bucket >= {from:DateTime}
  AND bucket < {to:DateTime}
GROUP BY period
ORDER BY period

SLA and Top Errors

SLA queries hit the raw executions table (need per-row duration filtering):

SELECT
    countIf(duration_ms <= {threshold:Int64} AND status != 'RUNNING') * 100.0 / count() AS sla_pct
FROM executions FINAL
WHERE tenant_id = ? AND application_name = ? AND start_time >= ? AND start_time < ?

Top errors query:

SELECT
    error_message,
    count() AS error_count,
    max(start_time) AS last_seen
FROM executions FINAL
WHERE tenant_id = ? AND status = 'FAILED'
  AND start_time >= now() - INTERVAL 1 HOUR
GROUP BY error_message
ORDER BY error_count DESC
LIMIT 10

Multitenancy

Data Isolation

Primary: Application-layer WHERE clause injection. Every ClickHouse query gets WHERE tenant_id = ? from the authenticated user's JWT claims.

Defense-in-depth: ClickHouse row policies:

-- Create a ClickHouse user per tenant
CREATE USER tenant_acme IDENTIFIED BY '...';

-- Row policy ensures tenant can only see their data
CREATE ROW POLICY tenant_acme_executions ON executions
    FOR SELECT USING tenant_id = 'acme';

-- Repeat for all tables

Tenant ID in Schema

tenant_id is the first column in every table's ORDER BY and PARTITION BY. This ensures:

  • Data for the same tenant is physically co-located on disk
  • Queries filtering by tenant_id use the sparse index efficiently
  • Partition drops for retention are scoped to individual tenants

Resource Quotas

CREATE SETTINGS PROFILE tenant_limits
    SETTINGS max_execution_time = 30,
             max_rows_to_read = 100000000,
             max_memory_usage = '4G';

ALTER USER tenant_acme SETTINGS PROFILE tenant_limits;

Prevents noisy neighbor problems where one tenant's expensive query affects others.

Retention

Strategy: Application-Driven Scheduler

Per-tenant, per-document-type retention is too dynamic for static ClickHouse TTL rules. Instead:

  1. Config table in PostgreSQL:
CREATE TABLE tenant_retention_config (
    tenant_id       VARCHAR(255) NOT NULL,
    document_type   VARCHAR(50) NOT NULL,   -- executions, logs, metrics, etc.
    retention_days  INT NOT NULL,
    PRIMARY KEY (tenant_id, document_type)
);
  1. RetentionScheduler (Spring @Scheduled, runs daily at 03:00 UTC):
@Scheduled(cron = "0 0 3 * * *")
public void enforceRetention() {
    List<TenantRetention> configs = retentionConfigRepo.findAll();
    for (TenantRetention config : configs) {
        String table = config.documentType();  // executions, logs, metrics, etc.
        clickHouseJdbc.execute(
            "ALTER TABLE " + table + " DELETE WHERE tenant_id = ? AND start_time < now() - INTERVAL ? DAY",
            config.tenantId(), config.retentionDays()
        );
    }
}
  1. Safety-net TTL: Each table has a generous default TTL (365 days) as a backstop in case the scheduler fails. The scheduler handles the per-tenant granularity.

  2. Partition-aligned drops: Since PARTITION BY (tenant_id, toYYYYMM(start_time)), when all rows in a partition match the DELETE condition, ClickHouse drops the entire partition (fast, no rewrite). Enable ttl_only_drop_parts=1 on tables.

Java/Spring Integration

Dependencies

<dependency>
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.7.x</version>  <!-- latest stable -->
    <classifier>all</classifier>
</dependency>

Configuration

clickhouse:
  url: jdbc:clickhouse://cameleer-clickhouse:8123/cameleer?async_insert=1&wait_for_async_insert=0
  username: cameleer_app
  password: ${CLICKHOUSE_PASSWORD}

DataSource Bean

@Configuration
public class ClickHouseConfig {
    @Bean
    public DataSource clickHouseDataSource(ClickHouseProperties props) {
        HikariDataSource ds = new HikariDataSource();
        ds.setJdbcUrl(props.getUrl());
        ds.setUsername(props.getUsername());
        ds.setPassword(props.getPassword());
        ds.setMaximumPoolSize(10);
        return ds;
    }

    @Bean
    public JdbcTemplate clickHouseJdbcTemplate(
            @Qualifier("clickHouseDataSource") DataSource ds) {
        return new JdbcTemplate(ds);
    }
}

Interface Implementations

Existing interfaces remain unchanged. New implementations:

Interface Current Impl New Impl
ExecutionStore PostgresExecutionStore ClickHouseExecutionStore
SearchIndex OpenSearchIndex ClickHouseSearchIndex
StatsStore PostgresStatsStore ClickHouseStatsStore
DiagramStore PostgresDiagramStore ClickHouseDiagramStore
MetricsStore PostgresMetricsStore ClickHouseMetricsStore
(log search) OpenSearchLogIndex ClickHouseLogStore
(new) SearchIndexer ExecutionAccumulator

Kubernetes Deployment

ClickHouse StatefulSet

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: clickhouse
spec:
  serviceName: clickhouse
  replicas: 1  # single node initially
  template:
    spec:
      containers:
      - name: clickhouse
        image: clickhouse/clickhouse-server:26.2
        ports:
        - containerPort: 8123  # HTTP
        - containerPort: 9000  # Native
        volumeMounts:
        - name: data
          mountPath: /var/lib/clickhouse
        - name: config
          mountPath: /etc/clickhouse-server/config.d
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
          limits:
            memory: "8Gi"
            cpu: "4"
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 100Gi  # NVMe/SSD

Health Check

livenessProbe:
  httpGet:
    path: /ping
    port: 8123
readinessProbe:
  httpGet:
    path: /ping
    port: 8123

Migration Path

Phase 1: Foundation

  • Add clickhouse-jdbc dependency
  • Create ClickHouseConfig (DataSource, JdbcTemplate)
  • Schema initialization (idempotent DDL scripts, not Flyway -- ClickHouse DDL is different enough)
  • Implement ClickHouseMetricsStore (simplest table, validates pipeline)
  • Deploy ClickHouse to k8s alongside existing PG+OpenSearch
  • Build ExecutionAccumulator (replaces SearchIndexer)
  • Implement ClickHouseExecutionStore and ClickHouseProcessorStore
  • Implement ClickHouseSearchIndex (ngram-based SQL queries)
  • Feature flag: dual-write to both PG and CH, read from PG

Phase 3: Stats & Analytics

  • Create MV definitions (all 5 stats views)
  • Implement ClickHouseStatsStore
  • Validate stats accuracy: compare CH vs PG continuous aggregates

Phase 4: Remaining Tables

  • ClickHouseDiagramStore (ReplacingMergeTree)
  • ClickHouseAgentEventStore
  • ClickHouseLogStore (replaces OpenSearchLogIndex)
  • Application-side highlighting

Phase 5: Multitenancy

  • Tables already include tenant_id from Phase 1 (schema is forward-looking). This phase activates multitenancy.
  • Wire tenant_id from JWT claims into all ClickHouse queries (application-layer WHERE injection)
  • Add tenant_id to PostgreSQL RBAC/config tables
  • Create ClickHouse row policies per tenant (defense-in-depth)
  • Create tenant_retention_config table in PG and RetentionScheduler component
  • Tenant user management and resource quotas in ClickHouse

Phase 6: Cutover

  • Backfill historical data from PG/OpenSearch to ClickHouse
  • Switch read path to ClickHouse (feature flag)
  • Validate end-to-end
  • Remove OpenSearch dependency (POM, config, k8s manifests)
  • Remove TimescaleDB extensions and hypertable-specific code
  • Keep PostgreSQL for RBAC/config/audit only

Verification

Functional Verification

  1. Ingestion: Send executions via agent, verify they appear in ClickHouse with correct fields
  2. Two-phase lifecycle: Send RUNNING, then COMPLETED. Verify single merged row in CH
  3. Search: Wildcard search across bodies, headers, errors. Verify sub-second response
  4. Stats: Dashboard statistics match expected values. Compare with PG aggregates during dual-write
  5. Logs: Ingest log batches, query by app/level/time/text. Verify correctness
  6. Retention: Configure per-tenant retention, run scheduler, verify expired data is deleted
  7. Multitenancy: Two tenants, verify data isolation (one tenant cannot see another's data)

Performance Verification

  1. Insert throughput: 5K executions/batch at 1 flush/sec sustained
  2. Search latency: Sub-second for LIKE '%term%' across 1M+ rows
  3. Stats query latency: Dashboard stats in <100ms from materialized views
  4. Log search: <1s for text search across 7 days of logs

Data Integrity

  1. During dual-write phase: compare row counts between PG and CH
  2. After cutover: spot-check execution details, processor trees, search results