From a7d256b38a30732cdaec473ff1763edf4f3a2596 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 1 Apr 2026 21:04:34 +0200 Subject: [PATCH] fix: compute hasTraceData from processor records in chunk accumulator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The chunked ingestion path hardcoded hasTraceData=false because the execution envelope doesn't carry processor bodies. But the processor records DO have inputBody/outputBody — we just need to check them. Track hasTraceData across chunks in PendingExchange and pass it to MergedExecution when the final chunk arrives or on stale sweep. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../core/ingestion/ChunkAccumulator.java | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java index a36c5127..6134a1de 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java @@ -50,6 +50,7 @@ public class ChunkAccumulator { */ public void onChunk(ExecutionChunk chunk) { // 1. Push processor records immediately (append-only) + boolean chunkHasTrace = false; if (chunk.getProcessors() != null && !chunk.getProcessors().isEmpty()) { processorSink.accept(new ProcessorBatch( DEFAULT_TENANT, @@ -58,6 +59,8 @@ public class ChunkAccumulator { chunk.getApplicationId(), chunk.getStartTime(), chunk.getProcessors())); + chunkHasTrace = chunk.getProcessors().stream() + .anyMatch(p -> isNonEmpty(p.getInputBody()) || isNonEmpty(p.getOutputBody())); } // 2. Buffer/merge the exchange envelope @@ -67,17 +70,24 @@ public class ChunkAccumulator { ExecutionChunk merged = existing != null ? mergeEnvelopes(existing.envelope(), chunk) : chunk; - executionSink.accept(toMergedExecution(merged)); + boolean hasTrace = chunkHasTrace || (existing != null && existing.hasTraceData()); + executionSink.accept(toMergedExecution(merged, hasTrace)); } else { // Buffer the envelope for later merging + boolean trace = chunkHasTrace; pending.merge(chunk.getExchangeId(), - new PendingExchange(chunk, Instant.now()), + new PendingExchange(chunk, Instant.now(), trace), (old, incoming) -> new PendingExchange( mergeEnvelopes(old.envelope(), incoming.envelope()), - old.receivedAt())); + old.receivedAt(), + old.hasTraceData() || incoming.hasTraceData())); } } + private static boolean isNonEmpty(String s) { + return s != null && !s.isEmpty(); + } + /** * Flush exchanges that have been pending longer than the stale threshold. * Called periodically by a scheduled task. @@ -90,7 +100,7 @@ public class ChunkAccumulator { if (removed != null) { log.info("Flushing stale exchange {} (pending since {})", exchangeId, removed.receivedAt()); - executionSink.accept(toMergedExecution(removed.envelope())); + executionSink.accept(toMergedExecution(removed.envelope(), removed.hasTraceData())); } } }); @@ -142,7 +152,7 @@ public class ChunkAccumulator { // ---- Conversion to MergedExecution ---- - private MergedExecution toMergedExecution(ExecutionChunk envelope) { + private MergedExecution toMergedExecution(ExecutionChunk envelope, boolean hasTraceData) { String diagramHash = ""; try { diagramHash = diagramStore @@ -179,7 +189,7 @@ public class ChunkAccumulator { serializeAttributes(envelope.getAttributes()), envelope.getTraceId(), envelope.getSpanId(), - false, // hasTraceData — not tracked at envelope level + hasTraceData, envelope.getReplayExchangeId() != null, // isReplay envelope.getOriginalExchangeId(), envelope.getReplayExchangeId() @@ -215,5 +225,5 @@ public class ChunkAccumulator { /** * Envelope buffered while waiting for the final chunk. */ - private record PendingExchange(ExecutionChunk envelope, Instant receivedAt) {} + private record PendingExchange(ExecutionChunk envelope, Instant receivedAt, boolean hasTraceData) {} }