diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java
new file mode 100644
index 00000000..35eccbb9
--- /dev/null
+++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java
@@ -0,0 +1,205 @@
+package com.cameleer3.server.core.ingestion;
+
+import com.cameleer3.server.core.storage.model.ExecutionChunk;
+import com.cameleer3.server.core.storage.model.FlatProcessorRecord;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+/**
+ * Accumulates {@link ExecutionChunk} documents and produces:
+ *
+ * - {@link ProcessorBatch} — pushed immediately for each chunk (append-only)
+ * - {@link MergedExecution} — pushed when the final chunk arrives or on stale sweep
+ *
+ */
+public class ChunkAccumulator {
+
+ private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class);
+ private static final String DEFAULT_TENANT = "default";
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private final Consumer executionSink;
+ private final Consumer processorSink;
+ private final Duration staleThreshold;
+ private final ConcurrentHashMap pending = new ConcurrentHashMap<>();
+
+ public ChunkAccumulator(Consumer executionSink,
+ Consumer processorSink,
+ Duration staleThreshold) {
+ this.executionSink = executionSink;
+ this.processorSink = processorSink;
+ this.staleThreshold = staleThreshold;
+ }
+
+ /**
+ * Process an incoming chunk: push processors immediately,
+ * buffer/merge the envelope, and emit when final.
+ */
+ public void onChunk(ExecutionChunk chunk) {
+ // 1. Push processor records immediately (append-only)
+ if (chunk.processors() != null && !chunk.processors().isEmpty()) {
+ processorSink.accept(new ProcessorBatch(
+ DEFAULT_TENANT,
+ chunk.exchangeId(),
+ chunk.routeId(),
+ chunk.applicationName(),
+ chunk.startTime(),
+ chunk.processors()));
+ }
+
+ // 2. Buffer/merge the exchange envelope
+ if (chunk.isFinal()) {
+ // Merge with any pending envelope, then emit
+ PendingExchange existing = pending.remove(chunk.exchangeId());
+ ExecutionChunk merged = existing != null
+ ? mergeEnvelopes(existing.envelope(), chunk)
+ : chunk;
+ executionSink.accept(toMergedExecution(merged));
+ } else {
+ // Buffer the envelope for later merging
+ pending.merge(chunk.exchangeId(),
+ new PendingExchange(chunk, Instant.now()),
+ (old, incoming) -> new PendingExchange(
+ mergeEnvelopes(old.envelope(), incoming.envelope()),
+ old.receivedAt()));
+ }
+ }
+
+ /**
+ * Flush exchanges that have been pending longer than the stale threshold.
+ * Called periodically by a scheduled task.
+ */
+ public void sweepStale() {
+ Instant cutoff = Instant.now().minus(staleThreshold);
+ pending.forEach((exchangeId, pe) -> {
+ if (pe.receivedAt().isBefore(cutoff)) {
+ PendingExchange removed = pending.remove(exchangeId);
+ if (removed != null) {
+ log.info("Flushing stale exchange {} (pending since {})",
+ exchangeId, removed.receivedAt());
+ executionSink.accept(toMergedExecution(removed.envelope()));
+ }
+ }
+ });
+ }
+
+ /** Number of exchanges awaiting a final chunk. */
+ public int getPendingCount() {
+ return pending.size();
+ }
+
+ // ---- Merge logic ----
+
+ /**
+ * COALESCE merge: for each field, prefer the newer value if non-null, else keep older.
+ * The newer chunk (higher chunkSeq) takes precedence for status, endTime, durationMs.
+ */
+ private static ExecutionChunk mergeEnvelopes(ExecutionChunk older, ExecutionChunk newer) {
+ return new ExecutionChunk(
+ coalesce(newer.exchangeId(), older.exchangeId()),
+ coalesce(newer.applicationName(), older.applicationName()),
+ coalesce(newer.agentId(), older.agentId()),
+ coalesce(newer.routeId(), older.routeId()),
+ coalesce(newer.correlationId(), older.correlationId()),
+ coalesce(newer.status(), older.status()),
+ coalesce(older.startTime(), newer.startTime()), // prefer earliest startTime
+ coalesce(newer.endTime(), older.endTime()),
+ coalesce(newer.durationMs(), older.durationMs()),
+ coalesce(newer.engineLevel(), older.engineLevel()),
+ coalesce(newer.errorMessage(), older.errorMessage()),
+ coalesce(newer.errorStackTrace(), older.errorStackTrace()),
+ coalesce(newer.errorType(), older.errorType()),
+ coalesce(newer.errorCategory(), older.errorCategory()),
+ coalesce(newer.rootCauseType(), older.rootCauseType()),
+ coalesce(newer.rootCauseMessage(), older.rootCauseMessage()),
+ coalesce(newer.attributes(), older.attributes()),
+ coalesce(newer.traceId(), older.traceId()),
+ coalesce(newer.spanId(), older.spanId()),
+ coalesce(newer.originalExchangeId(), older.originalExchangeId()),
+ coalesce(newer.replayExchangeId(), older.replayExchangeId()),
+ Math.max(newer.chunkSeq(), older.chunkSeq()),
+ newer.isFinal() || older.isFinal(),
+ List.of() // processors are handled separately
+ );
+ }
+
+ private static T coalesce(T a, T b) {
+ return a != null ? a : b;
+ }
+
+ // ---- Conversion to MergedExecution ----
+
+ private static MergedExecution toMergedExecution(ExecutionChunk envelope) {
+ return new MergedExecution(
+ DEFAULT_TENANT,
+ 1L,
+ envelope.exchangeId(),
+ envelope.routeId(),
+ envelope.agentId(),
+ envelope.applicationName(),
+ envelope.status(),
+ envelope.correlationId(),
+ envelope.exchangeId(),
+ envelope.startTime(),
+ envelope.endTime(),
+ envelope.durationMs(),
+ envelope.errorMessage(),
+ envelope.errorStackTrace(),
+ envelope.errorType(),
+ envelope.errorCategory(),
+ envelope.rootCauseType(),
+ envelope.rootCauseMessage(),
+ "", // diagramContentHash — server-side lookup, not in chunk
+ envelope.engineLevel(),
+ "", // inputBody — on processor records now
+ "", // outputBody
+ "", // inputHeaders
+ "", // outputHeaders
+ serializeAttributes(envelope.attributes()),
+ envelope.traceId(),
+ envelope.spanId(),
+ false, // hasTraceData — not tracked at envelope level
+ envelope.replayExchangeId() != null // isReplay
+ );
+ }
+
+ private static String serializeAttributes(Map attributes) {
+ if (attributes == null || attributes.isEmpty()) {
+ return "{}";
+ }
+ try {
+ return MAPPER.writeValueAsString(attributes);
+ } catch (JsonProcessingException e) {
+ log.warn("Failed to serialize attributes, falling back to empty object", e);
+ return "{}";
+ }
+ }
+
+ // ---- Inner types ----
+
+ /**
+ * A batch of processor records from a single chunk, ready for ClickHouse insertion.
+ */
+ public record ProcessorBatch(
+ String tenantId,
+ String executionId,
+ String routeId,
+ String applicationName,
+ Instant execStartTime,
+ List processors
+ ) {}
+
+ /**
+ * Envelope buffered while waiting for the final chunk.
+ */
+ private record PendingExchange(ExecutionChunk envelope, Instant receivedAt) {}
+}
diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java
new file mode 100644
index 00000000..fd9cdca8
--- /dev/null
+++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java
@@ -0,0 +1,226 @@
+package com.cameleer3.server.core.ingestion;
+
+import com.cameleer3.server.core.storage.model.ExecutionChunk;
+import com.cameleer3.server.core.storage.model.FlatProcessorRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ChunkAccumulatorTest {
+
+ private CopyOnWriteArrayList executionSink;
+ private CopyOnWriteArrayList processorSink;
+ private ChunkAccumulator accumulator;
+
+ @BeforeEach
+ void setUp() {
+ executionSink = new CopyOnWriteArrayList<>();
+ processorSink = new CopyOnWriteArrayList<>();
+ accumulator = new ChunkAccumulator(
+ executionSink::add, processorSink::add, Duration.ofMinutes(5));
+ }
+
+ @Test
+ void singleFinalChunk_producesExecutionAndProcessors() {
+ ExecutionChunk chunk = new ExecutionChunk(
+ "ex-1", "order-service", "agent-1", "route-1",
+ "corr-1", "COMPLETED",
+ Instant.parse("2026-03-31T10:00:00Z"),
+ Instant.parse("2026-03-31T10:00:01Z"), 1000L,
+ "REGULAR",
+ null, null, null, null, null, null,
+ Map.of("orderId", "ORD-1"),
+ "trace-1", "span-1", null, null,
+ 0, true,
+ List.of(proc(1, null, "log1", "log", "COMPLETED", 5L)));
+
+ accumulator.onChunk(chunk);
+
+ // Processor sink should receive 1 batch with 1 record
+ assertThat(processorSink).hasSize(1);
+ ChunkAccumulator.ProcessorBatch batch = processorSink.get(0);
+ assertThat(batch.tenantId()).isEqualTo("default");
+ assertThat(batch.executionId()).isEqualTo("ex-1");
+ assertThat(batch.routeId()).isEqualTo("route-1");
+ assertThat(batch.applicationName()).isEqualTo("order-service");
+ assertThat(batch.execStartTime()).isEqualTo(Instant.parse("2026-03-31T10:00:00Z"));
+ assertThat(batch.processors()).hasSize(1);
+
+ // Execution sink should receive 1 merged execution
+ assertThat(executionSink).hasSize(1);
+ MergedExecution exec = executionSink.get(0);
+ assertThat(exec.tenantId()).isEqualTo("default");
+ assertThat(exec.version()).isEqualTo(1L);
+ assertThat(exec.executionId()).isEqualTo("ex-1");
+ assertThat(exec.routeId()).isEqualTo("route-1");
+ assertThat(exec.status()).isEqualTo("COMPLETED");
+ assertThat(exec.durationMs()).isEqualTo(1000L);
+ assertThat(exec.traceId()).isEqualTo("trace-1");
+ assertThat(exec.spanId()).isEqualTo("span-1");
+ assertThat(exec.attributes()).contains("orderId");
+ }
+
+ @Test
+ void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() {
+ ExecutionChunk chunk0 = new ExecutionChunk(
+ "ex-2", "app", "agent-1", "route-1",
+ "ex-2", "RUNNING",
+ Instant.parse("2026-03-31T10:00:00Z"),
+ null, null, "REGULAR",
+ null, null, null, null, null, null,
+ null, null, null, null, null,
+ 0, false,
+ List.of(
+ proc(1, null, "log1", "log", "COMPLETED", 5L),
+ proc(2, null, "log2", "log", "COMPLETED", 3L)));
+
+ accumulator.onChunk(chunk0);
+
+ // Processors pushed immediately on chunk 0
+ assertThat(processorSink).hasSize(1);
+ assertThat(processorSink.get(0).processors()).hasSize(2);
+
+ // No execution yet (not final)
+ assertThat(executionSink).isEmpty();
+
+ ExecutionChunk chunk1 = new ExecutionChunk(
+ "ex-2", "app", "agent-1", "route-1",
+ "ex-2", "COMPLETED",
+ Instant.parse("2026-03-31T10:00:00Z"),
+ Instant.parse("2026-03-31T10:00:02Z"), 2000L,
+ "REGULAR",
+ null, null, null, null, null, null,
+ null, null, null, null, null,
+ 1, true,
+ List.of(proc(3, null, "log3", "log", "COMPLETED", 7L)));
+
+ accumulator.onChunk(chunk1);
+
+ // Processors from chunk 1 also pushed
+ assertThat(processorSink).hasSize(2);
+ assertThat(processorSink.get(1).processors()).hasSize(1);
+
+ // Now execution is emitted
+ assertThat(executionSink).hasSize(1);
+ MergedExecution exec = executionSink.get(0);
+ assertThat(exec.status()).isEqualTo("COMPLETED");
+ assertThat(exec.durationMs()).isEqualTo(2000L);
+ }
+
+ @Test
+ void staleExchange_flushedBySweep() throws Exception {
+ ChunkAccumulator staleAccumulator = new ChunkAccumulator(
+ executionSink::add, processorSink::add, Duration.ofMillis(1));
+
+ ExecutionChunk chunk = new ExecutionChunk(
+ "ex-3", "app", "agent-1", "route-1",
+ "ex-3", "RUNNING",
+ Instant.parse("2026-03-31T10:00:00Z"),
+ null, null, "REGULAR",
+ null, null, null, null, null, null,
+ null, null, null, null, null,
+ 0, false,
+ List.of());
+
+ staleAccumulator.onChunk(chunk);
+ assertThat(executionSink).isEmpty();
+
+ Thread.sleep(5);
+ staleAccumulator.sweepStale();
+
+ assertThat(executionSink).hasSize(1);
+ MergedExecution exec = executionSink.get(0);
+ assertThat(exec.status()).isEqualTo("RUNNING");
+ assertThat(exec.executionId()).isEqualTo("ex-3");
+ }
+
+ @Test
+ void finalChunkWithErrors_populatesErrorFields() {
+ ExecutionChunk chunk = new ExecutionChunk(
+ "ex-4", "app", "agent-1", "route-1",
+ "ex-4", "FAILED",
+ Instant.parse("2026-03-31T10:00:00Z"),
+ Instant.parse("2026-03-31T10:00:01Z"), 1000L,
+ "REGULAR",
+ "NullPointerException", "at com.foo.Bar.baz(Bar.java:42)",
+ "NullPointerException", "RUNTIME",
+ "NullPointerException", "null value at index 0",
+ null, null, null, null, null,
+ 0, true,
+ List.of());
+
+ accumulator.onChunk(chunk);
+
+ assertThat(executionSink).hasSize(1);
+ MergedExecution exec = executionSink.get(0);
+ assertThat(exec.status()).isEqualTo("FAILED");
+ assertThat(exec.errorMessage()).isEqualTo("NullPointerException");
+ assertThat(exec.errorStacktrace()).isEqualTo("at com.foo.Bar.baz(Bar.java:42)");
+ assertThat(exec.errorType()).isEqualTo("NullPointerException");
+ assertThat(exec.errorCategory()).isEqualTo("RUNTIME");
+ assertThat(exec.rootCauseType()).isEqualTo("NullPointerException");
+ assertThat(exec.rootCauseMessage()).isEqualTo("null value at index 0");
+ }
+
+ @Test
+ void getPendingCount_tracksBufferedExchanges() {
+ ExecutionChunk running1 = new ExecutionChunk(
+ "ex-5", "app", "agent-1", "route-1",
+ "ex-5", "RUNNING",
+ Instant.parse("2026-03-31T10:00:00Z"),
+ null, null, "REGULAR",
+ null, null, null, null, null, null,
+ null, null, null, null, null,
+ 0, false,
+ List.of());
+
+ ExecutionChunk running2 = new ExecutionChunk(
+ "ex-6", "app", "agent-1", "route-2",
+ "ex-6", "RUNNING",
+ Instant.parse("2026-03-31T10:00:00Z"),
+ null, null, "REGULAR",
+ null, null, null, null, null, null,
+ null, null, null, null, null,
+ 0, false,
+ List.of());
+
+ accumulator.onChunk(running1);
+ accumulator.onChunk(running2);
+ assertThat(accumulator.getPendingCount()).isEqualTo(2);
+
+ // Send final for ex-5
+ ExecutionChunk final5 = new ExecutionChunk(
+ "ex-5", "app", "agent-1", "route-1",
+ "ex-5", "COMPLETED",
+ Instant.parse("2026-03-31T10:00:00Z"),
+ Instant.parse("2026-03-31T10:00:01Z"), 1000L,
+ "REGULAR",
+ null, null, null, null, null, null,
+ null, null, null, null, null,
+ 1, true,
+ List.of());
+
+ accumulator.onChunk(final5);
+ assertThat(accumulator.getPendingCount()).isEqualTo(1);
+ }
+
+ /** Helper to create a FlatProcessorRecord with minimal fields. */
+ private static FlatProcessorRecord proc(int seq, Integer parentSeq,
+ String processorId, String processorType,
+ String status, long durationMs) {
+ return new FlatProcessorRecord(
+ seq, parentSeq, null, processorId, processorType,
+ null, null, status,
+ Instant.parse("2026-03-31T10:00:00.100Z"), durationMs,
+ null, null, null, null, null,
+ null, null, null, null, null, null,
+ null, null, null, null, null);
+ }
+}