feat: add iteration fields to processor execution storage

Add loop_index, loop_size, split_index, split_size, multicast_index
columns to processor_executions table and thread them through the
full storage → ingestion → detail pipeline. These fields enable
execution overlay to display iteration context for loop, split,
and multicast EIPs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-27 18:32:47 +01:00
parent 889f0e5263
commit edd841ffeb
6 changed files with 54 additions and 9 deletions

View File

@@ -72,8 +72,9 @@ public class PostgresExecutionStore implements ExecutionStore {
INSERT INTO processor_executions (execution_id, processor_id, processor_type,
application_name, route_id, depth, parent_processor_id,
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
input_body, output_body, input_headers, output_headers, attributes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb)
input_body, output_body, input_headers, output_headers, attributes,
loop_index, loop_size, split_index, split_size, multicast_index)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?, ?, ?, ?, ?)
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
status = EXCLUDED.status,
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
@@ -84,7 +85,12 @@ public class PostgresExecutionStore implements ExecutionStore {
output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body),
input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers),
output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers),
attributes = COALESCE(EXCLUDED.attributes, processor_executions.attributes)
attributes = COALESCE(EXCLUDED.attributes, processor_executions.attributes),
loop_index = COALESCE(EXCLUDED.loop_index, processor_executions.loop_index),
loop_size = COALESCE(EXCLUDED.loop_size, processor_executions.loop_size),
split_index = COALESCE(EXCLUDED.split_index, processor_executions.split_index),
split_size = COALESCE(EXCLUDED.split_size, processor_executions.split_size),
multicast_index = COALESCE(EXCLUDED.multicast_index, processor_executions.multicast_index)
""",
processors.stream().map(p -> new Object[]{
p.executionId(), p.processorId(), p.processorType(),
@@ -94,7 +100,9 @@ public class PostgresExecutionStore implements ExecutionStore {
p.endTime() != null ? Timestamp.from(p.endTime()) : null,
p.durationMs(), p.errorMessage(), p.errorStacktrace(),
p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders(),
p.attributes()
p.attributes(),
p.loopIndex(), p.loopSize(), p.splitIndex(), p.splitSize(),
p.multicastIndex()
}).toList());
}
@@ -140,7 +148,12 @@ public class PostgresExecutionStore implements ExecutionStore {
rs.getString("error_message"), rs.getString("error_stacktrace"),
rs.getString("input_body"), rs.getString("output_body"),
rs.getString("input_headers"), rs.getString("output_headers"),
rs.getString("attributes"));
rs.getString("attributes"),
rs.getObject("loop_index") != null ? rs.getInt("loop_index") : null,
rs.getObject("loop_size") != null ? rs.getInt("loop_size") : null,
rs.getObject("split_index") != null ? rs.getInt("split_index") : null,
rs.getObject("split_size") != null ? rs.getInt("split_size") : null,
rs.getObject("multicast_index") != null ? rs.getInt("multicast_index") : null);
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
Timestamp ts = rs.getTimestamp(column);

View File

@@ -0,0 +1,5 @@
ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS loop_index INTEGER;
ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS loop_size INTEGER;
ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS split_index INTEGER;
ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS split_size INTEGER;
ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS multicast_index INTEGER;

View File

@@ -48,7 +48,10 @@ public class DetailService {
p.startTime(), p.endTime(),
p.durationMs() != null ? p.durationMs() : 0L,
p.errorMessage(), p.errorStacktrace(),
parseAttributes(p.attributes())
parseAttributes(p.attributes()),
p.loopIndex(), p.loopSize(),
p.splitIndex(), p.splitSize(),
p.multicastIndex()
));
}

View File

@@ -22,12 +22,20 @@ public final class ProcessorNode {
private final String errorMessage;
private final String errorStackTrace;
private final Map<String, String> attributes;
private final Integer loopIndex;
private final Integer loopSize;
private final Integer splitIndex;
private final Integer splitSize;
private final Integer multicastIndex;
private final List<ProcessorNode> children;
public ProcessorNode(String processorId, String processorType, String status,
Instant startTime, Instant endTime, long durationMs,
String errorMessage, String errorStackTrace,
Map<String, String> attributes) {
Map<String, String> attributes,
Integer loopIndex, Integer loopSize,
Integer splitIndex, Integer splitSize,
Integer multicastIndex) {
this.processorId = processorId;
this.processorType = processorType;
this.status = status;
@@ -37,6 +45,11 @@ public final class ProcessorNode {
this.errorMessage = errorMessage;
this.errorStackTrace = errorStackTrace;
this.attributes = attributes;
this.loopIndex = loopIndex;
this.loopSize = loopSize;
this.splitIndex = splitIndex;
this.splitSize = splitSize;
this.multicastIndex = multicastIndex;
this.children = new ArrayList<>();
}
@@ -53,5 +66,10 @@ public final class ProcessorNode {
public String getErrorMessage() { return errorMessage; }
public String getErrorStackTrace() { return errorStackTrace; }
public Map<String, String> getAttributes() { return attributes; }
public Integer getLoopIndex() { return loopIndex; }
public Integer getLoopSize() { return loopSize; }
public Integer getSplitIndex() { return splitIndex; }
public Integer getSplitSize() { return splitSize; }
public Integer getMulticastIndex() { return multicastIndex; }
public List<ProcessorNode> getChildren() { return List.copyOf(children); }
}

View File

@@ -128,7 +128,10 @@ public class IngestionService {
p.getErrorMessage(), p.getErrorStackTrace(),
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
toJson(p.getInputHeaders()), toJson(p.getOutputHeaders()),
toJson(p.getAttributes())
toJson(p.getAttributes()),
p.getLoopIndex(), p.getLoopSize(),
p.getSplitIndex(), p.getSplitSize(),
p.getMulticastIndex()
));
if (p.getChildren() != null) {
flat.addAll(flattenProcessors(

View File

@@ -33,6 +33,9 @@ public interface ExecutionStore {
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace,
String inputBody, String outputBody, String inputHeaders, String outputHeaders,
String attributes
String attributes,
Integer loopIndex, Integer loopSize,
Integer splitIndex, Integer splitSize,
Integer multicastIndex
) {}
}