From 30344d29b147a209ace8d3b398633f0449739758 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Sat, 28 Mar 2026 21:44:54 +0100 Subject: [PATCH] feat: store raw processor tree JSON and add error categorization fields Fixes iteration overlay corruption caused by flat storage collapsing duplicate processorIds across loop iterations. Server: - Store raw processor tree as processors_json JSONB on executions table - Detail endpoint serves from processors_json (faithful tree), falls back to flat record reconstruction for older executions - V10 migration: processors_json, error categorization (errorType, errorCategory, rootCauseType, rootCauseMessage), OTel (traceId, spanId), circuit breaker (circuitBreakerState, fallbackTriggered), drops erroneous splitDepth/loopDepth columns - Add all new fields through full ingestion/storage/API chain UI: - Fix overlay wrapper filtering: check wrapper type before status filter - Add new fields to schema.d.ts Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/storage/PostgresExecutionStore.java | 56 +++++++++++--- .../V10__error_categorization_and_tracing.sql | 23 ++++++ .../app/storage/PostgresExecutionStoreIT.java | 16 ++-- .../app/storage/PostgresStatsStoreIT.java | 3 +- .../server/core/detail/DetailService.java | 73 +++++++++++++++++-- .../server/core/detail/ExecutionDetail.java | 8 +- .../server/core/detail/ProcessorNode.java | 27 ++++++- .../core/ingestion/IngestionService.java | 31 ++++---- .../server/core/storage/ExecutionStore.java | 12 ++- .../core/detail/TreeReconstructionTest.java | 2 +- ui/src/api/schema.d.ts | 13 ++++ .../ExecutionDiagram/useExecutionOverlay.ts | 10 ++- 12 files changed, 226 insertions(+), 48 deletions(-) create mode 100644 cameleer3-server-app/src/main/resources/db/migration/V10__error_categorization_and_tracing.sql diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java index 1451a1e3..73c4b845 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java @@ -28,8 +28,13 @@ public class PostgresExecutionStore implements ExecutionStore { status, correlation_id, exchange_id, start_time, end_time, duration_ms, error_message, error_stacktrace, diagram_content_hash, engine_level, input_body, output_body, input_headers, output_headers, - attributes, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, now(), now()) + attributes, + error_type, error_category, root_cause_type, root_cause_message, + trace_id, span_id, + processors_json, + created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, + ?, ?, ?, ?, ?, ?, ?::jsonb, now(), now()) ON CONFLICT (execution_id, start_time) DO UPDATE SET status = CASE WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED') @@ -49,6 +54,13 @@ public class PostgresExecutionStore implements ExecutionStore { input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers), output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers), attributes = COALESCE(EXCLUDED.attributes, executions.attributes), + error_type = COALESCE(EXCLUDED.error_type, executions.error_type), + error_category = COALESCE(EXCLUDED.error_category, executions.error_category), + root_cause_type = COALESCE(EXCLUDED.root_cause_type, executions.root_cause_type), + root_cause_message = COALESCE(EXCLUDED.root_cause_message, executions.root_cause_message), + trace_id = COALESCE(EXCLUDED.trace_id, executions.trace_id), + span_id = COALESCE(EXCLUDED.span_id, executions.span_id), + processors_json = COALESCE(EXCLUDED.processors_json, executions.processors_json), updated_at = now() """, execution.executionId(), execution.routeId(), execution.agentId(), @@ -61,7 +73,11 @@ public class PostgresExecutionStore implements ExecutionStore { execution.engineLevel(), execution.inputBody(), execution.outputBody(), execution.inputHeaders(), execution.outputHeaders(), - execution.attributes()); + execution.attributes(), + execution.errorType(), execution.errorCategory(), + execution.rootCauseType(), execution.rootCauseMessage(), + execution.traceId(), execution.spanId(), + execution.processorsJson()); } @Override @@ -74,8 +90,11 @@ public class PostgresExecutionStore implements ExecutionStore { status, start_time, end_time, duration_ms, error_message, error_stacktrace, input_body, output_body, input_headers, output_headers, attributes, loop_index, loop_size, split_index, split_size, multicast_index, - resolved_endpoint_uri, split_depth, loop_depth) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?, ?, ?, ?, ?, ?, ?, ?) + resolved_endpoint_uri, + error_type, error_category, root_cause_type, root_cause_message, + error_handler_type, circuit_breaker_state, fallback_triggered) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET status = EXCLUDED.status, end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time), @@ -93,8 +112,13 @@ public class PostgresExecutionStore implements ExecutionStore { split_size = COALESCE(EXCLUDED.split_size, processor_executions.split_size), multicast_index = COALESCE(EXCLUDED.multicast_index, processor_executions.multicast_index), resolved_endpoint_uri = COALESCE(EXCLUDED.resolved_endpoint_uri, processor_executions.resolved_endpoint_uri), - split_depth = EXCLUDED.split_depth, - loop_depth = EXCLUDED.loop_depth + error_type = COALESCE(EXCLUDED.error_type, processor_executions.error_type), + error_category = COALESCE(EXCLUDED.error_category, processor_executions.error_category), + root_cause_type = COALESCE(EXCLUDED.root_cause_type, processor_executions.root_cause_type), + root_cause_message = COALESCE(EXCLUDED.root_cause_message, processor_executions.root_cause_message), + error_handler_type = COALESCE(EXCLUDED.error_handler_type, processor_executions.error_handler_type), + circuit_breaker_state = COALESCE(EXCLUDED.circuit_breaker_state, processor_executions.circuit_breaker_state), + fallback_triggered = COALESCE(EXCLUDED.fallback_triggered, processor_executions.fallback_triggered) """, processors.stream().map(p -> new Object[]{ p.executionId(), p.processorId(), p.processorType(), @@ -108,8 +132,10 @@ public class PostgresExecutionStore implements ExecutionStore { p.loopIndex(), p.loopSize(), p.splitIndex(), p.splitSize(), p.multicastIndex(), p.resolvedEndpointUri(), - p.splitDepth(), - p.loopDepth() + p.errorType(), p.errorCategory(), + p.rootCauseType(), p.rootCauseMessage(), + p.errorHandlerType(), p.circuitBreakerState(), + p.fallbackTriggered() }).toList()); } @@ -148,7 +174,11 @@ public class PostgresExecutionStore implements ExecutionStore { rs.getString("engine_level"), rs.getString("input_body"), rs.getString("output_body"), rs.getString("input_headers"), rs.getString("output_headers"), - rs.getString("attributes")); + rs.getString("attributes"), + rs.getString("error_type"), rs.getString("error_category"), + rs.getString("root_cause_type"), rs.getString("root_cause_message"), + rs.getString("trace_id"), rs.getString("span_id"), + rs.getString("processors_json")); private static final RowMapper PROCESSOR_MAPPER = (rs, rowNum) -> new ProcessorRecord( @@ -169,8 +199,10 @@ public class PostgresExecutionStore implements ExecutionStore { rs.getObject("split_size") != null ? rs.getInt("split_size") : null, rs.getObject("multicast_index") != null ? rs.getInt("multicast_index") : null, rs.getString("resolved_endpoint_uri"), - rs.getInt("split_depth"), - rs.getInt("loop_depth")); + rs.getString("error_type"), rs.getString("error_category"), + rs.getString("root_cause_type"), rs.getString("root_cause_message"), + rs.getString("error_handler_type"), rs.getString("circuit_breaker_state"), + rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null); private static Instant toInstant(ResultSet rs, String column) throws SQLException { Timestamp ts = rs.getTimestamp(column); diff --git a/cameleer3-server-app/src/main/resources/db/migration/V10__error_categorization_and_tracing.sql b/cameleer3-server-app/src/main/resources/db/migration/V10__error_categorization_and_tracing.sql new file mode 100644 index 00000000..8aab749a --- /dev/null +++ b/cameleer3-server-app/src/main/resources/db/migration/V10__error_categorization_and_tracing.sql @@ -0,0 +1,23 @@ +-- executions: store raw processor tree for faithful detail response +ALTER TABLE executions ADD COLUMN processors_json JSONB; + +-- executions: error categorization + OTel tracing +ALTER TABLE executions ADD COLUMN error_type TEXT; +ALTER TABLE executions ADD COLUMN error_category TEXT; +ALTER TABLE executions ADD COLUMN root_cause_type TEXT; +ALTER TABLE executions ADD COLUMN root_cause_message TEXT; +ALTER TABLE executions ADD COLUMN trace_id TEXT; +ALTER TABLE executions ADD COLUMN span_id TEXT; + +-- processor_executions: error categorization + circuit breaker +ALTER TABLE processor_executions ADD COLUMN error_type TEXT; +ALTER TABLE processor_executions ADD COLUMN error_category TEXT; +ALTER TABLE processor_executions ADD COLUMN root_cause_type TEXT; +ALTER TABLE processor_executions ADD COLUMN root_cause_message TEXT; +ALTER TABLE processor_executions ADD COLUMN error_handler_type TEXT; +ALTER TABLE processor_executions ADD COLUMN circuit_breaker_state TEXT; +ALTER TABLE processor_executions ADD COLUMN fallback_triggered BOOLEAN; + +-- Remove erroneous depth columns from V9 +ALTER TABLE processor_executions DROP COLUMN IF EXISTS split_depth; +ALTER TABLE processor_executions DROP COLUMN IF EXISTS loop_depth; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java index e1320b4a..957b3877 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java @@ -26,7 +26,8 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT { "COMPLETED", "corr-1", "exchange-1", now, now.plusMillis(100), 100L, null, null, null, - "REGULAR", null, null, null, null, null); + "REGULAR", null, null, null, null, null, + null, null, null, null, null, null, null); executionStore.upsert(record); Optional found = executionStore.findById("exec-1"); @@ -43,11 +44,13 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT { ExecutionRecord first = new ExecutionRecord( "exec-dup", "route-a", "agent-1", "app-1", "RUNNING", null, null, now, null, null, null, null, null, - null, null, null, null, null, null); + null, null, null, null, null, null, + null, null, null, null, null, null, null); ExecutionRecord second = new ExecutionRecord( "exec-dup", "route-a", "agent-1", "app-1", "COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null, - "COMPLETE", null, null, null, null, null); + "COMPLETE", null, null, null, null, null, + null, null, null, null, null, null, null); executionStore.upsert(first); executionStore.upsert(second); @@ -64,7 +67,8 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT { ExecutionRecord exec = new ExecutionRecord( "exec-proc", "route-a", "agent-1", "app-1", "COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null, - "COMPLETE", null, null, null, null, null); + "COMPLETE", null, null, null, null, null, + null, null, null, null, null, null, null); executionStore.upsert(exec); List processors = List.of( @@ -73,13 +77,13 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT { now, now.plusMillis(10), 10L, null, null, "input body", "output body", null, null, null, null, null, null, null, null, - null, 0, 0), + null, null, null, null, null, null, null, null), new ProcessorRecord("exec-proc", "proc-2", "to", "app-1", "route-a", 1, "proc-1", "COMPLETED", now.plusMillis(10), now.plusMillis(30), 20L, null, null, null, null, null, null, null, null, null, null, null, null, - null, 0, 0) + null, null, null, null, null, null, null, null) ); executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java index a7885bb8..6a448d9a 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java @@ -60,6 +60,7 @@ class PostgresStatsStoreIT extends AbstractPostgresIT { id, routeId, "agent-1", applicationName, status, null, null, startTime, startTime.plusMillis(durationMs), durationMs, status.equals("FAILED") ? "error" : null, null, null, - null, null, null, null, null, null)); + null, null, null, null, null, null, + null, null, null, null, null, null, null)); } } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java index 339851d1..c5b9dba1 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java @@ -1,16 +1,20 @@ package com.cameleer3.server.core.detail; +import com.cameleer3.common.model.ProcessorExecution; import com.cameleer3.server.core.storage.ExecutionStore; import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.*; public class DetailService { - private static final ObjectMapper JSON = new ObjectMapper(); + private static final ObjectMapper JSON = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); private static final TypeReference> STR_MAP = new TypeReference<>() {}; + private static final TypeReference> PROCESSOR_EXEC_LIST = new TypeReference<>() {}; private final ExecutionStore executionStore; @@ -21,8 +25,14 @@ public class DetailService { public Optional getDetail(String executionId) { return executionStore.findById(executionId) .map(exec -> { - List processors = executionStore.findProcessors(executionId); - List roots = buildTree(processors); + // Prefer the raw processor tree (faithful to agent data) over + // flat-record reconstruction (which loses iteration context). + List processors = parseProcessorsJson(exec.processorsJson()); + if (processors == null) { + // Fallback for executions ingested before processors_json was added + List records = executionStore.findProcessors(executionId); + processors = buildTree(records); + } return new ExecutionDetail( exec.executionId(), exec.routeId(), exec.agentId(), exec.applicationName(), @@ -30,10 +40,13 @@ public class DetailService { exec.durationMs() != null ? exec.durationMs() : 0L, exec.correlationId(), exec.exchangeId(), exec.errorMessage(), exec.errorStacktrace(), - exec.diagramContentHash(), roots, + exec.diagramContentHash(), processors, exec.inputBody(), exec.outputBody(), exec.inputHeaders(), exec.outputHeaders(), - parseAttributes(exec.attributes()) + parseAttributes(exec.attributes()), + exec.errorType(), exec.errorCategory(), + exec.rootCauseType(), exec.rootCauseMessage(), + exec.traceId(), exec.spanId() ); }); } @@ -50,6 +63,50 @@ public class DetailService { }); } + /** Parse the raw processor tree JSON stored alongside the execution. */ + private List parseProcessorsJson(String json) { + if (json == null || json.isBlank()) return null; + try { + List executions = JSON.readValue(json, PROCESSOR_EXEC_LIST); + return convertProcessors(executions); + } catch (Exception e) { + return null; + } + } + + /** Convert agent ProcessorExecution tree to detail ProcessorNode tree. */ + private List convertProcessors(List executions) { + if (executions == null) return List.of(); + List result = new ArrayList<>(); + for (ProcessorExecution p : executions) { + ProcessorNode node = new ProcessorNode( + p.getProcessorId(), p.getProcessorType(), + p.getStatus() != null ? p.getStatus().name() : null, + p.getStartTime(), p.getEndTime(), + p.getDurationMs(), + p.getErrorMessage(), p.getErrorStackTrace(), + p.getAttributes() != null ? new LinkedHashMap<>(p.getAttributes()) : null, + p.getLoopIndex(), p.getLoopSize(), + p.getSplitIndex(), p.getSplitSize(), + p.getMulticastIndex(), + p.getResolvedEndpointUri(), + p.getErrorType(), p.getErrorCategory(), + p.getRootCauseType(), p.getRootCauseMessage(), + p.getErrorHandlerType(), p.getCircuitBreakerState(), + p.getFallbackTriggered() + ); + for (ProcessorNode child : convertProcessors(p.getChildren())) { + node.addChild(child); + } + result.add(node); + } + return result; + } + + /** + * Fallback: reconstruct processor tree from flat records. + * Note: this loses iteration context for processors with the same ID across iterations. + */ List buildTree(List processors) { if (processors.isEmpty()) return List.of(); @@ -64,7 +121,11 @@ public class DetailService { p.loopIndex(), p.loopSize(), p.splitIndex(), p.splitSize(), p.multicastIndex(), - p.resolvedEndpointUri() + p.resolvedEndpointUri(), + p.errorType(), p.errorCategory(), + p.rootCauseType(), p.rootCauseMessage(), + p.errorHandlerType(), p.circuitBreakerState(), + p.fallbackTriggered() )); } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java index caf2522b..bd7ce786 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java @@ -47,6 +47,12 @@ public record ExecutionDetail( String outputBody, String inputHeaders, String outputHeaders, - Map attributes + Map attributes, + String errorType, + String errorCategory, + String rootCauseType, + String rootCauseMessage, + String traceId, + String spanId ) { } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java index b33dace8..768a52a0 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java @@ -28,6 +28,13 @@ public final class ProcessorNode { private final Integer splitSize; private final Integer multicastIndex; private final String resolvedEndpointUri; + private final String errorType; + private final String errorCategory; + private final String rootCauseType; + private final String rootCauseMessage; + private final String errorHandlerType; + private final String circuitBreakerState; + private final Boolean fallbackTriggered; private final List children; public ProcessorNode(String processorId, String processorType, String status, @@ -37,7 +44,11 @@ public final class ProcessorNode { Integer loopIndex, Integer loopSize, Integer splitIndex, Integer splitSize, Integer multicastIndex, - String resolvedEndpointUri) { + String resolvedEndpointUri, + String errorType, String errorCategory, + String rootCauseType, String rootCauseMessage, + String errorHandlerType, String circuitBreakerState, + Boolean fallbackTriggered) { this.processorId = processorId; this.processorType = processorType; this.status = status; @@ -53,6 +64,13 @@ public final class ProcessorNode { this.splitSize = splitSize; this.multicastIndex = multicastIndex; this.resolvedEndpointUri = resolvedEndpointUri; + this.errorType = errorType; + this.errorCategory = errorCategory; + this.rootCauseType = rootCauseType; + this.rootCauseMessage = rootCauseMessage; + this.errorHandlerType = errorHandlerType; + this.circuitBreakerState = circuitBreakerState; + this.fallbackTriggered = fallbackTriggered; this.children = new ArrayList<>(); } @@ -75,5 +93,12 @@ public final class ProcessorNode { public Integer getSplitSize() { return splitSize; } public Integer getMulticastIndex() { return multicastIndex; } public String getResolvedEndpointUri() { return resolvedEndpointUri; } + public String getErrorType() { return errorType; } + public String getErrorCategory() { return errorCategory; } + public String getRootCauseType() { return rootCauseType; } + public String getRootCauseMessage() { return rootCauseMessage; } + public String getErrorHandlerType() { return errorHandlerType; } + public String getCircuitBreakerState() { return circuitBreakerState; } + public Boolean getFallbackTriggered() { return fallbackTriggered; } public List getChildren() { return List.copyOf(children); } } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java index 9da186df..2eb47cd7 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java @@ -107,7 +107,11 @@ public class IngestionService { diagramHash, exec.getEngineLevel(), inputBody, outputBody, inputHeaders, outputHeaders, - toJson(exec.getAttributes()) + toJson(exec.getAttributes()), + exec.getErrorType(), exec.getErrorCategory(), + exec.getRootCauseType(), exec.getRootCauseMessage(), + exec.getTraceId(), exec.getSpanId(), + toJsonObject(exec.getProcessors()) ); } @@ -133,8 +137,10 @@ public class IngestionService { p.getSplitIndex(), p.getSplitSize(), p.getMulticastIndex(), p.getResolvedEndpointUri(), - getDepthSafe(p, "splitDepth"), - getDepthSafe(p, "loopDepth") + p.getErrorType(), p.getErrorCategory(), + p.getRootCauseType(), p.getRootCauseMessage(), + p.getErrorHandlerType(), p.getCircuitBreakerState(), + p.getFallbackTriggered() )); if (p.getChildren() != null) { flat.addAll(flattenProcessors( @@ -145,16 +151,6 @@ public class IngestionService { return flat; } - /** Safely call getSplitDepth()/getLoopDepth() — returns 0 if not available in this cameleer3-common version. */ - private static int getDepthSafe(ProcessorExecution p, String field) { - try { - var method = p.getClass().getMethod("get" + Character.toUpperCase(field.charAt(0)) + field.substring(1)); - return (int) method.invoke(p); - } catch (Exception e) { - return 0; - } - } - private String truncateBody(String body) { if (body == null) return null; if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit); @@ -169,4 +165,13 @@ public class IngestionService { return "{}"; } } + + private static String toJsonObject(Object obj) { + if (obj == null) return null; + try { + return JSON.writeValueAsString(obj); + } catch (JsonProcessingException e) { + return null; + } + } } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java index 2d4e468d..0850559d 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java @@ -25,7 +25,11 @@ public interface ExecutionStore { String errorMessage, String errorStacktrace, String diagramContentHash, String engineLevel, String inputBody, String outputBody, String inputHeaders, String outputHeaders, - String attributes + String attributes, + String errorType, String errorCategory, + String rootCauseType, String rootCauseMessage, + String traceId, String spanId, + String processorsJson ) {} record ProcessorRecord( @@ -40,7 +44,9 @@ public interface ExecutionStore { Integer splitIndex, Integer splitSize, Integer multicastIndex, String resolvedEndpointUri, - int splitDepth, - int loopDepth + String errorType, String errorCategory, + String rootCauseType, String rootCauseMessage, + String errorHandlerType, String circuitBreakerState, + Boolean fallbackTriggered ) {} } diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java index 5913d6fc..a7480d73 100644 --- a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java +++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java @@ -29,7 +29,7 @@ class TreeReconstructionTest { status, NOW, NOW, 10L, null, null, null, null, null, null, null, null, null, null, null, null, - null, 0, 0 + null, null, null, null, null, null, null, null ); } diff --git a/ui/src/api/schema.d.ts b/ui/src/api/schema.d.ts index 62921ddb..02c1ea87 100644 --- a/ui/src/api/schema.d.ts +++ b/ui/src/api/schema.d.ts @@ -1626,6 +1626,12 @@ export interface components { attributes: { [key: string]: string; }; + errorType?: string; + errorCategory?: string; + rootCauseType?: string; + rootCauseMessage?: string; + traceId?: string; + spanId?: string; }; ProcessorNode: { processorId: string; @@ -1653,6 +1659,13 @@ export interface components { [key: string]: string; }; resolvedEndpointUri?: string; + errorType?: string; + errorCategory?: string; + rootCauseType?: string; + rootCauseMessage?: string; + errorHandlerType?: string; + circuitBreakerState?: string; + fallbackTriggered?: boolean; children: components["schemas"]["ProcessorNode"][]; }; DiagramLayout: { diff --git a/ui/src/components/ExecutionDiagram/useExecutionOverlay.ts b/ui/src/components/ExecutionDiagram/useExecutionOverlay.ts index 47af0270..3de89e73 100644 --- a/ui/src/components/ExecutionDiagram/useExecutionOverlay.ts +++ b/ui/src/components/ExecutionDiagram/useExecutionOverlay.ts @@ -29,10 +29,10 @@ function buildOverlay( parentId?: string, ): void { for (const proc of processors) { - if (!proc.processorId || !proc.status) continue; - if (proc.status !== 'COMPLETED' && proc.status !== 'FAILED') continue; + if (!proc.processorId) continue; - // Iteration wrapper: filter by selected iteration, skip the wrapper itself + // Iteration wrapper: filter by selected iteration, skip the wrapper itself. + // Must be checked before the status filter — wrappers may not have a status. if (ITERATION_WRAPPER_TYPES.has(proc.processorType)) { if (parentId && iterationState.has(parentId)) { const info = iterationState.get(parentId)!; @@ -48,7 +48,9 @@ function buildOverlay( continue; } - // Regular processor: add to overlay + // Regular processor: only include completed/failed nodes + if (!proc.status || (proc.status !== 'COMPLETED' && proc.status !== 'FAILED')) continue; + const subRouteFailed = proc.status === 'FAILED' && (proc.processorType?.includes('DIRECT') || proc.processorType?.includes('SEDA'));