From 62420cf0c21197c9952f9d800cfb8b92e8092b75 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 19:10:21 +0200 Subject: [PATCH] feat(clickhouse): add ChunkAccumulator for chunked execution ingestion Co-Authored-By: Claude Opus 4.6 (1M context) --- .../core/ingestion/ChunkAccumulator.java | 205 ++++++++++++++++ .../core/ingestion/ChunkAccumulatorTest.java | 226 ++++++++++++++++++ 2 files changed, 431 insertions(+) create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java create mode 100644 cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java new file mode 100644 index 00000000..35eccbb9 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java @@ -0,0 +1,205 @@ +package com.cameleer3.server.core.ingestion; + +import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +/** + * Accumulates {@link ExecutionChunk} documents and produces: + * + */ +public class ChunkAccumulator { + + private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class); + private static final String DEFAULT_TENANT = "default"; + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final Consumer executionSink; + private final Consumer processorSink; + private final Duration staleThreshold; + private final ConcurrentHashMap pending = new ConcurrentHashMap<>(); + + public ChunkAccumulator(Consumer executionSink, + Consumer processorSink, + Duration staleThreshold) { + this.executionSink = executionSink; + this.processorSink = processorSink; + this.staleThreshold = staleThreshold; + } + + /** + * Process an incoming chunk: push processors immediately, + * buffer/merge the envelope, and emit when final. + */ + public void onChunk(ExecutionChunk chunk) { + // 1. Push processor records immediately (append-only) + if (chunk.processors() != null && !chunk.processors().isEmpty()) { + processorSink.accept(new ProcessorBatch( + DEFAULT_TENANT, + chunk.exchangeId(), + chunk.routeId(), + chunk.applicationName(), + chunk.startTime(), + chunk.processors())); + } + + // 2. Buffer/merge the exchange envelope + if (chunk.isFinal()) { + // Merge with any pending envelope, then emit + PendingExchange existing = pending.remove(chunk.exchangeId()); + ExecutionChunk merged = existing != null + ? mergeEnvelopes(existing.envelope(), chunk) + : chunk; + executionSink.accept(toMergedExecution(merged)); + } else { + // Buffer the envelope for later merging + pending.merge(chunk.exchangeId(), + new PendingExchange(chunk, Instant.now()), + (old, incoming) -> new PendingExchange( + mergeEnvelopes(old.envelope(), incoming.envelope()), + old.receivedAt())); + } + } + + /** + * Flush exchanges that have been pending longer than the stale threshold. + * Called periodically by a scheduled task. + */ + public void sweepStale() { + Instant cutoff = Instant.now().minus(staleThreshold); + pending.forEach((exchangeId, pe) -> { + if (pe.receivedAt().isBefore(cutoff)) { + PendingExchange removed = pending.remove(exchangeId); + if (removed != null) { + log.info("Flushing stale exchange {} (pending since {})", + exchangeId, removed.receivedAt()); + executionSink.accept(toMergedExecution(removed.envelope())); + } + } + }); + } + + /** Number of exchanges awaiting a final chunk. */ + public int getPendingCount() { + return pending.size(); + } + + // ---- Merge logic ---- + + /** + * COALESCE merge: for each field, prefer the newer value if non-null, else keep older. + * The newer chunk (higher chunkSeq) takes precedence for status, endTime, durationMs. + */ + private static ExecutionChunk mergeEnvelopes(ExecutionChunk older, ExecutionChunk newer) { + return new ExecutionChunk( + coalesce(newer.exchangeId(), older.exchangeId()), + coalesce(newer.applicationName(), older.applicationName()), + coalesce(newer.agentId(), older.agentId()), + coalesce(newer.routeId(), older.routeId()), + coalesce(newer.correlationId(), older.correlationId()), + coalesce(newer.status(), older.status()), + coalesce(older.startTime(), newer.startTime()), // prefer earliest startTime + coalesce(newer.endTime(), older.endTime()), + coalesce(newer.durationMs(), older.durationMs()), + coalesce(newer.engineLevel(), older.engineLevel()), + coalesce(newer.errorMessage(), older.errorMessage()), + coalesce(newer.errorStackTrace(), older.errorStackTrace()), + coalesce(newer.errorType(), older.errorType()), + coalesce(newer.errorCategory(), older.errorCategory()), + coalesce(newer.rootCauseType(), older.rootCauseType()), + coalesce(newer.rootCauseMessage(), older.rootCauseMessage()), + coalesce(newer.attributes(), older.attributes()), + coalesce(newer.traceId(), older.traceId()), + coalesce(newer.spanId(), older.spanId()), + coalesce(newer.originalExchangeId(), older.originalExchangeId()), + coalesce(newer.replayExchangeId(), older.replayExchangeId()), + Math.max(newer.chunkSeq(), older.chunkSeq()), + newer.isFinal() || older.isFinal(), + List.of() // processors are handled separately + ); + } + + private static T coalesce(T a, T b) { + return a != null ? a : b; + } + + // ---- Conversion to MergedExecution ---- + + private static MergedExecution toMergedExecution(ExecutionChunk envelope) { + return new MergedExecution( + DEFAULT_TENANT, + 1L, + envelope.exchangeId(), + envelope.routeId(), + envelope.agentId(), + envelope.applicationName(), + envelope.status(), + envelope.correlationId(), + envelope.exchangeId(), + envelope.startTime(), + envelope.endTime(), + envelope.durationMs(), + envelope.errorMessage(), + envelope.errorStackTrace(), + envelope.errorType(), + envelope.errorCategory(), + envelope.rootCauseType(), + envelope.rootCauseMessage(), + "", // diagramContentHash — server-side lookup, not in chunk + envelope.engineLevel(), + "", // inputBody — on processor records now + "", // outputBody + "", // inputHeaders + "", // outputHeaders + serializeAttributes(envelope.attributes()), + envelope.traceId(), + envelope.spanId(), + false, // hasTraceData — not tracked at envelope level + envelope.replayExchangeId() != null // isReplay + ); + } + + private static String serializeAttributes(Map attributes) { + if (attributes == null || attributes.isEmpty()) { + return "{}"; + } + try { + return MAPPER.writeValueAsString(attributes); + } catch (JsonProcessingException e) { + log.warn("Failed to serialize attributes, falling back to empty object", e); + return "{}"; + } + } + + // ---- Inner types ---- + + /** + * A batch of processor records from a single chunk, ready for ClickHouse insertion. + */ + public record ProcessorBatch( + String tenantId, + String executionId, + String routeId, + String applicationName, + Instant execStartTime, + List processors + ) {} + + /** + * Envelope buffered while waiting for the final chunk. + */ + private record PendingExchange(ExecutionChunk envelope, Instant receivedAt) {} +} diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java new file mode 100644 index 00000000..fd9cdca8 --- /dev/null +++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java @@ -0,0 +1,226 @@ +package com.cameleer3.server.core.ingestion; + +import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +import static org.assertj.core.api.Assertions.assertThat; + +class ChunkAccumulatorTest { + + private CopyOnWriteArrayList executionSink; + private CopyOnWriteArrayList processorSink; + private ChunkAccumulator accumulator; + + @BeforeEach + void setUp() { + executionSink = new CopyOnWriteArrayList<>(); + processorSink = new CopyOnWriteArrayList<>(); + accumulator = new ChunkAccumulator( + executionSink::add, processorSink::add, Duration.ofMinutes(5)); + } + + @Test + void singleFinalChunk_producesExecutionAndProcessors() { + ExecutionChunk chunk = new ExecutionChunk( + "ex-1", "order-service", "agent-1", "route-1", + "corr-1", "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + "REGULAR", + null, null, null, null, null, null, + Map.of("orderId", "ORD-1"), + "trace-1", "span-1", null, null, + 0, true, + List.of(proc(1, null, "log1", "log", "COMPLETED", 5L))); + + accumulator.onChunk(chunk); + + // Processor sink should receive 1 batch with 1 record + assertThat(processorSink).hasSize(1); + ChunkAccumulator.ProcessorBatch batch = processorSink.get(0); + assertThat(batch.tenantId()).isEqualTo("default"); + assertThat(batch.executionId()).isEqualTo("ex-1"); + assertThat(batch.routeId()).isEqualTo("route-1"); + assertThat(batch.applicationName()).isEqualTo("order-service"); + assertThat(batch.execStartTime()).isEqualTo(Instant.parse("2026-03-31T10:00:00Z")); + assertThat(batch.processors()).hasSize(1); + + // Execution sink should receive 1 merged execution + assertThat(executionSink).hasSize(1); + MergedExecution exec = executionSink.get(0); + assertThat(exec.tenantId()).isEqualTo("default"); + assertThat(exec.version()).isEqualTo(1L); + assertThat(exec.executionId()).isEqualTo("ex-1"); + assertThat(exec.routeId()).isEqualTo("route-1"); + assertThat(exec.status()).isEqualTo("COMPLETED"); + assertThat(exec.durationMs()).isEqualTo(1000L); + assertThat(exec.traceId()).isEqualTo("trace-1"); + assertThat(exec.spanId()).isEqualTo("span-1"); + assertThat(exec.attributes()).contains("orderId"); + } + + @Test + void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() { + ExecutionChunk chunk0 = new ExecutionChunk( + "ex-2", "app", "agent-1", "route-1", + "ex-2", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 0, false, + List.of( + proc(1, null, "log1", "log", "COMPLETED", 5L), + proc(2, null, "log2", "log", "COMPLETED", 3L))); + + accumulator.onChunk(chunk0); + + // Processors pushed immediately on chunk 0 + assertThat(processorSink).hasSize(1); + assertThat(processorSink.get(0).processors()).hasSize(2); + + // No execution yet (not final) + assertThat(executionSink).isEmpty(); + + ExecutionChunk chunk1 = new ExecutionChunk( + "ex-2", "app", "agent-1", "route-1", + "ex-2", "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:02Z"), 2000L, + "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 1, true, + List.of(proc(3, null, "log3", "log", "COMPLETED", 7L))); + + accumulator.onChunk(chunk1); + + // Processors from chunk 1 also pushed + assertThat(processorSink).hasSize(2); + assertThat(processorSink.get(1).processors()).hasSize(1); + + // Now execution is emitted + assertThat(executionSink).hasSize(1); + MergedExecution exec = executionSink.get(0); + assertThat(exec.status()).isEqualTo("COMPLETED"); + assertThat(exec.durationMs()).isEqualTo(2000L); + } + + @Test + void staleExchange_flushedBySweep() throws Exception { + ChunkAccumulator staleAccumulator = new ChunkAccumulator( + executionSink::add, processorSink::add, Duration.ofMillis(1)); + + ExecutionChunk chunk = new ExecutionChunk( + "ex-3", "app", "agent-1", "route-1", + "ex-3", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 0, false, + List.of()); + + staleAccumulator.onChunk(chunk); + assertThat(executionSink).isEmpty(); + + Thread.sleep(5); + staleAccumulator.sweepStale(); + + assertThat(executionSink).hasSize(1); + MergedExecution exec = executionSink.get(0); + assertThat(exec.status()).isEqualTo("RUNNING"); + assertThat(exec.executionId()).isEqualTo("ex-3"); + } + + @Test + void finalChunkWithErrors_populatesErrorFields() { + ExecutionChunk chunk = new ExecutionChunk( + "ex-4", "app", "agent-1", "route-1", + "ex-4", "FAILED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + "REGULAR", + "NullPointerException", "at com.foo.Bar.baz(Bar.java:42)", + "NullPointerException", "RUNTIME", + "NullPointerException", "null value at index 0", + null, null, null, null, null, + 0, true, + List.of()); + + accumulator.onChunk(chunk); + + assertThat(executionSink).hasSize(1); + MergedExecution exec = executionSink.get(0); + assertThat(exec.status()).isEqualTo("FAILED"); + assertThat(exec.errorMessage()).isEqualTo("NullPointerException"); + assertThat(exec.errorStacktrace()).isEqualTo("at com.foo.Bar.baz(Bar.java:42)"); + assertThat(exec.errorType()).isEqualTo("NullPointerException"); + assertThat(exec.errorCategory()).isEqualTo("RUNTIME"); + assertThat(exec.rootCauseType()).isEqualTo("NullPointerException"); + assertThat(exec.rootCauseMessage()).isEqualTo("null value at index 0"); + } + + @Test + void getPendingCount_tracksBufferedExchanges() { + ExecutionChunk running1 = new ExecutionChunk( + "ex-5", "app", "agent-1", "route-1", + "ex-5", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 0, false, + List.of()); + + ExecutionChunk running2 = new ExecutionChunk( + "ex-6", "app", "agent-1", "route-2", + "ex-6", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 0, false, + List.of()); + + accumulator.onChunk(running1); + accumulator.onChunk(running2); + assertThat(accumulator.getPendingCount()).isEqualTo(2); + + // Send final for ex-5 + ExecutionChunk final5 = new ExecutionChunk( + "ex-5", "app", "agent-1", "route-1", + "ex-5", "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 1, true, + List.of()); + + accumulator.onChunk(final5); + assertThat(accumulator.getPendingCount()).isEqualTo(1); + } + + /** Helper to create a FlatProcessorRecord with minimal fields. */ + private static FlatProcessorRecord proc(int seq, Integer parentSeq, + String processorId, String processorType, + String status, long durationMs) { + return new FlatProcessorRecord( + seq, parentSeq, null, processorId, processorType, + null, null, status, + Instant.parse("2026-03-31T10:00:00.100Z"), durationMs, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null); + } +}