fix: compute hasTraceData from processor records in chunk accumulator
The chunked ingestion path hardcoded hasTraceData=false because the execution envelope doesn't carry processor bodies. But the processor records DO have inputBody/outputBody — we just need to check them. Track hasTraceData across chunks in PendingExchange and pass it to MergedExecution when the final chunk arrives or on stale sweep. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -50,6 +50,7 @@ public class ChunkAccumulator {
|
|||||||
*/
|
*/
|
||||||
public void onChunk(ExecutionChunk chunk) {
|
public void onChunk(ExecutionChunk chunk) {
|
||||||
// 1. Push processor records immediately (append-only)
|
// 1. Push processor records immediately (append-only)
|
||||||
|
boolean chunkHasTrace = false;
|
||||||
if (chunk.getProcessors() != null && !chunk.getProcessors().isEmpty()) {
|
if (chunk.getProcessors() != null && !chunk.getProcessors().isEmpty()) {
|
||||||
processorSink.accept(new ProcessorBatch(
|
processorSink.accept(new ProcessorBatch(
|
||||||
DEFAULT_TENANT,
|
DEFAULT_TENANT,
|
||||||
@@ -58,6 +59,8 @@ public class ChunkAccumulator {
|
|||||||
chunk.getApplicationId(),
|
chunk.getApplicationId(),
|
||||||
chunk.getStartTime(),
|
chunk.getStartTime(),
|
||||||
chunk.getProcessors()));
|
chunk.getProcessors()));
|
||||||
|
chunkHasTrace = chunk.getProcessors().stream()
|
||||||
|
.anyMatch(p -> isNonEmpty(p.getInputBody()) || isNonEmpty(p.getOutputBody()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Buffer/merge the exchange envelope
|
// 2. Buffer/merge the exchange envelope
|
||||||
@@ -67,17 +70,24 @@ public class ChunkAccumulator {
|
|||||||
ExecutionChunk merged = existing != null
|
ExecutionChunk merged = existing != null
|
||||||
? mergeEnvelopes(existing.envelope(), chunk)
|
? mergeEnvelopes(existing.envelope(), chunk)
|
||||||
: chunk;
|
: chunk;
|
||||||
executionSink.accept(toMergedExecution(merged));
|
boolean hasTrace = chunkHasTrace || (existing != null && existing.hasTraceData());
|
||||||
|
executionSink.accept(toMergedExecution(merged, hasTrace));
|
||||||
} else {
|
} else {
|
||||||
// Buffer the envelope for later merging
|
// Buffer the envelope for later merging
|
||||||
|
boolean trace = chunkHasTrace;
|
||||||
pending.merge(chunk.getExchangeId(),
|
pending.merge(chunk.getExchangeId(),
|
||||||
new PendingExchange(chunk, Instant.now()),
|
new PendingExchange(chunk, Instant.now(), trace),
|
||||||
(old, incoming) -> new PendingExchange(
|
(old, incoming) -> new PendingExchange(
|
||||||
mergeEnvelopes(old.envelope(), incoming.envelope()),
|
mergeEnvelopes(old.envelope(), incoming.envelope()),
|
||||||
old.receivedAt()));
|
old.receivedAt(),
|
||||||
|
old.hasTraceData() || incoming.hasTraceData()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isNonEmpty(String s) {
|
||||||
|
return s != null && !s.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush exchanges that have been pending longer than the stale threshold.
|
* Flush exchanges that have been pending longer than the stale threshold.
|
||||||
* Called periodically by a scheduled task.
|
* Called periodically by a scheduled task.
|
||||||
@@ -90,7 +100,7 @@ public class ChunkAccumulator {
|
|||||||
if (removed != null) {
|
if (removed != null) {
|
||||||
log.info("Flushing stale exchange {} (pending since {})",
|
log.info("Flushing stale exchange {} (pending since {})",
|
||||||
exchangeId, removed.receivedAt());
|
exchangeId, removed.receivedAt());
|
||||||
executionSink.accept(toMergedExecution(removed.envelope()));
|
executionSink.accept(toMergedExecution(removed.envelope(), removed.hasTraceData()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -142,7 +152,7 @@ public class ChunkAccumulator {
|
|||||||
|
|
||||||
// ---- Conversion to MergedExecution ----
|
// ---- Conversion to MergedExecution ----
|
||||||
|
|
||||||
private MergedExecution toMergedExecution(ExecutionChunk envelope) {
|
private MergedExecution toMergedExecution(ExecutionChunk envelope, boolean hasTraceData) {
|
||||||
String diagramHash = "";
|
String diagramHash = "";
|
||||||
try {
|
try {
|
||||||
diagramHash = diagramStore
|
diagramHash = diagramStore
|
||||||
@@ -179,7 +189,7 @@ public class ChunkAccumulator {
|
|||||||
serializeAttributes(envelope.getAttributes()),
|
serializeAttributes(envelope.getAttributes()),
|
||||||
envelope.getTraceId(),
|
envelope.getTraceId(),
|
||||||
envelope.getSpanId(),
|
envelope.getSpanId(),
|
||||||
false, // hasTraceData — not tracked at envelope level
|
hasTraceData,
|
||||||
envelope.getReplayExchangeId() != null, // isReplay
|
envelope.getReplayExchangeId() != null, // isReplay
|
||||||
envelope.getOriginalExchangeId(),
|
envelope.getOriginalExchangeId(),
|
||||||
envelope.getReplayExchangeId()
|
envelope.getReplayExchangeId()
|
||||||
@@ -215,5 +225,5 @@ public class ChunkAccumulator {
|
|||||||
/**
|
/**
|
||||||
* Envelope buffered while waiting for the final chunk.
|
* Envelope buffered while waiting for the final chunk.
|
||||||
*/
|
*/
|
||||||
private record PendingExchange(ExecutionChunk envelope, Instant receivedAt) {}
|
private record PendingExchange(ExecutionChunk envelope, Instant receivedAt, boolean hasTraceData) {}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user