fix: populate diagramContentHash in chunked ingestion pipeline
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 43s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped

ChunkAccumulator now injects DiagramStore and looks up the content hash
when converting to MergedExecution. Without this, the detail page had
no diagram hash, so the overlay couldn't find the route diagram.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-01 10:50:34 +02:00
parent 59374482bc
commit d4dbfa7ae6
2 changed files with 17 additions and 3 deletions

View File

@@ -107,10 +107,12 @@ public class StorageBeanConfig {
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true)
public ChunkAccumulator chunkAccumulator(
WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer) {
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
DiagramStore diagramStore) {
return new ChunkAccumulator(
executionBuffer::offer,
processorBatchBuffer::offer,
diagramStore,
java.time.Duration.ofMinutes(5));
}

View File

@@ -2,6 +2,7 @@ package com.cameleer3.server.core.ingestion;
import com.cameleer3.common.model.ExecutionChunk;
import com.cameleer3.common.model.FlatProcessorRecord;
import com.cameleer3.server.core.storage.DiagramStore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
@@ -29,14 +30,17 @@ public class ChunkAccumulator {
private final Consumer<MergedExecution> executionSink;
private final Consumer<ProcessorBatch> processorSink;
private final DiagramStore diagramStore;
private final Duration staleThreshold;
private final ConcurrentHashMap<String, PendingExchange> pending = new ConcurrentHashMap<>();
public ChunkAccumulator(Consumer<MergedExecution> executionSink,
Consumer<ProcessorBatch> processorSink,
DiagramStore diagramStore,
Duration staleThreshold) {
this.executionSink = executionSink;
this.processorSink = processorSink;
this.diagramStore = diagramStore;
this.staleThreshold = staleThreshold;
}
@@ -138,7 +142,15 @@ public class ChunkAccumulator {
// ---- Conversion to MergedExecution ----
private static MergedExecution toMergedExecution(ExecutionChunk envelope) {
private MergedExecution toMergedExecution(ExecutionChunk envelope) {
String diagramHash = "";
try {
diagramHash = diagramStore
.findContentHashForRoute(envelope.getRouteId(), envelope.getAgentId())
.orElse("");
} catch (Exception e) {
log.debug("Could not resolve diagram hash for route={}", envelope.getRouteId());
}
return new MergedExecution(
DEFAULT_TENANT,
1L,
@@ -158,7 +170,7 @@ public class ChunkAccumulator {
envelope.getErrorCategory(),
envelope.getRootCauseType(),
envelope.getRootCauseMessage(),
"", // diagramContentHash — server-side lookup, not in chunk
diagramHash,
envelope.getEngineLevel(),
"", // inputBody — on processor records now
"", // outputBody