feat: store raw processor tree JSON and add error categorization fields
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m0s
CI / docker (push) Successful in 53s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 37s

Fixes iteration overlay corruption caused by flat storage collapsing
duplicate processorIds across loop iterations.

Server:
- Store raw processor tree as processors_json JSONB on executions table
- Detail endpoint serves from processors_json (faithful tree), falls back
  to flat record reconstruction for older executions
- V10 migration: processors_json, error categorization (errorType,
  errorCategory, rootCauseType, rootCauseMessage), OTel (traceId, spanId),
  circuit breaker (circuitBreakerState, fallbackTriggered), drops
  erroneous splitDepth/loopDepth columns
- Add all new fields through full ingestion/storage/API chain

UI:
- Fix overlay wrapper filtering: check wrapper type before status filter
- Add new fields to schema.d.ts

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-28 21:44:54 +01:00
parent f12f5f3c8d
commit 30344d29b1
12 changed files with 226 additions and 48 deletions

View File

@@ -28,8 +28,13 @@ public class PostgresExecutionStore implements ExecutionStore {
status, correlation_id, exchange_id, start_time, end_time,
duration_ms, error_message, error_stacktrace, diagram_content_hash,
engine_level, input_body, output_body, input_headers, output_headers,
attributes, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, now(), now())
attributes,
error_type, error_category, root_cause_type, root_cause_message,
trace_id, span_id,
processors_json,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb,
?, ?, ?, ?, ?, ?, ?::jsonb, now(), now())
ON CONFLICT (execution_id, start_time) DO UPDATE SET
status = CASE
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
@@ -49,6 +54,13 @@ public class PostgresExecutionStore implements ExecutionStore {
input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers),
output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers),
attributes = COALESCE(EXCLUDED.attributes, executions.attributes),
error_type = COALESCE(EXCLUDED.error_type, executions.error_type),
error_category = COALESCE(EXCLUDED.error_category, executions.error_category),
root_cause_type = COALESCE(EXCLUDED.root_cause_type, executions.root_cause_type),
root_cause_message = COALESCE(EXCLUDED.root_cause_message, executions.root_cause_message),
trace_id = COALESCE(EXCLUDED.trace_id, executions.trace_id),
span_id = COALESCE(EXCLUDED.span_id, executions.span_id),
processors_json = COALESCE(EXCLUDED.processors_json, executions.processors_json),
updated_at = now()
""",
execution.executionId(), execution.routeId(), execution.agentId(),
@@ -61,7 +73,11 @@ public class PostgresExecutionStore implements ExecutionStore {
execution.engineLevel(),
execution.inputBody(), execution.outputBody(),
execution.inputHeaders(), execution.outputHeaders(),
execution.attributes());
execution.attributes(),
execution.errorType(), execution.errorCategory(),
execution.rootCauseType(), execution.rootCauseMessage(),
execution.traceId(), execution.spanId(),
execution.processorsJson());
}
@Override
@@ -74,8 +90,11 @@ public class PostgresExecutionStore implements ExecutionStore {
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
input_body, output_body, input_headers, output_headers, attributes,
loop_index, loop_size, split_index, split_size, multicast_index,
resolved_endpoint_uri, split_depth, loop_depth)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?, ?, ?, ?, ?, ?, ?, ?)
resolved_endpoint_uri,
error_type, error_category, root_cause_type, root_cause_message,
error_handler_type, circuit_breaker_state, fallback_triggered)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
status = EXCLUDED.status,
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
@@ -93,8 +112,13 @@ public class PostgresExecutionStore implements ExecutionStore {
split_size = COALESCE(EXCLUDED.split_size, processor_executions.split_size),
multicast_index = COALESCE(EXCLUDED.multicast_index, processor_executions.multicast_index),
resolved_endpoint_uri = COALESCE(EXCLUDED.resolved_endpoint_uri, processor_executions.resolved_endpoint_uri),
split_depth = EXCLUDED.split_depth,
loop_depth = EXCLUDED.loop_depth
error_type = COALESCE(EXCLUDED.error_type, processor_executions.error_type),
error_category = COALESCE(EXCLUDED.error_category, processor_executions.error_category),
root_cause_type = COALESCE(EXCLUDED.root_cause_type, processor_executions.root_cause_type),
root_cause_message = COALESCE(EXCLUDED.root_cause_message, processor_executions.root_cause_message),
error_handler_type = COALESCE(EXCLUDED.error_handler_type, processor_executions.error_handler_type),
circuit_breaker_state = COALESCE(EXCLUDED.circuit_breaker_state, processor_executions.circuit_breaker_state),
fallback_triggered = COALESCE(EXCLUDED.fallback_triggered, processor_executions.fallback_triggered)
""",
processors.stream().map(p -> new Object[]{
p.executionId(), p.processorId(), p.processorType(),
@@ -108,8 +132,10 @@ public class PostgresExecutionStore implements ExecutionStore {
p.loopIndex(), p.loopSize(), p.splitIndex(), p.splitSize(),
p.multicastIndex(),
p.resolvedEndpointUri(),
p.splitDepth(),
p.loopDepth()
p.errorType(), p.errorCategory(),
p.rootCauseType(), p.rootCauseMessage(),
p.errorHandlerType(), p.circuitBreakerState(),
p.fallbackTriggered()
}).toList());
}
@@ -148,7 +174,11 @@ public class PostgresExecutionStore implements ExecutionStore {
rs.getString("engine_level"),
rs.getString("input_body"), rs.getString("output_body"),
rs.getString("input_headers"), rs.getString("output_headers"),
rs.getString("attributes"));
rs.getString("attributes"),
rs.getString("error_type"), rs.getString("error_category"),
rs.getString("root_cause_type"), rs.getString("root_cause_message"),
rs.getString("trace_id"), rs.getString("span_id"),
rs.getString("processors_json"));
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
new ProcessorRecord(
@@ -169,8 +199,10 @@ public class PostgresExecutionStore implements ExecutionStore {
rs.getObject("split_size") != null ? rs.getInt("split_size") : null,
rs.getObject("multicast_index") != null ? rs.getInt("multicast_index") : null,
rs.getString("resolved_endpoint_uri"),
rs.getInt("split_depth"),
rs.getInt("loop_depth"));
rs.getString("error_type"), rs.getString("error_category"),
rs.getString("root_cause_type"), rs.getString("root_cause_message"),
rs.getString("error_handler_type"), rs.getString("circuit_breaker_state"),
rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null);
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
Timestamp ts = rs.getTimestamp(column);

View File

@@ -0,0 +1,23 @@
-- executions: store raw processor tree for faithful detail response
ALTER TABLE executions ADD COLUMN processors_json JSONB;
-- executions: error categorization + OTel tracing
ALTER TABLE executions ADD COLUMN error_type TEXT;
ALTER TABLE executions ADD COLUMN error_category TEXT;
ALTER TABLE executions ADD COLUMN root_cause_type TEXT;
ALTER TABLE executions ADD COLUMN root_cause_message TEXT;
ALTER TABLE executions ADD COLUMN trace_id TEXT;
ALTER TABLE executions ADD COLUMN span_id TEXT;
-- processor_executions: error categorization + circuit breaker
ALTER TABLE processor_executions ADD COLUMN error_type TEXT;
ALTER TABLE processor_executions ADD COLUMN error_category TEXT;
ALTER TABLE processor_executions ADD COLUMN root_cause_type TEXT;
ALTER TABLE processor_executions ADD COLUMN root_cause_message TEXT;
ALTER TABLE processor_executions ADD COLUMN error_handler_type TEXT;
ALTER TABLE processor_executions ADD COLUMN circuit_breaker_state TEXT;
ALTER TABLE processor_executions ADD COLUMN fallback_triggered BOOLEAN;
-- Remove erroneous depth columns from V9
ALTER TABLE processor_executions DROP COLUMN IF EXISTS split_depth;
ALTER TABLE processor_executions DROP COLUMN IF EXISTS loop_depth;

View File

@@ -26,7 +26,8 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
"COMPLETED", "corr-1", "exchange-1",
now, now.plusMillis(100), 100L,
null, null, null,
"REGULAR", null, null, null, null, null);
"REGULAR", null, null, null, null, null,
null, null, null, null, null, null, null);
executionStore.upsert(record);
Optional<ExecutionRecord> found = executionStore.findById("exec-1");
@@ -43,11 +44,13 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
ExecutionRecord first = new ExecutionRecord(
"exec-dup", "route-a", "agent-1", "app-1",
"RUNNING", null, null, now, null, null, null, null, null,
null, null, null, null, null, null);
null, null, null, null, null, null,
null, null, null, null, null, null, null);
ExecutionRecord second = new ExecutionRecord(
"exec-dup", "route-a", "agent-1", "app-1",
"COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null,
"COMPLETE", null, null, null, null, null);
"COMPLETE", null, null, null, null, null,
null, null, null, null, null, null, null);
executionStore.upsert(first);
executionStore.upsert(second);
@@ -64,7 +67,8 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
ExecutionRecord exec = new ExecutionRecord(
"exec-proc", "route-a", "agent-1", "app-1",
"COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null,
"COMPLETE", null, null, null, null, null);
"COMPLETE", null, null, null, null, null,
null, null, null, null, null, null, null);
executionStore.upsert(exec);
List<ProcessorRecord> processors = List.of(
@@ -73,13 +77,13 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
now, now.plusMillis(10), 10L, null, null,
"input body", "output body", null, null, null,
null, null, null, null, null,
null, 0, 0),
null, null, null, null, null, null, null, null),
new ProcessorRecord("exec-proc", "proc-2", "to",
"app-1", "route-a", 1, "proc-1", "COMPLETED",
now.plusMillis(10), now.plusMillis(30), 20L, null, null,
null, null, null, null, null,
null, null, null, null, null,
null, 0, 0)
null, null, null, null, null, null, null, null)
);
executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors);

View File

@@ -60,6 +60,7 @@ class PostgresStatsStoreIT extends AbstractPostgresIT {
id, routeId, "agent-1", applicationName, status, null, null,
startTime, startTime.plusMillis(durationMs), durationMs,
status.equals("FAILED") ? "error" : null, null, null,
null, null, null, null, null, null));
null, null, null, null, null, null,
null, null, null, null, null, null, null));
}
}

View File

@@ -1,16 +1,20 @@
package com.cameleer3.server.core.detail;
import com.cameleer3.common.model.ProcessorExecution;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;
public class DetailService {
private static final ObjectMapper JSON = new ObjectMapper();
private static final ObjectMapper JSON = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static final TypeReference<Map<String, String>> STR_MAP = new TypeReference<>() {};
private static final TypeReference<List<ProcessorExecution>> PROCESSOR_EXEC_LIST = new TypeReference<>() {};
private final ExecutionStore executionStore;
@@ -21,8 +25,14 @@ public class DetailService {
public Optional<ExecutionDetail> getDetail(String executionId) {
return executionStore.findById(executionId)
.map(exec -> {
List<ProcessorRecord> processors = executionStore.findProcessors(executionId);
List<ProcessorNode> roots = buildTree(processors);
// Prefer the raw processor tree (faithful to agent data) over
// flat-record reconstruction (which loses iteration context).
List<ProcessorNode> processors = parseProcessorsJson(exec.processorsJson());
if (processors == null) {
// Fallback for executions ingested before processors_json was added
List<ProcessorRecord> records = executionStore.findProcessors(executionId);
processors = buildTree(records);
}
return new ExecutionDetail(
exec.executionId(), exec.routeId(), exec.agentId(),
exec.applicationName(),
@@ -30,10 +40,13 @@ public class DetailService {
exec.durationMs() != null ? exec.durationMs() : 0L,
exec.correlationId(), exec.exchangeId(),
exec.errorMessage(), exec.errorStacktrace(),
exec.diagramContentHash(), roots,
exec.diagramContentHash(), processors,
exec.inputBody(), exec.outputBody(),
exec.inputHeaders(), exec.outputHeaders(),
parseAttributes(exec.attributes())
parseAttributes(exec.attributes()),
exec.errorType(), exec.errorCategory(),
exec.rootCauseType(), exec.rootCauseMessage(),
exec.traceId(), exec.spanId()
);
});
}
@@ -50,6 +63,50 @@ public class DetailService {
});
}
/** Parse the raw processor tree JSON stored alongside the execution. */
private List<ProcessorNode> parseProcessorsJson(String json) {
if (json == null || json.isBlank()) return null;
try {
List<ProcessorExecution> executions = JSON.readValue(json, PROCESSOR_EXEC_LIST);
return convertProcessors(executions);
} catch (Exception e) {
return null;
}
}
/** Convert agent ProcessorExecution tree to detail ProcessorNode tree. */
private List<ProcessorNode> convertProcessors(List<ProcessorExecution> executions) {
if (executions == null) return List.of();
List<ProcessorNode> result = new ArrayList<>();
for (ProcessorExecution p : executions) {
ProcessorNode node = new ProcessorNode(
p.getProcessorId(), p.getProcessorType(),
p.getStatus() != null ? p.getStatus().name() : null,
p.getStartTime(), p.getEndTime(),
p.getDurationMs(),
p.getErrorMessage(), p.getErrorStackTrace(),
p.getAttributes() != null ? new LinkedHashMap<>(p.getAttributes()) : null,
p.getLoopIndex(), p.getLoopSize(),
p.getSplitIndex(), p.getSplitSize(),
p.getMulticastIndex(),
p.getResolvedEndpointUri(),
p.getErrorType(), p.getErrorCategory(),
p.getRootCauseType(), p.getRootCauseMessage(),
p.getErrorHandlerType(), p.getCircuitBreakerState(),
p.getFallbackTriggered()
);
for (ProcessorNode child : convertProcessors(p.getChildren())) {
node.addChild(child);
}
result.add(node);
}
return result;
}
/**
* Fallback: reconstruct processor tree from flat records.
* Note: this loses iteration context for processors with the same ID across iterations.
*/
List<ProcessorNode> buildTree(List<ProcessorRecord> processors) {
if (processors.isEmpty()) return List.of();
@@ -64,7 +121,11 @@ public class DetailService {
p.loopIndex(), p.loopSize(),
p.splitIndex(), p.splitSize(),
p.multicastIndex(),
p.resolvedEndpointUri()
p.resolvedEndpointUri(),
p.errorType(), p.errorCategory(),
p.rootCauseType(), p.rootCauseMessage(),
p.errorHandlerType(), p.circuitBreakerState(),
p.fallbackTriggered()
));
}

View File

@@ -47,6 +47,12 @@ public record ExecutionDetail(
String outputBody,
String inputHeaders,
String outputHeaders,
Map<String, String> attributes
Map<String, String> attributes,
String errorType,
String errorCategory,
String rootCauseType,
String rootCauseMessage,
String traceId,
String spanId
) {
}

View File

@@ -28,6 +28,13 @@ public final class ProcessorNode {
private final Integer splitSize;
private final Integer multicastIndex;
private final String resolvedEndpointUri;
private final String errorType;
private final String errorCategory;
private final String rootCauseType;
private final String rootCauseMessage;
private final String errorHandlerType;
private final String circuitBreakerState;
private final Boolean fallbackTriggered;
private final List<ProcessorNode> children;
public ProcessorNode(String processorId, String processorType, String status,
@@ -37,7 +44,11 @@ public final class ProcessorNode {
Integer loopIndex, Integer loopSize,
Integer splitIndex, Integer splitSize,
Integer multicastIndex,
String resolvedEndpointUri) {
String resolvedEndpointUri,
String errorType, String errorCategory,
String rootCauseType, String rootCauseMessage,
String errorHandlerType, String circuitBreakerState,
Boolean fallbackTriggered) {
this.processorId = processorId;
this.processorType = processorType;
this.status = status;
@@ -53,6 +64,13 @@ public final class ProcessorNode {
this.splitSize = splitSize;
this.multicastIndex = multicastIndex;
this.resolvedEndpointUri = resolvedEndpointUri;
this.errorType = errorType;
this.errorCategory = errorCategory;
this.rootCauseType = rootCauseType;
this.rootCauseMessage = rootCauseMessage;
this.errorHandlerType = errorHandlerType;
this.circuitBreakerState = circuitBreakerState;
this.fallbackTriggered = fallbackTriggered;
this.children = new ArrayList<>();
}
@@ -75,5 +93,12 @@ public final class ProcessorNode {
public Integer getSplitSize() { return splitSize; }
public Integer getMulticastIndex() { return multicastIndex; }
public String getResolvedEndpointUri() { return resolvedEndpointUri; }
public String getErrorType() { return errorType; }
public String getErrorCategory() { return errorCategory; }
public String getRootCauseType() { return rootCauseType; }
public String getRootCauseMessage() { return rootCauseMessage; }
public String getErrorHandlerType() { return errorHandlerType; }
public String getCircuitBreakerState() { return circuitBreakerState; }
public Boolean getFallbackTriggered() { return fallbackTriggered; }
public List<ProcessorNode> getChildren() { return List.copyOf(children); }
}

View File

@@ -107,7 +107,11 @@ public class IngestionService {
diagramHash,
exec.getEngineLevel(),
inputBody, outputBody, inputHeaders, outputHeaders,
toJson(exec.getAttributes())
toJson(exec.getAttributes()),
exec.getErrorType(), exec.getErrorCategory(),
exec.getRootCauseType(), exec.getRootCauseMessage(),
exec.getTraceId(), exec.getSpanId(),
toJsonObject(exec.getProcessors())
);
}
@@ -133,8 +137,10 @@ public class IngestionService {
p.getSplitIndex(), p.getSplitSize(),
p.getMulticastIndex(),
p.getResolvedEndpointUri(),
getDepthSafe(p, "splitDepth"),
getDepthSafe(p, "loopDepth")
p.getErrorType(), p.getErrorCategory(),
p.getRootCauseType(), p.getRootCauseMessage(),
p.getErrorHandlerType(), p.getCircuitBreakerState(),
p.getFallbackTriggered()
));
if (p.getChildren() != null) {
flat.addAll(flattenProcessors(
@@ -145,16 +151,6 @@ public class IngestionService {
return flat;
}
/** Safely call getSplitDepth()/getLoopDepth() — returns 0 if not available in this cameleer3-common version. */
private static int getDepthSafe(ProcessorExecution p, String field) {
try {
var method = p.getClass().getMethod("get" + Character.toUpperCase(field.charAt(0)) + field.substring(1));
return (int) method.invoke(p);
} catch (Exception e) {
return 0;
}
}
private String truncateBody(String body) {
if (body == null) return null;
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
@@ -169,4 +165,13 @@ public class IngestionService {
return "{}";
}
}
private static String toJsonObject(Object obj) {
if (obj == null) return null;
try {
return JSON.writeValueAsString(obj);
} catch (JsonProcessingException e) {
return null;
}
}
}

View File

@@ -25,7 +25,11 @@ public interface ExecutionStore {
String errorMessage, String errorStacktrace, String diagramContentHash,
String engineLevel,
String inputBody, String outputBody, String inputHeaders, String outputHeaders,
String attributes
String attributes,
String errorType, String errorCategory,
String rootCauseType, String rootCauseMessage,
String traceId, String spanId,
String processorsJson
) {}
record ProcessorRecord(
@@ -40,7 +44,9 @@ public interface ExecutionStore {
Integer splitIndex, Integer splitSize,
Integer multicastIndex,
String resolvedEndpointUri,
int splitDepth,
int loopDepth
String errorType, String errorCategory,
String rootCauseType, String rootCauseMessage,
String errorHandlerType, String circuitBreakerState,
Boolean fallbackTriggered
) {}
}

View File

@@ -29,7 +29,7 @@ class TreeReconstructionTest {
status, NOW, NOW, 10L,
null, null, null, null, null, null, null,
null, null, null, null, null,
null, 0, 0
null, null, null, null, null, null, null, null
);
}

View File

@@ -1626,6 +1626,12 @@ export interface components {
attributes: {
[key: string]: string;
};
errorType?: string;
errorCategory?: string;
rootCauseType?: string;
rootCauseMessage?: string;
traceId?: string;
spanId?: string;
};
ProcessorNode: {
processorId: string;
@@ -1653,6 +1659,13 @@ export interface components {
[key: string]: string;
};
resolvedEndpointUri?: string;
errorType?: string;
errorCategory?: string;
rootCauseType?: string;
rootCauseMessage?: string;
errorHandlerType?: string;
circuitBreakerState?: string;
fallbackTriggered?: boolean;
children: components["schemas"]["ProcessorNode"][];
};
DiagramLayout: {

View File

@@ -29,10 +29,10 @@ function buildOverlay(
parentId?: string,
): void {
for (const proc of processors) {
if (!proc.processorId || !proc.status) continue;
if (proc.status !== 'COMPLETED' && proc.status !== 'FAILED') continue;
if (!proc.processorId) continue;
// Iteration wrapper: filter by selected iteration, skip the wrapper itself
// Iteration wrapper: filter by selected iteration, skip the wrapper itself.
// Must be checked before the status filter — wrappers may not have a status.
if (ITERATION_WRAPPER_TYPES.has(proc.processorType)) {
if (parentId && iterationState.has(parentId)) {
const info = iterationState.get(parentId)!;
@@ -48,7 +48,9 @@ function buildOverlay(
continue;
}
// Regular processor: add to overlay
// Regular processor: only include completed/failed nodes
if (!proc.status || (proc.status !== 'COMPLETED' && proc.status !== 'FAILED')) continue;
const subRouteFailed =
proc.status === 'FAILED' &&
(proc.processorType?.includes('DIRECT') || proc.processorType?.includes('SEDA'));