diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java
index 6841c683..c5e17e6f 100644
--- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java
+++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java
@@ -1,113 +1,116 @@
package com.cameleer3.server.core.ingestion;
+import com.cameleer3.common.model.ProcessorExecution;
+import com.cameleer3.common.model.RouteExecution;
+import com.cameleer3.server.core.indexing.ExecutionUpdatedEvent;
+import com.cameleer3.server.core.storage.DiagramStore;
+import com.cameleer3.server.core.storage.ExecutionStore;
+import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
+import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
+import java.util.ArrayList;
import java.util.List;
+import java.util.function.Consumer;
-/**
- * Routes incoming data to the appropriate {@link WriteBuffer} instances.
- *
- * This is a plain class (no Spring annotations) -- it lives in the core module
- * and is wired as a bean by the app module configuration.
- */
public class IngestionService {
- private final WriteBuffer executionBuffer;
- private final WriteBuffer diagramBuffer;
+ private final ExecutionStore executionStore;
+ private final DiagramStore diagramStore;
private final WriteBuffer metricsBuffer;
+ private final Consumer eventPublisher;
+ private final int bodySizeLimit;
- public IngestionService(WriteBuffer executionBuffer,
- WriteBuffer diagramBuffer,
- WriteBuffer metricsBuffer) {
- this.executionBuffer = executionBuffer;
- this.diagramBuffer = diagramBuffer;
+ public IngestionService(ExecutionStore executionStore,
+ DiagramStore diagramStore,
+ WriteBuffer metricsBuffer,
+ Consumer eventPublisher,
+ int bodySizeLimit) {
+ this.executionStore = executionStore;
+ this.diagramStore = diagramStore;
this.metricsBuffer = metricsBuffer;
+ this.eventPublisher = eventPublisher;
+ this.bodySizeLimit = bodySizeLimit;
}
- /**
- * Accept a batch of tagged route executions into the buffer.
- *
- * @return true if all items were buffered, false if buffer is full (backpressure)
- */
- public boolean acceptExecutions(List executions) {
- return executionBuffer.offerBatch(executions);
+ public void ingestExecution(String agentId, String groupName, RouteExecution execution) {
+ ExecutionRecord record = toExecutionRecord(agentId, groupName, execution);
+ executionStore.upsert(record);
+
+ if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) {
+ List processors = flattenProcessors(
+ execution.getProcessors(), record.executionId(),
+ record.startTime(), groupName, execution.getRouteId(),
+ null, 0);
+ executionStore.upsertProcessors(
+ record.executionId(), record.startTime(),
+ groupName, execution.getRouteId(), processors);
+ }
+
+ eventPublisher.accept(new ExecutionUpdatedEvent(
+ record.executionId(), record.startTime()));
}
- /**
- * Accept a single tagged route execution into the buffer.
- *
- * @return true if the item was buffered, false if buffer is full (backpressure)
- */
- public boolean acceptExecution(TaggedExecution execution) {
- return executionBuffer.offer(execution);
+ public void ingestDiagram(TaggedDiagram diagram) {
+ diagramStore.store(diagram);
}
- /**
- * Accept a single tagged route diagram into the buffer.
- *
- * @return true if the item was buffered, false if buffer is full (backpressure)
- */
- public boolean acceptDiagram(TaggedDiagram diagram) {
- return diagramBuffer.offer(diagram);
- }
-
- /**
- * Accept a batch of tagged route diagrams into the buffer.
- *
- * @return true if all items were buffered, false if buffer is full (backpressure)
- */
- public boolean acceptDiagrams(List diagrams) {
- return diagramBuffer.offerBatch(diagrams);
- }
-
- /**
- * Accept a batch of metrics snapshots into the buffer.
- *
- * @return true if all items were buffered, false if buffer is full (backpressure)
- */
public boolean acceptMetrics(List metrics) {
return metricsBuffer.offerBatch(metrics);
}
- /**
- * @return current number of items in the execution buffer
- */
- public int getExecutionBufferDepth() {
- return executionBuffer.size();
- }
-
- /**
- * @return current number of items in the diagram buffer
- */
- public int getDiagramBufferDepth() {
- return diagramBuffer.size();
- }
-
- /**
- * @return current number of items in the metrics buffer
- */
public int getMetricsBufferDepth() {
return metricsBuffer.size();
}
- /**
- * @return the execution write buffer (for use by flush scheduler)
- */
- public WriteBuffer getExecutionBuffer() {
- return executionBuffer;
- }
-
- /**
- * @return the diagram write buffer (for use by flush scheduler)
- */
- public WriteBuffer getDiagramBuffer() {
- return diagramBuffer;
- }
-
- /**
- * @return the metrics write buffer (for use by flush scheduler)
- */
public WriteBuffer getMetricsBuffer() {
return metricsBuffer;
}
+
+ private ExecutionRecord toExecutionRecord(String agentId, String groupName,
+ RouteExecution exec) {
+ return new ExecutionRecord(
+ exec.getExecutionId(), exec.getRouteId(), agentId, groupName,
+ exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
+ exec.getCorrelationId(), exec.getExchangeId(),
+ exec.getStartTime(), exec.getEndTime(),
+ exec.getDurationMs(),
+ exec.getErrorMessage(), exec.getErrorStacktrace(),
+ null // diagramContentHash set separately
+ );
+ }
+
+ private List flattenProcessors(
+ List processors, String executionId,
+ java.time.Instant execStartTime, String groupName, String routeId,
+ String parentProcessorId, int depth) {
+ List flat = new ArrayList<>();
+ for (ProcessorExecution p : processors) {
+ flat.add(new ProcessorRecord(
+ executionId, p.getProcessorId(), p.getProcessorType(),
+ p.getDiagramNodeId(), groupName, routeId,
+ depth, parentProcessorId,
+ p.getStatus() != null ? p.getStatus().name() : "RUNNING",
+ p.getStartTime() != null ? p.getStartTime() : execStartTime,
+ p.getEndTime(),
+ p.getDurationMs(),
+ p.getErrorMessage(), p.getErrorStacktrace(),
+ truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
+ p.getInputHeaders() != null ? p.getInputHeaders().toString() : null,
+ p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null
+ ));
+ if (p.getChildren() != null) {
+ flat.addAll(flattenProcessors(
+ p.getChildren(), executionId, execStartTime,
+ groupName, routeId, p.getProcessorId(), depth + 1));
+ }
+ }
+ return flat;
+ }
+
+ private String truncateBody(String body) {
+ if (body == null) return null;
+ if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
+ return body;
+ }
}