From d4dbfa7ae6cab5c42b9e5e564f6b65bdb5bda94d Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 1 Apr 2026 10:50:34 +0200 Subject: [PATCH] fix: populate diagramContentHash in chunked ingestion pipeline ChunkAccumulator now injects DiagramStore and looks up the content hash when converting to MergedExecution. Without this, the detail page had no diagram hash, so the overlay couldn't find the route diagram. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../server/app/config/StorageBeanConfig.java | 4 +++- .../server/core/ingestion/ChunkAccumulator.java | 16 ++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 78827a60..001f666e 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -107,10 +107,12 @@ public class StorageBeanConfig { @ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true) public ChunkAccumulator chunkAccumulator( WriteBuffer executionBuffer, - WriteBuffer processorBatchBuffer) { + WriteBuffer processorBatchBuffer, + DiagramStore diagramStore) { return new ChunkAccumulator( executionBuffer::offer, processorBatchBuffer::offer, + diagramStore, java.time.Duration.ofMinutes(5)); } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java index 9d6ed225..bfbca40d 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java @@ -2,6 +2,7 @@ package com.cameleer3.server.core.ingestion; import com.cameleer3.common.model.ExecutionChunk; import com.cameleer3.common.model.FlatProcessorRecord; +import com.cameleer3.server.core.storage.DiagramStore; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; @@ -29,14 +30,17 @@ public class ChunkAccumulator { private final Consumer executionSink; private final Consumer processorSink; + private final DiagramStore diagramStore; private final Duration staleThreshold; private final ConcurrentHashMap pending = new ConcurrentHashMap<>(); public ChunkAccumulator(Consumer executionSink, Consumer processorSink, + DiagramStore diagramStore, Duration staleThreshold) { this.executionSink = executionSink; this.processorSink = processorSink; + this.diagramStore = diagramStore; this.staleThreshold = staleThreshold; } @@ -138,7 +142,15 @@ public class ChunkAccumulator { // ---- Conversion to MergedExecution ---- - private static MergedExecution toMergedExecution(ExecutionChunk envelope) { + private MergedExecution toMergedExecution(ExecutionChunk envelope) { + String diagramHash = ""; + try { + diagramHash = diagramStore + .findContentHashForRoute(envelope.getRouteId(), envelope.getAgentId()) + .orElse(""); + } catch (Exception e) { + log.debug("Could not resolve diagram hash for route={}", envelope.getRouteId()); + } return new MergedExecution( DEFAULT_TENANT, 1L, @@ -158,7 +170,7 @@ public class ChunkAccumulator { envelope.getErrorCategory(), envelope.getRootCauseType(), envelope.getRootCauseMessage(), - "", // diagramContentHash — server-side lookup, not in chunk + diagramHash, envelope.getEngineLevel(), "", // inputBody — on processor records now "", // outputBody