From 85ebe7611115172df2da1a6da9a47b1c6745628f Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Mon, 16 Mar 2026 18:18:54 +0100 Subject: [PATCH] refactor: IngestionService uses synchronous ExecutionStore writes with event publishing Co-Authored-By: Claude Sonnet 4.6 --- .../core/ingestion/IngestionService.java | 171 +++++++++--------- 1 file changed, 87 insertions(+), 84 deletions(-) diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java index 6841c683..c5e17e6f 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java @@ -1,113 +1,116 @@ package com.cameleer3.server.core.ingestion; +import com.cameleer3.common.model.ProcessorExecution; +import com.cameleer3.common.model.RouteExecution; +import com.cameleer3.server.core.indexing.ExecutionUpdatedEvent; +import com.cameleer3.server.core.storage.DiagramStore; +import com.cameleer3.server.core.storage.ExecutionStore; +import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord; +import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; -/** - * Routes incoming data to the appropriate {@link WriteBuffer} instances. - *

- * This is a plain class (no Spring annotations) -- it lives in the core module - * and is wired as a bean by the app module configuration. - */ public class IngestionService { - private final WriteBuffer executionBuffer; - private final WriteBuffer diagramBuffer; + private final ExecutionStore executionStore; + private final DiagramStore diagramStore; private final WriteBuffer metricsBuffer; + private final Consumer eventPublisher; + private final int bodySizeLimit; - public IngestionService(WriteBuffer executionBuffer, - WriteBuffer diagramBuffer, - WriteBuffer metricsBuffer) { - this.executionBuffer = executionBuffer; - this.diagramBuffer = diagramBuffer; + public IngestionService(ExecutionStore executionStore, + DiagramStore diagramStore, + WriteBuffer metricsBuffer, + Consumer eventPublisher, + int bodySizeLimit) { + this.executionStore = executionStore; + this.diagramStore = diagramStore; this.metricsBuffer = metricsBuffer; + this.eventPublisher = eventPublisher; + this.bodySizeLimit = bodySizeLimit; } - /** - * Accept a batch of tagged route executions into the buffer. - * - * @return true if all items were buffered, false if buffer is full (backpressure) - */ - public boolean acceptExecutions(List executions) { - return executionBuffer.offerBatch(executions); + public void ingestExecution(String agentId, String groupName, RouteExecution execution) { + ExecutionRecord record = toExecutionRecord(agentId, groupName, execution); + executionStore.upsert(record); + + if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) { + List processors = flattenProcessors( + execution.getProcessors(), record.executionId(), + record.startTime(), groupName, execution.getRouteId(), + null, 0); + executionStore.upsertProcessors( + record.executionId(), record.startTime(), + groupName, execution.getRouteId(), processors); + } + + eventPublisher.accept(new ExecutionUpdatedEvent( + record.executionId(), record.startTime())); } - /** - * Accept a single tagged route execution into the buffer. - * - * @return true if the item was buffered, false if buffer is full (backpressure) - */ - public boolean acceptExecution(TaggedExecution execution) { - return executionBuffer.offer(execution); + public void ingestDiagram(TaggedDiagram diagram) { + diagramStore.store(diagram); } - /** - * Accept a single tagged route diagram into the buffer. - * - * @return true if the item was buffered, false if buffer is full (backpressure) - */ - public boolean acceptDiagram(TaggedDiagram diagram) { - return diagramBuffer.offer(diagram); - } - - /** - * Accept a batch of tagged route diagrams into the buffer. - * - * @return true if all items were buffered, false if buffer is full (backpressure) - */ - public boolean acceptDiagrams(List diagrams) { - return diagramBuffer.offerBatch(diagrams); - } - - /** - * Accept a batch of metrics snapshots into the buffer. - * - * @return true if all items were buffered, false if buffer is full (backpressure) - */ public boolean acceptMetrics(List metrics) { return metricsBuffer.offerBatch(metrics); } - /** - * @return current number of items in the execution buffer - */ - public int getExecutionBufferDepth() { - return executionBuffer.size(); - } - - /** - * @return current number of items in the diagram buffer - */ - public int getDiagramBufferDepth() { - return diagramBuffer.size(); - } - - /** - * @return current number of items in the metrics buffer - */ public int getMetricsBufferDepth() { return metricsBuffer.size(); } - /** - * @return the execution write buffer (for use by flush scheduler) - */ - public WriteBuffer getExecutionBuffer() { - return executionBuffer; - } - - /** - * @return the diagram write buffer (for use by flush scheduler) - */ - public WriteBuffer getDiagramBuffer() { - return diagramBuffer; - } - - /** - * @return the metrics write buffer (for use by flush scheduler) - */ public WriteBuffer getMetricsBuffer() { return metricsBuffer; } + + private ExecutionRecord toExecutionRecord(String agentId, String groupName, + RouteExecution exec) { + return new ExecutionRecord( + exec.getExecutionId(), exec.getRouteId(), agentId, groupName, + exec.getStatus() != null ? exec.getStatus().name() : "RUNNING", + exec.getCorrelationId(), exec.getExchangeId(), + exec.getStartTime(), exec.getEndTime(), + exec.getDurationMs(), + exec.getErrorMessage(), exec.getErrorStacktrace(), + null // diagramContentHash set separately + ); + } + + private List flattenProcessors( + List processors, String executionId, + java.time.Instant execStartTime, String groupName, String routeId, + String parentProcessorId, int depth) { + List flat = new ArrayList<>(); + for (ProcessorExecution p : processors) { + flat.add(new ProcessorRecord( + executionId, p.getProcessorId(), p.getProcessorType(), + p.getDiagramNodeId(), groupName, routeId, + depth, parentProcessorId, + p.getStatus() != null ? p.getStatus().name() : "RUNNING", + p.getStartTime() != null ? p.getStartTime() : execStartTime, + p.getEndTime(), + p.getDurationMs(), + p.getErrorMessage(), p.getErrorStacktrace(), + truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()), + p.getInputHeaders() != null ? p.getInputHeaders().toString() : null, + p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null + )); + if (p.getChildren() != null) { + flat.addAll(flattenProcessors( + p.getChildren(), executionId, execStartTime, + groupName, routeId, p.getProcessorId(), depth + 1)); + } + } + return flat; + } + + private String truncateBody(String body) { + if (body == null) return null; + if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit); + return body; + } }