refactor: IngestionService uses synchronous ExecutionStore writes with event publishing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-16 18:18:54 +01:00
parent adf4b44d78
commit 85ebe76111

View File

@@ -1,113 +1,116 @@
package com.cameleer3.server.core.ingestion;
import com.cameleer3.common.model.ProcessorExecution;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.indexing.ExecutionUpdatedEvent;
import com.cameleer3.server.core.storage.DiagramStore;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
/**
* Routes incoming data to the appropriate {@link WriteBuffer} instances.
* <p>
* This is a plain class (no Spring annotations) -- it lives in the core module
* and is wired as a bean by the app module configuration.
*/
public class IngestionService {
private final WriteBuffer<TaggedExecution> executionBuffer;
private final WriteBuffer<TaggedDiagram> diagramBuffer;
private final ExecutionStore executionStore;
private final DiagramStore diagramStore;
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
private final Consumer<ExecutionUpdatedEvent> eventPublisher;
private final int bodySizeLimit;
public IngestionService(WriteBuffer<TaggedExecution> executionBuffer,
WriteBuffer<TaggedDiagram> diagramBuffer,
WriteBuffer<MetricsSnapshot> metricsBuffer) {
this.executionBuffer = executionBuffer;
this.diagramBuffer = diagramBuffer;
public IngestionService(ExecutionStore executionStore,
DiagramStore diagramStore,
WriteBuffer<MetricsSnapshot> metricsBuffer,
Consumer<ExecutionUpdatedEvent> eventPublisher,
int bodySizeLimit) {
this.executionStore = executionStore;
this.diagramStore = diagramStore;
this.metricsBuffer = metricsBuffer;
this.eventPublisher = eventPublisher;
this.bodySizeLimit = bodySizeLimit;
}
/**
* Accept a batch of tagged route executions into the buffer.
*
* @return true if all items were buffered, false if buffer is full (backpressure)
*/
public boolean acceptExecutions(List<TaggedExecution> executions) {
return executionBuffer.offerBatch(executions);
public void ingestExecution(String agentId, String groupName, RouteExecution execution) {
ExecutionRecord record = toExecutionRecord(agentId, groupName, execution);
executionStore.upsert(record);
if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) {
List<ProcessorRecord> processors = flattenProcessors(
execution.getProcessors(), record.executionId(),
record.startTime(), groupName, execution.getRouteId(),
null, 0);
executionStore.upsertProcessors(
record.executionId(), record.startTime(),
groupName, execution.getRouteId(), processors);
}
eventPublisher.accept(new ExecutionUpdatedEvent(
record.executionId(), record.startTime()));
}
/**
* Accept a single tagged route execution into the buffer.
*
* @return true if the item was buffered, false if buffer is full (backpressure)
*/
public boolean acceptExecution(TaggedExecution execution) {
return executionBuffer.offer(execution);
public void ingestDiagram(TaggedDiagram diagram) {
diagramStore.store(diagram);
}
/**
* Accept a single tagged route diagram into the buffer.
*
* @return true if the item was buffered, false if buffer is full (backpressure)
*/
public boolean acceptDiagram(TaggedDiagram diagram) {
return diagramBuffer.offer(diagram);
}
/**
* Accept a batch of tagged route diagrams into the buffer.
*
* @return true if all items were buffered, false if buffer is full (backpressure)
*/
public boolean acceptDiagrams(List<TaggedDiagram> diagrams) {
return diagramBuffer.offerBatch(diagrams);
}
/**
* Accept a batch of metrics snapshots into the buffer.
*
* @return true if all items were buffered, false if buffer is full (backpressure)
*/
public boolean acceptMetrics(List<MetricsSnapshot> metrics) {
return metricsBuffer.offerBatch(metrics);
}
/**
* @return current number of items in the execution buffer
*/
public int getExecutionBufferDepth() {
return executionBuffer.size();
}
/**
* @return current number of items in the diagram buffer
*/
public int getDiagramBufferDepth() {
return diagramBuffer.size();
}
/**
* @return current number of items in the metrics buffer
*/
public int getMetricsBufferDepth() {
return metricsBuffer.size();
}
/**
* @return the execution write buffer (for use by flush scheduler)
*/
public WriteBuffer<TaggedExecution> getExecutionBuffer() {
return executionBuffer;
}
/**
* @return the diagram write buffer (for use by flush scheduler)
*/
public WriteBuffer<TaggedDiagram> getDiagramBuffer() {
return diagramBuffer;
}
/**
* @return the metrics write buffer (for use by flush scheduler)
*/
public WriteBuffer<MetricsSnapshot> getMetricsBuffer() {
return metricsBuffer;
}
private ExecutionRecord toExecutionRecord(String agentId, String groupName,
RouteExecution exec) {
return new ExecutionRecord(
exec.getExecutionId(), exec.getRouteId(), agentId, groupName,
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
exec.getCorrelationId(), exec.getExchangeId(),
exec.getStartTime(), exec.getEndTime(),
exec.getDurationMs(),
exec.getErrorMessage(), exec.getErrorStacktrace(),
null // diagramContentHash set separately
);
}
private List<ProcessorRecord> flattenProcessors(
List<ProcessorExecution> processors, String executionId,
java.time.Instant execStartTime, String groupName, String routeId,
String parentProcessorId, int depth) {
List<ProcessorRecord> flat = new ArrayList<>();
for (ProcessorExecution p : processors) {
flat.add(new ProcessorRecord(
executionId, p.getProcessorId(), p.getProcessorType(),
p.getDiagramNodeId(), groupName, routeId,
depth, parentProcessorId,
p.getStatus() != null ? p.getStatus().name() : "RUNNING",
p.getStartTime() != null ? p.getStartTime() : execStartTime,
p.getEndTime(),
p.getDurationMs(),
p.getErrorMessage(), p.getErrorStacktrace(),
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
p.getInputHeaders() != null ? p.getInputHeaders().toString() : null,
p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null
));
if (p.getChildren() != null) {
flat.addAll(flattenProcessors(
p.getChildren(), executionId, execStartTime,
groupName, routeId, p.getProcessorId(), depth + 1));
}
}
return flat;
}
private String truncateBody(String body) {
if (body == null) return null;
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
return body;
}
}