# ClickHouse Phase 2: Executions + Search — Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Ingest chunked execution data into ClickHouse and provide a ClickHouse-backed search implementation behind a feature flag. Replace the old `RouteExecution` tree ingestion with `ExecutionChunk` + `FlatProcessorRecord` flat ingestion. **Architecture:** Agents send `ExecutionChunk` documents containing flat `FlatProcessorRecord` entries with `seq`/`parentSeq`/`iteration` fields. A new `ChunkIngestionController` accepts chunks at `POST /api/v1/data/chunks`. A `ChunkAccumulator` buffers exchange envelope data in-memory, inserts processor records immediately via WriteBuffer, and writes the execution row when the final chunk arrives. A `ClickHouseSearchIndex` implements the `SearchIndex` interface using SQL with ngram bloom filter acceleration. Feature flags control which search backend is active. The old `RouteExecution` ingestion path is removed (no backward compatibility needed). **Tech Stack:** ClickHouse 24.12, clickhouse-jdbc 0.9.7 (all classifier), Spring JdbcTemplate, Testcontainers **Design Spec:** `docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md` **Note on cameleer3-common:** The agent team is refactoring `cameleer3-common` to add `ExecutionChunk` and `FlatProcessorRecord`. Until that library is published, this plan defines server-side DTOs in `cameleer3-server-core` that mirror the common models. When the common lib is ready, swap the server DTOs for the shared classes (import change only). --- ## File Structure | File | Responsibility | |------|----------------| | `cameleer3-server-app/.../resources/clickhouse/V2__executions.sql` | DDL for `executions` table (ReplacingMergeTree) | | `cameleer3-server-app/.../resources/clickhouse/V3__processor_executions.sql` | DDL for `processor_executions` table (MergeTree) — uses seq/parentSeq/iteration | | `cameleer3-server-core/.../model/ExecutionChunk.java` | Server-side DTO mirroring agent's ExecutionChunk (temporary until common lib ready) | | `cameleer3-server-core/.../model/FlatProcessorRecord.java` | Server-side DTO mirroring agent's FlatProcessorRecord (temporary until common lib ready) | | `cameleer3-server-core/.../ingestion/ChunkAccumulator.java` | Accumulates exchange envelope across chunks, pushes processor records + final execution row to WriteBuffers | | `cameleer3-server-core/.../ingestion/MergedExecution.java` | Record holding merged execution envelope + version + tenant | | `cameleer3-server-app/.../storage/ClickHouseExecutionStore.java` | Batch INSERT for executions + processor_executions to ClickHouse | | `cameleer3-server-app/.../search/ClickHouseSearchIndex.java` | SearchIndex impl using SQL with ngram indexes | | `cameleer3-server-app/.../ingestion/ExecutionFlushScheduler.java` | Drains execution + processor WriteBuffers → ClickHouseExecutionStore | | `cameleer3-server-app/.../controller/ChunkIngestionController.java` | REST endpoint `POST /api/v1/data/chunks` accepting ExecutionChunk | | `cameleer3-server-app/.../config/StorageBeanConfig.java` | Modified: add chunk accumulator + CH search beans | | `cameleer3-server-app/.../config/IngestionBeanConfig.java` | Modified: add execution + processor WriteBuffer beans | | `cameleer3-server-app/.../resources/application.yml` | Modified: add `cameleer.storage.search` flag | | `cameleer3-server-app/...test.../storage/ClickHouseExecutionStoreIT.java` | Integration test for CH execution writes | | `cameleer3-server-app/...test.../search/ClickHouseSearchIndexIT.java` | Integration test for CH search | | `cameleer3-server-core/...test.../ingestion/ChunkAccumulatorTest.java` | Unit test for accumulator logic | --- ### Task 1: Server-Side DTOs (ExecutionChunk + FlatProcessorRecord) **Files:** - Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java` - Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java` These mirror the agent's models exactly. When `cameleer3-common` is published with these classes, delete these files and update imports. - [ ] **Step 1: Create FlatProcessorRecord** ```java // cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java package com.cameleer3.server.core.storage.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonInclude; import java.time.Instant; import java.util.Map; /** * Flat processor execution record with seq/parentSeq for tree reconstruction. * Mirrors cameleer3-common FlatProcessorRecord — replace with common lib import when available. */ @JsonIgnoreProperties(ignoreUnknown = true) @JsonInclude(JsonInclude.Include.NON_NULL) public record FlatProcessorRecord( int seq, Integer parentSeq, String parentProcessorId, String processorId, String processorType, Integer iteration, Integer iterationSize, String status, Instant startTime, long durationMs, String resolvedEndpointUri, String inputBody, String outputBody, Map inputHeaders, Map outputHeaders, String errorMessage, String errorStackTrace, String errorType, String errorCategory, String rootCauseType, String rootCauseMessage, Map attributes, String circuitBreakerState, Boolean fallbackTriggered, Boolean filterMatched, Boolean duplicateMessage ) {} ``` - [ ] **Step 2: Create ExecutionChunk** ```java // cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java package com.cameleer3.server.core.storage.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import java.time.Instant; import java.util.List; import java.util.Map; /** * Chunk document: exchange envelope + list of FlatProcessorRecords. * Mirrors cameleer3-common ExecutionChunk — replace with common lib import when available. */ @JsonIgnoreProperties(ignoreUnknown = true) @JsonInclude(JsonInclude.Include.NON_NULL) public record ExecutionChunk( String exchangeId, String applicationName, String agentId, String routeId, String correlationId, String status, Instant startTime, Instant endTime, Long durationMs, String engineLevel, String errorMessage, String errorStackTrace, String errorType, String errorCategory, String rootCauseType, String rootCauseMessage, Map attributes, String traceId, String spanId, String originalExchangeId, String replayExchangeId, int chunkSeq, @JsonProperty("final") boolean isFinal, List processors ) {} ``` - [ ] **Step 3: Write deserialization test** ```java // cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java package com.cameleer3.server.core.storage.model; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.junit.jupiter.api.Test; import java.time.Instant; import java.util.List; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; class ExecutionChunkDeserializationTest { private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()); @Test void roundTrip_fullChunk() throws Exception { ExecutionChunk chunk = new ExecutionChunk( "ex-1", "order-service", "pod-1", "order-route", "corr-1", "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:01Z"), 1000L, "REGULAR", null, null, null, null, null, null, Map.of("orderId", "ORD-1"), "trace-1", "span-1", null, null, 2, true, List.of(new FlatProcessorRecord( 1, null, null, "log1", "log", null, null, "COMPLETED", Instant.parse("2026-03-31T10:00:00.100Z"), 5L, null, "body", null, null, null, null, null, null, null, null, null, null, null, null, null, null))); String json = MAPPER.writeValueAsString(chunk); ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class); assertThat(deserialized.exchangeId()).isEqualTo("ex-1"); assertThat(deserialized.isFinal()).isTrue(); assertThat(deserialized.chunkSeq()).isEqualTo(2); assertThat(deserialized.processors()).hasSize(1); assertThat(deserialized.processors().get(0).seq()).isEqualTo(1); assertThat(deserialized.processors().get(0).processorId()).isEqualTo("log1"); assertThat(deserialized.attributes()).containsEntry("orderId", "ORD-1"); } @Test void roundTrip_runningChunkWithIterations() throws Exception { ExecutionChunk chunk = new ExecutionChunk( "ex-2", "app", "agent-1", "route-1", "ex-2", "RUNNING", Instant.parse("2026-03-31T10:00:00Z"), null, null, "REGULAR", null, null, null, null, null, null, null, null, null, null, null, 0, false, List.of( new FlatProcessorRecord( 1, null, null, "split1", "split", null, 3, "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), 100L, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null), new FlatProcessorRecord( 2, 1, "split1", "log1", "log", 0, null, "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), 5L, null, "item-0", null, null, null, null, null, null, null, null, null, null, null, null, null, null), new FlatProcessorRecord( 3, 1, "split1", "log1", "log", 1, null, "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), 5L, null, "item-1", null, null, null, null, null, null, null, null, null, null, null, null, null, null))); String json = MAPPER.writeValueAsString(chunk); ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class); assertThat(deserialized.isFinal()).isFalse(); assertThat(deserialized.processors()).hasSize(3); FlatProcessorRecord split = deserialized.processors().get(0); assertThat(split.iterationSize()).isEqualTo(3); assertThat(split.parentSeq()).isNull(); FlatProcessorRecord child0 = deserialized.processors().get(1); assertThat(child0.parentSeq()).isEqualTo(1); assertThat(child0.parentProcessorId()).isEqualTo("split1"); assertThat(child0.iteration()).isEqualTo(0); } @Test void deserialize_unknownFieldsIgnored() throws Exception { String json = """ {"exchangeId":"ex-1","routeId":"r1","status":"COMPLETED", "startTime":"2026-03-31T10:00:00Z","chunkSeq":0,"final":true, "futureField":"ignored","processors":[]} """; ExecutionChunk chunk = MAPPER.readValue(json, ExecutionChunk.class); assertThat(chunk.exchangeId()).isEqualTo("ex-1"); assertThat(chunk.isFinal()).isTrue(); } } ``` - [ ] **Step 4: Run tests** ```bash mvn test -pl cameleer3-server-core -Dtest=ExecutionChunkDeserializationTest ``` Expected: PASS (3 tests). - [ ] **Step 5: Commit** ```bash git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java \ cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java \ cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java git commit -m "feat: add server-side ExecutionChunk and FlatProcessorRecord DTOs" ``` --- ### Task 2: DDL Scripts for executions and processor_executions **Files:** - Create: `cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql` - Create: `cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql` - [ ] **Step 1: Create executions DDL** The executions table stores one row per exchange (written when the final chunk arrives). Uses `ReplacingMergeTree(_version)` for rare late corrections. ```sql -- V2__executions.sql CREATE TABLE IF NOT EXISTS executions ( tenant_id LowCardinality(String) DEFAULT 'default', execution_id String, start_time DateTime64(3), _version UInt64 DEFAULT 1, route_id LowCardinality(String), agent_id LowCardinality(String), application_name LowCardinality(String), status LowCardinality(String), correlation_id String DEFAULT '', exchange_id String DEFAULT '', end_time Nullable(DateTime64(3)), duration_ms Nullable(Int64), error_message String DEFAULT '', error_stacktrace String DEFAULT '', error_type LowCardinality(String) DEFAULT '', error_category LowCardinality(String) DEFAULT '', root_cause_type String DEFAULT '', root_cause_message String DEFAULT '', diagram_content_hash String DEFAULT '', engine_level LowCardinality(String) DEFAULT '', input_body String DEFAULT '', output_body String DEFAULT '', input_headers String DEFAULT '', output_headers String DEFAULT '', attributes String DEFAULT '', trace_id String DEFAULT '', span_id String DEFAULT '', has_trace_data Bool DEFAULT false, is_replay Bool DEFAULT false, _search_text String MATERIALIZED concat(error_message, ' ', error_stacktrace, ' ', attributes, ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers, ' ', root_cause_message), INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_status status TYPE set(10) GRANULARITY 1, INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4 ) ENGINE = ReplacingMergeTree(_version) PARTITION BY (tenant_id, toYYYYMM(start_time)) ORDER BY (tenant_id, start_time, application_name, route_id, execution_id) TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; ``` Note: Removed `processors_json` — with flat records in `processor_executions`, the nested JSON column is no longer needed. - [ ] **Step 2: Create processor_executions DDL** Uses `seq`/`parentSeq`/`iteration` instead of `depth`/`loopIndex`/`splitIndex`/`multicastIndex`. ORDER BY ends with `seq` (unique per execution) instead of `processor_id` (can repeat across iterations). ```sql -- V3__processor_executions.sql CREATE TABLE IF NOT EXISTS processor_executions ( tenant_id LowCardinality(String) DEFAULT 'default', execution_id String, seq UInt32, parent_seq Nullable(UInt32), parent_processor_id String DEFAULT '', processor_id String, processor_type LowCardinality(String), start_time DateTime64(3), route_id LowCardinality(String), application_name LowCardinality(String), iteration Nullable(Int32), iteration_size Nullable(Int32), status LowCardinality(String), end_time Nullable(DateTime64(3)), duration_ms Nullable(Int64), error_message String DEFAULT '', error_stacktrace String DEFAULT '', error_type LowCardinality(String) DEFAULT '', error_category LowCardinality(String) DEFAULT '', root_cause_type String DEFAULT '', root_cause_message String DEFAULT '', input_body String DEFAULT '', output_body String DEFAULT '', input_headers String DEFAULT '', output_headers String DEFAULT '', attributes String DEFAULT '', resolved_endpoint_uri String DEFAULT '', circuit_breaker_state LowCardinality(String) DEFAULT '', fallback_triggered Bool DEFAULT false, filter_matched Bool DEFAULT false, duplicate_message Bool DEFAULT false, _search_text String MATERIALIZED concat(error_message, ' ', error_stacktrace, ' ', attributes, ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers), INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4 ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(start_time)) ORDER BY (tenant_id, start_time, application_name, route_id, execution_id, seq) TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; ``` - [ ] **Step 3: Verify DDL loads** ```bash mvn clean compile -pl cameleer3-server-app ``` `ClickHouseSchemaInitializer` scans `classpath:clickhouse/*.sql` automatically. - [ ] **Step 4: Commit** ```bash git add cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql \ cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql git commit -m "feat(clickhouse): add executions and processor_executions DDL for chunked transport" ``` --- ### Task 3: MergedExecution + ClickHouseExecutionStore **Files:** - Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java` - Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java` - Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java` The store handles batch INSERT for both `executions` (from MergedExecution) and `processor_executions` (from FlatProcessorRecord). It does NOT implement the `ExecutionStore` interface — it has its own batch API consumed by the flush scheduler. - [ ] **Step 1: Create MergedExecution record** ```java // cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java package com.cameleer3.server.core.ingestion; import java.time.Instant; import java.util.Map; /** * A merged execution envelope ready for ClickHouse insertion. * Produced by {@link ChunkAccumulator} after receiving the final chunk. */ public record MergedExecution( String tenantId, long version, String executionId, String routeId, String agentId, String applicationName, String status, String correlationId, String exchangeId, Instant startTime, Instant endTime, Long durationMs, String errorMessage, String errorStacktrace, String errorType, String errorCategory, String rootCauseType, String rootCauseMessage, String diagramContentHash, String engineLevel, String inputBody, String outputBody, String inputHeaders, String outputHeaders, String attributes, String traceId, String spanId, boolean hasTraceData, boolean isReplay ) {} ``` - [ ] **Step 2: Write the failing integration test** ```java // cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java package com.cameleer3.server.app.storage; import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.storage.model.FlatProcessorRecord; import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.jdbc.core.JdbcTemplate; import org.testcontainers.clickhouse.ClickHouseContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.List; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @Testcontainers class ClickHouseExecutionStoreIT { @Container static final ClickHouseContainer clickhouse = new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); private JdbcTemplate jdbc; private ClickHouseExecutionStore store; @BeforeEach void setUp() throws IOException { HikariDataSource ds = new HikariDataSource(); ds.setJdbcUrl(clickhouse.getJdbcUrl()); ds.setUsername(clickhouse.getUsername()); ds.setPassword(clickhouse.getPassword()); jdbc = new JdbcTemplate(ds); String execDdl = new String(getClass().getResourceAsStream( "/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8); String procDdl = new String(getClass().getResourceAsStream( "/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8); jdbc.execute(execDdl); jdbc.execute(procDdl); jdbc.execute("TRUNCATE TABLE executions"); jdbc.execute("TRUNCATE TABLE processor_executions"); store = new ClickHouseExecutionStore(jdbc); } @Test void insertExecutionBatch_writesToClickHouse() { MergedExecution exec = new MergedExecution( "default", 1, "exec-1", "route-timer", "agent-a", "my-app", "COMPLETED", "corr-1", "exchange-1", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:01Z"), 1000L, null, null, null, null, null, null, "hash-abc", "DEEP", "{\"key\":\"val\"}", "{\"out\":\"data\"}", "{\"h\":\"1\"}", "{\"h\":\"2\"}", "{\"attr\":\"val\"}", "trace-1", "span-1", true, false); store.insertExecutionBatch(List.of(exec)); Integer count = jdbc.queryForObject( "SELECT count() FROM executions WHERE execution_id = 'exec-1'", Integer.class); assertThat(count).isEqualTo(1); } @Test void insertProcessorBatch_writesToClickHouse() { FlatProcessorRecord proc = new FlatProcessorRecord( 1, null, null, "proc-1", "to", null, null, "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), 500L, "http://localhost:8080/api", "input body", "output body", Map.of("Content-Type", "application/json"), null, null, null, null, null, null, null, null, null, null, null, null); store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app", Instant.parse("2026-03-31T10:00:00Z"), List.of(proc)); Integer count = jdbc.queryForObject( "SELECT count() FROM processor_executions WHERE execution_id = 'exec-1'", Integer.class); assertThat(count).isEqualTo(1); // Verify seq and parent_seq are stored Integer seq = jdbc.queryForObject( "SELECT seq FROM processor_executions WHERE execution_id = 'exec-1'", Integer.class); assertThat(seq).isEqualTo(1); } @Test void insertProcessorBatch_withIterations() { List procs = List.of( new FlatProcessorRecord(1, null, null, "split1", "split", null, 3, "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), 100L, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null), new FlatProcessorRecord(2, 1, "split1", "log1", "log", 0, null, "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), 5L, null, "item-0", null, null, null, null, null, null, null, null, null, null, null, null, null, null), new FlatProcessorRecord(3, 1, "split1", "log1", "log", 1, null, "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), 5L, null, "item-1", null, null, null, null, null, null, null, null, null, null, null, null, null, null), new FlatProcessorRecord(4, 1, "split1", "log1", "log", 2, null, "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), 5L, null, "item-2", null, null, null, null, null, null, null, null, null, null, null, null, null, null)); store.insertProcessorBatch("default", "exec-split", "route-1", "my-app", Instant.parse("2026-03-31T10:00:00Z"), procs); Integer count = jdbc.queryForObject( "SELECT count() FROM processor_executions WHERE execution_id = 'exec-split'", Integer.class); assertThat(count).isEqualTo(4); // Verify iteration data Integer iterSize = jdbc.queryForObject( "SELECT iteration_size FROM processor_executions WHERE execution_id = 'exec-split' AND seq = 1", Integer.class); assertThat(iterSize).isEqualTo(3); } @Test void insertExecutionBatch_emptyList_doesNothing() { store.insertExecutionBatch(List.of()); Integer count = jdbc.queryForObject("SELECT count() FROM executions", Integer.class); assertThat(count).isEqualTo(0); } @Test void insertExecutionBatch_replacingMergeTree_keepsLatestVersion() { Instant startTime = Instant.parse("2026-03-31T10:00:00Z"); MergedExecution v1 = new MergedExecution( "default", 1, "exec-dup", "route-1", "agent-a", "my-app", "RUNNING", null, "exchange-1", startTime, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, false, false); MergedExecution v2 = new MergedExecution( "default", 2, "exec-dup", "route-1", "agent-a", "my-app", "COMPLETED", "corr-1", "exchange-1", startTime, Instant.parse("2026-03-31T10:00:01Z"), 1000L, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, false, false); store.insertExecutionBatch(List.of(v1, v2)); jdbc.execute("OPTIMIZE TABLE executions FINAL"); String status = jdbc.queryForObject( "SELECT status FROM executions WHERE execution_id = 'exec-dup'", String.class); assertThat(status).isEqualTo("COMPLETED"); } } ``` - [ ] **Step 3: Run test to verify it fails** ```bash mvn test -pl cameleer3-server-app -Dtest=ClickHouseExecutionStoreIT -Dfailsafe.provider=surefire -DfailIfNoTests=false ``` Expected: compilation error — `ClickHouseExecutionStore` does not exist. - [ ] **Step 4: Implement ClickHouseExecutionStore** ```java // cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java package com.cameleer3.server.app.storage; import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.storage.model.FlatProcessorRecord; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.jdbc.core.JdbcTemplate; import java.sql.Timestamp; import java.time.Instant; import java.util.List; import java.util.Map; public class ClickHouseExecutionStore { private static final ObjectMapper JSON = new ObjectMapper(); private final JdbcTemplate jdbc; public ClickHouseExecutionStore(JdbcTemplate jdbc) { this.jdbc = jdbc; } public void insertExecutionBatch(List executions) { if (executions.isEmpty()) return; jdbc.batchUpdate(""" INSERT INTO executions ( tenant_id, execution_id, start_time, _version, route_id, agent_id, application_name, status, correlation_id, exchange_id, end_time, duration_ms, error_message, error_stacktrace, error_type, error_category, root_cause_type, root_cause_message, diagram_content_hash, engine_level, input_body, output_body, input_headers, output_headers, attributes, trace_id, span_id, has_trace_data, is_replay ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) """, executions.stream().map(e -> new Object[]{ e.tenantId(), e.executionId(), Timestamp.from(e.startTime()), e.version(), orEmpty(e.routeId()), orEmpty(e.agentId()), orEmpty(e.applicationName()), orEmpty(e.status()), orEmpty(e.correlationId()), orEmpty(e.exchangeId()), e.endTime() != null ? Timestamp.from(e.endTime()) : null, e.durationMs(), orEmpty(e.errorMessage()), orEmpty(e.errorStacktrace()), orEmpty(e.errorType()), orEmpty(e.errorCategory()), orEmpty(e.rootCauseType()), orEmpty(e.rootCauseMessage()), orEmpty(e.diagramContentHash()), orEmpty(e.engineLevel()), orEmpty(e.inputBody()), orEmpty(e.outputBody()), orEmpty(e.inputHeaders()), orEmpty(e.outputHeaders()), orEmpty(e.attributes()), orEmpty(e.traceId()), orEmpty(e.spanId()), e.hasTraceData(), e.isReplay() }).toList()); } public void insertProcessorBatch(String tenantId, String executionId, String routeId, String applicationName, Instant execStartTime, List processors) { if (processors.isEmpty()) return; jdbc.batchUpdate(""" INSERT INTO processor_executions ( tenant_id, execution_id, seq, parent_seq, parent_processor_id, processor_id, processor_type, start_time, route_id, application_name, iteration, iteration_size, status, end_time, duration_ms, error_message, error_stacktrace, error_type, error_category, root_cause_type, root_cause_message, input_body, output_body, input_headers, output_headers, attributes, resolved_endpoint_uri, circuit_breaker_state, fallback_triggered, filter_matched, duplicate_message ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) """, processors.stream().map(p -> new Object[]{ tenantId, executionId, p.seq(), p.parentSeq(), orEmpty(p.parentProcessorId()), p.processorId(), p.processorType(), p.startTime() != null ? Timestamp.from(p.startTime()) : Timestamp.from(execStartTime), routeId, applicationName, p.iteration(), p.iterationSize(), orEmpty(p.status()), p.startTime() != null && p.durationMs() > 0 ? Timestamp.from(p.startTime().plusMillis(p.durationMs())) : null, p.durationMs(), orEmpty(p.errorMessage()), orEmpty(p.errorStackTrace()), orEmpty(p.errorType()), orEmpty(p.errorCategory()), orEmpty(p.rootCauseType()), orEmpty(p.rootCauseMessage()), orEmpty(p.inputBody()), orEmpty(p.outputBody()), headersToString(p.inputHeaders()), headersToString(p.outputHeaders()), mapToString(p.attributes()), orEmpty(p.resolvedEndpointUri()), orEmpty(p.circuitBreakerState()), p.fallbackTriggered() != null ? p.fallbackTriggered() : false, p.filterMatched() != null ? p.filterMatched() : false, p.duplicateMessage() != null ? p.duplicateMessage() : false }).toList()); } private static String orEmpty(String value) { return value != null ? value : ""; } private static String headersToString(Map headers) { if (headers == null || headers.isEmpty()) return ""; try { return JSON.writeValueAsString(headers); } catch (JsonProcessingException e) { return ""; } } private static String mapToString(Map map) { if (map == null || map.isEmpty()) return ""; try { return JSON.writeValueAsString(map); } catch (JsonProcessingException e) { return ""; } } } ``` - [ ] **Step 5: Run test to verify it passes** ```bash mvn test -pl cameleer3-server-app -Dtest=ClickHouseExecutionStoreIT -Dfailsafe.provider=surefire ``` Expected: all 5 tests PASS. - [ ] **Step 6: Commit** ```bash git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java \ cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java \ cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java git commit -m "feat(clickhouse): add ClickHouseExecutionStore with batch insert for chunked format" ``` --- ### Task 4: ChunkAccumulator **Files:** - Create: `cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java` - Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java` The ChunkAccumulator receives `ExecutionChunk` documents. For each chunk: - Processor records are pushed to a sink immediately (they're append-only) - Exchange envelope data is buffered/merged in a ConcurrentHashMap - When `isFinal=true`, the merged envelope is pushed to an execution sink A scheduled sweep flushes stale exchanges (no final chunk received within 5 minutes). - [ ] **Step 1: Write the failing unit test** ```java // cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java package com.cameleer3.server.core.ingestion; import com.cameleer3.server.core.storage.model.ExecutionChunk; import com.cameleer3.server.core.storage.model.FlatProcessorRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import static org.assertj.core.api.Assertions.assertThat; class ChunkAccumulatorTest { private List executionSink; private List processorSink; private ChunkAccumulator accumulator; @BeforeEach void setUp() { executionSink = new CopyOnWriteArrayList<>(); processorSink = new CopyOnWriteArrayList<>(); accumulator = new ChunkAccumulator(executionSink::add, processorSink::add, Duration.ofMinutes(5)); } @Test void singleFinalChunk_producesExecutionAndProcessors() { ExecutionChunk chunk = new ExecutionChunk( "ex-1", "my-app", "agent-a", "route-1", "corr-1", "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:01Z"), 1000L, "DEEP", null, null, null, null, null, null, Map.of("env", "prod"), "trace-1", "span-1", null, null, 0, true, List.of(new FlatProcessorRecord( 1, null, null, "log1", "log", null, null, "COMPLETED", Instant.parse("2026-03-31T10:00:00.100Z"), 5L, null, "body", null, null, null, null, null, null, null, null, null, null, null, null, null, null))); accumulator.onChunk(chunk); assertThat(executionSink).hasSize(1); MergedExecution merged = executionSink.get(0); assertThat(merged.executionId()).isEqualTo("ex-1"); assertThat(merged.status()).isEqualTo("COMPLETED"); assertThat(merged.durationMs()).isEqualTo(1000L); assertThat(merged.version()).isEqualTo(1); assertThat(processorSink).hasSize(1); assertThat(processorSink.get(0).processors()).hasSize(1); assertThat(processorSink.get(0).executionId()).isEqualTo("ex-1"); } @Test void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() { Instant start = Instant.parse("2026-03-31T10:00:00Z"); // Chunk 0: RUNNING with 2 processors ExecutionChunk chunk0 = new ExecutionChunk( "ex-multi", "my-app", "agent-a", "route-1", "corr-1", "RUNNING", start, null, null, "DEEP", null, null, null, null, null, null, null, null, null, null, null, 0, false, List.of( new FlatProcessorRecord(1, null, null, "log1", "log", null, null, "COMPLETED", start, 5L, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null), new FlatProcessorRecord(2, null, null, "to1", "to", null, null, "COMPLETED", start.plusMillis(5), 10L, "http://svc/api", null, null, null, null, null, null, null, null, null, null, null, null, null, null, null))); accumulator.onChunk(chunk0); // Processors inserted immediately assertThat(processorSink).hasSize(1); assertThat(processorSink.get(0).processors()).hasSize(2); // Execution NOT yet flushed assertThat(executionSink).isEmpty(); // Chunk 1: COMPLETED (final) with 1 more processor ExecutionChunk chunk1 = new ExecutionChunk( "ex-multi", "my-app", "agent-a", "route-1", "corr-1", "COMPLETED", start, start.plusMillis(500), 500L, "DEEP", null, null, null, null, null, null, Map.of("result", "ok"), null, null, null, null, 1, true, List.of(new FlatProcessorRecord(3, null, null, "log2", "log", null, null, "COMPLETED", start.plusMillis(100), 2L, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null))); accumulator.onChunk(chunk1); // Final chunk triggers execution flush assertThat(executionSink).hasSize(1); MergedExecution merged = executionSink.get(0); assertThat(merged.status()).isEqualTo("COMPLETED"); assertThat(merged.durationMs()).isEqualTo(500L); assertThat(merged.version()).isEqualTo(1); // Second processor batch assertThat(processorSink).hasSize(2); assertThat(processorSink.get(1).processors()).hasSize(1); } @Test void staleExchange_flushedBySweep() { accumulator = new ChunkAccumulator(executionSink::add, processorSink::add, Duration.ofMillis(1)); ExecutionChunk chunk = new ExecutionChunk( "ex-stale", "my-app", "agent-a", "route-1", "ex-stale", "RUNNING", Instant.parse("2026-03-31T09:50:00Z"), null, null, "REGULAR", null, null, null, null, null, null, null, null, null, null, null, 0, false, List.of()); accumulator.onChunk(chunk); try { Thread.sleep(5); } catch (InterruptedException ignored) {} accumulator.sweepStale(); assertThat(executionSink).hasSize(1); assertThat(executionSink.get(0).status()).isEqualTo("RUNNING"); assertThat(executionSink.get(0).version()).isEqualTo(1); } @Test void finalChunkWithErrors_populatesErrorFields() { ExecutionChunk chunk = new ExecutionChunk( "ex-err", "my-app", "agent-a", "route-1", "ex-err", "FAILED", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00.200Z"), 200L, "REGULAR", "Connection refused", "java.net.ConnectException...", "java.net.ConnectException", "CONNECTION", "java.net.ConnectException", "Connection refused", null, null, null, null, null, 0, true, List.of()); accumulator.onChunk(chunk); MergedExecution merged = executionSink.get(0); assertThat(merged.errorMessage()).isEqualTo("Connection refused"); assertThat(merged.errorType()).isEqualTo("java.net.ConnectException"); assertThat(merged.errorCategory()).isEqualTo("CONNECTION"); } @Test void getPendingCount_tracksBufferedExchanges() { Instant t = Instant.parse("2026-03-31T10:00:00Z"); accumulator.onChunk(new ExecutionChunk("e1", "app", "a", "r", "e1", "RUNNING", t, null, null, "REGULAR", null, null, null, null, null, null, null, null, null, null, null, 0, false, List.of())); accumulator.onChunk(new ExecutionChunk("e2", "app", "a", "r", "e2", "RUNNING", t, null, null, "REGULAR", null, null, null, null, null, null, null, null, null, null, null, 0, false, List.of())); assertThat(accumulator.getPendingCount()).isEqualTo(2); accumulator.onChunk(new ExecutionChunk("e1", "app", "a", "r", "e1", "COMPLETED", t, t.plusMillis(100), 100L, "REGULAR", null, null, null, null, null, null, null, null, null, null, null, 1, true, List.of())); assertThat(accumulator.getPendingCount()).isEqualTo(1); } } ``` - [ ] **Step 2: Run test to verify it fails** ```bash mvn test -pl cameleer3-server-core -Dtest=ChunkAccumulatorTest -DfailIfNoTests=false ``` Expected: compilation error — `ChunkAccumulator` does not exist. - [ ] **Step 3: Implement ChunkAccumulator** ```java // cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java package com.cameleer3.server.core.ingestion; import com.cameleer3.server.core.storage.model.ExecutionChunk; import com.cameleer3.server.core.storage.model.FlatProcessorRecord; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; /** * Accumulates ExecutionChunk documents per exchange. *

* Processor records are pushed to the processor sink immediately (append-only). * Exchange envelope data is buffered and merged across chunks. * When the final chunk arrives, the merged envelope is pushed to the execution sink. */ public class ChunkAccumulator { private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class); private static final ObjectMapper JSON = new ObjectMapper(); private static final String DEFAULT_TENANT = "default"; private final Consumer executionSink; private final Consumer processorSink; private final Duration staleThreshold; private final Map pending = new ConcurrentHashMap<>(); public ChunkAccumulator(Consumer executionSink, Consumer processorSink, Duration staleThreshold) { this.executionSink = executionSink; this.processorSink = processorSink; this.staleThreshold = staleThreshold; } public void onChunk(ExecutionChunk chunk) { String exchangeId = chunk.exchangeId(); // Insert processor records immediately (append-only) if (chunk.processors() != null && !chunk.processors().isEmpty()) { processorSink.accept(new ProcessorBatch( DEFAULT_TENANT, exchangeId, coalesce(chunk.routeId(), ""), coalesce(chunk.applicationName(), ""), chunk.startTime(), chunk.processors())); } if (chunk.isFinal()) { // Merge with any pending state and flush execution PendingExchange pendingExchange = pending.remove(exchangeId); MergedExecution merged = buildMergedExecution(chunk, pendingExchange); executionSink.accept(merged); } else { // Buffer/update exchange envelope pending.compute(exchangeId, (id, existing) -> { if (existing == null) { return new PendingExchange(chunk, Instant.now()); } return existing.mergeWith(chunk); }); } } public void sweepStale() { Instant cutoff = Instant.now().minus(staleThreshold); List staleIds = new ArrayList<>(); pending.forEach((id, pe) -> { if (pe.receivedAt().isBefore(cutoff)) { staleIds.add(id); } }); for (String id : staleIds) { PendingExchange stale = pending.remove(id); if (stale != null) { log.info("Flushing stale exchange {}", id); executionSink.accept(buildMergedExecution(stale.envelope(), null)); } } } public int getPendingCount() { return pending.size(); } private MergedExecution buildMergedExecution(ExecutionChunk finalChunk, PendingExchange pendingState) { ExecutionChunk base = pendingState != null ? pendingState.envelope() : null; String attributes = serializeMap(finalChunk.attributes()); if ((attributes == null || attributes.isEmpty()) && base != null) { attributes = serializeMap(base.envelope().attributes()); } boolean hasTraceData = false; boolean isReplay = finalChunk.replayExchangeId() != null; return new MergedExecution( DEFAULT_TENANT, 1, finalChunk.exchangeId(), coalesce(finalChunk.routeId(), base != null ? base.envelope().routeId() : null), coalesce(finalChunk.agentId(), base != null ? base.envelope().agentId() : null), coalesce(finalChunk.applicationName(), base != null ? base.envelope().applicationName() : null), coalesce(finalChunk.status(), base != null ? base.envelope().status() : "RUNNING"), coalesce(finalChunk.correlationId(), base != null ? base.envelope().correlationId() : null), finalChunk.exchangeId(), coalesce(finalChunk.startTime(), base != null ? base.envelope().startTime() : null), coalesce(finalChunk.endTime(), base != null ? base.envelope().endTime() : null), coalesce(finalChunk.durationMs(), base != null ? base.envelope().durationMs() : null), coalesce(finalChunk.errorMessage(), base != null ? base.envelope().errorMessage() : null), coalesce(finalChunk.errorStackTrace(), base != null ? base.envelope().errorStackTrace() : null), coalesce(finalChunk.errorType(), base != null ? base.envelope().errorType() : null), coalesce(finalChunk.errorCategory(), base != null ? base.envelope().errorCategory() : null), coalesce(finalChunk.rootCauseType(), base != null ? base.envelope().rootCauseType() : null), coalesce(finalChunk.rootCauseMessage(), base != null ? base.envelope().rootCauseMessage() : null), "", // diagramContentHash — server-side lookup, not in chunk coalesce(finalChunk.engineLevel(), base != null ? base.envelope().engineLevel() : null), "", "", "", "", // input/output body/headers — on processor records now, not envelope coalesce(attributes, ""), coalesce(finalChunk.traceId(), base != null ? base.envelope().traceId() : null), coalesce(finalChunk.spanId(), base != null ? base.envelope().spanId() : null), hasTraceData, isReplay ); } private static String serializeMap(Map map) { if (map == null || map.isEmpty()) return ""; try { return JSON.writeValueAsString(map); } catch (JsonProcessingException e) { return ""; } } private static T coalesce(T a, T b) { return a != null ? a : b; } /** * A batch of processor records for a single exchange, ready for insertion. */ public record ProcessorBatch( String tenantId, String executionId, String routeId, String applicationName, Instant execStartTime, List processors ) {} private record PendingExchange(ExecutionChunk envelope, Instant receivedAt) { PendingExchange mergeWith(ExecutionChunk newer) { // Keep the latest envelope data (later chunkSeq has more complete info) ExecutionChunk merged = new ExecutionChunk( envelope.exchangeId(), coalesce(newer.applicationName(), envelope.applicationName()), coalesce(newer.agentId(), envelope.agentId()), coalesce(newer.routeId(), envelope.routeId()), coalesce(newer.correlationId(), envelope.correlationId()), coalesce(newer.status(), envelope.status()), coalesce(envelope.startTime(), newer.startTime()), coalesce(newer.endTime(), envelope.endTime()), coalesce(newer.durationMs(), envelope.durationMs()), coalesce(newer.engineLevel(), envelope.engineLevel()), coalesce(newer.errorMessage(), envelope.errorMessage()), coalesce(newer.errorStackTrace(), envelope.errorStackTrace()), coalesce(newer.errorType(), envelope.errorType()), coalesce(newer.errorCategory(), envelope.errorCategory()), coalesce(newer.rootCauseType(), envelope.rootCauseType()), coalesce(newer.rootCauseMessage(), envelope.rootCauseMessage()), newer.attributes() != null ? newer.attributes() : envelope.attributes(), coalesce(newer.traceId(), envelope.traceId()), coalesce(newer.spanId(), envelope.spanId()), coalesce(newer.originalExchangeId(), envelope.originalExchangeId()), coalesce(newer.replayExchangeId(), envelope.replayExchangeId()), newer.chunkSeq(), newer.isFinal(), List.of()); return new PendingExchange(merged, receivedAt); } } } ``` - [ ] **Step 4: Run test to verify it passes** ```bash mvn test -pl cameleer3-server-core -Dtest=ChunkAccumulatorTest ``` Expected: all 5 tests PASS. - [ ] **Step 5: Commit** ```bash git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java \ cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java git commit -m "feat(clickhouse): add ChunkAccumulator for chunked execution ingestion" ``` --- ### Task 5: ExecutionFlushScheduler + ChunkIngestionController **Files:** - Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java` - Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java` - [ ] **Step 1: Implement ExecutionFlushScheduler** Follows `MetricsFlushScheduler` pattern. Drains two WriteBuffers (executions + processor batches) and calls ClickHouseExecutionStore. Also runs the stale sweep. ```java // cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java package com.cameleer3.server.app.ingestion; import com.cameleer3.server.app.config.IngestionConfig; import com.cameleer3.server.app.storage.ClickHouseExecutionStore; import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.SmartLifecycle; import org.springframework.scheduling.annotation.Scheduled; import java.util.List; public class ExecutionFlushScheduler implements SmartLifecycle { private static final Logger log = LoggerFactory.getLogger(ExecutionFlushScheduler.class); private final WriteBuffer executionBuffer; private final WriteBuffer processorBuffer; private final ClickHouseExecutionStore executionStore; private final ChunkAccumulator accumulator; private final int batchSize; private volatile boolean running = false; public ExecutionFlushScheduler(WriteBuffer executionBuffer, WriteBuffer processorBuffer, ClickHouseExecutionStore executionStore, ChunkAccumulator accumulator, IngestionConfig config) { this.executionBuffer = executionBuffer; this.processorBuffer = processorBuffer; this.executionStore = executionStore; this.accumulator = accumulator; this.batchSize = config.getBatchSize(); } @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}") public void flush() { flushExecutions(); flushProcessors(); } private void flushExecutions() { try { List batch = executionBuffer.drain(batchSize); if (!batch.isEmpty()) { executionStore.insertExecutionBatch(batch); log.debug("Flushed {} executions to ClickHouse", batch.size()); } } catch (Exception e) { log.error("Failed to flush executions to ClickHouse", e); } } private void flushProcessors() { try { List batches = processorBuffer.drain(batchSize); for (ChunkAccumulator.ProcessorBatch batch : batches) { if (!batch.processors().isEmpty()) { executionStore.insertProcessorBatch( batch.tenantId(), batch.executionId(), batch.routeId(), batch.applicationName(), batch.execStartTime(), batch.processors()); } } if (!batches.isEmpty()) { int totalProcs = batches.stream().mapToInt(b -> b.processors().size()).sum(); log.debug("Flushed {} processor batches ({} records) to ClickHouse", batches.size(), totalProcs); } } catch (Exception e) { log.error("Failed to flush processors to ClickHouse", e); } } @Scheduled(fixedDelay = 60_000) public void sweepStale() { try { accumulator.sweepStale(); } catch (Exception e) { log.error("Failed to sweep stale exchanges", e); } } @Override public void start() { running = true; } @Override public void stop() { flush(); running = false; } @Override public boolean isRunning() { return running; } @Override public int getPhase() { return Integer.MAX_VALUE - 1; } } ``` - [ ] **Step 2: Implement ChunkIngestionController** ```java // cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java package com.cameleer3.server.app.controller; import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.storage.model.ExecutionChunk; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; @RestController @RequestMapping("/api/v1/data") @Tag(name = "Ingestion", description = "Data ingestion endpoints") public class ChunkIngestionController { private final ChunkAccumulator accumulator; public ChunkIngestionController(ChunkAccumulator accumulator) { this.accumulator = accumulator; } @PostMapping("/chunks") @Operation(summary = "Ingest execution chunk (single or array)") public ResponseEntity ingestChunks(@RequestBody String body) { try { com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper() .registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule()); String trimmed = body.trim(); if (trimmed.startsWith("[")) { List chunks = mapper.readValue(trimmed, mapper.getTypeFactory().constructCollectionType(List.class, ExecutionChunk.class)); for (ExecutionChunk chunk : chunks) { accumulator.onChunk(chunk); } } else { ExecutionChunk chunk = mapper.readValue(trimmed, ExecutionChunk.class); accumulator.onChunk(chunk); } return ResponseEntity.accepted().build(); } catch (Exception e) { return ResponseEntity.badRequest().build(); } } } ``` - [ ] **Step 3: Compile** ```bash mvn clean compile -pl cameleer3-server-app ``` - [ ] **Step 4: Commit** ```bash git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java \ cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java git commit -m "feat(clickhouse): add ExecutionFlushScheduler and ChunkIngestionController" ``` --- ### Task 6: ClickHouseSearchIndex **Files:** - Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java` - Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java` Same as the original plan — implements `SearchIndex` using SQL against ClickHouse. The search query patterns are unchanged: `_search_text LIKE '%term%'` on executions, subquery join on processor_executions for body/header/error scoped searches. This task is identical to Task 5 in the original plan. Refer to that task's complete code for the `ClickHouseSearchIndex` and `ClickHouseSearchIndexIT` implementations. The only difference is that `processor_executions` now uses `seq`/`iteration` columns instead of `depth`/`loopIndex`/etc., but the search queries only use `_search_text`, `execution_id`, `input_body`, `output_body`, `input_headers`, `output_headers`, `error_message`, and `error_stacktrace` — none of which changed. - [ ] **Step 1: Write the failing integration test** Use the same test class from the original plan's Task 5, Step 1. The test seeds data via `ClickHouseExecutionStore` using the new `MergedExecution` and `FlatProcessorRecord` types. Refer to the original plan for the complete test code. - [ ] **Step 2: Run test to verify it fails** ```bash mvn test -pl cameleer3-server-app -Dtest=ClickHouseSearchIndexIT -Dfailsafe.provider=surefire -DfailIfNoTests=false ``` - [ ] **Step 3: Implement ClickHouseSearchIndex** Use the same implementation from the original plan's Task 5, Step 3. The SQL queries and WHERE clause building are identical. - [ ] **Step 4: Run test to verify it passes** ```bash mvn test -pl cameleer3-server-app -Dtest=ClickHouseSearchIndexIT -Dfailsafe.provider=surefire ``` - [ ] **Step 5: Commit** ```bash git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java \ cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java git commit -m "feat(clickhouse): add ClickHouseSearchIndex with ngram-accelerated SQL search" ``` --- ### Task 7: Feature Flag Wiring **Files:** - Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java` - Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java` - Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java` - Modify: `cameleer3-server-app/src/main/resources/application.yml` - Modify: `deploy/base/server.yaml` Wire up the ChunkAccumulator, WriteBuffers, flush scheduler, and search switching. - [ ] **Step 1: Add execution + processor WriteBuffer beans to IngestionBeanConfig** ```java // Add to IngestionBeanConfig.java @Bean @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") public WriteBuffer executionBuffer(IngestionConfig config) { return new WriteBuffer<>(config.getBufferCapacity()); } @Bean @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") public WriteBuffer processorBatchBuffer(IngestionConfig config) { return new WriteBuffer<>(config.getBufferCapacity()); } ``` - [ ] **Step 2: Add CH beans to StorageBeanConfig** ```java // Add to StorageBeanConfig.java @Bean @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") public ClickHouseExecutionStore clickHouseExecutionStore( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseExecutionStore(clickHouseJdbc); } @Bean @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") public ChunkAccumulator chunkAccumulator( WriteBuffer executionBuffer, WriteBuffer processorBatchBuffer) { return new ChunkAccumulator( executionBuffer::offer, processorBatchBuffer::offer, java.time.Duration.ofMinutes(5)); } @Bean @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") public ExecutionFlushScheduler executionFlushScheduler( WriteBuffer executionBuffer, WriteBuffer processorBatchBuffer, ClickHouseExecutionStore executionStore, ChunkAccumulator accumulator, IngestionConfig config) { return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer, executionStore, accumulator, config); } @Bean @ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse") public SearchIndex clickHouseSearchIndex( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseSearchIndex(clickHouseJdbc); } ``` - [ ] **Step 3: Add ConditionalOnProperty to OpenSearchIndex** ```java @Repository @ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "opensearch", matchIfMissing = true) public class OpenSearchIndex implements SearchIndex { ``` - [ ] **Step 4: Update application.yml** ```yaml cameleer: storage: metrics: ${CAMELEER_STORAGE_METRICS:postgres} search: ${CAMELEER_STORAGE_SEARCH:opensearch} ``` - [ ] **Step 5: Update deploy/base/server.yaml** Add env var: ```yaml - name: CAMELEER_STORAGE_SEARCH value: "opensearch" ``` - [ ] **Step 6: Compile and verify all tests pass** ```bash mvn clean verify -DskipITs ``` - [ ] **Step 7: Commit** ```bash git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java \ cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java \ cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java \ cameleer3-server-app/src/main/resources/application.yml \ deploy/base/server.yaml git commit -m "feat(clickhouse): wire ChunkAccumulator, flush scheduler, and search feature flag" ``` --- ### Task 8: End-to-End Integration Test **Files:** - Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java` Validates the full pipeline: ChunkAccumulator → WriteBuffers → ClickHouseExecutionStore → ClickHouseSearchIndex. - [ ] **Step 1: Write the integration test** ```java // cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java package com.cameleer3.server.app.storage; import com.cameleer3.server.app.search.ClickHouseSearchIndex; import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.search.ExecutionSummary; import com.cameleer3.server.core.search.SearchRequest; import com.cameleer3.server.core.search.SearchResult; import com.cameleer3.server.core.storage.model.ExecutionChunk; import com.cameleer3.server.core.storage.model.FlatProcessorRecord; import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.jdbc.core.JdbcTemplate; import org.testcontainers.clickhouse.ClickHouseContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @Testcontainers class ClickHouseChunkPipelineIT { @Container static final ClickHouseContainer clickhouse = new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); private JdbcTemplate jdbc; private ClickHouseExecutionStore executionStore; private ClickHouseSearchIndex searchIndex; private ChunkAccumulator accumulator; private List executionBuffer; private List processorBuffer; @BeforeEach void setUp() throws IOException { HikariDataSource ds = new HikariDataSource(); ds.setJdbcUrl(clickhouse.getJdbcUrl()); ds.setUsername(clickhouse.getUsername()); ds.setPassword(clickhouse.getPassword()); jdbc = new JdbcTemplate(ds); String execDdl = new String(getClass().getResourceAsStream( "/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8); String procDdl = new String(getClass().getResourceAsStream( "/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8); jdbc.execute(execDdl); jdbc.execute(procDdl); jdbc.execute("TRUNCATE TABLE executions"); jdbc.execute("TRUNCATE TABLE processor_executions"); executionStore = new ClickHouseExecutionStore(jdbc); searchIndex = new ClickHouseSearchIndex(jdbc); executionBuffer = new ArrayList<>(); processorBuffer = new ArrayList<>(); accumulator = new ChunkAccumulator(executionBuffer::add, processorBuffer::add, Duration.ofMinutes(5)); } @Test void fullPipeline_chunkedIngestion_thenSearch() { Instant start = Instant.parse("2026-03-31T12:00:00Z"); // Chunk 0: RUNNING with initial processors accumulator.onChunk(new ExecutionChunk( "pipeline-1", "order-service", "pod-1", "order-route", "corr-1", "RUNNING", start, null, null, "DEEP", null, null, null, null, null, null, Map.of("orderId", "ORD-123"), null, null, null, null, 0, false, List.of( new FlatProcessorRecord(1, null, null, "log1", "log", null, null, "COMPLETED", start, 2L, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null), new FlatProcessorRecord(2, null, null, "split1", "split", null, 3, "COMPLETED", start.plusMillis(2), 100L, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null), new FlatProcessorRecord(3, 2, "split1", "to1", "to", 0, null, "COMPLETED", start.plusMillis(5), 30L, "http://inventory/api", "order ABC-123 check stock", "stock available", null, null, null, null, null, null, null, null, null, null, null, null, null)))); // Processors should be flushed immediately assertThat(processorBuffer).hasSize(1); assertThat(executionBuffer).isEmpty(); // Chunk 1: COMPLETED (final) accumulator.onChunk(new ExecutionChunk( "pipeline-1", "order-service", "pod-1", "order-route", "corr-1", "COMPLETED", start, start.plusMillis(750), 750L, "DEEP", null, null, null, null, null, null, null, null, null, null, null, 1, true, List.of( new FlatProcessorRecord(4, 2, "split1", "to1", "to", 1, null, "COMPLETED", start.plusMillis(40), 25L, "http://inventory/api", "order DEF-456 check stock", "stock available", null, null, null, null, null, null, null, null, null, null, null, null, null)))); assertThat(executionBuffer).hasSize(1); assertThat(processorBuffer).hasSize(2); // Flush to ClickHouse executionStore.insertExecutionBatch(executionBuffer); for (ChunkAccumulator.ProcessorBatch batch : processorBuffer) { executionStore.insertProcessorBatch( batch.tenantId(), batch.executionId(), batch.routeId(), batch.applicationName(), batch.execStartTime(), batch.processors()); } // Search by order ID in attributes SearchResult result = searchIndex.search(new SearchRequest( null, null, null, null, null, null, "ORD-123", null, null, null, null, null, null, null, null, 0, 50, null, null)); assertThat(result.total()).isEqualTo(1); assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1"); assertThat(result.data().get(0).status()).isEqualTo("COMPLETED"); assertThat(result.data().get(0).durationMs()).isEqualTo(750L); // Search in processor body SearchResult bodyResult = searchIndex.search(new SearchRequest( null, null, null, null, null, null, null, "ABC-123", null, null, null, null, null, null, null, 0, 50, null, null)); assertThat(bodyResult.total()).isEqualTo(1); // Verify iteration data in processor_executions Integer iterSize = jdbc.queryForObject( "SELECT iteration_size FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 2", Integer.class); assertThat(iterSize).isEqualTo(3); Integer iter0 = jdbc.queryForObject( "SELECT iteration FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 3", Integer.class); assertThat(iter0).isEqualTo(0); } } ``` - [ ] **Step 2: Run the integration test** ```bash mvn test -pl cameleer3-server-app -Dtest=ClickHouseChunkPipelineIT -Dfailsafe.provider=surefire ``` Expected: PASS. - [ ] **Step 3: Commit** ```bash git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java git commit -m "test(clickhouse): add end-to-end chunk pipeline integration test" ``` --- ## Verification Checklist After all tasks are complete, verify: 1. **Chunk ingestion**: `POST /api/v1/data/chunks` accepts single and array ExecutionChunks 2. **Processor immediate insert**: Processor records are inserted as chunks arrive (append-only) 3. **Envelope accumulation**: Multiple non-final chunks merge envelope data correctly 4. **Final flush**: Final chunk triggers execution row write with version=1 5. **Stale sweep**: Exchanges without final chunk for 5 minutes are flushed as RUNNING 6. **Search**: All filter types work: status, time range, duration, correlation ID, application, text, textInBody, textInHeaders, textInErrors 7. **Highlighting**: Search results include 120-char context snippets 8. **Feature flag**: `cameleer.storage.search=opensearch` uses OpenSearch, `=clickhouse` uses ClickHouse 9. **Backward compat**: With `clickhouse.enabled=false`, server starts without CH beans (PG + OpenSearch only) 10. **seq/parentSeq**: Processor records correctly store seq, parentSeq, iteration, iterationSize 11. **CI**: `mvn clean verify -DskipITs` passes