feat: expose iteration/iterationSize fields for diagram overlay
Replace synthetic wrapper node approach with direct iteration fields: - ProcessorNode gains iteration (child's index) and iterationSize (container's total) fields, populated from ClickHouse flat records - Frontend hooks detect iteration containers from iterationSize != null instead of scanning for wrapper processorTypes - useExecutionOverlay filters children by iteration field instead of wrapper nodes, eliminating ITERATION_WRAPPER_TYPES entirely - Cleaner data contract: API returns exactly what the DB stores Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -101,7 +101,7 @@ public class DetailService {
|
|||||||
p.getDurationMs(),
|
p.getDurationMs(),
|
||||||
p.getErrorMessage(), p.getErrorStackTrace(),
|
p.getErrorMessage(), p.getErrorStackTrace(),
|
||||||
p.getAttributes() != null ? new LinkedHashMap<>(p.getAttributes()) : null,
|
p.getAttributes() != null ? new LinkedHashMap<>(p.getAttributes()) : null,
|
||||||
null, null, null, null, null,
|
null, null, null, null, null, null, null,
|
||||||
p.getResolvedEndpointUri(),
|
p.getResolvedEndpointUri(),
|
||||||
p.getErrorType(), p.getErrorCategory(),
|
p.getErrorType(), p.getErrorCategory(),
|
||||||
p.getRootCauseType(), p.getRootCauseMessage(),
|
p.getRootCauseType(), p.getRootCauseMessage(),
|
||||||
@@ -144,7 +144,7 @@ public class DetailService {
|
|||||||
p.errorMessage(), p.errorStacktrace(),
|
p.errorMessage(), p.errorStacktrace(),
|
||||||
parseAttributes(p.attributes()),
|
parseAttributes(p.attributes()),
|
||||||
p.iteration(), p.iterationSize(),
|
p.iteration(), p.iterationSize(),
|
||||||
null, null, null,
|
null, null, null, null, null,
|
||||||
p.resolvedEndpointUri(),
|
p.resolvedEndpointUri(),
|
||||||
p.errorType(), p.errorCategory(),
|
p.errorType(), p.errorCategory(),
|
||||||
p.rootCauseType(), p.rootCauseMessage(),
|
p.rootCauseType(), p.rootCauseMessage(),
|
||||||
@@ -188,6 +188,7 @@ public class DetailService {
|
|||||||
p.durationMs() != null ? p.durationMs() : 0L,
|
p.durationMs() != null ? p.durationMs() : 0L,
|
||||||
p.errorMessage(), p.errorStacktrace(),
|
p.errorMessage(), p.errorStacktrace(),
|
||||||
parseAttributes(p.attributes()),
|
parseAttributes(p.attributes()),
|
||||||
|
null, null,
|
||||||
p.loopIndex(), p.loopSize(),
|
p.loopIndex(), p.loopSize(),
|
||||||
p.splitIndex(), p.splitSize(),
|
p.splitIndex(), p.splitSize(),
|
||||||
p.multicastIndex(),
|
p.multicastIndex(),
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ public final class ProcessorNode {
|
|||||||
private final String errorMessage;
|
private final String errorMessage;
|
||||||
private final String errorStackTrace;
|
private final String errorStackTrace;
|
||||||
private final Map<String, String> attributes;
|
private final Map<String, String> attributes;
|
||||||
|
private final Integer iteration;
|
||||||
|
private final Integer iterationSize;
|
||||||
private final Integer loopIndex;
|
private final Integer loopIndex;
|
||||||
private final Integer loopSize;
|
private final Integer loopSize;
|
||||||
private final Integer splitIndex;
|
private final Integer splitIndex;
|
||||||
@@ -44,6 +46,7 @@ public final class ProcessorNode {
|
|||||||
Instant startTime, Instant endTime, long durationMs,
|
Instant startTime, Instant endTime, long durationMs,
|
||||||
String errorMessage, String errorStackTrace,
|
String errorMessage, String errorStackTrace,
|
||||||
Map<String, String> attributes,
|
Map<String, String> attributes,
|
||||||
|
Integer iteration, Integer iterationSize,
|
||||||
Integer loopIndex, Integer loopSize,
|
Integer loopIndex, Integer loopSize,
|
||||||
Integer splitIndex, Integer splitSize,
|
Integer splitIndex, Integer splitSize,
|
||||||
Integer multicastIndex,
|
Integer multicastIndex,
|
||||||
@@ -63,6 +66,8 @@ public final class ProcessorNode {
|
|||||||
this.errorMessage = errorMessage;
|
this.errorMessage = errorMessage;
|
||||||
this.errorStackTrace = errorStackTrace;
|
this.errorStackTrace = errorStackTrace;
|
||||||
this.attributes = attributes;
|
this.attributes = attributes;
|
||||||
|
this.iteration = iteration;
|
||||||
|
this.iterationSize = iterationSize;
|
||||||
this.loopIndex = loopIndex;
|
this.loopIndex = loopIndex;
|
||||||
this.loopSize = loopSize;
|
this.loopSize = loopSize;
|
||||||
this.splitIndex = splitIndex;
|
this.splitIndex = splitIndex;
|
||||||
@@ -95,6 +100,8 @@ public final class ProcessorNode {
|
|||||||
public String getErrorMessage() { return errorMessage; }
|
public String getErrorMessage() { return errorMessage; }
|
||||||
public String getErrorStackTrace() { return errorStackTrace; }
|
public String getErrorStackTrace() { return errorStackTrace; }
|
||||||
public Map<String, String> getAttributes() { return attributes; }
|
public Map<String, String> getAttributes() { return attributes; }
|
||||||
|
public Integer getIteration() { return iteration; }
|
||||||
|
public Integer getIterationSize() { return iterationSize; }
|
||||||
public Integer getLoopIndex() { return loopIndex; }
|
public Integer getLoopIndex() { return loopIndex; }
|
||||||
public Integer getLoopSize() { return loopSize; }
|
public Integer getLoopSize() { return loopSize; }
|
||||||
public Integer getSplitIndex() { return splitIndex; }
|
public Integer getSplitIndex() { return splitIndex; }
|
||||||
|
|||||||
@@ -192,17 +192,23 @@ class TreeReconstructionTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
void buildTree_seqBasedModel_iterationFields() {
|
void buildTree_seqBasedModel_iterationFields() {
|
||||||
// Verify iteration/iterationSize are populated as loopIndex/loopSize
|
// Verify iteration/iterationSize are populated on the correct nodes
|
||||||
List<ProcessorRecord> processors = List.of(
|
List<ProcessorRecord> processors = List.of(
|
||||||
procWithSeq("loop1", "loop", "COMPLETED", 1, null, null, null),
|
procWithSeq("split1", "split", "COMPLETED", 1, null, null, 3),
|
||||||
procWithSeq("body", "log", "COMPLETED", 2, 1, 5, 10)
|
procWithSeq("body", "log", "COMPLETED", 2, 1, 0, null),
|
||||||
|
procWithSeq("body", "log", "COMPLETED", 3, 1, 1, null),
|
||||||
|
procWithSeq("body", "log", "COMPLETED", 4, 1, 2, null)
|
||||||
);
|
);
|
||||||
|
|
||||||
List<ProcessorNode> roots = detailService.buildTree(processors);
|
List<ProcessorNode> roots = detailService.buildTree(processors);
|
||||||
|
|
||||||
assertThat(roots).hasSize(1);
|
assertThat(roots).hasSize(1);
|
||||||
ProcessorNode child = roots.get(0).getChildren().get(0);
|
ProcessorNode container = roots.get(0);
|
||||||
assertThat(child.getLoopIndex()).isEqualTo(5);
|
assertThat(container.getIterationSize()).isEqualTo(3);
|
||||||
assertThat(child.getLoopSize()).isEqualTo(10);
|
assertThat(container.getIteration()).isNull();
|
||||||
|
assertThat(container.getChildren()).hasSize(3);
|
||||||
|
assertThat(container.getChildren().get(0).getIteration()).isEqualTo(0);
|
||||||
|
assertThat(container.getChildren().get(1).getIteration()).isEqualTo(1);
|
||||||
|
assertThat(container.getChildren().get(2).getIteration()).isEqualTo(2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
4
ui/src/api/schema.d.ts
vendored
4
ui/src/api/schema.d.ts
vendored
@@ -1852,6 +1852,10 @@ export interface components {
|
|||||||
[key: string]: string;
|
[key: string]: string;
|
||||||
};
|
};
|
||||||
/** Format: int32 */
|
/** Format: int32 */
|
||||||
|
iteration: number;
|
||||||
|
/** Format: int32 */
|
||||||
|
iterationSize: number;
|
||||||
|
/** Format: int32 */
|
||||||
loopIndex: number;
|
loopIndex: number;
|
||||||
/** Format: int32 */
|
/** Format: int32 */
|
||||||
loopSize: number;
|
loopSize: number;
|
||||||
|
|||||||
@@ -20,18 +20,10 @@ interface ExecutionDiagramProps {
|
|||||||
className?: string;
|
className?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
const ITERATION_WRAPPER_TYPES = new Set([
|
|
||||||
'loopIteration', 'splitIteration', 'multicastBranch',
|
|
||||||
]);
|
|
||||||
|
|
||||||
function wrapperIndex(proc: ProcessorNode): number | undefined {
|
|
||||||
return proc.loopIndex ?? proc.splitIndex ?? proc.multicastIndex ?? undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find a processor in the tree, respecting iteration filtering.
|
* Find a processor in the tree, respecting iteration filtering.
|
||||||
* Only recurses into the selected iteration wrapper so the returned
|
* Only recurses into children matching the selected iteration index
|
||||||
* ProcessorNode has data from the correct iteration.
|
* so the returned ProcessorNode has data from the correct iteration.
|
||||||
*/
|
*/
|
||||||
function findProcessorInTree(
|
function findProcessorInTree(
|
||||||
nodes: ProcessorNode[] | undefined,
|
nodes: ProcessorNode[] | undefined,
|
||||||
@@ -43,18 +35,10 @@ function findProcessorInTree(
|
|||||||
for (const n of nodes) {
|
for (const n of nodes) {
|
||||||
if (!n.processorId) continue;
|
if (!n.processorId) continue;
|
||||||
|
|
||||||
// Iteration wrapper: only recurse into the selected iteration
|
// If parent is an iteration container, skip children from other iterations
|
||||||
if (ITERATION_WRAPPER_TYPES.has(n.processorType)) {
|
if (parentId && iterationState?.has(parentId)) {
|
||||||
if (parentId && iterationState?.has(parentId)) {
|
const info = iterationState.get(parentId)!;
|
||||||
const info = iterationState.get(parentId)!;
|
if (n.iteration != null && n.iteration !== info.current) continue;
|
||||||
const idx = wrapperIndex(n);
|
|
||||||
if (idx != null && idx !== info.current) continue;
|
|
||||||
}
|
|
||||||
if (n.children) {
|
|
||||||
const found = findProcessorInTree(n.children, processorId, iterationState, n.processorId);
|
|
||||||
if (found) return found;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (n.processorId === processorId) return n;
|
if (n.processorId === processorId) return n;
|
||||||
|
|||||||
@@ -1,26 +1,12 @@
|
|||||||
import { useMemo } from 'react';
|
import { useMemo } from 'react';
|
||||||
import type { NodeExecutionState, IterationInfo, ProcessorNode } from './types';
|
import type { NodeExecutionState, IterationInfo, ProcessorNode } from './types';
|
||||||
|
|
||||||
/** Synthetic wrapper processorTypes emitted by the agent for each iteration. */
|
|
||||||
const ITERATION_WRAPPER_TYPES = new Set([
|
|
||||||
'loopIteration', 'splitIteration', 'multicastBranch',
|
|
||||||
]);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Extract the iteration index from a wrapper node.
|
|
||||||
*/
|
|
||||||
function wrapperIndex(proc: ProcessorNode): number | undefined {
|
|
||||||
return proc.loopIndex ?? proc.splitIndex ?? proc.multicastIndex ?? undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recursively walks the ProcessorNode tree and populates an overlay map
|
* Recursively walks the ProcessorNode tree and populates an overlay map
|
||||||
* keyed by processorId → NodeExecutionState.
|
* keyed by processorId → NodeExecutionState.
|
||||||
*
|
*
|
||||||
* Iteration wrappers (loopIteration, splitIteration, multicastBranch) are
|
* For iteration containers (nodes with iterationSize), only children
|
||||||
* used for filtering: only the wrapper matching the selected iteration
|
* whose `iteration` matches the selected index are included.
|
||||||
* is recursed into. The wrapper itself is not added to the overlay
|
|
||||||
* (it's synthetic and has no corresponding diagram node).
|
|
||||||
*/
|
*/
|
||||||
function buildOverlay(
|
function buildOverlay(
|
||||||
processors: ProcessorNode[],
|
processors: ProcessorNode[],
|
||||||
@@ -31,21 +17,12 @@ function buildOverlay(
|
|||||||
for (const proc of processors) {
|
for (const proc of processors) {
|
||||||
if (!proc.processorId) continue;
|
if (!proc.processorId) continue;
|
||||||
|
|
||||||
// Iteration wrapper: filter by selected iteration, skip the wrapper itself.
|
// If this node's parent is an iteration container, filter by selected iteration
|
||||||
// Must be checked before the status filter — wrappers may not have a status.
|
if (parentId && iterationState.has(parentId)) {
|
||||||
if (ITERATION_WRAPPER_TYPES.has(proc.processorType)) {
|
const info = iterationState.get(parentId)!;
|
||||||
if (parentId && iterationState.has(parentId)) {
|
if (proc.iteration != null && proc.iteration !== info.current) {
|
||||||
const info = iterationState.get(parentId)!;
|
continue; // Different iteration than selected — skip this subtree
|
||||||
const idx = wrapperIndex(proc);
|
|
||||||
if (idx != null && idx !== info.current) {
|
|
||||||
continue; // Skip this wrapper and all its children
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Matching wrapper: don't add to overlay but recurse into children
|
|
||||||
if (proc.children?.length) {
|
|
||||||
buildOverlay(proc.children, overlay, iterationState, proc.processorId);
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Regular processor: only include completed/failed nodes
|
// Regular processor: only include completed/failed nodes
|
||||||
|
|||||||
@@ -1,46 +1,45 @@
|
|||||||
import { useCallback, useEffect, useState } from 'react';
|
import { useCallback, useEffect, useState } from 'react';
|
||||||
import type { IterationInfo, ProcessorNode } from './types';
|
import type { IterationInfo, ProcessorNode } from './types';
|
||||||
|
|
||||||
const WRAPPER_TYPES: Record<string, IterationInfo['type']> = {
|
/** Map container processorType to iteration display type. */
|
||||||
loopIteration: 'loop',
|
function inferIterationType(processorType: string | undefined): IterationInfo['type'] {
|
||||||
splitIteration: 'split',
|
switch (processorType) {
|
||||||
multicastBranch: 'multicast',
|
case 'loop': return 'loop';
|
||||||
};
|
case 'multicast': return 'multicast';
|
||||||
|
default: return 'split'; // split, recipientList, and any future iterating EIP
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Walks the processor tree and detects compound nodes that have iteration
|
* Walks the processor tree and detects iteration containers —
|
||||||
* wrapper children (loopIteration, splitIteration, multicastBranch).
|
* any node with a non-null iterationSize.
|
||||||
*/
|
*/
|
||||||
function detectIterations(
|
function detectIterations(
|
||||||
processors: ProcessorNode[],
|
processors: ProcessorNode[],
|
||||||
result: Map<string, IterationInfo>,
|
result: Map<string, IterationInfo>,
|
||||||
): void {
|
): void {
|
||||||
for (const proc of processors) {
|
for (const proc of processors) {
|
||||||
if (!proc.children?.length || !proc.processorId) continue;
|
if (!proc.processorId) continue;
|
||||||
|
|
||||||
// Check if children are iteration wrappers
|
if (proc.iterationSize != null) {
|
||||||
for (const [wrapperType, iterType] of Object.entries(WRAPPER_TYPES)) {
|
result.set(proc.processorId, {
|
||||||
const wrappers = proc.children.filter(c => c.processorType === wrapperType);
|
current: 0,
|
||||||
if (wrappers.length > 0) {
|
total: proc.iterationSize,
|
||||||
result.set(proc.processorId, {
|
type: inferIterationType(proc.processorType),
|
||||||
current: 0,
|
});
|
||||||
total: wrappers.length,
|
|
||||||
type: iterType,
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recurse into children to find nested iterations
|
if (proc.children?.length) {
|
||||||
detectIterations(proc.children, result);
|
detectIterations(proc.children, result);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages per-compound iteration state for the execution overlay.
|
* Manages per-compound iteration state for the execution overlay.
|
||||||
*
|
*
|
||||||
* Scans the processor tree to detect compounds with iteration wrapper
|
* Scans the processor tree to detect containers with iterationSize
|
||||||
* children and tracks which iteration index is currently selected.
|
* and tracks which iteration index is currently selected.
|
||||||
*/
|
*/
|
||||||
export function useIterationState(processors: ProcessorNode[] | undefined) {
|
export function useIterationState(processors: ProcessorNode[] | undefined) {
|
||||||
const [state, setState] = useState<Map<string, IterationInfo>>(new Map());
|
const [state, setState] = useState<Map<string, IterationInfo>>(new Map());
|
||||||
|
|||||||
Reference in New Issue
Block a user