Files
cameleer-server/docs/superpowers/plans/2026-03-31-clickhouse-phase2-executions-search.md
hsiegeln cb3ebfea7c
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 18s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
chore: rename cameleer3 to cameleer
Rename Java packages from com.cameleer3 to com.cameleer, module
directories from cameleer3-* to cameleer-*, and all references
throughout workflows, Dockerfiles, docs, migrations, and pom.xml.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 15:28:42 +02:00

76 KiB

ClickHouse Phase 2: Executions + Search — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Ingest chunked execution data into ClickHouse and provide a ClickHouse-backed search implementation behind a feature flag. Replace the old RouteExecution tree ingestion with ExecutionChunk + FlatProcessorRecord flat ingestion.

Architecture: Agents send ExecutionChunk documents containing flat FlatProcessorRecord entries with seq/parentSeq/iteration fields. A new ChunkIngestionController accepts chunks at POST /api/v1/data/chunks. A ChunkAccumulator buffers exchange envelope data in-memory, inserts processor records immediately via WriteBuffer, and writes the execution row when the final chunk arrives. A ClickHouseSearchIndex implements the SearchIndex interface using SQL with ngram bloom filter acceleration. Feature flags control which search backend is active. The old RouteExecution ingestion path is removed (no backward compatibility needed).

Tech Stack: ClickHouse 24.12, clickhouse-jdbc 0.9.7 (all classifier), Spring JdbcTemplate, Testcontainers

Design Spec: docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md

Note on cameleer-common: The agent team is refactoring cameleer-common to add ExecutionChunk and FlatProcessorRecord. Until that library is published, this plan defines server-side DTOs in cameleer-server-core that mirror the common models. When the common lib is ready, swap the server DTOs for the shared classes (import change only).


File Structure

File Responsibility
cameleer-server-app/.../resources/clickhouse/V2__executions.sql DDL for executions table (ReplacingMergeTree)
cameleer-server-app/.../resources/clickhouse/V3__processor_executions.sql DDL for processor_executions table (MergeTree) — uses seq/parentSeq/iteration
cameleer-server-core/.../model/ExecutionChunk.java Server-side DTO mirroring agent's ExecutionChunk (temporary until common lib ready)
cameleer-server-core/.../model/FlatProcessorRecord.java Server-side DTO mirroring agent's FlatProcessorRecord (temporary until common lib ready)
cameleer-server-core/.../ingestion/ChunkAccumulator.java Accumulates exchange envelope across chunks, pushes processor records + final execution row to WriteBuffers
cameleer-server-core/.../ingestion/MergedExecution.java Record holding merged execution envelope + version + tenant
cameleer-server-app/.../storage/ClickHouseExecutionStore.java Batch INSERT for executions + processor_executions to ClickHouse
cameleer-server-app/.../search/ClickHouseSearchIndex.java SearchIndex impl using SQL with ngram indexes
cameleer-server-app/.../ingestion/ExecutionFlushScheduler.java Drains execution + processor WriteBuffers → ClickHouseExecutionStore
cameleer-server-app/.../controller/ChunkIngestionController.java REST endpoint POST /api/v1/data/chunks accepting ExecutionChunk
cameleer-server-app/.../config/StorageBeanConfig.java Modified: add chunk accumulator + CH search beans
cameleer-server-app/.../config/IngestionBeanConfig.java Modified: add execution + processor WriteBuffer beans
cameleer-server-app/.../resources/application.yml Modified: add cameleer.storage.search flag
cameleer-server-app/...test.../storage/ClickHouseExecutionStoreIT.java Integration test for CH execution writes
cameleer-server-app/...test.../search/ClickHouseSearchIndexIT.java Integration test for CH search
cameleer-server-core/...test.../ingestion/ChunkAccumulatorTest.java Unit test for accumulator logic

Task 1: Server-Side DTOs (ExecutionChunk + FlatProcessorRecord)

Files:

  • Create: cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/FlatProcessorRecord.java
  • Create: cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionChunk.java

These mirror the agent's models exactly. When cameleer-common is published with these classes, delete these files and update imports.

  • Step 1: Create FlatProcessorRecord
// cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/FlatProcessorRecord.java
package com.cameleer.server.core.storage.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;

import java.time.Instant;
import java.util.Map;

/**
 * Flat processor execution record with seq/parentSeq for tree reconstruction.
 * Mirrors cameleer-common FlatProcessorRecord — replace with common lib import when available.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public record FlatProcessorRecord(
        int seq,
        Integer parentSeq,
        String parentProcessorId,
        String processorId,
        String processorType,
        Integer iteration,
        Integer iterationSize,
        String status,
        Instant startTime,
        long durationMs,
        String resolvedEndpointUri,
        String inputBody,
        String outputBody,
        Map<String, String> inputHeaders,
        Map<String, String> outputHeaders,
        String errorMessage,
        String errorStackTrace,
        String errorType,
        String errorCategory,
        String rootCauseType,
        String rootCauseMessage,
        Map<String, String> attributes,
        String circuitBreakerState,
        Boolean fallbackTriggered,
        Boolean filterMatched,
        Boolean duplicateMessage
) {}
  • Step 2: Create ExecutionChunk
// cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionChunk.java
package com.cameleer.server.core.storage.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.time.Instant;
import java.util.List;
import java.util.Map;

/**
 * Chunk document: exchange envelope + list of FlatProcessorRecords.
 * Mirrors cameleer-common ExecutionChunk — replace with common lib import when available.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public record ExecutionChunk(
        String exchangeId,
        String applicationName,
        String agentId,
        String routeId,
        String correlationId,
        String status,
        Instant startTime,
        Instant endTime,
        Long durationMs,
        String engineLevel,
        String errorMessage,
        String errorStackTrace,
        String errorType,
        String errorCategory,
        String rootCauseType,
        String rootCauseMessage,
        Map<String, String> attributes,
        String traceId,
        String spanId,
        String originalExchangeId,
        String replayExchangeId,
        int chunkSeq,
        @JsonProperty("final") boolean isFinal,
        List<FlatProcessorRecord> processors
) {}
  • Step 3: Write deserialization test
// cameleer-server-core/src/test/java/com/cameleer/server/core/storage/model/ExecutionChunkDeserializationTest.java
package com.cameleer.server.core.storage.model;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.junit.jupiter.api.Test;

import java.time.Instant;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

class ExecutionChunkDeserializationTest {

    private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule());

    @Test
    void roundTrip_fullChunk() throws Exception {
        ExecutionChunk chunk = new ExecutionChunk(
                "ex-1", "order-service", "pod-1", "order-route",
                "corr-1", "COMPLETED",
                Instant.parse("2026-03-31T10:00:00Z"),
                Instant.parse("2026-03-31T10:00:01Z"), 1000L,
                "REGULAR",
                null, null, null, null, null, null,
                Map.of("orderId", "ORD-1"),
                "trace-1", "span-1", null, null,
                2, true,
                List.of(new FlatProcessorRecord(
                        1, null, null, "log1", "log",
                        null, null, "COMPLETED",
                        Instant.parse("2026-03-31T10:00:00.100Z"), 5L,
                        null, "body", null, null, null,
                        null, null, null, null, null, null,
                        null, null, null, null, null)));

        String json = MAPPER.writeValueAsString(chunk);
        ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class);

        assertThat(deserialized.exchangeId()).isEqualTo("ex-1");
        assertThat(deserialized.isFinal()).isTrue();
        assertThat(deserialized.chunkSeq()).isEqualTo(2);
        assertThat(deserialized.processors()).hasSize(1);
        assertThat(deserialized.processors().get(0).seq()).isEqualTo(1);
        assertThat(deserialized.processors().get(0).processorId()).isEqualTo("log1");
        assertThat(deserialized.attributes()).containsEntry("orderId", "ORD-1");
    }

    @Test
    void roundTrip_runningChunkWithIterations() throws Exception {
        ExecutionChunk chunk = new ExecutionChunk(
                "ex-2", "app", "agent-1", "route-1",
                "ex-2", "RUNNING",
                Instant.parse("2026-03-31T10:00:00Z"),
                null, null, "REGULAR",
                null, null, null, null, null, null,
                null, null, null, null, null,
                0, false,
                List.of(
                        new FlatProcessorRecord(
                                1, null, null, "split1", "split",
                                null, 3, "COMPLETED",
                                Instant.parse("2026-03-31T10:00:00Z"), 100L,
                                null, null, null, null, null,
                                null, null, null, null, null, null,
                                null, null, null, null, null),
                        new FlatProcessorRecord(
                                2, 1, "split1", "log1", "log",
                                0, null, "COMPLETED",
                                Instant.parse("2026-03-31T10:00:00Z"), 5L,
                                null, "item-0", null, null, null,
                                null, null, null, null, null, null,
                                null, null, null, null, null),
                        new FlatProcessorRecord(
                                3, 1, "split1", "log1", "log",
                                1, null, "COMPLETED",
                                Instant.parse("2026-03-31T10:00:00Z"), 5L,
                                null, "item-1", null, null, null,
                                null, null, null, null, null, null,
                                null, null, null, null, null)));

        String json = MAPPER.writeValueAsString(chunk);
        ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class);

        assertThat(deserialized.isFinal()).isFalse();
        assertThat(deserialized.processors()).hasSize(3);

        FlatProcessorRecord split = deserialized.processors().get(0);
        assertThat(split.iterationSize()).isEqualTo(3);
        assertThat(split.parentSeq()).isNull();

        FlatProcessorRecord child0 = deserialized.processors().get(1);
        assertThat(child0.parentSeq()).isEqualTo(1);
        assertThat(child0.parentProcessorId()).isEqualTo("split1");
        assertThat(child0.iteration()).isEqualTo(0);
    }

    @Test
    void deserialize_unknownFieldsIgnored() throws Exception {
        String json = """
                {"exchangeId":"ex-1","routeId":"r1","status":"COMPLETED",
                 "startTime":"2026-03-31T10:00:00Z","chunkSeq":0,"final":true,
                 "futureField":"ignored","processors":[]}
                """;
        ExecutionChunk chunk = MAPPER.readValue(json, ExecutionChunk.class);
        assertThat(chunk.exchangeId()).isEqualTo("ex-1");
        assertThat(chunk.isFinal()).isTrue();
    }
}
  • Step 4: Run tests
mvn test -pl cameleer-server-core -Dtest=ExecutionChunkDeserializationTest

Expected: PASS (3 tests).

  • Step 5: Commit
git add cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/FlatProcessorRecord.java \
       cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionChunk.java \
       cameleer-server-core/src/test/java/com/cameleer/server/core/storage/model/ExecutionChunkDeserializationTest.java
git commit -m "feat: add server-side ExecutionChunk and FlatProcessorRecord DTOs"

Task 2: DDL Scripts for executions and processor_executions

Files:

  • Create: cameleer-server-app/src/main/resources/clickhouse/V2__executions.sql

  • Create: cameleer-server-app/src/main/resources/clickhouse/V3__processor_executions.sql

  • Step 1: Create executions DDL

The executions table stores one row per exchange (written when the final chunk arrives). Uses ReplacingMergeTree(_version) for rare late corrections.

-- V2__executions.sql
CREATE TABLE IF NOT EXISTS executions (
    tenant_id            LowCardinality(String) DEFAULT 'default',
    execution_id         String,
    start_time           DateTime64(3),
    _version             UInt64 DEFAULT 1,
    route_id             LowCardinality(String),
    agent_id             LowCardinality(String),
    application_name     LowCardinality(String),
    status               LowCardinality(String),
    correlation_id       String DEFAULT '',
    exchange_id          String DEFAULT '',
    end_time             Nullable(DateTime64(3)),
    duration_ms          Nullable(Int64),
    error_message        String DEFAULT '',
    error_stacktrace     String DEFAULT '',
    error_type           LowCardinality(String) DEFAULT '',
    error_category       LowCardinality(String) DEFAULT '',
    root_cause_type      String DEFAULT '',
    root_cause_message   String DEFAULT '',
    diagram_content_hash String DEFAULT '',
    engine_level         LowCardinality(String) DEFAULT '',
    input_body           String DEFAULT '',
    output_body          String DEFAULT '',
    input_headers        String DEFAULT '',
    output_headers       String DEFAULT '',
    attributes           String DEFAULT '',
    trace_id             String DEFAULT '',
    span_id              String DEFAULT '',
    has_trace_data       Bool DEFAULT false,
    is_replay            Bool DEFAULT false,

    _search_text         String MATERIALIZED
        concat(error_message, ' ', error_stacktrace, ' ', attributes,
               ' ', input_body, ' ', output_body, ' ', input_headers,
               ' ', output_headers, ' ', root_cause_message),

    INDEX idx_search   _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_error    error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_bodies   concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_headers  concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_status   status TYPE set(10) GRANULARITY 1,
    INDEX idx_corr     correlation_id TYPE bloom_filter(0.01) GRANULARITY 4
)
ENGINE = ReplacingMergeTree(_version)
PARTITION BY (tenant_id, toYYYYMM(start_time))
ORDER BY (tenant_id, start_time, application_name, route_id, execution_id)
TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;

Note: Removed processors_json — with flat records in processor_executions, the nested JSON column is no longer needed.

  • Step 2: Create processor_executions DDL

Uses seq/parentSeq/iteration instead of depth/loopIndex/splitIndex/multicastIndex. ORDER BY ends with seq (unique per execution) instead of processor_id (can repeat across iterations).

-- V3__processor_executions.sql
CREATE TABLE IF NOT EXISTS processor_executions (
    tenant_id             LowCardinality(String) DEFAULT 'default',
    execution_id          String,
    seq                   UInt32,
    parent_seq            Nullable(UInt32),
    parent_processor_id   String DEFAULT '',
    processor_id          String,
    processor_type        LowCardinality(String),
    start_time            DateTime64(3),
    route_id              LowCardinality(String),
    application_name      LowCardinality(String),
    iteration             Nullable(Int32),
    iteration_size        Nullable(Int32),
    status                LowCardinality(String),
    end_time              Nullable(DateTime64(3)),
    duration_ms           Nullable(Int64),
    error_message         String DEFAULT '',
    error_stacktrace      String DEFAULT '',
    error_type            LowCardinality(String) DEFAULT '',
    error_category        LowCardinality(String) DEFAULT '',
    root_cause_type       String DEFAULT '',
    root_cause_message    String DEFAULT '',
    input_body            String DEFAULT '',
    output_body           String DEFAULT '',
    input_headers         String DEFAULT '',
    output_headers        String DEFAULT '',
    attributes            String DEFAULT '',
    resolved_endpoint_uri String DEFAULT '',
    circuit_breaker_state LowCardinality(String) DEFAULT '',
    fallback_triggered    Bool DEFAULT false,
    filter_matched        Bool DEFAULT false,
    duplicate_message     Bool DEFAULT false,

    _search_text          String MATERIALIZED
        concat(error_message, ' ', error_stacktrace, ' ', attributes,
               ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers),

    INDEX idx_search  _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
    INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(start_time))
ORDER BY (tenant_id, start_time, application_name, route_id, execution_id, seq)
TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
  • Step 3: Verify DDL loads
mvn clean compile -pl cameleer-server-app

ClickHouseSchemaInitializer scans classpath:clickhouse/*.sql automatically.

  • Step 4: Commit
git add cameleer-server-app/src/main/resources/clickhouse/V2__executions.sql \
       cameleer-server-app/src/main/resources/clickhouse/V3__processor_executions.sql
git commit -m "feat(clickhouse): add executions and processor_executions DDL for chunked transport"

Task 3: MergedExecution + ClickHouseExecutionStore

Files:

  • Create: cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/MergedExecution.java
  • Create: cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java
  • Create: cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseExecutionStoreIT.java

The store handles batch INSERT for both executions (from MergedExecution) and processor_executions (from FlatProcessorRecord). It does NOT implement the ExecutionStore interface — it has its own batch API consumed by the flush scheduler.

  • Step 1: Create MergedExecution record
// cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/MergedExecution.java
package com.cameleer.server.core.ingestion;

import java.time.Instant;
import java.util.Map;

/**
 * A merged execution envelope ready for ClickHouse insertion.
 * Produced by {@link ChunkAccumulator} after receiving the final chunk.
 */
public record MergedExecution(
        String tenantId,
        long version,
        String executionId,
        String routeId,
        String agentId,
        String applicationName,
        String status,
        String correlationId,
        String exchangeId,
        Instant startTime,
        Instant endTime,
        Long durationMs,
        String errorMessage,
        String errorStacktrace,
        String errorType,
        String errorCategory,
        String rootCauseType,
        String rootCauseMessage,
        String diagramContentHash,
        String engineLevel,
        String inputBody,
        String outputBody,
        String inputHeaders,
        String outputHeaders,
        String attributes,
        String traceId,
        String spanId,
        boolean hasTraceData,
        boolean isReplay
) {}
  • Step 2: Write the failing integration test
// cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseExecutionStoreIT.java
package com.cameleer.server.app.storage;

import com.cameleer.server.core.ingestion.MergedExecution;
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

@Testcontainers
class ClickHouseExecutionStoreIT {

    @Container
    static final ClickHouseContainer clickhouse =
            new ClickHouseContainer("clickhouse/clickhouse-server:24.12");

    private JdbcTemplate jdbc;
    private ClickHouseExecutionStore store;

    @BeforeEach
    void setUp() throws IOException {
        HikariDataSource ds = new HikariDataSource();
        ds.setJdbcUrl(clickhouse.getJdbcUrl());
        ds.setUsername(clickhouse.getUsername());
        ds.setPassword(clickhouse.getPassword());
        jdbc = new JdbcTemplate(ds);

        String execDdl = new String(getClass().getResourceAsStream(
                "/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8);
        String procDdl = new String(getClass().getResourceAsStream(
                "/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8);
        jdbc.execute(execDdl);
        jdbc.execute(procDdl);
        jdbc.execute("TRUNCATE TABLE executions");
        jdbc.execute("TRUNCATE TABLE processor_executions");

        store = new ClickHouseExecutionStore(jdbc);
    }

    @Test
    void insertExecutionBatch_writesToClickHouse() {
        MergedExecution exec = new MergedExecution(
                "default", 1,
                "exec-1", "route-timer", "agent-a", "my-app",
                "COMPLETED", "corr-1", "exchange-1",
                Instant.parse("2026-03-31T10:00:00Z"),
                Instant.parse("2026-03-31T10:00:01Z"), 1000L,
                null, null, null, null, null, null,
                "hash-abc", "DEEP",
                "{\"key\":\"val\"}", "{\"out\":\"data\"}", "{\"h\":\"1\"}", "{\"h\":\"2\"}",
                "{\"attr\":\"val\"}",
                "trace-1", "span-1", true, false);

        store.insertExecutionBatch(List.of(exec));

        Integer count = jdbc.queryForObject(
                "SELECT count() FROM executions WHERE execution_id = 'exec-1'",
                Integer.class);
        assertThat(count).isEqualTo(1);
    }

    @Test
    void insertProcessorBatch_writesToClickHouse() {
        FlatProcessorRecord proc = new FlatProcessorRecord(
                1, null, null, "proc-1", "to",
                null, null, "COMPLETED",
                Instant.parse("2026-03-31T10:00:00Z"), 500L,
                "http://localhost:8080/api",
                "input body", "output body",
                Map.of("Content-Type", "application/json"), null,
                null, null, null, null, null, null,
                null, null, null, null, null);

        store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app",
                Instant.parse("2026-03-31T10:00:00Z"), List.of(proc));

        Integer count = jdbc.queryForObject(
                "SELECT count() FROM processor_executions WHERE execution_id = 'exec-1'",
                Integer.class);
        assertThat(count).isEqualTo(1);

        // Verify seq and parent_seq are stored
        Integer seq = jdbc.queryForObject(
                "SELECT seq FROM processor_executions WHERE execution_id = 'exec-1'",
                Integer.class);
        assertThat(seq).isEqualTo(1);
    }

    @Test
    void insertProcessorBatch_withIterations() {
        List<FlatProcessorRecord> procs = List.of(
                new FlatProcessorRecord(1, null, null, "split1", "split",
                        null, 3, "COMPLETED",
                        Instant.parse("2026-03-31T10:00:00Z"), 100L,
                        null, null, null, null, null,
                        null, null, null, null, null, null,
                        null, null, null, null, null),
                new FlatProcessorRecord(2, 1, "split1", "log1", "log",
                        0, null, "COMPLETED",
                        Instant.parse("2026-03-31T10:00:00Z"), 5L,
                        null, "item-0", null, null, null,
                        null, null, null, null, null, null,
                        null, null, null, null, null),
                new FlatProcessorRecord(3, 1, "split1", "log1", "log",
                        1, null, "COMPLETED",
                        Instant.parse("2026-03-31T10:00:00Z"), 5L,
                        null, "item-1", null, null, null,
                        null, null, null, null, null, null,
                        null, null, null, null, null),
                new FlatProcessorRecord(4, 1, "split1", "log1", "log",
                        2, null, "COMPLETED",
                        Instant.parse("2026-03-31T10:00:00Z"), 5L,
                        null, "item-2", null, null, null,
                        null, null, null, null, null, null,
                        null, null, null, null, null));

        store.insertProcessorBatch("default", "exec-split", "route-1", "my-app",
                Instant.parse("2026-03-31T10:00:00Z"), procs);

        Integer count = jdbc.queryForObject(
                "SELECT count() FROM processor_executions WHERE execution_id = 'exec-split'",
                Integer.class);
        assertThat(count).isEqualTo(4);

        // Verify iteration data
        Integer iterSize = jdbc.queryForObject(
                "SELECT iteration_size FROM processor_executions WHERE execution_id = 'exec-split' AND seq = 1",
                Integer.class);
        assertThat(iterSize).isEqualTo(3);
    }

    @Test
    void insertExecutionBatch_emptyList_doesNothing() {
        store.insertExecutionBatch(List.of());
        Integer count = jdbc.queryForObject("SELECT count() FROM executions", Integer.class);
        assertThat(count).isEqualTo(0);
    }

    @Test
    void insertExecutionBatch_replacingMergeTree_keepsLatestVersion() {
        Instant startTime = Instant.parse("2026-03-31T10:00:00Z");
        MergedExecution v1 = new MergedExecution(
                "default", 1,
                "exec-dup", "route-1", "agent-a", "my-app",
                "RUNNING", null, "exchange-1",
                startTime, null, null,
                null, null, null, null, null, null,
                null, null,
                null, null, null, null, null,
                null, null, false, false);
        MergedExecution v2 = new MergedExecution(
                "default", 2,
                "exec-dup", "route-1", "agent-a", "my-app",
                "COMPLETED", "corr-1", "exchange-1",
                startTime, Instant.parse("2026-03-31T10:00:01Z"), 1000L,
                null, null, null, null, null, null,
                null, null,
                null, null, null, null, null,
                null, null, false, false);

        store.insertExecutionBatch(List.of(v1, v2));
        jdbc.execute("OPTIMIZE TABLE executions FINAL");

        String status = jdbc.queryForObject(
                "SELECT status FROM executions WHERE execution_id = 'exec-dup'",
                String.class);
        assertThat(status).isEqualTo("COMPLETED");
    }
}
  • Step 3: Run test to verify it fails
mvn test -pl cameleer-server-app -Dtest=ClickHouseExecutionStoreIT -Dfailsafe.provider=surefire -DfailIfNoTests=false

Expected: compilation error — ClickHouseExecutionStore does not exist.

  • Step 4: Implement ClickHouseExecutionStore
// cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java
package com.cameleer.server.app.storage;

import com.cameleer.server.core.ingestion.MergedExecution;
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.jdbc.core.JdbcTemplate;

import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;

public class ClickHouseExecutionStore {

    private static final ObjectMapper JSON = new ObjectMapper();
    private final JdbcTemplate jdbc;

    public ClickHouseExecutionStore(JdbcTemplate jdbc) {
        this.jdbc = jdbc;
    }

    public void insertExecutionBatch(List<MergedExecution> executions) {
        if (executions.isEmpty()) return;

        jdbc.batchUpdate("""
                INSERT INTO executions (
                    tenant_id, execution_id, start_time, _version,
                    route_id, agent_id, application_name, status,
                    correlation_id, exchange_id, end_time, duration_ms,
                    error_message, error_stacktrace, error_type, error_category,
                    root_cause_type, root_cause_message,
                    diagram_content_hash, engine_level,
                    input_body, output_body, input_headers, output_headers,
                    attributes, trace_id, span_id,
                    has_trace_data, is_replay
                ) VALUES (
                    ?, ?, ?, ?,
                    ?, ?, ?, ?,
                    ?, ?, ?, ?,
                    ?, ?, ?, ?,
                    ?, ?,
                    ?, ?,
                    ?, ?, ?, ?,
                    ?, ?, ?,
                    ?, ?
                )
                """,
                executions.stream().map(e -> new Object[]{
                        e.tenantId(),
                        e.executionId(),
                        Timestamp.from(e.startTime()),
                        e.version(),
                        orEmpty(e.routeId()),
                        orEmpty(e.agentId()),
                        orEmpty(e.applicationName()),
                        orEmpty(e.status()),
                        orEmpty(e.correlationId()),
                        orEmpty(e.exchangeId()),
                        e.endTime() != null ? Timestamp.from(e.endTime()) : null,
                        e.durationMs(),
                        orEmpty(e.errorMessage()),
                        orEmpty(e.errorStacktrace()),
                        orEmpty(e.errorType()),
                        orEmpty(e.errorCategory()),
                        orEmpty(e.rootCauseType()),
                        orEmpty(e.rootCauseMessage()),
                        orEmpty(e.diagramContentHash()),
                        orEmpty(e.engineLevel()),
                        orEmpty(e.inputBody()),
                        orEmpty(e.outputBody()),
                        orEmpty(e.inputHeaders()),
                        orEmpty(e.outputHeaders()),
                        orEmpty(e.attributes()),
                        orEmpty(e.traceId()),
                        orEmpty(e.spanId()),
                        e.hasTraceData(),
                        e.isReplay()
                }).toList());
    }

    public void insertProcessorBatch(String tenantId, String executionId,
                                      String routeId, String applicationName,
                                      Instant execStartTime,
                                      List<FlatProcessorRecord> processors) {
        if (processors.isEmpty()) return;

        jdbc.batchUpdate("""
                INSERT INTO processor_executions (
                    tenant_id, execution_id, seq, parent_seq, parent_processor_id,
                    processor_id, processor_type, start_time,
                    route_id, application_name,
                    iteration, iteration_size, status,
                    end_time, duration_ms,
                    error_message, error_stacktrace, error_type, error_category,
                    root_cause_type, root_cause_message,
                    input_body, output_body, input_headers, output_headers,
                    attributes, resolved_endpoint_uri,
                    circuit_breaker_state, fallback_triggered,
                    filter_matched, duplicate_message
                ) VALUES (
                    ?, ?, ?, ?, ?,
                    ?, ?, ?,
                    ?, ?,
                    ?, ?, ?,
                    ?, ?,
                    ?, ?, ?, ?,
                    ?, ?,
                    ?, ?, ?, ?,
                    ?, ?,
                    ?, ?,
                    ?, ?
                )
                """,
                processors.stream().map(p -> new Object[]{
                        tenantId,
                        executionId,
                        p.seq(),
                        p.parentSeq(),
                        orEmpty(p.parentProcessorId()),
                        p.processorId(),
                        p.processorType(),
                        p.startTime() != null ? Timestamp.from(p.startTime()) : Timestamp.from(execStartTime),
                        routeId,
                        applicationName,
                        p.iteration(),
                        p.iterationSize(),
                        orEmpty(p.status()),
                        p.startTime() != null && p.durationMs() > 0
                                ? Timestamp.from(p.startTime().plusMillis(p.durationMs())) : null,
                        p.durationMs(),
                        orEmpty(p.errorMessage()),
                        orEmpty(p.errorStackTrace()),
                        orEmpty(p.errorType()),
                        orEmpty(p.errorCategory()),
                        orEmpty(p.rootCauseType()),
                        orEmpty(p.rootCauseMessage()),
                        orEmpty(p.inputBody()),
                        orEmpty(p.outputBody()),
                        headersToString(p.inputHeaders()),
                        headersToString(p.outputHeaders()),
                        mapToString(p.attributes()),
                        orEmpty(p.resolvedEndpointUri()),
                        orEmpty(p.circuitBreakerState()),
                        p.fallbackTriggered() != null ? p.fallbackTriggered() : false,
                        p.filterMatched() != null ? p.filterMatched() : false,
                        p.duplicateMessage() != null ? p.duplicateMessage() : false
                }).toList());
    }

    private static String orEmpty(String value) {
        return value != null ? value : "";
    }

    private static String headersToString(Map<String, String> headers) {
        if (headers == null || headers.isEmpty()) return "";
        try {
            return JSON.writeValueAsString(headers);
        } catch (JsonProcessingException e) {
            return "";
        }
    }

    private static String mapToString(Map<String, String> map) {
        if (map == null || map.isEmpty()) return "";
        try {
            return JSON.writeValueAsString(map);
        } catch (JsonProcessingException e) {
            return "";
        }
    }
}
  • Step 5: Run test to verify it passes
mvn test -pl cameleer-server-app -Dtest=ClickHouseExecutionStoreIT -Dfailsafe.provider=surefire

Expected: all 5 tests PASS.

  • Step 6: Commit
git add cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/MergedExecution.java \
       cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java \
       cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseExecutionStoreIT.java
git commit -m "feat(clickhouse): add ClickHouseExecutionStore with batch insert for chunked format"

Task 4: ChunkAccumulator

Files:

  • Create: cameleer-server-core/src/test/java/com/cameleer/server/core/ingestion/ChunkAccumulatorTest.java
  • Create: cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/ChunkAccumulator.java

The ChunkAccumulator receives ExecutionChunk documents. For each chunk:

  • Processor records are pushed to a sink immediately (they're append-only)
  • Exchange envelope data is buffered/merged in a ConcurrentHashMap
  • When isFinal=true, the merged envelope is pushed to an execution sink

A scheduled sweep flushes stale exchanges (no final chunk received within 5 minutes).

  • Step 1: Write the failing unit test
// cameleer-server-core/src/test/java/com/cameleer/server/core/ingestion/ChunkAccumulatorTest.java
package com.cameleer.server.core.ingestion;

import com.cameleer.server.core.storage.model.ExecutionChunk;
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import static org.assertj.core.api.Assertions.assertThat;

class ChunkAccumulatorTest {

    private List<MergedExecution> executionSink;
    private List<ChunkAccumulator.ProcessorBatch> processorSink;
    private ChunkAccumulator accumulator;

    @BeforeEach
    void setUp() {
        executionSink = new CopyOnWriteArrayList<>();
        processorSink = new CopyOnWriteArrayList<>();
        accumulator = new ChunkAccumulator(executionSink::add, processorSink::add, Duration.ofMinutes(5));
    }

    @Test
    void singleFinalChunk_producesExecutionAndProcessors() {
        ExecutionChunk chunk = new ExecutionChunk(
                "ex-1", "my-app", "agent-a", "route-1",
                "corr-1", "COMPLETED",
                Instant.parse("2026-03-31T10:00:00Z"),
                Instant.parse("2026-03-31T10:00:01Z"), 1000L,
                "DEEP",
                null, null, null, null, null, null,
                Map.of("env", "prod"),
                "trace-1", "span-1", null, null,
                0, true,
                List.of(new FlatProcessorRecord(
                        1, null, null, "log1", "log",
                        null, null, "COMPLETED",
                        Instant.parse("2026-03-31T10:00:00.100Z"), 5L,
                        null, "body", null, null, null,
                        null, null, null, null, null, null,
                        null, null, null, null, null)));

        accumulator.onChunk(chunk);

        assertThat(executionSink).hasSize(1);
        MergedExecution merged = executionSink.get(0);
        assertThat(merged.executionId()).isEqualTo("ex-1");
        assertThat(merged.status()).isEqualTo("COMPLETED");
        assertThat(merged.durationMs()).isEqualTo(1000L);
        assertThat(merged.version()).isEqualTo(1);

        assertThat(processorSink).hasSize(1);
        assertThat(processorSink.get(0).processors()).hasSize(1);
        assertThat(processorSink.get(0).executionId()).isEqualTo("ex-1");
    }

    @Test
    void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() {
        Instant start = Instant.parse("2026-03-31T10:00:00Z");

        // Chunk 0: RUNNING with 2 processors
        ExecutionChunk chunk0 = new ExecutionChunk(
                "ex-multi", "my-app", "agent-a", "route-1",
                "corr-1", "RUNNING",
                start, null, null, "DEEP",
                null, null, null, null, null, null,
                null, null, null, null, null,
                0, false,
                List.of(
                        new FlatProcessorRecord(1, null, null, "log1", "log",
                                null, null, "COMPLETED", start, 5L,
                                null, null, null, null, null,
                                null, null, null, null, null, null,
                                null, null, null, null, null),
                        new FlatProcessorRecord(2, null, null, "to1", "to",
                                null, null, "COMPLETED", start.plusMillis(5), 10L,
                                "http://svc/api", null, null, null, null,
                                null, null, null, null, null, null,
                                null, null, null, null, null)));
        accumulator.onChunk(chunk0);

        // Processors inserted immediately
        assertThat(processorSink).hasSize(1);
        assertThat(processorSink.get(0).processors()).hasSize(2);
        // Execution NOT yet flushed
        assertThat(executionSink).isEmpty();

        // Chunk 1: COMPLETED (final) with 1 more processor
        ExecutionChunk chunk1 = new ExecutionChunk(
                "ex-multi", "my-app", "agent-a", "route-1",
                "corr-1", "COMPLETED",
                start, start.plusMillis(500), 500L, "DEEP",
                null, null, null, null, null, null,
                Map.of("result", "ok"),
                null, null, null, null,
                1, true,
                List.of(new FlatProcessorRecord(3, null, null, "log2", "log",
                        null, null, "COMPLETED", start.plusMillis(100), 2L,
                        null, null, null, null, null,
                        null, null, null, null, null, null,
                        null, null, null, null, null)));
        accumulator.onChunk(chunk1);

        // Final chunk triggers execution flush
        assertThat(executionSink).hasSize(1);
        MergedExecution merged = executionSink.get(0);
        assertThat(merged.status()).isEqualTo("COMPLETED");
        assertThat(merged.durationMs()).isEqualTo(500L);
        assertThat(merged.version()).isEqualTo(1);

        // Second processor batch
        assertThat(processorSink).hasSize(2);
        assertThat(processorSink.get(1).processors()).hasSize(1);
    }

    @Test
    void staleExchange_flushedBySweep() {
        accumulator = new ChunkAccumulator(executionSink::add, processorSink::add, Duration.ofMillis(1));

        ExecutionChunk chunk = new ExecutionChunk(
                "ex-stale", "my-app", "agent-a", "route-1",
                "ex-stale", "RUNNING",
                Instant.parse("2026-03-31T09:50:00Z"),
                null, null, "REGULAR",
                null, null, null, null, null, null,
                null, null, null, null, null,
                0, false, List.of());
        accumulator.onChunk(chunk);

        try { Thread.sleep(5); } catch (InterruptedException ignored) {}

        accumulator.sweepStale();

        assertThat(executionSink).hasSize(1);
        assertThat(executionSink.get(0).status()).isEqualTo("RUNNING");
        assertThat(executionSink.get(0).version()).isEqualTo(1);
    }

    @Test
    void finalChunkWithErrors_populatesErrorFields() {
        ExecutionChunk chunk = new ExecutionChunk(
                "ex-err", "my-app", "agent-a", "route-1",
                "ex-err", "FAILED",
                Instant.parse("2026-03-31T10:00:00Z"),
                Instant.parse("2026-03-31T10:00:00.200Z"), 200L,
                "REGULAR",
                "Connection refused", "java.net.ConnectException...",
                "java.net.ConnectException", "CONNECTION",
                "java.net.ConnectException", "Connection refused",
                null, null, null, null, null,
                0, true, List.of());
        accumulator.onChunk(chunk);

        MergedExecution merged = executionSink.get(0);
        assertThat(merged.errorMessage()).isEqualTo("Connection refused");
        assertThat(merged.errorType()).isEqualTo("java.net.ConnectException");
        assertThat(merged.errorCategory()).isEqualTo("CONNECTION");
    }

    @Test
    void getPendingCount_tracksBufferedExchanges() {
        Instant t = Instant.parse("2026-03-31T10:00:00Z");
        accumulator.onChunk(new ExecutionChunk("e1", "app", "a", "r", "e1", "RUNNING",
                t, null, null, "REGULAR", null, null, null, null, null, null,
                null, null, null, null, null, 0, false, List.of()));
        accumulator.onChunk(new ExecutionChunk("e2", "app", "a", "r", "e2", "RUNNING",
                t, null, null, "REGULAR", null, null, null, null, null, null,
                null, null, null, null, null, 0, false, List.of()));
        assertThat(accumulator.getPendingCount()).isEqualTo(2);

        accumulator.onChunk(new ExecutionChunk("e1", "app", "a", "r", "e1", "COMPLETED",
                t, t.plusMillis(100), 100L, "REGULAR", null, null, null, null, null, null,
                null, null, null, null, null, 1, true, List.of()));
        assertThat(accumulator.getPendingCount()).isEqualTo(1);
    }
}
  • Step 2: Run test to verify it fails
mvn test -pl cameleer-server-core -Dtest=ChunkAccumulatorTest -DfailIfNoTests=false

Expected: compilation error — ChunkAccumulator does not exist.

  • Step 3: Implement ChunkAccumulator
// cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/ChunkAccumulator.java
package com.cameleer.server.core.ingestion;

import com.cameleer.server.core.storage.model.ExecutionChunk;
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

/**
 * Accumulates ExecutionChunk documents per exchange.
 * <p>
 * Processor records are pushed to the processor sink immediately (append-only).
 * Exchange envelope data is buffered and merged across chunks.
 * When the final chunk arrives, the merged envelope is pushed to the execution sink.
 */
public class ChunkAccumulator {

    private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class);
    private static final ObjectMapper JSON = new ObjectMapper();
    private static final String DEFAULT_TENANT = "default";

    private final Consumer<MergedExecution> executionSink;
    private final Consumer<ProcessorBatch> processorSink;
    private final Duration staleThreshold;
    private final Map<String, PendingExchange> pending = new ConcurrentHashMap<>();

    public ChunkAccumulator(Consumer<MergedExecution> executionSink,
                             Consumer<ProcessorBatch> processorSink,
                             Duration staleThreshold) {
        this.executionSink = executionSink;
        this.processorSink = processorSink;
        this.staleThreshold = staleThreshold;
    }

    public void onChunk(ExecutionChunk chunk) {
        String exchangeId = chunk.exchangeId();

        // Insert processor records immediately (append-only)
        if (chunk.processors() != null && !chunk.processors().isEmpty()) {
            processorSink.accept(new ProcessorBatch(
                    DEFAULT_TENANT, exchangeId,
                    coalesce(chunk.routeId(), ""),
                    coalesce(chunk.applicationName(), ""),
                    chunk.startTime(),
                    chunk.processors()));
        }

        if (chunk.isFinal()) {
            // Merge with any pending state and flush execution
            PendingExchange pendingExchange = pending.remove(exchangeId);
            MergedExecution merged = buildMergedExecution(chunk, pendingExchange);
            executionSink.accept(merged);
        } else {
            // Buffer/update exchange envelope
            pending.compute(exchangeId, (id, existing) -> {
                if (existing == null) {
                    return new PendingExchange(chunk, Instant.now());
                }
                return existing.mergeWith(chunk);
            });
        }
    }

    public void sweepStale() {
        Instant cutoff = Instant.now().minus(staleThreshold);
        List<String> staleIds = new ArrayList<>();

        pending.forEach((id, pe) -> {
            if (pe.receivedAt().isBefore(cutoff)) {
                staleIds.add(id);
            }
        });

        for (String id : staleIds) {
            PendingExchange stale = pending.remove(id);
            if (stale != null) {
                log.info("Flushing stale exchange {}", id);
                executionSink.accept(buildMergedExecution(stale.envelope(), null));
            }
        }
    }

    public int getPendingCount() {
        return pending.size();
    }

    private MergedExecution buildMergedExecution(ExecutionChunk finalChunk,
                                                   PendingExchange pendingState) {
        ExecutionChunk base = pendingState != null ? pendingState.envelope() : null;

        String attributes = serializeMap(finalChunk.attributes());
        if ((attributes == null || attributes.isEmpty()) && base != null) {
            attributes = serializeMap(base.envelope().attributes());
        }

        boolean hasTraceData = false;
        boolean isReplay = finalChunk.replayExchangeId() != null;

        return new MergedExecution(
                DEFAULT_TENANT, 1,
                finalChunk.exchangeId(),
                coalesce(finalChunk.routeId(), base != null ? base.envelope().routeId() : null),
                coalesce(finalChunk.agentId(), base != null ? base.envelope().agentId() : null),
                coalesce(finalChunk.applicationName(), base != null ? base.envelope().applicationName() : null),
                coalesce(finalChunk.status(), base != null ? base.envelope().status() : "RUNNING"),
                coalesce(finalChunk.correlationId(), base != null ? base.envelope().correlationId() : null),
                finalChunk.exchangeId(),
                coalesce(finalChunk.startTime(), base != null ? base.envelope().startTime() : null),
                coalesce(finalChunk.endTime(), base != null ? base.envelope().endTime() : null),
                coalesce(finalChunk.durationMs(), base != null ? base.envelope().durationMs() : null),
                coalesce(finalChunk.errorMessage(), base != null ? base.envelope().errorMessage() : null),
                coalesce(finalChunk.errorStackTrace(), base != null ? base.envelope().errorStackTrace() : null),
                coalesce(finalChunk.errorType(), base != null ? base.envelope().errorType() : null),
                coalesce(finalChunk.errorCategory(), base != null ? base.envelope().errorCategory() : null),
                coalesce(finalChunk.rootCauseType(), base != null ? base.envelope().rootCauseType() : null),
                coalesce(finalChunk.rootCauseMessage(), base != null ? base.envelope().rootCauseMessage() : null),
                "", // diagramContentHash — server-side lookup, not in chunk
                coalesce(finalChunk.engineLevel(), base != null ? base.envelope().engineLevel() : null),
                "", "", "", "", // input/output body/headers — on processor records now, not envelope
                coalesce(attributes, ""),
                coalesce(finalChunk.traceId(), base != null ? base.envelope().traceId() : null),
                coalesce(finalChunk.spanId(), base != null ? base.envelope().spanId() : null),
                hasTraceData,
                isReplay
        );
    }

    private static String serializeMap(Map<String, String> map) {
        if (map == null || map.isEmpty()) return "";
        try {
            return JSON.writeValueAsString(map);
        } catch (JsonProcessingException e) {
            return "";
        }
    }

    private static <T> T coalesce(T a, T b) {
        return a != null ? a : b;
    }

    /**
     * A batch of processor records for a single exchange, ready for insertion.
     */
    public record ProcessorBatch(
            String tenantId,
            String executionId,
            String routeId,
            String applicationName,
            Instant execStartTime,
            List<FlatProcessorRecord> processors
    ) {}

    private record PendingExchange(ExecutionChunk envelope, Instant receivedAt) {
        PendingExchange mergeWith(ExecutionChunk newer) {
            // Keep the latest envelope data (later chunkSeq has more complete info)
            ExecutionChunk merged = new ExecutionChunk(
                    envelope.exchangeId(),
                    coalesce(newer.applicationName(), envelope.applicationName()),
                    coalesce(newer.agentId(), envelope.agentId()),
                    coalesce(newer.routeId(), envelope.routeId()),
                    coalesce(newer.correlationId(), envelope.correlationId()),
                    coalesce(newer.status(), envelope.status()),
                    coalesce(envelope.startTime(), newer.startTime()),
                    coalesce(newer.endTime(), envelope.endTime()),
                    coalesce(newer.durationMs(), envelope.durationMs()),
                    coalesce(newer.engineLevel(), envelope.engineLevel()),
                    coalesce(newer.errorMessage(), envelope.errorMessage()),
                    coalesce(newer.errorStackTrace(), envelope.errorStackTrace()),
                    coalesce(newer.errorType(), envelope.errorType()),
                    coalesce(newer.errorCategory(), envelope.errorCategory()),
                    coalesce(newer.rootCauseType(), envelope.rootCauseType()),
                    coalesce(newer.rootCauseMessage(), envelope.rootCauseMessage()),
                    newer.attributes() != null ? newer.attributes() : envelope.attributes(),
                    coalesce(newer.traceId(), envelope.traceId()),
                    coalesce(newer.spanId(), envelope.spanId()),
                    coalesce(newer.originalExchangeId(), envelope.originalExchangeId()),
                    coalesce(newer.replayExchangeId(), envelope.replayExchangeId()),
                    newer.chunkSeq(),
                    newer.isFinal(),
                    List.of());
            return new PendingExchange(merged, receivedAt);
        }
    }
}
  • Step 4: Run test to verify it passes
mvn test -pl cameleer-server-core -Dtest=ChunkAccumulatorTest

Expected: all 5 tests PASS.

  • Step 5: Commit
git add cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/ChunkAccumulator.java \
       cameleer-server-core/src/test/java/com/cameleer/server/core/ingestion/ChunkAccumulatorTest.java
git commit -m "feat(clickhouse): add ChunkAccumulator for chunked execution ingestion"

Task 5: ExecutionFlushScheduler + ChunkIngestionController

Files:

  • Create: cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ExecutionFlushScheduler.java

  • Create: cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ChunkIngestionController.java

  • Step 1: Implement ExecutionFlushScheduler

Follows MetricsFlushScheduler pattern. Drains two WriteBuffers (executions + processor batches) and calls ClickHouseExecutionStore. Also runs the stale sweep.

// cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ExecutionFlushScheduler.java
package com.cameleer.server.app.ingestion;

import com.cameleer.server.app.config.IngestionConfig;
import com.cameleer.server.app.storage.ClickHouseExecutionStore;
import com.cameleer.server.core.ingestion.ChunkAccumulator;
import com.cameleer.server.core.ingestion.MergedExecution;
import com.cameleer.server.core.ingestion.WriteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.annotation.Scheduled;

import java.util.List;

public class ExecutionFlushScheduler implements SmartLifecycle {

    private static final Logger log = LoggerFactory.getLogger(ExecutionFlushScheduler.class);

    private final WriteBuffer<MergedExecution> executionBuffer;
    private final WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer;
    private final ClickHouseExecutionStore executionStore;
    private final ChunkAccumulator accumulator;
    private final int batchSize;
    private volatile boolean running = false;

    public ExecutionFlushScheduler(WriteBuffer<MergedExecution> executionBuffer,
                                    WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer,
                                    ClickHouseExecutionStore executionStore,
                                    ChunkAccumulator accumulator,
                                    IngestionConfig config) {
        this.executionBuffer = executionBuffer;
        this.processorBuffer = processorBuffer;
        this.executionStore = executionStore;
        this.accumulator = accumulator;
        this.batchSize = config.getBatchSize();
    }

    @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
    public void flush() {
        flushExecutions();
        flushProcessors();
    }

    private void flushExecutions() {
        try {
            List<MergedExecution> batch = executionBuffer.drain(batchSize);
            if (!batch.isEmpty()) {
                executionStore.insertExecutionBatch(batch);
                log.debug("Flushed {} executions to ClickHouse", batch.size());
            }
        } catch (Exception e) {
            log.error("Failed to flush executions to ClickHouse", e);
        }
    }

    private void flushProcessors() {
        try {
            List<ChunkAccumulator.ProcessorBatch> batches = processorBuffer.drain(batchSize);
            for (ChunkAccumulator.ProcessorBatch batch : batches) {
                if (!batch.processors().isEmpty()) {
                    executionStore.insertProcessorBatch(
                            batch.tenantId(), batch.executionId(),
                            batch.routeId(), batch.applicationName(),
                            batch.execStartTime(), batch.processors());
                }
            }
            if (!batches.isEmpty()) {
                int totalProcs = batches.stream().mapToInt(b -> b.processors().size()).sum();
                log.debug("Flushed {} processor batches ({} records) to ClickHouse",
                        batches.size(), totalProcs);
            }
        } catch (Exception e) {
            log.error("Failed to flush processors to ClickHouse", e);
        }
    }

    @Scheduled(fixedDelay = 60_000)
    public void sweepStale() {
        try {
            accumulator.sweepStale();
        } catch (Exception e) {
            log.error("Failed to sweep stale exchanges", e);
        }
    }

    @Override public void start() { running = true; }

    @Override
    public void stop() {
        flush();
        running = false;
    }

    @Override public boolean isRunning() { return running; }
    @Override public int getPhase() { return Integer.MAX_VALUE - 1; }
}
  • Step 2: Implement ChunkIngestionController
// cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ChunkIngestionController.java
package com.cameleer.server.app.controller;

import com.cameleer.server.core.ingestion.ChunkAccumulator;
import com.cameleer.server.core.storage.model.ExecutionChunk;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
@RequestMapping("/api/v1/data")
@Tag(name = "Ingestion", description = "Data ingestion endpoints")
public class ChunkIngestionController {

    private final ChunkAccumulator accumulator;

    public ChunkIngestionController(ChunkAccumulator accumulator) {
        this.accumulator = accumulator;
    }

    @PostMapping("/chunks")
    @Operation(summary = "Ingest execution chunk (single or array)")
    public ResponseEntity<Void> ingestChunks(@RequestBody String body) {
        try {
            com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper()
                    .registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule());

            String trimmed = body.trim();
            if (trimmed.startsWith("[")) {
                List<ExecutionChunk> chunks = mapper.readValue(trimmed,
                        mapper.getTypeFactory().constructCollectionType(List.class, ExecutionChunk.class));
                for (ExecutionChunk chunk : chunks) {
                    accumulator.onChunk(chunk);
                }
            } else {
                ExecutionChunk chunk = mapper.readValue(trimmed, ExecutionChunk.class);
                accumulator.onChunk(chunk);
            }
            return ResponseEntity.accepted().build();
        } catch (Exception e) {
            return ResponseEntity.badRequest().build();
        }
    }
}
  • Step 3: Compile
mvn clean compile -pl cameleer-server-app
  • Step 4: Commit
git add cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ExecutionFlushScheduler.java \
       cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ChunkIngestionController.java
git commit -m "feat(clickhouse): add ExecutionFlushScheduler and ChunkIngestionController"

Task 6: ClickHouseSearchIndex

Files:

  • Create: cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexIT.java
  • Create: cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseSearchIndex.java

Same as the original plan — implements SearchIndex using SQL against ClickHouse. The search query patterns are unchanged: _search_text LIKE '%term%' on executions, subquery join on processor_executions for body/header/error scoped searches.

This task is identical to Task 5 in the original plan. Refer to that task's complete code for the ClickHouseSearchIndex and ClickHouseSearchIndexIT implementations. The only difference is that processor_executions now uses seq/iteration columns instead of depth/loopIndex/etc., but the search queries only use _search_text, execution_id, input_body, output_body, input_headers, output_headers, error_message, and error_stacktrace — none of which changed.

  • Step 1: Write the failing integration test

Use the same test class from the original plan's Task 5, Step 1. The test seeds data via ClickHouseExecutionStore using the new MergedExecution and FlatProcessorRecord types. Refer to the original plan for the complete test code.

  • Step 2: Run test to verify it fails
mvn test -pl cameleer-server-app -Dtest=ClickHouseSearchIndexIT -Dfailsafe.provider=surefire -DfailIfNoTests=false
  • Step 3: Implement ClickHouseSearchIndex

Use the same implementation from the original plan's Task 5, Step 3. The SQL queries and WHERE clause building are identical.

  • Step 4: Run test to verify it passes
mvn test -pl cameleer-server-app -Dtest=ClickHouseSearchIndexIT -Dfailsafe.provider=surefire
  • Step 5: Commit
git add cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseSearchIndex.java \
       cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexIT.java
git commit -m "feat(clickhouse): add ClickHouseSearchIndex with ngram-accelerated SQL search"

Task 7: Feature Flag Wiring

Files:

  • Modify: cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java
  • Modify: cameleer-server-app/src/main/java/com/cameleer/server/app/config/IngestionBeanConfig.java
  • Modify: cameleer-server-app/src/main/java/com/cameleer/server/app/search/OpenSearchIndex.java
  • Modify: cameleer-server-app/src/main/resources/application.yml
  • Modify: deploy/base/server.yaml

Wire up the ChunkAccumulator, WriteBuffers, flush scheduler, and search switching.

  • Step 1: Add execution + processor WriteBuffer beans to IngestionBeanConfig
// Add to IngestionBeanConfig.java
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public WriteBuffer<MergedExecution> executionBuffer(IngestionConfig config) {
    return new WriteBuffer<>(config.getBufferCapacity());
}

@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer(IngestionConfig config) {
    return new WriteBuffer<>(config.getBufferCapacity());
}
  • Step 2: Add CH beans to StorageBeanConfig
// Add to StorageBeanConfig.java

@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public ClickHouseExecutionStore clickHouseExecutionStore(
        @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
    return new ClickHouseExecutionStore(clickHouseJdbc);
}

@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public ChunkAccumulator chunkAccumulator(
        WriteBuffer<MergedExecution> executionBuffer,
        WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer) {
    return new ChunkAccumulator(
            executionBuffer::offer,
            processorBatchBuffer::offer,
            java.time.Duration.ofMinutes(5));
}

@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public ExecutionFlushScheduler executionFlushScheduler(
        WriteBuffer<MergedExecution> executionBuffer,
        WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
        ClickHouseExecutionStore executionStore,
        ChunkAccumulator accumulator,
        IngestionConfig config) {
    return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer,
            executionStore, accumulator, config);
}

@Bean
@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse")
public SearchIndex clickHouseSearchIndex(
        @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
    return new ClickHouseSearchIndex(clickHouseJdbc);
}
  • Step 3: Add ConditionalOnProperty to OpenSearchIndex
@Repository
@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "opensearch", matchIfMissing = true)
public class OpenSearchIndex implements SearchIndex {
  • Step 4: Update application.yml
cameleer:
  storage:
    metrics: ${CAMELEER_STORAGE_METRICS:postgres}
    search: ${CAMELEER_STORAGE_SEARCH:opensearch}
  • Step 5: Update deploy/base/server.yaml

Add env var:

- name: CAMELEER_STORAGE_SEARCH
  value: "opensearch"
  • Step 6: Compile and verify all tests pass
mvn clean verify -DskipITs
  • Step 7: Commit
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java \
       cameleer-server-app/src/main/java/com/cameleer/server/app/config/IngestionBeanConfig.java \
       cameleer-server-app/src/main/java/com/cameleer/server/app/search/OpenSearchIndex.java \
       cameleer-server-app/src/main/resources/application.yml \
       deploy/base/server.yaml
git commit -m "feat(clickhouse): wire ChunkAccumulator, flush scheduler, and search feature flag"

Task 8: End-to-End Integration Test

Files:

  • Create: cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java

Validates the full pipeline: ChunkAccumulator → WriteBuffers → ClickHouseExecutionStore → ClickHouseSearchIndex.

  • Step 1: Write the integration test
// cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java
package com.cameleer.server.app.storage;

import com.cameleer.server.app.search.ClickHouseSearchIndex;
import com.cameleer.server.core.ingestion.ChunkAccumulator;
import com.cameleer.server.core.ingestion.MergedExecution;
import com.cameleer.server.core.search.ExecutionSummary;
import com.cameleer.server.core.search.SearchRequest;
import com.cameleer.server.core.search.SearchResult;
import com.cameleer.server.core.storage.model.ExecutionChunk;
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

@Testcontainers
class ClickHouseChunkPipelineIT {

    @Container
    static final ClickHouseContainer clickhouse =
            new ClickHouseContainer("clickhouse/clickhouse-server:24.12");

    private JdbcTemplate jdbc;
    private ClickHouseExecutionStore executionStore;
    private ClickHouseSearchIndex searchIndex;
    private ChunkAccumulator accumulator;
    private List<MergedExecution> executionBuffer;
    private List<ChunkAccumulator.ProcessorBatch> processorBuffer;

    @BeforeEach
    void setUp() throws IOException {
        HikariDataSource ds = new HikariDataSource();
        ds.setJdbcUrl(clickhouse.getJdbcUrl());
        ds.setUsername(clickhouse.getUsername());
        ds.setPassword(clickhouse.getPassword());
        jdbc = new JdbcTemplate(ds);

        String execDdl = new String(getClass().getResourceAsStream(
                "/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8);
        String procDdl = new String(getClass().getResourceAsStream(
                "/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8);
        jdbc.execute(execDdl);
        jdbc.execute(procDdl);
        jdbc.execute("TRUNCATE TABLE executions");
        jdbc.execute("TRUNCATE TABLE processor_executions");

        executionStore = new ClickHouseExecutionStore(jdbc);
        searchIndex = new ClickHouseSearchIndex(jdbc);

        executionBuffer = new ArrayList<>();
        processorBuffer = new ArrayList<>();
        accumulator = new ChunkAccumulator(executionBuffer::add, processorBuffer::add, Duration.ofMinutes(5));
    }

    @Test
    void fullPipeline_chunkedIngestion_thenSearch() {
        Instant start = Instant.parse("2026-03-31T12:00:00Z");

        // Chunk 0: RUNNING with initial processors
        accumulator.onChunk(new ExecutionChunk(
                "pipeline-1", "order-service", "pod-1", "order-route",
                "corr-1", "RUNNING",
                start, null, null, "DEEP",
                null, null, null, null, null, null,
                Map.of("orderId", "ORD-123"),
                null, null, null, null,
                0, false,
                List.of(
                        new FlatProcessorRecord(1, null, null, "log1", "log",
                                null, null, "COMPLETED", start, 2L,
                                null, null, null, null, null,
                                null, null, null, null, null, null,
                                null, null, null, null, null),
                        new FlatProcessorRecord(2, null, null, "split1", "split",
                                null, 3, "COMPLETED", start.plusMillis(2), 100L,
                                null, null, null, null, null,
                                null, null, null, null, null, null,
                                null, null, null, null, null),
                        new FlatProcessorRecord(3, 2, "split1", "to1", "to",
                                0, null, "COMPLETED", start.plusMillis(5), 30L,
                                "http://inventory/api",
                                "order ABC-123 check stock", "stock available",
                                null, null,
                                null, null, null, null, null, null,
                                null, null, null, null, null))));

        // Processors should be flushed immediately
        assertThat(processorBuffer).hasSize(1);
        assertThat(executionBuffer).isEmpty();

        // Chunk 1: COMPLETED (final)
        accumulator.onChunk(new ExecutionChunk(
                "pipeline-1", "order-service", "pod-1", "order-route",
                "corr-1", "COMPLETED",
                start, start.plusMillis(750), 750L, "DEEP",
                null, null, null, null, null, null,
                null, null, null, null, null,
                1, true,
                List.of(
                        new FlatProcessorRecord(4, 2, "split1", "to1", "to",
                                1, null, "COMPLETED", start.plusMillis(40), 25L,
                                "http://inventory/api",
                                "order DEF-456 check stock", "stock available",
                                null, null,
                                null, null, null, null, null, null,
                                null, null, null, null, null))));

        assertThat(executionBuffer).hasSize(1);
        assertThat(processorBuffer).hasSize(2);

        // Flush to ClickHouse
        executionStore.insertExecutionBatch(executionBuffer);
        for (ChunkAccumulator.ProcessorBatch batch : processorBuffer) {
            executionStore.insertProcessorBatch(
                    batch.tenantId(), batch.executionId(),
                    batch.routeId(), batch.applicationName(),
                    batch.execStartTime(), batch.processors());
        }

        // Search by order ID in attributes
        SearchResult<ExecutionSummary> result = searchIndex.search(new SearchRequest(
                null, null, null, null, null, null,
                "ORD-123", null, null, null,
                null, null, null, null, null,
                0, 50, null, null));
        assertThat(result.total()).isEqualTo(1);
        assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1");
        assertThat(result.data().get(0).status()).isEqualTo("COMPLETED");
        assertThat(result.data().get(0).durationMs()).isEqualTo(750L);

        // Search in processor body
        SearchResult<ExecutionSummary> bodyResult = searchIndex.search(new SearchRequest(
                null, null, null, null, null, null,
                null, "ABC-123", null, null,
                null, null, null, null, null,
                0, 50, null, null));
        assertThat(bodyResult.total()).isEqualTo(1);

        // Verify iteration data in processor_executions
        Integer iterSize = jdbc.queryForObject(
                "SELECT iteration_size FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 2",
                Integer.class);
        assertThat(iterSize).isEqualTo(3);

        Integer iter0 = jdbc.queryForObject(
                "SELECT iteration FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 3",
                Integer.class);
        assertThat(iter0).isEqualTo(0);
    }
}
  • Step 2: Run the integration test
mvn test -pl cameleer-server-app -Dtest=ClickHouseChunkPipelineIT -Dfailsafe.provider=surefire

Expected: PASS.

  • Step 3: Commit
git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java
git commit -m "test(clickhouse): add end-to-end chunk pipeline integration test"

Verification Checklist

After all tasks are complete, verify:

  1. Chunk ingestion: POST /api/v1/data/chunks accepts single and array ExecutionChunks
  2. Processor immediate insert: Processor records are inserted as chunks arrive (append-only)
  3. Envelope accumulation: Multiple non-final chunks merge envelope data correctly
  4. Final flush: Final chunk triggers execution row write with version=1
  5. Stale sweep: Exchanges without final chunk for 5 minutes are flushed as RUNNING
  6. Search: All filter types work: status, time range, duration, correlation ID, application, text, textInBody, textInHeaders, textInErrors
  7. Highlighting: Search results include 120-char context snippets
  8. Feature flag: cameleer.storage.search=opensearch uses OpenSearch, =clickhouse uses ClickHouse
  9. Backward compat: With clickhouse.enabled=false, server starts without CH beans (PG + OpenSearch only)
  10. seq/parentSeq: Processor records correctly store seq, parentSeq, iteration, iterationSize
  11. CI: mvn clean verify -DskipITs passes