diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 78827a60..001f666e 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -107,10 +107,12 @@ public class StorageBeanConfig { @ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true) public ChunkAccumulator chunkAccumulator( WriteBuffer executionBuffer, - WriteBuffer processorBatchBuffer) { + WriteBuffer processorBatchBuffer, + DiagramStore diagramStore) { return new ChunkAccumulator( executionBuffer::offer, processorBatchBuffer::offer, + diagramStore, java.time.Duration.ofMinutes(5)); } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java index 9d6ed225..bfbca40d 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java @@ -2,6 +2,7 @@ package com.cameleer3.server.core.ingestion; import com.cameleer3.common.model.ExecutionChunk; import com.cameleer3.common.model.FlatProcessorRecord; +import com.cameleer3.server.core.storage.DiagramStore; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; @@ -29,14 +30,17 @@ public class ChunkAccumulator { private final Consumer executionSink; private final Consumer processorSink; + private final DiagramStore diagramStore; private final Duration staleThreshold; private final ConcurrentHashMap pending = new ConcurrentHashMap<>(); public ChunkAccumulator(Consumer executionSink, Consumer processorSink, + DiagramStore diagramStore, Duration staleThreshold) { this.executionSink = executionSink; this.processorSink = processorSink; + this.diagramStore = diagramStore; this.staleThreshold = staleThreshold; } @@ -138,7 +142,15 @@ public class ChunkAccumulator { // ---- Conversion to MergedExecution ---- - private static MergedExecution toMergedExecution(ExecutionChunk envelope) { + private MergedExecution toMergedExecution(ExecutionChunk envelope) { + String diagramHash = ""; + try { + diagramHash = diagramStore + .findContentHashForRoute(envelope.getRouteId(), envelope.getAgentId()) + .orElse(""); + } catch (Exception e) { + log.debug("Could not resolve diagram hash for route={}", envelope.getRouteId()); + } return new MergedExecution( DEFAULT_TENANT, 1L, @@ -158,7 +170,7 @@ public class ChunkAccumulator { envelope.getErrorCategory(), envelope.getRootCauseType(), envelope.getRootCauseMessage(), - "", // diagramContentHash — server-side lookup, not in chunk + diagramHash, envelope.getEngineLevel(), "", // inputBody — on processor records now "", // outputBody