From edd841ffeb8d67140bb687e0597cf65a42504dfc Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Fri, 27 Mar 2026 18:32:47 +0100 Subject: [PATCH] feat: add iteration fields to processor execution storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add loop_index, loop_size, split_index, split_size, multicast_index columns to processor_executions table and thread them through the full storage → ingestion → detail pipeline. These fields enable execution overlay to display iteration context for loop, split, and multicast EIPs. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/storage/PostgresExecutionStore.java | 23 +++++++++++++++---- .../V8__processor_iteration_fields.sql | 5 ++++ .../server/core/detail/DetailService.java | 5 +++- .../server/core/detail/ProcessorNode.java | 20 +++++++++++++++- .../core/ingestion/IngestionService.java | 5 +++- .../server/core/storage/ExecutionStore.java | 5 +++- 6 files changed, 54 insertions(+), 9 deletions(-) create mode 100644 cameleer3-server-app/src/main/resources/db/migration/V8__processor_iteration_fields.sql diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java index 760cb5e2..5060b718 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java @@ -72,8 +72,9 @@ public class PostgresExecutionStore implements ExecutionStore { INSERT INTO processor_executions (execution_id, processor_id, processor_type, application_name, route_id, depth, parent_processor_id, status, start_time, end_time, duration_ms, error_message, error_stacktrace, - input_body, output_body, input_headers, output_headers, attributes) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb) + input_body, output_body, input_headers, output_headers, attributes, + loop_index, loop_size, split_index, split_size, multicast_index) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?, ?, ?, ?, ?) ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET status = EXCLUDED.status, end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time), @@ -84,7 +85,12 @@ public class PostgresExecutionStore implements ExecutionStore { output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body), input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers), output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers), - attributes = COALESCE(EXCLUDED.attributes, processor_executions.attributes) + attributes = COALESCE(EXCLUDED.attributes, processor_executions.attributes), + loop_index = COALESCE(EXCLUDED.loop_index, processor_executions.loop_index), + loop_size = COALESCE(EXCLUDED.loop_size, processor_executions.loop_size), + split_index = COALESCE(EXCLUDED.split_index, processor_executions.split_index), + split_size = COALESCE(EXCLUDED.split_size, processor_executions.split_size), + multicast_index = COALESCE(EXCLUDED.multicast_index, processor_executions.multicast_index) """, processors.stream().map(p -> new Object[]{ p.executionId(), p.processorId(), p.processorType(), @@ -94,7 +100,9 @@ public class PostgresExecutionStore implements ExecutionStore { p.endTime() != null ? Timestamp.from(p.endTime()) : null, p.durationMs(), p.errorMessage(), p.errorStacktrace(), p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders(), - p.attributes() + p.attributes(), + p.loopIndex(), p.loopSize(), p.splitIndex(), p.splitSize(), + p.multicastIndex() }).toList()); } @@ -140,7 +148,12 @@ public class PostgresExecutionStore implements ExecutionStore { rs.getString("error_message"), rs.getString("error_stacktrace"), rs.getString("input_body"), rs.getString("output_body"), rs.getString("input_headers"), rs.getString("output_headers"), - rs.getString("attributes")); + rs.getString("attributes"), + rs.getObject("loop_index") != null ? rs.getInt("loop_index") : null, + rs.getObject("loop_size") != null ? rs.getInt("loop_size") : null, + rs.getObject("split_index") != null ? rs.getInt("split_index") : null, + rs.getObject("split_size") != null ? rs.getInt("split_size") : null, + rs.getObject("multicast_index") != null ? rs.getInt("multicast_index") : null); private static Instant toInstant(ResultSet rs, String column) throws SQLException { Timestamp ts = rs.getTimestamp(column); diff --git a/cameleer3-server-app/src/main/resources/db/migration/V8__processor_iteration_fields.sql b/cameleer3-server-app/src/main/resources/db/migration/V8__processor_iteration_fields.sql new file mode 100644 index 00000000..5adb0cce --- /dev/null +++ b/cameleer3-server-app/src/main/resources/db/migration/V8__processor_iteration_fields.sql @@ -0,0 +1,5 @@ +ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS loop_index INTEGER; +ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS loop_size INTEGER; +ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS split_index INTEGER; +ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS split_size INTEGER; +ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS multicast_index INTEGER; diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java index fd26e183..c12d3be6 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java @@ -48,7 +48,10 @@ public class DetailService { p.startTime(), p.endTime(), p.durationMs() != null ? p.durationMs() : 0L, p.errorMessage(), p.errorStacktrace(), - parseAttributes(p.attributes()) + parseAttributes(p.attributes()), + p.loopIndex(), p.loopSize(), + p.splitIndex(), p.splitSize(), + p.multicastIndex() )); } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java index 32cd14c4..f071352d 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java @@ -22,12 +22,20 @@ public final class ProcessorNode { private final String errorMessage; private final String errorStackTrace; private final Map attributes; + private final Integer loopIndex; + private final Integer loopSize; + private final Integer splitIndex; + private final Integer splitSize; + private final Integer multicastIndex; private final List children; public ProcessorNode(String processorId, String processorType, String status, Instant startTime, Instant endTime, long durationMs, String errorMessage, String errorStackTrace, - Map attributes) { + Map attributes, + Integer loopIndex, Integer loopSize, + Integer splitIndex, Integer splitSize, + Integer multicastIndex) { this.processorId = processorId; this.processorType = processorType; this.status = status; @@ -37,6 +45,11 @@ public final class ProcessorNode { this.errorMessage = errorMessage; this.errorStackTrace = errorStackTrace; this.attributes = attributes; + this.loopIndex = loopIndex; + this.loopSize = loopSize; + this.splitIndex = splitIndex; + this.splitSize = splitSize; + this.multicastIndex = multicastIndex; this.children = new ArrayList<>(); } @@ -53,5 +66,10 @@ public final class ProcessorNode { public String getErrorMessage() { return errorMessage; } public String getErrorStackTrace() { return errorStackTrace; } public Map getAttributes() { return attributes; } + public Integer getLoopIndex() { return loopIndex; } + public Integer getLoopSize() { return loopSize; } + public Integer getSplitIndex() { return splitIndex; } + public Integer getSplitSize() { return splitSize; } + public Integer getMulticastIndex() { return multicastIndex; } public List getChildren() { return List.copyOf(children); } } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java index c4fad6fa..f07f67a4 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java @@ -128,7 +128,10 @@ public class IngestionService { p.getErrorMessage(), p.getErrorStackTrace(), truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()), toJson(p.getInputHeaders()), toJson(p.getOutputHeaders()), - toJson(p.getAttributes()) + toJson(p.getAttributes()), + p.getLoopIndex(), p.getLoopSize(), + p.getSplitIndex(), p.getSplitSize(), + p.getMulticastIndex() )); if (p.getChildren() != null) { flat.addAll(flattenProcessors( diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java index ecc076dd..75971586 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java @@ -33,6 +33,9 @@ public interface ExecutionStore { Instant startTime, Instant endTime, Long durationMs, String errorMessage, String errorStacktrace, String inputBody, String outputBody, String inputHeaders, String outputHeaders, - String attributes + String attributes, + Integer loopIndex, Integer loopSize, + Integer splitIndex, Integer splitSize, + Integer multicastIndex ) {} }