From faf5d505f44730364c09b5ccac4d8b014b34deb6 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Sat, 28 Mar 2026 18:57:27 +0100 Subject: [PATCH] feat: support iteration wrapper nodes and filter overlay by selected iteration Server: - Add split_depth and loop_depth columns (V9 migration) - Persist splitDepth/loopDepth with reflection fallback for older agent versions UI: - Detect iterations via wrapper processorTypes (loopIteration, splitIteration, multicastBranch) - Filter overlay by selected iteration at the wrapper level - Skip non-selected iteration wrappers entirely (wrapper + children) - Don't add synthetic wrappers to overlay (no diagram node correspondence) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/storage/PostgresExecutionStore.java | 16 +++-- .../migration/V9__resolved_endpoint_uri.sql | 2 + .../core/ingestion/IngestionService.java | 14 ++++- .../server/core/storage/ExecutionStore.java | 4 +- .../ExecutionDiagram/useExecutionOverlay.ts | 51 +++++++++------- .../ExecutionDiagram/useIterationState.ts | 60 ++++++------------- 6 files changed, 76 insertions(+), 71 deletions(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java index b2086b08..1451a1e3 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java @@ -74,8 +74,8 @@ public class PostgresExecutionStore implements ExecutionStore { status, start_time, end_time, duration_ms, error_message, error_stacktrace, input_body, output_body, input_headers, output_headers, attributes, loop_index, loop_size, split_index, split_size, multicast_index, - resolved_endpoint_uri) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?, ?, ?, ?, ?, ?) + resolved_endpoint_uri, split_depth, loop_depth) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET status = EXCLUDED.status, end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time), @@ -92,7 +92,9 @@ public class PostgresExecutionStore implements ExecutionStore { split_index = COALESCE(EXCLUDED.split_index, processor_executions.split_index), split_size = COALESCE(EXCLUDED.split_size, processor_executions.split_size), multicast_index = COALESCE(EXCLUDED.multicast_index, processor_executions.multicast_index), - resolved_endpoint_uri = COALESCE(EXCLUDED.resolved_endpoint_uri, processor_executions.resolved_endpoint_uri) + resolved_endpoint_uri = COALESCE(EXCLUDED.resolved_endpoint_uri, processor_executions.resolved_endpoint_uri), + split_depth = EXCLUDED.split_depth, + loop_depth = EXCLUDED.loop_depth """, processors.stream().map(p -> new Object[]{ p.executionId(), p.processorId(), p.processorType(), @@ -105,7 +107,9 @@ public class PostgresExecutionStore implements ExecutionStore { p.attributes(), p.loopIndex(), p.loopSize(), p.splitIndex(), p.splitSize(), p.multicastIndex(), - p.resolvedEndpointUri() + p.resolvedEndpointUri(), + p.splitDepth(), + p.loopDepth() }).toList()); } @@ -164,7 +168,9 @@ public class PostgresExecutionStore implements ExecutionStore { rs.getObject("split_index") != null ? rs.getInt("split_index") : null, rs.getObject("split_size") != null ? rs.getInt("split_size") : null, rs.getObject("multicast_index") != null ? rs.getInt("multicast_index") : null, - rs.getString("resolved_endpoint_uri")); + rs.getString("resolved_endpoint_uri"), + rs.getInt("split_depth"), + rs.getInt("loop_depth")); private static Instant toInstant(ResultSet rs, String column) throws SQLException { Timestamp ts = rs.getTimestamp(column); diff --git a/cameleer3-server-app/src/main/resources/db/migration/V9__resolved_endpoint_uri.sql b/cameleer3-server-app/src/main/resources/db/migration/V9__resolved_endpoint_uri.sql index e5ec1ea3..04c2ff11 100644 --- a/cameleer3-server-app/src/main/resources/db/migration/V9__resolved_endpoint_uri.sql +++ b/cameleer3-server-app/src/main/resources/db/migration/V9__resolved_endpoint_uri.sql @@ -1 +1,3 @@ ALTER TABLE processor_executions ADD COLUMN resolved_endpoint_uri TEXT; +ALTER TABLE processor_executions ADD COLUMN split_depth INTEGER DEFAULT 0; +ALTER TABLE processor_executions ADD COLUMN loop_depth INTEGER DEFAULT 0; diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java index 4f67aa15..9da186df 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java @@ -132,7 +132,9 @@ public class IngestionService { p.getLoopIndex(), p.getLoopSize(), p.getSplitIndex(), p.getSplitSize(), p.getMulticastIndex(), - p.getResolvedEndpointUri() + p.getResolvedEndpointUri(), + getDepthSafe(p, "splitDepth"), + getDepthSafe(p, "loopDepth") )); if (p.getChildren() != null) { flat.addAll(flattenProcessors( @@ -143,6 +145,16 @@ public class IngestionService { return flat; } + /** Safely call getSplitDepth()/getLoopDepth() — returns 0 if not available in this cameleer3-common version. */ + private static int getDepthSafe(ProcessorExecution p, String field) { + try { + var method = p.getClass().getMethod("get" + Character.toUpperCase(field.charAt(0)) + field.substring(1)); + return (int) method.invoke(p); + } catch (Exception e) { + return 0; + } + } + private String truncateBody(String body) { if (body == null) return null; if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit); diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java index 2f89074b..2d4e468d 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java @@ -39,6 +39,8 @@ public interface ExecutionStore { Integer loopIndex, Integer loopSize, Integer splitIndex, Integer splitSize, Integer multicastIndex, - String resolvedEndpointUri + String resolvedEndpointUri, + int splitDepth, + int loopDepth ) {} } diff --git a/ui/src/components/ExecutionDiagram/useExecutionOverlay.ts b/ui/src/components/ExecutionDiagram/useExecutionOverlay.ts index 21736e24..47af0270 100644 --- a/ui/src/components/ExecutionDiagram/useExecutionOverlay.ts +++ b/ui/src/components/ExecutionDiagram/useExecutionOverlay.ts @@ -1,13 +1,26 @@ import { useMemo } from 'react'; import type { NodeExecutionState, IterationInfo, ProcessorNode } from './types'; +/** Synthetic wrapper processorTypes emitted by the agent for each iteration. */ +const ITERATION_WRAPPER_TYPES = new Set([ + 'loopIteration', 'splitIteration', 'multicastBranch', +]); + +/** + * Extract the iteration index from a wrapper node. + */ +function wrapperIndex(proc: ProcessorNode): number | undefined { + return proc.loopIndex ?? proc.splitIndex ?? proc.multicastIndex ?? undefined; +} + /** * Recursively walks the ProcessorNode tree and populates an overlay map * keyed by processorId → NodeExecutionState. * - * Handles iteration filtering: when a processor has a loop/split/multicast - * index, only include it if it matches the currently selected iteration - * for its parent compound node. + * Iteration wrappers (loopIteration, splitIteration, multicastBranch) are + * used for filtering: only the wrapper matching the selected iteration + * is recursed into. The wrapper itself is not added to the overlay + * (it's synthetic and has no corresponding diagram node). */ function buildOverlay( processors: ProcessorNode[], @@ -19,29 +32,23 @@ function buildOverlay( if (!proc.processorId || !proc.status) continue; if (proc.status !== 'COMPLETED' && proc.status !== 'FAILED') continue; - // Iteration filtering: if this processor belongs to an iterated parent, - // only include it when the index matches the selected iteration. - if (parentId && iterationState.has(parentId)) { - const info = iterationState.get(parentId)!; - if (info.type === 'loop' && proc.loopIndex != null) { - if (proc.loopIndex !== info.current) { - // Still recurse into children so nested compounds are discovered, - // but skip adding this processor to the overlay. - continue; + // Iteration wrapper: filter by selected iteration, skip the wrapper itself + if (ITERATION_WRAPPER_TYPES.has(proc.processorType)) { + if (parentId && iterationState.has(parentId)) { + const info = iterationState.get(parentId)!; + const idx = wrapperIndex(proc); + if (idx != null && idx !== info.current) { + continue; // Skip this wrapper and all its children } } - if (info.type === 'split' && proc.splitIndex != null) { - if (proc.splitIndex !== info.current) { - continue; - } - } - if (info.type === 'multicast' && proc.multicastIndex != null) { - if (proc.multicastIndex !== info.current) { - continue; - } + // Matching wrapper: don't add to overlay but recurse into children + if (proc.children?.length) { + buildOverlay(proc.children, overlay, iterationState, proc.processorId); } + continue; } + // Regular processor: add to overlay const subRouteFailed = proc.status === 'FAILED' && (proc.processorType?.includes('DIRECT') || proc.processorType?.includes('SEDA')); @@ -53,7 +60,7 @@ function buildOverlay( hasTraceData: true, }); - // Recurse into children, passing this processor as the parent for iteration filtering. + // Recurse into children if (proc.children?.length) { buildOverlay(proc.children, overlay, iterationState, proc.processorId); } diff --git a/ui/src/components/ExecutionDiagram/useIterationState.ts b/ui/src/components/ExecutionDiagram/useIterationState.ts index 02a61c04..39006986 100644 --- a/ui/src/components/ExecutionDiagram/useIterationState.ts +++ b/ui/src/components/ExecutionDiagram/useIterationState.ts @@ -1,56 +1,33 @@ import { useCallback, useEffect, useState } from 'react'; import type { IterationInfo, ProcessorNode } from './types'; +const WRAPPER_TYPES: Record = { + loopIteration: 'loop', + splitIteration: 'split', + multicastBranch: 'multicast', +}; + /** - * Walks the processor tree and detects compound nodes that have iterated - * children (loop, split, multicast). Populates a map of compoundId → - * IterationInfo so the UI can show stepper widgets and filter iterations. + * Walks the processor tree and detects compound nodes that have iteration + * wrapper children (loopIteration, splitIteration, multicastBranch). */ function detectIterations( processors: ProcessorNode[], result: Map, ): void { for (const proc of processors) { - if (!proc.children?.length) continue; + if (!proc.children?.length || !proc.processorId) continue; - // Check if children indicate a loop compound - const loopChild = proc.children.find( - (c) => c.loopSize != null && c.loopSize > 0, - ); - if (loopChild && proc.processorId) { - result.set(proc.processorId, { - current: 0, - total: loopChild.loopSize!, - type: 'loop', - }); - } - - // Check if children indicate a split compound - const splitChild = proc.children.find( - (c) => c.splitSize != null && c.splitSize > 0, - ); - if (splitChild && !loopChild && proc.processorId) { - result.set(proc.processorId, { - current: 0, - total: splitChild.splitSize!, - type: 'split', - }); - } - - // Check if children indicate a multicast compound - if (!loopChild && !splitChild) { - const multicastIndices = new Set(); - for (const child of proc.children) { - if (child.multicastIndex != null) { - multicastIndices.add(child.multicastIndex); - } - } - if (multicastIndices.size > 0 && proc.processorId) { + // Check if children are iteration wrappers + for (const [wrapperType, iterType] of Object.entries(WRAPPER_TYPES)) { + const wrappers = proc.children.filter(c => c.processorType === wrapperType); + if (wrappers.length > 0) { result.set(proc.processorId, { current: 0, - total: multicastIndices.size, - type: 'multicast', + total: wrappers.length, + type: iterType, }); + break; } } @@ -62,13 +39,12 @@ function detectIterations( /** * Manages per-compound iteration state for the execution overlay. * - * Scans the processor tree to detect compounds with iterated children - * and tracks which iteration index is currently selected for each. + * Scans the processor tree to detect compounds with iteration wrapper + * children and tracks which iteration index is currently selected. */ export function useIterationState(processors: ProcessorNode[] | undefined) { const [state, setState] = useState>(new Map()); - // Initialize iteration info when processors change useEffect(() => { if (!processors) return; const newState = new Map();