feat: seq-based tree reconstruction for ClickHouse flat processor model

Dual-mode buildTree: detects seq presence and uses seq/parentSeq linkage
instead of processorId map. Handles duplicate processorIds across
iterations correctly. Old processorId-based mode kept for PG compat.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-01 00:07:20 +02:00
parent 0661fd995f
commit 151b96a680
2 changed files with 166 additions and 2 deletions

View File

@@ -64,6 +64,18 @@ public class DetailService {
});
}
public Optional<Map<String, String>> getProcessorSnapshotBySeq(String executionId, int seq) {
return executionStore.findProcessorBySeq(executionId, seq)
.map(p -> {
Map<String, String> snapshot = new LinkedHashMap<>();
if (p.inputBody() != null) snapshot.put("inputBody", p.inputBody());
if (p.outputBody() != null) snapshot.put("outputBody", p.outputBody());
if (p.inputHeaders() != null) snapshot.put("inputHeaders", p.inputHeaders());
if (p.outputHeaders() != null) snapshot.put("outputHeaders", p.outputHeaders());
return snapshot;
});
}
/** Parse the raw processor tree JSON stored alongside the execution. */
private List<ProcessorNode> parseProcessorsJson(String json) {
if (json == null || json.isBlank()) return null;
@@ -104,12 +116,68 @@ public class DetailService {
}
/**
* Fallback: reconstruct processor tree from flat records.
* Note: this loses iteration context for processors with the same ID across iterations.
* Reconstruct processor tree from flat records.
* Detects whether records use the seq-based model (ClickHouse) or
* processorId-based model (PostgreSQL) and delegates accordingly.
*/
List<ProcessorNode> buildTree(List<ProcessorRecord> processors) {
if (processors.isEmpty()) return List.of();
boolean hasSeq = processors.stream().anyMatch(p -> p.seq() != null);
return hasSeq ? buildTreeBySeq(processors) : buildTreeByProcessorId(processors);
}
/**
* Seq-based tree reconstruction for ClickHouse flat processor model.
* Uses seq/parentSeq linkage, correctly handling duplicate processorIds
* across iterations (e.g., the same processor inside a split running N times).
*/
private List<ProcessorNode> buildTreeBySeq(List<ProcessorRecord> processors) {
Map<Integer, ProcessorNode> nodeBySeq = new LinkedHashMap<>();
for (ProcessorRecord p : processors) {
boolean hasTrace = p.inputBody() != null || p.outputBody() != null
|| p.inputHeaders() != null || p.outputHeaders() != null;
ProcessorNode node = new ProcessorNode(
p.processorId(), p.processorType(), p.status(),
p.startTime(), p.endTime(),
p.durationMs() != null ? p.durationMs() : 0L,
p.errorMessage(), p.errorStacktrace(),
parseAttributes(p.attributes()),
p.iteration(), p.iterationSize(),
null, null, null,
p.resolvedEndpointUri(),
p.errorType(), p.errorCategory(),
p.rootCauseType(), p.rootCauseMessage(),
null, p.circuitBreakerState(),
p.fallbackTriggered(),
p.filterMatched(), p.duplicateMessage(),
hasTrace
);
nodeBySeq.put(p.seq(), node);
}
List<ProcessorNode> roots = new ArrayList<>();
for (ProcessorRecord p : processors) {
ProcessorNode node = nodeBySeq.get(p.seq());
if (p.parentSeq() == null) {
roots.add(node);
} else {
ProcessorNode parent = nodeBySeq.get(p.parentSeq());
if (parent != null) {
parent.addChild(node);
} else {
roots.add(node); // orphan safety
}
}
}
return roots;
}
/**
* ProcessorId-based tree reconstruction for PostgreSQL flat records.
* Note: this loses iteration context for processors with the same ID across iterations.
*/
private List<ProcessorNode> buildTreeByProcessorId(List<ProcessorRecord> processors) {
Map<String, ProcessorNode> nodeMap = new LinkedHashMap<>();
for (ProcessorRecord p : processors) {
boolean hasTrace = p.inputBody() != null || p.outputBody() != null

View File

@@ -109,4 +109,100 @@ class TreeReconstructionTest {
List<ProcessorNode> roots = detailService.buildTree(List.of());
assertThat(roots).isEmpty();
}
// --- seq-based model tests (ClickHouse) ---
private ProcessorRecord procWithSeq(String id, String type, String status,
int seq, Integer parentSeq,
Integer iteration, Integer iterationSize) {
return new ProcessorRecord(
"exec-1", id, type, "app", "route1",
0, null, status, NOW, NOW, 10L,
null, null, null, null, null, null, null,
null, null, null, null, null,
null, null, null, null, null, null, null, null,
seq, parentSeq, iteration, iterationSize, null, null
);
}
@Test
void buildTree_seqBasedModel_linearChain() {
List<ProcessorRecord> processors = List.of(
procWithSeq("from", "from", "COMPLETED", 1, null, null, null),
procWithSeq("log1", "log", "COMPLETED", 2, 1, null, null),
procWithSeq("to1", "to", "COMPLETED", 3, 2, null, null)
);
List<ProcessorNode> roots = detailService.buildTree(processors);
assertThat(roots).hasSize(1);
ProcessorNode root = roots.get(0);
assertThat(root.getProcessorId()).isEqualTo("from");
assertThat(root.getChildren()).hasSize(1);
ProcessorNode child = root.getChildren().get(0);
assertThat(child.getProcessorId()).isEqualTo("log1");
assertThat(child.getChildren()).hasSize(1);
ProcessorNode grandchild = child.getChildren().get(0);
assertThat(grandchild.getProcessorId()).isEqualTo("to1");
assertThat(grandchild.getChildren()).isEmpty();
}
@Test
void buildTree_seqBasedModel_sameProcessorIdMultipleIterations() {
// A split processor (seq 1) with 3 child processors all having the SAME
// processorId but different seq values — this is the key scenario that
// breaks the old processorId-based approach.
List<ProcessorRecord> processors = List.of(
procWithSeq("split1", "split", "COMPLETED", 1, null, null, null),
procWithSeq("log-inside", "log", "COMPLETED", 2, 1, 0, 3),
procWithSeq("log-inside", "log", "COMPLETED", 3, 1, 1, 3),
procWithSeq("log-inside", "log", "COMPLETED", 4, 1, 2, 3)
);
List<ProcessorNode> roots = detailService.buildTree(processors);
assertThat(roots).hasSize(1);
ProcessorNode split = roots.get(0);
assertThat(split.getProcessorId()).isEqualTo("split1");
assertThat(split.getChildren()).hasSize(3);
// All three children should have the same processorId
for (ProcessorNode child : split.getChildren()) {
assertThat(child.getProcessorId()).isEqualTo("log-inside");
}
}
@Test
void buildTree_seqBasedModel_orphanSafety() {
// A processor whose parentSeq points to a non-existent seq
List<ProcessorRecord> processors = List.of(
procWithSeq("root", "from", "COMPLETED", 1, null, null, null),
procWithSeq("orphan", "log", "COMPLETED", 2, 999, null, null)
);
List<ProcessorNode> roots = detailService.buildTree(processors);
// Both should be roots — the orphan falls through to root list
assertThat(roots).hasSize(2);
assertThat(roots.get(0).getProcessorId()).isEqualTo("root");
assertThat(roots.get(1).getProcessorId()).isEqualTo("orphan");
}
@Test
void buildTree_seqBasedModel_iterationFields() {
// Verify iteration/iterationSize are populated as loopIndex/loopSize
List<ProcessorRecord> processors = List.of(
procWithSeq("loop1", "loop", "COMPLETED", 1, null, null, null),
procWithSeq("body", "log", "COMPLETED", 2, 1, 5, 10)
);
List<ProcessorNode> roots = detailService.buildTree(processors);
assertThat(roots).hasSize(1);
ProcessorNode child = roots.get(0).getChildren().get(0);
assertThat(child.getLoopIndex()).isEqualTo(5);
assertThat(child.getLoopSize()).isEqualTo(10);
}
}