Rename Java packages from com.cameleer3 to com.cameleer, module directories from cameleer3-* to cameleer-*, and all references throughout workflows, Dockerfiles, docs, migrations, and pom.xml. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
76 KiB
ClickHouse Phase 2: Executions + Search — Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Ingest chunked execution data into ClickHouse and provide a ClickHouse-backed search implementation behind a feature flag. Replace the old RouteExecution tree ingestion with ExecutionChunk + FlatProcessorRecord flat ingestion.
Architecture: Agents send ExecutionChunk documents containing flat FlatProcessorRecord entries with seq/parentSeq/iteration fields. A new ChunkIngestionController accepts chunks at POST /api/v1/data/chunks. A ChunkAccumulator buffers exchange envelope data in-memory, inserts processor records immediately via WriteBuffer, and writes the execution row when the final chunk arrives. A ClickHouseSearchIndex implements the SearchIndex interface using SQL with ngram bloom filter acceleration. Feature flags control which search backend is active. The old RouteExecution ingestion path is removed (no backward compatibility needed).
Tech Stack: ClickHouse 24.12, clickhouse-jdbc 0.9.7 (all classifier), Spring JdbcTemplate, Testcontainers
Design Spec: docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md
Note on cameleer-common: The agent team is refactoring cameleer-common to add ExecutionChunk and FlatProcessorRecord. Until that library is published, this plan defines server-side DTOs in cameleer-server-core that mirror the common models. When the common lib is ready, swap the server DTOs for the shared classes (import change only).
File Structure
| File | Responsibility |
|---|---|
cameleer-server-app/.../resources/clickhouse/V2__executions.sql |
DDL for executions table (ReplacingMergeTree) |
cameleer-server-app/.../resources/clickhouse/V3__processor_executions.sql |
DDL for processor_executions table (MergeTree) — uses seq/parentSeq/iteration |
cameleer-server-core/.../model/ExecutionChunk.java |
Server-side DTO mirroring agent's ExecutionChunk (temporary until common lib ready) |
cameleer-server-core/.../model/FlatProcessorRecord.java |
Server-side DTO mirroring agent's FlatProcessorRecord (temporary until common lib ready) |
cameleer-server-core/.../ingestion/ChunkAccumulator.java |
Accumulates exchange envelope across chunks, pushes processor records + final execution row to WriteBuffers |
cameleer-server-core/.../ingestion/MergedExecution.java |
Record holding merged execution envelope + version + tenant |
cameleer-server-app/.../storage/ClickHouseExecutionStore.java |
Batch INSERT for executions + processor_executions to ClickHouse |
cameleer-server-app/.../search/ClickHouseSearchIndex.java |
SearchIndex impl using SQL with ngram indexes |
cameleer-server-app/.../ingestion/ExecutionFlushScheduler.java |
Drains execution + processor WriteBuffers → ClickHouseExecutionStore |
cameleer-server-app/.../controller/ChunkIngestionController.java |
REST endpoint POST /api/v1/data/chunks accepting ExecutionChunk |
cameleer-server-app/.../config/StorageBeanConfig.java |
Modified: add chunk accumulator + CH search beans |
cameleer-server-app/.../config/IngestionBeanConfig.java |
Modified: add execution + processor WriteBuffer beans |
cameleer-server-app/.../resources/application.yml |
Modified: add cameleer.storage.search flag |
cameleer-server-app/...test.../storage/ClickHouseExecutionStoreIT.java |
Integration test for CH execution writes |
cameleer-server-app/...test.../search/ClickHouseSearchIndexIT.java |
Integration test for CH search |
cameleer-server-core/...test.../ingestion/ChunkAccumulatorTest.java |
Unit test for accumulator logic |
Task 1: Server-Side DTOs (ExecutionChunk + FlatProcessorRecord)
Files:
- Create:
cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/FlatProcessorRecord.java - Create:
cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionChunk.java
These mirror the agent's models exactly. When cameleer-common is published with these classes, delete these files and update imports.
- Step 1: Create FlatProcessorRecord
// cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/FlatProcessorRecord.java
package com.cameleer.server.core.storage.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.time.Instant;
import java.util.Map;
/**
* Flat processor execution record with seq/parentSeq for tree reconstruction.
* Mirrors cameleer-common FlatProcessorRecord — replace with common lib import when available.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public record FlatProcessorRecord(
int seq,
Integer parentSeq,
String parentProcessorId,
String processorId,
String processorType,
Integer iteration,
Integer iterationSize,
String status,
Instant startTime,
long durationMs,
String resolvedEndpointUri,
String inputBody,
String outputBody,
Map<String, String> inputHeaders,
Map<String, String> outputHeaders,
String errorMessage,
String errorStackTrace,
String errorType,
String errorCategory,
String rootCauseType,
String rootCauseMessage,
Map<String, String> attributes,
String circuitBreakerState,
Boolean fallbackTriggered,
Boolean filterMatched,
Boolean duplicateMessage
) {}
- Step 2: Create ExecutionChunk
// cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionChunk.java
package com.cameleer.server.core.storage.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
import java.util.List;
import java.util.Map;
/**
* Chunk document: exchange envelope + list of FlatProcessorRecords.
* Mirrors cameleer-common ExecutionChunk — replace with common lib import when available.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public record ExecutionChunk(
String exchangeId,
String applicationName,
String agentId,
String routeId,
String correlationId,
String status,
Instant startTime,
Instant endTime,
Long durationMs,
String engineLevel,
String errorMessage,
String errorStackTrace,
String errorType,
String errorCategory,
String rootCauseType,
String rootCauseMessage,
Map<String, String> attributes,
String traceId,
String spanId,
String originalExchangeId,
String replayExchangeId,
int chunkSeq,
@JsonProperty("final") boolean isFinal,
List<FlatProcessorRecord> processors
) {}
- Step 3: Write deserialization test
// cameleer-server-core/src/test/java/com/cameleer/server/core/storage/model/ExecutionChunkDeserializationTest.java
package com.cameleer.server.core.storage.model;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class ExecutionChunkDeserializationTest {
private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule());
@Test
void roundTrip_fullChunk() throws Exception {
ExecutionChunk chunk = new ExecutionChunk(
"ex-1", "order-service", "pod-1", "order-route",
"corr-1", "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:01Z"), 1000L,
"REGULAR",
null, null, null, null, null, null,
Map.of("orderId", "ORD-1"),
"trace-1", "span-1", null, null,
2, true,
List.of(new FlatProcessorRecord(
1, null, null, "log1", "log",
null, null, "COMPLETED",
Instant.parse("2026-03-31T10:00:00.100Z"), 5L,
null, "body", null, null, null,
null, null, null, null, null, null,
null, null, null, null, null)));
String json = MAPPER.writeValueAsString(chunk);
ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class);
assertThat(deserialized.exchangeId()).isEqualTo("ex-1");
assertThat(deserialized.isFinal()).isTrue();
assertThat(deserialized.chunkSeq()).isEqualTo(2);
assertThat(deserialized.processors()).hasSize(1);
assertThat(deserialized.processors().get(0).seq()).isEqualTo(1);
assertThat(deserialized.processors().get(0).processorId()).isEqualTo("log1");
assertThat(deserialized.attributes()).containsEntry("orderId", "ORD-1");
}
@Test
void roundTrip_runningChunkWithIterations() throws Exception {
ExecutionChunk chunk = new ExecutionChunk(
"ex-2", "app", "agent-1", "route-1",
"ex-2", "RUNNING",
Instant.parse("2026-03-31T10:00:00Z"),
null, null, "REGULAR",
null, null, null, null, null, null,
null, null, null, null, null,
0, false,
List.of(
new FlatProcessorRecord(
1, null, null, "split1", "split",
null, 3, "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 100L,
null, null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null),
new FlatProcessorRecord(
2, 1, "split1", "log1", "log",
0, null, "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 5L,
null, "item-0", null, null, null,
null, null, null, null, null, null,
null, null, null, null, null),
new FlatProcessorRecord(
3, 1, "split1", "log1", "log",
1, null, "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 5L,
null, "item-1", null, null, null,
null, null, null, null, null, null,
null, null, null, null, null)));
String json = MAPPER.writeValueAsString(chunk);
ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class);
assertThat(deserialized.isFinal()).isFalse();
assertThat(deserialized.processors()).hasSize(3);
FlatProcessorRecord split = deserialized.processors().get(0);
assertThat(split.iterationSize()).isEqualTo(3);
assertThat(split.parentSeq()).isNull();
FlatProcessorRecord child0 = deserialized.processors().get(1);
assertThat(child0.parentSeq()).isEqualTo(1);
assertThat(child0.parentProcessorId()).isEqualTo("split1");
assertThat(child0.iteration()).isEqualTo(0);
}
@Test
void deserialize_unknownFieldsIgnored() throws Exception {
String json = """
{"exchangeId":"ex-1","routeId":"r1","status":"COMPLETED",
"startTime":"2026-03-31T10:00:00Z","chunkSeq":0,"final":true,
"futureField":"ignored","processors":[]}
""";
ExecutionChunk chunk = MAPPER.readValue(json, ExecutionChunk.class);
assertThat(chunk.exchangeId()).isEqualTo("ex-1");
assertThat(chunk.isFinal()).isTrue();
}
}
- Step 4: Run tests
mvn test -pl cameleer-server-core -Dtest=ExecutionChunkDeserializationTest
Expected: PASS (3 tests).
- Step 5: Commit
git add cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/FlatProcessorRecord.java \
cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionChunk.java \
cameleer-server-core/src/test/java/com/cameleer/server/core/storage/model/ExecutionChunkDeserializationTest.java
git commit -m "feat: add server-side ExecutionChunk and FlatProcessorRecord DTOs"
Task 2: DDL Scripts for executions and processor_executions
Files:
-
Create:
cameleer-server-app/src/main/resources/clickhouse/V2__executions.sql -
Create:
cameleer-server-app/src/main/resources/clickhouse/V3__processor_executions.sql -
Step 1: Create executions DDL
The executions table stores one row per exchange (written when the final chunk arrives). Uses ReplacingMergeTree(_version) for rare late corrections.
-- V2__executions.sql
CREATE TABLE IF NOT EXISTS executions (
tenant_id LowCardinality(String) DEFAULT 'default',
execution_id String,
start_time DateTime64(3),
_version UInt64 DEFAULT 1,
route_id LowCardinality(String),
agent_id LowCardinality(String),
application_name LowCardinality(String),
status LowCardinality(String),
correlation_id String DEFAULT '',
exchange_id String DEFAULT '',
end_time Nullable(DateTime64(3)),
duration_ms Nullable(Int64),
error_message String DEFAULT '',
error_stacktrace String DEFAULT '',
error_type LowCardinality(String) DEFAULT '',
error_category LowCardinality(String) DEFAULT '',
root_cause_type String DEFAULT '',
root_cause_message String DEFAULT '',
diagram_content_hash String DEFAULT '',
engine_level LowCardinality(String) DEFAULT '',
input_body String DEFAULT '',
output_body String DEFAULT '',
input_headers String DEFAULT '',
output_headers String DEFAULT '',
attributes String DEFAULT '',
trace_id String DEFAULT '',
span_id String DEFAULT '',
has_trace_data Bool DEFAULT false,
is_replay Bool DEFAULT false,
_search_text String MATERIALIZED
concat(error_message, ' ', error_stacktrace, ' ', attributes,
' ', input_body, ' ', output_body, ' ', input_headers,
' ', output_headers, ' ', root_cause_message),
INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_status status TYPE set(10) GRANULARITY 1,
INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4
)
ENGINE = ReplacingMergeTree(_version)
PARTITION BY (tenant_id, toYYYYMM(start_time))
ORDER BY (tenant_id, start_time, application_name, route_id, execution_id)
TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
Note: Removed processors_json — with flat records in processor_executions, the nested JSON column is no longer needed.
- Step 2: Create processor_executions DDL
Uses seq/parentSeq/iteration instead of depth/loopIndex/splitIndex/multicastIndex. ORDER BY ends with seq (unique per execution) instead of processor_id (can repeat across iterations).
-- V3__processor_executions.sql
CREATE TABLE IF NOT EXISTS processor_executions (
tenant_id LowCardinality(String) DEFAULT 'default',
execution_id String,
seq UInt32,
parent_seq Nullable(UInt32),
parent_processor_id String DEFAULT '',
processor_id String,
processor_type LowCardinality(String),
start_time DateTime64(3),
route_id LowCardinality(String),
application_name LowCardinality(String),
iteration Nullable(Int32),
iteration_size Nullable(Int32),
status LowCardinality(String),
end_time Nullable(DateTime64(3)),
duration_ms Nullable(Int64),
error_message String DEFAULT '',
error_stacktrace String DEFAULT '',
error_type LowCardinality(String) DEFAULT '',
error_category LowCardinality(String) DEFAULT '',
root_cause_type String DEFAULT '',
root_cause_message String DEFAULT '',
input_body String DEFAULT '',
output_body String DEFAULT '',
input_headers String DEFAULT '',
output_headers String DEFAULT '',
attributes String DEFAULT '',
resolved_endpoint_uri String DEFAULT '',
circuit_breaker_state LowCardinality(String) DEFAULT '',
fallback_triggered Bool DEFAULT false,
filter_matched Bool DEFAULT false,
duplicate_message Bool DEFAULT false,
_search_text String MATERIALIZED
concat(error_message, ' ', error_stacktrace, ' ', attributes,
' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers),
INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(start_time))
ORDER BY (tenant_id, start_time, application_name, route_id, execution_id, seq)
TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
- Step 3: Verify DDL loads
mvn clean compile -pl cameleer-server-app
ClickHouseSchemaInitializer scans classpath:clickhouse/*.sql automatically.
- Step 4: Commit
git add cameleer-server-app/src/main/resources/clickhouse/V2__executions.sql \
cameleer-server-app/src/main/resources/clickhouse/V3__processor_executions.sql
git commit -m "feat(clickhouse): add executions and processor_executions DDL for chunked transport"
Task 3: MergedExecution + ClickHouseExecutionStore
Files:
- Create:
cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/MergedExecution.java - Create:
cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java - Create:
cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseExecutionStoreIT.java
The store handles batch INSERT for both executions (from MergedExecution) and processor_executions (from FlatProcessorRecord). It does NOT implement the ExecutionStore interface — it has its own batch API consumed by the flush scheduler.
- Step 1: Create MergedExecution record
// cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/MergedExecution.java
package com.cameleer.server.core.ingestion;
import java.time.Instant;
import java.util.Map;
/**
* A merged execution envelope ready for ClickHouse insertion.
* Produced by {@link ChunkAccumulator} after receiving the final chunk.
*/
public record MergedExecution(
String tenantId,
long version,
String executionId,
String routeId,
String agentId,
String applicationName,
String status,
String correlationId,
String exchangeId,
Instant startTime,
Instant endTime,
Long durationMs,
String errorMessage,
String errorStacktrace,
String errorType,
String errorCategory,
String rootCauseType,
String rootCauseMessage,
String diagramContentHash,
String engineLevel,
String inputBody,
String outputBody,
String inputHeaders,
String outputHeaders,
String attributes,
String traceId,
String spanId,
boolean hasTraceData,
boolean isReplay
) {}
- Step 2: Write the failing integration test
// cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseExecutionStoreIT.java
package com.cameleer.server.app.storage;
import com.cameleer.server.core.ingestion.MergedExecution;
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class ClickHouseExecutionStoreIT {
@Container
static final ClickHouseContainer clickhouse =
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
private JdbcTemplate jdbc;
private ClickHouseExecutionStore store;
@BeforeEach
void setUp() throws IOException {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(clickhouse.getJdbcUrl());
ds.setUsername(clickhouse.getUsername());
ds.setPassword(clickhouse.getPassword());
jdbc = new JdbcTemplate(ds);
String execDdl = new String(getClass().getResourceAsStream(
"/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8);
String procDdl = new String(getClass().getResourceAsStream(
"/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8);
jdbc.execute(execDdl);
jdbc.execute(procDdl);
jdbc.execute("TRUNCATE TABLE executions");
jdbc.execute("TRUNCATE TABLE processor_executions");
store = new ClickHouseExecutionStore(jdbc);
}
@Test
void insertExecutionBatch_writesToClickHouse() {
MergedExecution exec = new MergedExecution(
"default", 1,
"exec-1", "route-timer", "agent-a", "my-app",
"COMPLETED", "corr-1", "exchange-1",
Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:01Z"), 1000L,
null, null, null, null, null, null,
"hash-abc", "DEEP",
"{\"key\":\"val\"}", "{\"out\":\"data\"}", "{\"h\":\"1\"}", "{\"h\":\"2\"}",
"{\"attr\":\"val\"}",
"trace-1", "span-1", true, false);
store.insertExecutionBatch(List.of(exec));
Integer count = jdbc.queryForObject(
"SELECT count() FROM executions WHERE execution_id = 'exec-1'",
Integer.class);
assertThat(count).isEqualTo(1);
}
@Test
void insertProcessorBatch_writesToClickHouse() {
FlatProcessorRecord proc = new FlatProcessorRecord(
1, null, null, "proc-1", "to",
null, null, "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 500L,
"http://localhost:8080/api",
"input body", "output body",
Map.of("Content-Type", "application/json"), null,
null, null, null, null, null, null,
null, null, null, null, null);
store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app",
Instant.parse("2026-03-31T10:00:00Z"), List.of(proc));
Integer count = jdbc.queryForObject(
"SELECT count() FROM processor_executions WHERE execution_id = 'exec-1'",
Integer.class);
assertThat(count).isEqualTo(1);
// Verify seq and parent_seq are stored
Integer seq = jdbc.queryForObject(
"SELECT seq FROM processor_executions WHERE execution_id = 'exec-1'",
Integer.class);
assertThat(seq).isEqualTo(1);
}
@Test
void insertProcessorBatch_withIterations() {
List<FlatProcessorRecord> procs = List.of(
new FlatProcessorRecord(1, null, null, "split1", "split",
null, 3, "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 100L,
null, null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null),
new FlatProcessorRecord(2, 1, "split1", "log1", "log",
0, null, "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 5L,
null, "item-0", null, null, null,
null, null, null, null, null, null,
null, null, null, null, null),
new FlatProcessorRecord(3, 1, "split1", "log1", "log",
1, null, "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 5L,
null, "item-1", null, null, null,
null, null, null, null, null, null,
null, null, null, null, null),
new FlatProcessorRecord(4, 1, "split1", "log1", "log",
2, null, "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 5L,
null, "item-2", null, null, null,
null, null, null, null, null, null,
null, null, null, null, null));
store.insertProcessorBatch("default", "exec-split", "route-1", "my-app",
Instant.parse("2026-03-31T10:00:00Z"), procs);
Integer count = jdbc.queryForObject(
"SELECT count() FROM processor_executions WHERE execution_id = 'exec-split'",
Integer.class);
assertThat(count).isEqualTo(4);
// Verify iteration data
Integer iterSize = jdbc.queryForObject(
"SELECT iteration_size FROM processor_executions WHERE execution_id = 'exec-split' AND seq = 1",
Integer.class);
assertThat(iterSize).isEqualTo(3);
}
@Test
void insertExecutionBatch_emptyList_doesNothing() {
store.insertExecutionBatch(List.of());
Integer count = jdbc.queryForObject("SELECT count() FROM executions", Integer.class);
assertThat(count).isEqualTo(0);
}
@Test
void insertExecutionBatch_replacingMergeTree_keepsLatestVersion() {
Instant startTime = Instant.parse("2026-03-31T10:00:00Z");
MergedExecution v1 = new MergedExecution(
"default", 1,
"exec-dup", "route-1", "agent-a", "my-app",
"RUNNING", null, "exchange-1",
startTime, null, null,
null, null, null, null, null, null,
null, null,
null, null, null, null, null,
null, null, false, false);
MergedExecution v2 = new MergedExecution(
"default", 2,
"exec-dup", "route-1", "agent-a", "my-app",
"COMPLETED", "corr-1", "exchange-1",
startTime, Instant.parse("2026-03-31T10:00:01Z"), 1000L,
null, null, null, null, null, null,
null, null,
null, null, null, null, null,
null, null, false, false);
store.insertExecutionBatch(List.of(v1, v2));
jdbc.execute("OPTIMIZE TABLE executions FINAL");
String status = jdbc.queryForObject(
"SELECT status FROM executions WHERE execution_id = 'exec-dup'",
String.class);
assertThat(status).isEqualTo("COMPLETED");
}
}
- Step 3: Run test to verify it fails
mvn test -pl cameleer-server-app -Dtest=ClickHouseExecutionStoreIT -Dfailsafe.provider=surefire -DfailIfNoTests=false
Expected: compilation error — ClickHouseExecutionStore does not exist.
- Step 4: Implement ClickHouseExecutionStore
// cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java
package com.cameleer.server.app.storage;
import com.cameleer.server.core.ingestion.MergedExecution;
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
public class ClickHouseExecutionStore {
private static final ObjectMapper JSON = new ObjectMapper();
private final JdbcTemplate jdbc;
public ClickHouseExecutionStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
public void insertExecutionBatch(List<MergedExecution> executions) {
if (executions.isEmpty()) return;
jdbc.batchUpdate("""
INSERT INTO executions (
tenant_id, execution_id, start_time, _version,
route_id, agent_id, application_name, status,
correlation_id, exchange_id, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
diagram_content_hash, engine_level,
input_body, output_body, input_headers, output_headers,
attributes, trace_id, span_id,
has_trace_data, is_replay
) VALUES (
?, ?, ?, ?,
?, ?, ?, ?,
?, ?, ?, ?,
?, ?, ?, ?,
?, ?,
?, ?,
?, ?, ?, ?,
?, ?, ?,
?, ?
)
""",
executions.stream().map(e -> new Object[]{
e.tenantId(),
e.executionId(),
Timestamp.from(e.startTime()),
e.version(),
orEmpty(e.routeId()),
orEmpty(e.agentId()),
orEmpty(e.applicationName()),
orEmpty(e.status()),
orEmpty(e.correlationId()),
orEmpty(e.exchangeId()),
e.endTime() != null ? Timestamp.from(e.endTime()) : null,
e.durationMs(),
orEmpty(e.errorMessage()),
orEmpty(e.errorStacktrace()),
orEmpty(e.errorType()),
orEmpty(e.errorCategory()),
orEmpty(e.rootCauseType()),
orEmpty(e.rootCauseMessage()),
orEmpty(e.diagramContentHash()),
orEmpty(e.engineLevel()),
orEmpty(e.inputBody()),
orEmpty(e.outputBody()),
orEmpty(e.inputHeaders()),
orEmpty(e.outputHeaders()),
orEmpty(e.attributes()),
orEmpty(e.traceId()),
orEmpty(e.spanId()),
e.hasTraceData(),
e.isReplay()
}).toList());
}
public void insertProcessorBatch(String tenantId, String executionId,
String routeId, String applicationName,
Instant execStartTime,
List<FlatProcessorRecord> processors) {
if (processors.isEmpty()) return;
jdbc.batchUpdate("""
INSERT INTO processor_executions (
tenant_id, execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time,
route_id, application_name,
iteration, iteration_size, status,
end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers,
attributes, resolved_endpoint_uri,
circuit_breaker_state, fallback_triggered,
filter_matched, duplicate_message
) VALUES (
?, ?, ?, ?, ?,
?, ?, ?,
?, ?,
?, ?, ?,
?, ?,
?, ?, ?, ?,
?, ?,
?, ?, ?, ?,
?, ?,
?, ?,
?, ?
)
""",
processors.stream().map(p -> new Object[]{
tenantId,
executionId,
p.seq(),
p.parentSeq(),
orEmpty(p.parentProcessorId()),
p.processorId(),
p.processorType(),
p.startTime() != null ? Timestamp.from(p.startTime()) : Timestamp.from(execStartTime),
routeId,
applicationName,
p.iteration(),
p.iterationSize(),
orEmpty(p.status()),
p.startTime() != null && p.durationMs() > 0
? Timestamp.from(p.startTime().plusMillis(p.durationMs())) : null,
p.durationMs(),
orEmpty(p.errorMessage()),
orEmpty(p.errorStackTrace()),
orEmpty(p.errorType()),
orEmpty(p.errorCategory()),
orEmpty(p.rootCauseType()),
orEmpty(p.rootCauseMessage()),
orEmpty(p.inputBody()),
orEmpty(p.outputBody()),
headersToString(p.inputHeaders()),
headersToString(p.outputHeaders()),
mapToString(p.attributes()),
orEmpty(p.resolvedEndpointUri()),
orEmpty(p.circuitBreakerState()),
p.fallbackTriggered() != null ? p.fallbackTriggered() : false,
p.filterMatched() != null ? p.filterMatched() : false,
p.duplicateMessage() != null ? p.duplicateMessage() : false
}).toList());
}
private static String orEmpty(String value) {
return value != null ? value : "";
}
private static String headersToString(Map<String, String> headers) {
if (headers == null || headers.isEmpty()) return "";
try {
return JSON.writeValueAsString(headers);
} catch (JsonProcessingException e) {
return "";
}
}
private static String mapToString(Map<String, String> map) {
if (map == null || map.isEmpty()) return "";
try {
return JSON.writeValueAsString(map);
} catch (JsonProcessingException e) {
return "";
}
}
}
- Step 5: Run test to verify it passes
mvn test -pl cameleer-server-app -Dtest=ClickHouseExecutionStoreIT -Dfailsafe.provider=surefire
Expected: all 5 tests PASS.
- Step 6: Commit
git add cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/MergedExecution.java \
cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java \
cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseExecutionStoreIT.java
git commit -m "feat(clickhouse): add ClickHouseExecutionStore with batch insert for chunked format"
Task 4: ChunkAccumulator
Files:
- Create:
cameleer-server-core/src/test/java/com/cameleer/server/core/ingestion/ChunkAccumulatorTest.java - Create:
cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/ChunkAccumulator.java
The ChunkAccumulator receives ExecutionChunk documents. For each chunk:
- Processor records are pushed to a sink immediately (they're append-only)
- Exchange envelope data is buffered/merged in a ConcurrentHashMap
- When
isFinal=true, the merged envelope is pushed to an execution sink
A scheduled sweep flushes stale exchanges (no final chunk received within 5 minutes).
- Step 1: Write the failing unit test
// cameleer-server-core/src/test/java/com/cameleer/server/core/ingestion/ChunkAccumulatorTest.java
package com.cameleer.server.core.ingestion;
import com.cameleer.server.core.storage.model.ExecutionChunk;
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.assertj.core.api.Assertions.assertThat;
class ChunkAccumulatorTest {
private List<MergedExecution> executionSink;
private List<ChunkAccumulator.ProcessorBatch> processorSink;
private ChunkAccumulator accumulator;
@BeforeEach
void setUp() {
executionSink = new CopyOnWriteArrayList<>();
processorSink = new CopyOnWriteArrayList<>();
accumulator = new ChunkAccumulator(executionSink::add, processorSink::add, Duration.ofMinutes(5));
}
@Test
void singleFinalChunk_producesExecutionAndProcessors() {
ExecutionChunk chunk = new ExecutionChunk(
"ex-1", "my-app", "agent-a", "route-1",
"corr-1", "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:01Z"), 1000L,
"DEEP",
null, null, null, null, null, null,
Map.of("env", "prod"),
"trace-1", "span-1", null, null,
0, true,
List.of(new FlatProcessorRecord(
1, null, null, "log1", "log",
null, null, "COMPLETED",
Instant.parse("2026-03-31T10:00:00.100Z"), 5L,
null, "body", null, null, null,
null, null, null, null, null, null,
null, null, null, null, null)));
accumulator.onChunk(chunk);
assertThat(executionSink).hasSize(1);
MergedExecution merged = executionSink.get(0);
assertThat(merged.executionId()).isEqualTo("ex-1");
assertThat(merged.status()).isEqualTo("COMPLETED");
assertThat(merged.durationMs()).isEqualTo(1000L);
assertThat(merged.version()).isEqualTo(1);
assertThat(processorSink).hasSize(1);
assertThat(processorSink.get(0).processors()).hasSize(1);
assertThat(processorSink.get(0).executionId()).isEqualTo("ex-1");
}
@Test
void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() {
Instant start = Instant.parse("2026-03-31T10:00:00Z");
// Chunk 0: RUNNING with 2 processors
ExecutionChunk chunk0 = new ExecutionChunk(
"ex-multi", "my-app", "agent-a", "route-1",
"corr-1", "RUNNING",
start, null, null, "DEEP",
null, null, null, null, null, null,
null, null, null, null, null,
0, false,
List.of(
new FlatProcessorRecord(1, null, null, "log1", "log",
null, null, "COMPLETED", start, 5L,
null, null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null),
new FlatProcessorRecord(2, null, null, "to1", "to",
null, null, "COMPLETED", start.plusMillis(5), 10L,
"http://svc/api", null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null)));
accumulator.onChunk(chunk0);
// Processors inserted immediately
assertThat(processorSink).hasSize(1);
assertThat(processorSink.get(0).processors()).hasSize(2);
// Execution NOT yet flushed
assertThat(executionSink).isEmpty();
// Chunk 1: COMPLETED (final) with 1 more processor
ExecutionChunk chunk1 = new ExecutionChunk(
"ex-multi", "my-app", "agent-a", "route-1",
"corr-1", "COMPLETED",
start, start.plusMillis(500), 500L, "DEEP",
null, null, null, null, null, null,
Map.of("result", "ok"),
null, null, null, null,
1, true,
List.of(new FlatProcessorRecord(3, null, null, "log2", "log",
null, null, "COMPLETED", start.plusMillis(100), 2L,
null, null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null)));
accumulator.onChunk(chunk1);
// Final chunk triggers execution flush
assertThat(executionSink).hasSize(1);
MergedExecution merged = executionSink.get(0);
assertThat(merged.status()).isEqualTo("COMPLETED");
assertThat(merged.durationMs()).isEqualTo(500L);
assertThat(merged.version()).isEqualTo(1);
// Second processor batch
assertThat(processorSink).hasSize(2);
assertThat(processorSink.get(1).processors()).hasSize(1);
}
@Test
void staleExchange_flushedBySweep() {
accumulator = new ChunkAccumulator(executionSink::add, processorSink::add, Duration.ofMillis(1));
ExecutionChunk chunk = new ExecutionChunk(
"ex-stale", "my-app", "agent-a", "route-1",
"ex-stale", "RUNNING",
Instant.parse("2026-03-31T09:50:00Z"),
null, null, "REGULAR",
null, null, null, null, null, null,
null, null, null, null, null,
0, false, List.of());
accumulator.onChunk(chunk);
try { Thread.sleep(5); } catch (InterruptedException ignored) {}
accumulator.sweepStale();
assertThat(executionSink).hasSize(1);
assertThat(executionSink.get(0).status()).isEqualTo("RUNNING");
assertThat(executionSink.get(0).version()).isEqualTo(1);
}
@Test
void finalChunkWithErrors_populatesErrorFields() {
ExecutionChunk chunk = new ExecutionChunk(
"ex-err", "my-app", "agent-a", "route-1",
"ex-err", "FAILED",
Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:00.200Z"), 200L,
"REGULAR",
"Connection refused", "java.net.ConnectException...",
"java.net.ConnectException", "CONNECTION",
"java.net.ConnectException", "Connection refused",
null, null, null, null, null,
0, true, List.of());
accumulator.onChunk(chunk);
MergedExecution merged = executionSink.get(0);
assertThat(merged.errorMessage()).isEqualTo("Connection refused");
assertThat(merged.errorType()).isEqualTo("java.net.ConnectException");
assertThat(merged.errorCategory()).isEqualTo("CONNECTION");
}
@Test
void getPendingCount_tracksBufferedExchanges() {
Instant t = Instant.parse("2026-03-31T10:00:00Z");
accumulator.onChunk(new ExecutionChunk("e1", "app", "a", "r", "e1", "RUNNING",
t, null, null, "REGULAR", null, null, null, null, null, null,
null, null, null, null, null, 0, false, List.of()));
accumulator.onChunk(new ExecutionChunk("e2", "app", "a", "r", "e2", "RUNNING",
t, null, null, "REGULAR", null, null, null, null, null, null,
null, null, null, null, null, 0, false, List.of()));
assertThat(accumulator.getPendingCount()).isEqualTo(2);
accumulator.onChunk(new ExecutionChunk("e1", "app", "a", "r", "e1", "COMPLETED",
t, t.plusMillis(100), 100L, "REGULAR", null, null, null, null, null, null,
null, null, null, null, null, 1, true, List.of()));
assertThat(accumulator.getPendingCount()).isEqualTo(1);
}
}
- Step 2: Run test to verify it fails
mvn test -pl cameleer-server-core -Dtest=ChunkAccumulatorTest -DfailIfNoTests=false
Expected: compilation error — ChunkAccumulator does not exist.
- Step 3: Implement ChunkAccumulator
// cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/ChunkAccumulator.java
package com.cameleer.server.core.ingestion;
import com.cameleer.server.core.storage.model.ExecutionChunk;
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* Accumulates ExecutionChunk documents per exchange.
* <p>
* Processor records are pushed to the processor sink immediately (append-only).
* Exchange envelope data is buffered and merged across chunks.
* When the final chunk arrives, the merged envelope is pushed to the execution sink.
*/
public class ChunkAccumulator {
private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class);
private static final ObjectMapper JSON = new ObjectMapper();
private static final String DEFAULT_TENANT = "default";
private final Consumer<MergedExecution> executionSink;
private final Consumer<ProcessorBatch> processorSink;
private final Duration staleThreshold;
private final Map<String, PendingExchange> pending = new ConcurrentHashMap<>();
public ChunkAccumulator(Consumer<MergedExecution> executionSink,
Consumer<ProcessorBatch> processorSink,
Duration staleThreshold) {
this.executionSink = executionSink;
this.processorSink = processorSink;
this.staleThreshold = staleThreshold;
}
public void onChunk(ExecutionChunk chunk) {
String exchangeId = chunk.exchangeId();
// Insert processor records immediately (append-only)
if (chunk.processors() != null && !chunk.processors().isEmpty()) {
processorSink.accept(new ProcessorBatch(
DEFAULT_TENANT, exchangeId,
coalesce(chunk.routeId(), ""),
coalesce(chunk.applicationName(), ""),
chunk.startTime(),
chunk.processors()));
}
if (chunk.isFinal()) {
// Merge with any pending state and flush execution
PendingExchange pendingExchange = pending.remove(exchangeId);
MergedExecution merged = buildMergedExecution(chunk, pendingExchange);
executionSink.accept(merged);
} else {
// Buffer/update exchange envelope
pending.compute(exchangeId, (id, existing) -> {
if (existing == null) {
return new PendingExchange(chunk, Instant.now());
}
return existing.mergeWith(chunk);
});
}
}
public void sweepStale() {
Instant cutoff = Instant.now().minus(staleThreshold);
List<String> staleIds = new ArrayList<>();
pending.forEach((id, pe) -> {
if (pe.receivedAt().isBefore(cutoff)) {
staleIds.add(id);
}
});
for (String id : staleIds) {
PendingExchange stale = pending.remove(id);
if (stale != null) {
log.info("Flushing stale exchange {}", id);
executionSink.accept(buildMergedExecution(stale.envelope(), null));
}
}
}
public int getPendingCount() {
return pending.size();
}
private MergedExecution buildMergedExecution(ExecutionChunk finalChunk,
PendingExchange pendingState) {
ExecutionChunk base = pendingState != null ? pendingState.envelope() : null;
String attributes = serializeMap(finalChunk.attributes());
if ((attributes == null || attributes.isEmpty()) && base != null) {
attributes = serializeMap(base.envelope().attributes());
}
boolean hasTraceData = false;
boolean isReplay = finalChunk.replayExchangeId() != null;
return new MergedExecution(
DEFAULT_TENANT, 1,
finalChunk.exchangeId(),
coalesce(finalChunk.routeId(), base != null ? base.envelope().routeId() : null),
coalesce(finalChunk.agentId(), base != null ? base.envelope().agentId() : null),
coalesce(finalChunk.applicationName(), base != null ? base.envelope().applicationName() : null),
coalesce(finalChunk.status(), base != null ? base.envelope().status() : "RUNNING"),
coalesce(finalChunk.correlationId(), base != null ? base.envelope().correlationId() : null),
finalChunk.exchangeId(),
coalesce(finalChunk.startTime(), base != null ? base.envelope().startTime() : null),
coalesce(finalChunk.endTime(), base != null ? base.envelope().endTime() : null),
coalesce(finalChunk.durationMs(), base != null ? base.envelope().durationMs() : null),
coalesce(finalChunk.errorMessage(), base != null ? base.envelope().errorMessage() : null),
coalesce(finalChunk.errorStackTrace(), base != null ? base.envelope().errorStackTrace() : null),
coalesce(finalChunk.errorType(), base != null ? base.envelope().errorType() : null),
coalesce(finalChunk.errorCategory(), base != null ? base.envelope().errorCategory() : null),
coalesce(finalChunk.rootCauseType(), base != null ? base.envelope().rootCauseType() : null),
coalesce(finalChunk.rootCauseMessage(), base != null ? base.envelope().rootCauseMessage() : null),
"", // diagramContentHash — server-side lookup, not in chunk
coalesce(finalChunk.engineLevel(), base != null ? base.envelope().engineLevel() : null),
"", "", "", "", // input/output body/headers — on processor records now, not envelope
coalesce(attributes, ""),
coalesce(finalChunk.traceId(), base != null ? base.envelope().traceId() : null),
coalesce(finalChunk.spanId(), base != null ? base.envelope().spanId() : null),
hasTraceData,
isReplay
);
}
private static String serializeMap(Map<String, String> map) {
if (map == null || map.isEmpty()) return "";
try {
return JSON.writeValueAsString(map);
} catch (JsonProcessingException e) {
return "";
}
}
private static <T> T coalesce(T a, T b) {
return a != null ? a : b;
}
/**
* A batch of processor records for a single exchange, ready for insertion.
*/
public record ProcessorBatch(
String tenantId,
String executionId,
String routeId,
String applicationName,
Instant execStartTime,
List<FlatProcessorRecord> processors
) {}
private record PendingExchange(ExecutionChunk envelope, Instant receivedAt) {
PendingExchange mergeWith(ExecutionChunk newer) {
// Keep the latest envelope data (later chunkSeq has more complete info)
ExecutionChunk merged = new ExecutionChunk(
envelope.exchangeId(),
coalesce(newer.applicationName(), envelope.applicationName()),
coalesce(newer.agentId(), envelope.agentId()),
coalesce(newer.routeId(), envelope.routeId()),
coalesce(newer.correlationId(), envelope.correlationId()),
coalesce(newer.status(), envelope.status()),
coalesce(envelope.startTime(), newer.startTime()),
coalesce(newer.endTime(), envelope.endTime()),
coalesce(newer.durationMs(), envelope.durationMs()),
coalesce(newer.engineLevel(), envelope.engineLevel()),
coalesce(newer.errorMessage(), envelope.errorMessage()),
coalesce(newer.errorStackTrace(), envelope.errorStackTrace()),
coalesce(newer.errorType(), envelope.errorType()),
coalesce(newer.errorCategory(), envelope.errorCategory()),
coalesce(newer.rootCauseType(), envelope.rootCauseType()),
coalesce(newer.rootCauseMessage(), envelope.rootCauseMessage()),
newer.attributes() != null ? newer.attributes() : envelope.attributes(),
coalesce(newer.traceId(), envelope.traceId()),
coalesce(newer.spanId(), envelope.spanId()),
coalesce(newer.originalExchangeId(), envelope.originalExchangeId()),
coalesce(newer.replayExchangeId(), envelope.replayExchangeId()),
newer.chunkSeq(),
newer.isFinal(),
List.of());
return new PendingExchange(merged, receivedAt);
}
}
}
- Step 4: Run test to verify it passes
mvn test -pl cameleer-server-core -Dtest=ChunkAccumulatorTest
Expected: all 5 tests PASS.
- Step 5: Commit
git add cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/ChunkAccumulator.java \
cameleer-server-core/src/test/java/com/cameleer/server/core/ingestion/ChunkAccumulatorTest.java
git commit -m "feat(clickhouse): add ChunkAccumulator for chunked execution ingestion"
Task 5: ExecutionFlushScheduler + ChunkIngestionController
Files:
-
Create:
cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ExecutionFlushScheduler.java -
Create:
cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ChunkIngestionController.java -
Step 1: Implement ExecutionFlushScheduler
Follows MetricsFlushScheduler pattern. Drains two WriteBuffers (executions + processor batches) and calls ClickHouseExecutionStore. Also runs the stale sweep.
// cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ExecutionFlushScheduler.java
package com.cameleer.server.app.ingestion;
import com.cameleer.server.app.config.IngestionConfig;
import com.cameleer.server.app.storage.ClickHouseExecutionStore;
import com.cameleer.server.core.ingestion.ChunkAccumulator;
import com.cameleer.server.core.ingestion.MergedExecution;
import com.cameleer.server.core.ingestion.WriteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.List;
public class ExecutionFlushScheduler implements SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(ExecutionFlushScheduler.class);
private final WriteBuffer<MergedExecution> executionBuffer;
private final WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer;
private final ClickHouseExecutionStore executionStore;
private final ChunkAccumulator accumulator;
private final int batchSize;
private volatile boolean running = false;
public ExecutionFlushScheduler(WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer,
ClickHouseExecutionStore executionStore,
ChunkAccumulator accumulator,
IngestionConfig config) {
this.executionBuffer = executionBuffer;
this.processorBuffer = processorBuffer;
this.executionStore = executionStore;
this.accumulator = accumulator;
this.batchSize = config.getBatchSize();
}
@Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
public void flush() {
flushExecutions();
flushProcessors();
}
private void flushExecutions() {
try {
List<MergedExecution> batch = executionBuffer.drain(batchSize);
if (!batch.isEmpty()) {
executionStore.insertExecutionBatch(batch);
log.debug("Flushed {} executions to ClickHouse", batch.size());
}
} catch (Exception e) {
log.error("Failed to flush executions to ClickHouse", e);
}
}
private void flushProcessors() {
try {
List<ChunkAccumulator.ProcessorBatch> batches = processorBuffer.drain(batchSize);
for (ChunkAccumulator.ProcessorBatch batch : batches) {
if (!batch.processors().isEmpty()) {
executionStore.insertProcessorBatch(
batch.tenantId(), batch.executionId(),
batch.routeId(), batch.applicationName(),
batch.execStartTime(), batch.processors());
}
}
if (!batches.isEmpty()) {
int totalProcs = batches.stream().mapToInt(b -> b.processors().size()).sum();
log.debug("Flushed {} processor batches ({} records) to ClickHouse",
batches.size(), totalProcs);
}
} catch (Exception e) {
log.error("Failed to flush processors to ClickHouse", e);
}
}
@Scheduled(fixedDelay = 60_000)
public void sweepStale() {
try {
accumulator.sweepStale();
} catch (Exception e) {
log.error("Failed to sweep stale exchanges", e);
}
}
@Override public void start() { running = true; }
@Override
public void stop() {
flush();
running = false;
}
@Override public boolean isRunning() { return running; }
@Override public int getPhase() { return Integer.MAX_VALUE - 1; }
}
- Step 2: Implement ChunkIngestionController
// cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ChunkIngestionController.java
package com.cameleer.server.app.controller;
import com.cameleer.server.core.ingestion.ChunkAccumulator;
import com.cameleer.server.core.storage.model.ExecutionChunk;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/api/v1/data")
@Tag(name = "Ingestion", description = "Data ingestion endpoints")
public class ChunkIngestionController {
private final ChunkAccumulator accumulator;
public ChunkIngestionController(ChunkAccumulator accumulator) {
this.accumulator = accumulator;
}
@PostMapping("/chunks")
@Operation(summary = "Ingest execution chunk (single or array)")
public ResponseEntity<Void> ingestChunks(@RequestBody String body) {
try {
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper()
.registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule());
String trimmed = body.trim();
if (trimmed.startsWith("[")) {
List<ExecutionChunk> chunks = mapper.readValue(trimmed,
mapper.getTypeFactory().constructCollectionType(List.class, ExecutionChunk.class));
for (ExecutionChunk chunk : chunks) {
accumulator.onChunk(chunk);
}
} else {
ExecutionChunk chunk = mapper.readValue(trimmed, ExecutionChunk.class);
accumulator.onChunk(chunk);
}
return ResponseEntity.accepted().build();
} catch (Exception e) {
return ResponseEntity.badRequest().build();
}
}
}
- Step 3: Compile
mvn clean compile -pl cameleer-server-app
- Step 4: Commit
git add cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ExecutionFlushScheduler.java \
cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ChunkIngestionController.java
git commit -m "feat(clickhouse): add ExecutionFlushScheduler and ChunkIngestionController"
Task 6: ClickHouseSearchIndex
Files:
- Create:
cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexIT.java - Create:
cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseSearchIndex.java
Same as the original plan — implements SearchIndex using SQL against ClickHouse. The search query patterns are unchanged: _search_text LIKE '%term%' on executions, subquery join on processor_executions for body/header/error scoped searches.
This task is identical to Task 5 in the original plan. Refer to that task's complete code for the ClickHouseSearchIndex and ClickHouseSearchIndexIT implementations. The only difference is that processor_executions now uses seq/iteration columns instead of depth/loopIndex/etc., but the search queries only use _search_text, execution_id, input_body, output_body, input_headers, output_headers, error_message, and error_stacktrace — none of which changed.
- Step 1: Write the failing integration test
Use the same test class from the original plan's Task 5, Step 1. The test seeds data via ClickHouseExecutionStore using the new MergedExecution and FlatProcessorRecord types. Refer to the original plan for the complete test code.
- Step 2: Run test to verify it fails
mvn test -pl cameleer-server-app -Dtest=ClickHouseSearchIndexIT -Dfailsafe.provider=surefire -DfailIfNoTests=false
- Step 3: Implement ClickHouseSearchIndex
Use the same implementation from the original plan's Task 5, Step 3. The SQL queries and WHERE clause building are identical.
- Step 4: Run test to verify it passes
mvn test -pl cameleer-server-app -Dtest=ClickHouseSearchIndexIT -Dfailsafe.provider=surefire
- Step 5: Commit
git add cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseSearchIndex.java \
cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexIT.java
git commit -m "feat(clickhouse): add ClickHouseSearchIndex with ngram-accelerated SQL search"
Task 7: Feature Flag Wiring
Files:
- Modify:
cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java - Modify:
cameleer-server-app/src/main/java/com/cameleer/server/app/config/IngestionBeanConfig.java - Modify:
cameleer-server-app/src/main/java/com/cameleer/server/app/search/OpenSearchIndex.java - Modify:
cameleer-server-app/src/main/resources/application.yml - Modify:
deploy/base/server.yaml
Wire up the ChunkAccumulator, WriteBuffers, flush scheduler, and search switching.
- Step 1: Add execution + processor WriteBuffer beans to IngestionBeanConfig
// Add to IngestionBeanConfig.java
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public WriteBuffer<MergedExecution> executionBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
- Step 2: Add CH beans to StorageBeanConfig
// Add to StorageBeanConfig.java
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public ClickHouseExecutionStore clickHouseExecutionStore(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseExecutionStore(clickHouseJdbc);
}
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public ChunkAccumulator chunkAccumulator(
WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer) {
return new ChunkAccumulator(
executionBuffer::offer,
processorBatchBuffer::offer,
java.time.Duration.ofMinutes(5));
}
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public ExecutionFlushScheduler executionFlushScheduler(
WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
ClickHouseExecutionStore executionStore,
ChunkAccumulator accumulator,
IngestionConfig config) {
return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer,
executionStore, accumulator, config);
}
@Bean
@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse")
public SearchIndex clickHouseSearchIndex(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseSearchIndex(clickHouseJdbc);
}
- Step 3: Add ConditionalOnProperty to OpenSearchIndex
@Repository
@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "opensearch", matchIfMissing = true)
public class OpenSearchIndex implements SearchIndex {
- Step 4: Update application.yml
cameleer:
storage:
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
search: ${CAMELEER_STORAGE_SEARCH:opensearch}
- Step 5: Update deploy/base/server.yaml
Add env var:
- name: CAMELEER_STORAGE_SEARCH
value: "opensearch"
- Step 6: Compile and verify all tests pass
mvn clean verify -DskipITs
- Step 7: Commit
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java \
cameleer-server-app/src/main/java/com/cameleer/server/app/config/IngestionBeanConfig.java \
cameleer-server-app/src/main/java/com/cameleer/server/app/search/OpenSearchIndex.java \
cameleer-server-app/src/main/resources/application.yml \
deploy/base/server.yaml
git commit -m "feat(clickhouse): wire ChunkAccumulator, flush scheduler, and search feature flag"
Task 8: End-to-End Integration Test
Files:
- Create:
cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java
Validates the full pipeline: ChunkAccumulator → WriteBuffers → ClickHouseExecutionStore → ClickHouseSearchIndex.
- Step 1: Write the integration test
// cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java
package com.cameleer.server.app.storage;
import com.cameleer.server.app.search.ClickHouseSearchIndex;
import com.cameleer.server.core.ingestion.ChunkAccumulator;
import com.cameleer.server.core.ingestion.MergedExecution;
import com.cameleer.server.core.search.ExecutionSummary;
import com.cameleer.server.core.search.SearchRequest;
import com.cameleer.server.core.search.SearchResult;
import com.cameleer.server.core.storage.model.ExecutionChunk;
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class ClickHouseChunkPipelineIT {
@Container
static final ClickHouseContainer clickhouse =
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
private JdbcTemplate jdbc;
private ClickHouseExecutionStore executionStore;
private ClickHouseSearchIndex searchIndex;
private ChunkAccumulator accumulator;
private List<MergedExecution> executionBuffer;
private List<ChunkAccumulator.ProcessorBatch> processorBuffer;
@BeforeEach
void setUp() throws IOException {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(clickhouse.getJdbcUrl());
ds.setUsername(clickhouse.getUsername());
ds.setPassword(clickhouse.getPassword());
jdbc = new JdbcTemplate(ds);
String execDdl = new String(getClass().getResourceAsStream(
"/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8);
String procDdl = new String(getClass().getResourceAsStream(
"/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8);
jdbc.execute(execDdl);
jdbc.execute(procDdl);
jdbc.execute("TRUNCATE TABLE executions");
jdbc.execute("TRUNCATE TABLE processor_executions");
executionStore = new ClickHouseExecutionStore(jdbc);
searchIndex = new ClickHouseSearchIndex(jdbc);
executionBuffer = new ArrayList<>();
processorBuffer = new ArrayList<>();
accumulator = new ChunkAccumulator(executionBuffer::add, processorBuffer::add, Duration.ofMinutes(5));
}
@Test
void fullPipeline_chunkedIngestion_thenSearch() {
Instant start = Instant.parse("2026-03-31T12:00:00Z");
// Chunk 0: RUNNING with initial processors
accumulator.onChunk(new ExecutionChunk(
"pipeline-1", "order-service", "pod-1", "order-route",
"corr-1", "RUNNING",
start, null, null, "DEEP",
null, null, null, null, null, null,
Map.of("orderId", "ORD-123"),
null, null, null, null,
0, false,
List.of(
new FlatProcessorRecord(1, null, null, "log1", "log",
null, null, "COMPLETED", start, 2L,
null, null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null),
new FlatProcessorRecord(2, null, null, "split1", "split",
null, 3, "COMPLETED", start.plusMillis(2), 100L,
null, null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null),
new FlatProcessorRecord(3, 2, "split1", "to1", "to",
0, null, "COMPLETED", start.plusMillis(5), 30L,
"http://inventory/api",
"order ABC-123 check stock", "stock available",
null, null,
null, null, null, null, null, null,
null, null, null, null, null))));
// Processors should be flushed immediately
assertThat(processorBuffer).hasSize(1);
assertThat(executionBuffer).isEmpty();
// Chunk 1: COMPLETED (final)
accumulator.onChunk(new ExecutionChunk(
"pipeline-1", "order-service", "pod-1", "order-route",
"corr-1", "COMPLETED",
start, start.plusMillis(750), 750L, "DEEP",
null, null, null, null, null, null,
null, null, null, null, null,
1, true,
List.of(
new FlatProcessorRecord(4, 2, "split1", "to1", "to",
1, null, "COMPLETED", start.plusMillis(40), 25L,
"http://inventory/api",
"order DEF-456 check stock", "stock available",
null, null,
null, null, null, null, null, null,
null, null, null, null, null))));
assertThat(executionBuffer).hasSize(1);
assertThat(processorBuffer).hasSize(2);
// Flush to ClickHouse
executionStore.insertExecutionBatch(executionBuffer);
for (ChunkAccumulator.ProcessorBatch batch : processorBuffer) {
executionStore.insertProcessorBatch(
batch.tenantId(), batch.executionId(),
batch.routeId(), batch.applicationName(),
batch.execStartTime(), batch.processors());
}
// Search by order ID in attributes
SearchResult<ExecutionSummary> result = searchIndex.search(new SearchRequest(
null, null, null, null, null, null,
"ORD-123", null, null, null,
null, null, null, null, null,
0, 50, null, null));
assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1");
assertThat(result.data().get(0).status()).isEqualTo("COMPLETED");
assertThat(result.data().get(0).durationMs()).isEqualTo(750L);
// Search in processor body
SearchResult<ExecutionSummary> bodyResult = searchIndex.search(new SearchRequest(
null, null, null, null, null, null,
null, "ABC-123", null, null,
null, null, null, null, null,
0, 50, null, null));
assertThat(bodyResult.total()).isEqualTo(1);
// Verify iteration data in processor_executions
Integer iterSize = jdbc.queryForObject(
"SELECT iteration_size FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 2",
Integer.class);
assertThat(iterSize).isEqualTo(3);
Integer iter0 = jdbc.queryForObject(
"SELECT iteration FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 3",
Integer.class);
assertThat(iter0).isEqualTo(0);
}
}
- Step 2: Run the integration test
mvn test -pl cameleer-server-app -Dtest=ClickHouseChunkPipelineIT -Dfailsafe.provider=surefire
Expected: PASS.
- Step 3: Commit
git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java
git commit -m "test(clickhouse): add end-to-end chunk pipeline integration test"
Verification Checklist
After all tasks are complete, verify:
- Chunk ingestion:
POST /api/v1/data/chunksaccepts single and array ExecutionChunks - Processor immediate insert: Processor records are inserted as chunks arrive (append-only)
- Envelope accumulation: Multiple non-final chunks merge envelope data correctly
- Final flush: Final chunk triggers execution row write with version=1
- Stale sweep: Exchanges without final chunk for 5 minutes are flushed as RUNNING
- Search: All filter types work: status, time range, duration, correlation ID, application, text, textInBody, textInHeaders, textInErrors
- Highlighting: Search results include 120-char context snippets
- Feature flag:
cameleer.storage.search=opensearchuses OpenSearch,=clickhouseuses ClickHouse - Backward compat: With
clickhouse.enabled=false, server starts without CH beans (PG + OpenSearch only) - seq/parentSeq: Processor records correctly store seq, parentSeq, iteration, iterationSize
- CI:
mvn clean verify -DskipITspasses