feat: support iteration wrapper nodes and filter overlay by selected iteration
Server: - Add split_depth and loop_depth columns (V9 migration) - Persist splitDepth/loopDepth with reflection fallback for older agent versions UI: - Detect iterations via wrapper processorTypes (loopIteration, splitIteration, multicastBranch) - Filter overlay by selected iteration at the wrapper level - Skip non-selected iteration wrappers entirely (wrapper + children) - Don't add synthetic wrappers to overlay (no diagram node correspondence) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -74,8 +74,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
|
||||
input_body, output_body, input_headers, output_headers, attributes,
|
||||
loop_index, loop_size, split_index, split_size, multicast_index,
|
||||
resolved_endpoint_uri)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?, ?, ?, ?, ?, ?)
|
||||
resolved_endpoint_uri, split_depth, loop_depth)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
|
||||
status = EXCLUDED.status,
|
||||
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
|
||||
@@ -92,7 +92,9 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
split_index = COALESCE(EXCLUDED.split_index, processor_executions.split_index),
|
||||
split_size = COALESCE(EXCLUDED.split_size, processor_executions.split_size),
|
||||
multicast_index = COALESCE(EXCLUDED.multicast_index, processor_executions.multicast_index),
|
||||
resolved_endpoint_uri = COALESCE(EXCLUDED.resolved_endpoint_uri, processor_executions.resolved_endpoint_uri)
|
||||
resolved_endpoint_uri = COALESCE(EXCLUDED.resolved_endpoint_uri, processor_executions.resolved_endpoint_uri),
|
||||
split_depth = EXCLUDED.split_depth,
|
||||
loop_depth = EXCLUDED.loop_depth
|
||||
""",
|
||||
processors.stream().map(p -> new Object[]{
|
||||
p.executionId(), p.processorId(), p.processorType(),
|
||||
@@ -105,7 +107,9 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
p.attributes(),
|
||||
p.loopIndex(), p.loopSize(), p.splitIndex(), p.splitSize(),
|
||||
p.multicastIndex(),
|
||||
p.resolvedEndpointUri()
|
||||
p.resolvedEndpointUri(),
|
||||
p.splitDepth(),
|
||||
p.loopDepth()
|
||||
}).toList());
|
||||
}
|
||||
|
||||
@@ -164,7 +168,9 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
rs.getObject("split_index") != null ? rs.getInt("split_index") : null,
|
||||
rs.getObject("split_size") != null ? rs.getInt("split_size") : null,
|
||||
rs.getObject("multicast_index") != null ? rs.getInt("multicast_index") : null,
|
||||
rs.getString("resolved_endpoint_uri"));
|
||||
rs.getString("resolved_endpoint_uri"),
|
||||
rs.getInt("split_depth"),
|
||||
rs.getInt("loop_depth"));
|
||||
|
||||
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
|
||||
Timestamp ts = rs.getTimestamp(column);
|
||||
|
||||
@@ -1 +1,3 @@
|
||||
ALTER TABLE processor_executions ADD COLUMN resolved_endpoint_uri TEXT;
|
||||
ALTER TABLE processor_executions ADD COLUMN split_depth INTEGER DEFAULT 0;
|
||||
ALTER TABLE processor_executions ADD COLUMN loop_depth INTEGER DEFAULT 0;
|
||||
|
||||
@@ -132,7 +132,9 @@ public class IngestionService {
|
||||
p.getLoopIndex(), p.getLoopSize(),
|
||||
p.getSplitIndex(), p.getSplitSize(),
|
||||
p.getMulticastIndex(),
|
||||
p.getResolvedEndpointUri()
|
||||
p.getResolvedEndpointUri(),
|
||||
getDepthSafe(p, "splitDepth"),
|
||||
getDepthSafe(p, "loopDepth")
|
||||
));
|
||||
if (p.getChildren() != null) {
|
||||
flat.addAll(flattenProcessors(
|
||||
@@ -143,6 +145,16 @@ public class IngestionService {
|
||||
return flat;
|
||||
}
|
||||
|
||||
/** Safely call getSplitDepth()/getLoopDepth() — returns 0 if not available in this cameleer3-common version. */
|
||||
private static int getDepthSafe(ProcessorExecution p, String field) {
|
||||
try {
|
||||
var method = p.getClass().getMethod("get" + Character.toUpperCase(field.charAt(0)) + field.substring(1));
|
||||
return (int) method.invoke(p);
|
||||
} catch (Exception e) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private String truncateBody(String body) {
|
||||
if (body == null) return null;
|
||||
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
|
||||
|
||||
@@ -39,6 +39,8 @@ public interface ExecutionStore {
|
||||
Integer loopIndex, Integer loopSize,
|
||||
Integer splitIndex, Integer splitSize,
|
||||
Integer multicastIndex,
|
||||
String resolvedEndpointUri
|
||||
String resolvedEndpointUri,
|
||||
int splitDepth,
|
||||
int loopDepth
|
||||
) {}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,26 @@
|
||||
import { useMemo } from 'react';
|
||||
import type { NodeExecutionState, IterationInfo, ProcessorNode } from './types';
|
||||
|
||||
/** Synthetic wrapper processorTypes emitted by the agent for each iteration. */
|
||||
const ITERATION_WRAPPER_TYPES = new Set([
|
||||
'loopIteration', 'splitIteration', 'multicastBranch',
|
||||
]);
|
||||
|
||||
/**
|
||||
* Extract the iteration index from a wrapper node.
|
||||
*/
|
||||
function wrapperIndex(proc: ProcessorNode): number | undefined {
|
||||
return proc.loopIndex ?? proc.splitIndex ?? proc.multicastIndex ?? undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively walks the ProcessorNode tree and populates an overlay map
|
||||
* keyed by processorId → NodeExecutionState.
|
||||
*
|
||||
* Handles iteration filtering: when a processor has a loop/split/multicast
|
||||
* index, only include it if it matches the currently selected iteration
|
||||
* for its parent compound node.
|
||||
* Iteration wrappers (loopIteration, splitIteration, multicastBranch) are
|
||||
* used for filtering: only the wrapper matching the selected iteration
|
||||
* is recursed into. The wrapper itself is not added to the overlay
|
||||
* (it's synthetic and has no corresponding diagram node).
|
||||
*/
|
||||
function buildOverlay(
|
||||
processors: ProcessorNode[],
|
||||
@@ -19,29 +32,23 @@ function buildOverlay(
|
||||
if (!proc.processorId || !proc.status) continue;
|
||||
if (proc.status !== 'COMPLETED' && proc.status !== 'FAILED') continue;
|
||||
|
||||
// Iteration filtering: if this processor belongs to an iterated parent,
|
||||
// only include it when the index matches the selected iteration.
|
||||
if (parentId && iterationState.has(parentId)) {
|
||||
const info = iterationState.get(parentId)!;
|
||||
if (info.type === 'loop' && proc.loopIndex != null) {
|
||||
if (proc.loopIndex !== info.current) {
|
||||
// Still recurse into children so nested compounds are discovered,
|
||||
// but skip adding this processor to the overlay.
|
||||
continue;
|
||||
// Iteration wrapper: filter by selected iteration, skip the wrapper itself
|
||||
if (ITERATION_WRAPPER_TYPES.has(proc.processorType)) {
|
||||
if (parentId && iterationState.has(parentId)) {
|
||||
const info = iterationState.get(parentId)!;
|
||||
const idx = wrapperIndex(proc);
|
||||
if (idx != null && idx !== info.current) {
|
||||
continue; // Skip this wrapper and all its children
|
||||
}
|
||||
}
|
||||
if (info.type === 'split' && proc.splitIndex != null) {
|
||||
if (proc.splitIndex !== info.current) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (info.type === 'multicast' && proc.multicastIndex != null) {
|
||||
if (proc.multicastIndex !== info.current) {
|
||||
continue;
|
||||
}
|
||||
// Matching wrapper: don't add to overlay but recurse into children
|
||||
if (proc.children?.length) {
|
||||
buildOverlay(proc.children, overlay, iterationState, proc.processorId);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Regular processor: add to overlay
|
||||
const subRouteFailed =
|
||||
proc.status === 'FAILED' &&
|
||||
(proc.processorType?.includes('DIRECT') || proc.processorType?.includes('SEDA'));
|
||||
@@ -53,7 +60,7 @@ function buildOverlay(
|
||||
hasTraceData: true,
|
||||
});
|
||||
|
||||
// Recurse into children, passing this processor as the parent for iteration filtering.
|
||||
// Recurse into children
|
||||
if (proc.children?.length) {
|
||||
buildOverlay(proc.children, overlay, iterationState, proc.processorId);
|
||||
}
|
||||
|
||||
@@ -1,56 +1,33 @@
|
||||
import { useCallback, useEffect, useState } from 'react';
|
||||
import type { IterationInfo, ProcessorNode } from './types';
|
||||
|
||||
const WRAPPER_TYPES: Record<string, IterationInfo['type']> = {
|
||||
loopIteration: 'loop',
|
||||
splitIteration: 'split',
|
||||
multicastBranch: 'multicast',
|
||||
};
|
||||
|
||||
/**
|
||||
* Walks the processor tree and detects compound nodes that have iterated
|
||||
* children (loop, split, multicast). Populates a map of compoundId →
|
||||
* IterationInfo so the UI can show stepper widgets and filter iterations.
|
||||
* Walks the processor tree and detects compound nodes that have iteration
|
||||
* wrapper children (loopIteration, splitIteration, multicastBranch).
|
||||
*/
|
||||
function detectIterations(
|
||||
processors: ProcessorNode[],
|
||||
result: Map<string, IterationInfo>,
|
||||
): void {
|
||||
for (const proc of processors) {
|
||||
if (!proc.children?.length) continue;
|
||||
if (!proc.children?.length || !proc.processorId) continue;
|
||||
|
||||
// Check if children indicate a loop compound
|
||||
const loopChild = proc.children.find(
|
||||
(c) => c.loopSize != null && c.loopSize > 0,
|
||||
);
|
||||
if (loopChild && proc.processorId) {
|
||||
result.set(proc.processorId, {
|
||||
current: 0,
|
||||
total: loopChild.loopSize!,
|
||||
type: 'loop',
|
||||
});
|
||||
}
|
||||
|
||||
// Check if children indicate a split compound
|
||||
const splitChild = proc.children.find(
|
||||
(c) => c.splitSize != null && c.splitSize > 0,
|
||||
);
|
||||
if (splitChild && !loopChild && proc.processorId) {
|
||||
result.set(proc.processorId, {
|
||||
current: 0,
|
||||
total: splitChild.splitSize!,
|
||||
type: 'split',
|
||||
});
|
||||
}
|
||||
|
||||
// Check if children indicate a multicast compound
|
||||
if (!loopChild && !splitChild) {
|
||||
const multicastIndices = new Set<number>();
|
||||
for (const child of proc.children) {
|
||||
if (child.multicastIndex != null) {
|
||||
multicastIndices.add(child.multicastIndex);
|
||||
}
|
||||
}
|
||||
if (multicastIndices.size > 0 && proc.processorId) {
|
||||
// Check if children are iteration wrappers
|
||||
for (const [wrapperType, iterType] of Object.entries(WRAPPER_TYPES)) {
|
||||
const wrappers = proc.children.filter(c => c.processorType === wrapperType);
|
||||
if (wrappers.length > 0) {
|
||||
result.set(proc.processorId, {
|
||||
current: 0,
|
||||
total: multicastIndices.size,
|
||||
type: 'multicast',
|
||||
total: wrappers.length,
|
||||
type: iterType,
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,13 +39,12 @@ function detectIterations(
|
||||
/**
|
||||
* Manages per-compound iteration state for the execution overlay.
|
||||
*
|
||||
* Scans the processor tree to detect compounds with iterated children
|
||||
* and tracks which iteration index is currently selected for each.
|
||||
* Scans the processor tree to detect compounds with iteration wrapper
|
||||
* children and tracks which iteration index is currently selected.
|
||||
*/
|
||||
export function useIterationState(processors: ProcessorNode[] | undefined) {
|
||||
const [state, setState] = useState<Map<string, IterationInfo>>(new Map());
|
||||
|
||||
// Initialize iteration info when processors change
|
||||
useEffect(() => {
|
||||
if (!processors) return;
|
||||
const newState = new Map<string, IterationInfo>();
|
||||
|
||||
Reference in New Issue
Block a user