From 0827fd21e32aae1f6da81c1beb0ce2c76c9b8c90 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 14 Apr 2026 14:23:53 +0200 Subject: [PATCH] feat: persist and display exchange properties from agent Add support for exchange properties sent by the agent alongside headers. Properties flow through the same pipeline as headers: ClickHouse columns (input_properties, output_properties) on both executions and processor_executions tables, MergedExecution record, ChunkAccumulator extraction, DetailService snapshot, and REST API response. UI adds a Properties tab next to Headers in the process diagram detail panel, with the same input/output split table layout. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/storage/ClickHouseExecutionStore.java | 37 ++++++--- .../src/main/resources/clickhouse/init.sql | 6 ++ .../app/search/ClickHouseSearchIndexIT.java | 6 +- .../storage/ClickHouseExecutionReadIT.java | 2 +- .../storage/ClickHouseExecutionStoreIT.java | 5 +- .../server/core/detail/DetailService.java | 30 ++++--- .../server/core/detail/ExecutionDetail.java | 2 + .../core/ingestion/ChunkAccumulator.java | 12 +++ .../core/ingestion/IngestionService.java | 6 ++ .../core/ingestion/MergedExecution.java | 2 + .../server/core/storage/ExecutionStore.java | 2 + .../core/detail/TreeReconstructionTest.java | 4 +- ui/src/api/schema.d.ts | 2 + .../ExecutionDiagram/DetailPanel.tsx | 18 ++++- .../ExecutionDiagram/tabs/PropertiesTab.tsx | 80 +++++++++++++++++++ ui/src/components/ExecutionDiagram/types.ts | 2 +- 16 files changed, 180 insertions(+), 36 deletions(-) create mode 100644 ui/src/components/ExecutionDiagram/tabs/PropertiesTab.tsx diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java index d2e2b12d..4f5b837a 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java @@ -41,11 +41,12 @@ public class ClickHouseExecutionStore implements ExecutionStore { environment, status, correlation_id, exchange_id, start_time, end_time, duration_ms, error_message, error_stacktrace, error_type, error_category, root_cause_type, root_cause_message, diagram_content_hash, engine_level, - input_body, output_body, input_headers, output_headers, attributes, + input_body, output_body, input_headers, output_headers, + input_properties, output_properties, attributes, trace_id, span_id, has_trace_data, is_replay, original_exchange_id, replay_exchange_id ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, executions.stream().map(e -> new Object[]{ nullToEmpty(e.tenantId()), @@ -73,6 +74,8 @@ public class ClickHouseExecutionStore implements ExecutionStore { nullToEmpty(e.outputBody()), nullToEmpty(e.inputHeaders()), nullToEmpty(e.outputHeaders()), + nullToEmpty(e.inputProperties()), + nullToEmpty(e.outputProperties()), nullToEmpty(e.attributes()), nullToEmpty(e.traceId()), nullToEmpty(e.spanId()), @@ -95,11 +98,12 @@ public class ClickHouseExecutionStore implements ExecutionStore { iteration, iteration_size, status, end_time, duration_ms, error_message, error_stacktrace, error_type, error_category, root_cause_type, root_cause_message, - input_body, output_body, input_headers, output_headers, attributes, + input_body, output_body, input_headers, output_headers, + input_properties, output_properties, attributes, resolved_endpoint_uri, circuit_breaker_state, fallback_triggered, filter_matched, duplicate_message ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, processors.stream().map(p -> new Object[]{ nullToEmpty(tenantId), @@ -127,6 +131,8 @@ public class ClickHouseExecutionStore implements ExecutionStore { nullToEmpty(p.getOutputBody()), mapToJson(p.getInputHeaders()), mapToJson(p.getOutputHeaders()), + mapToJson(p.getInputProperties()), + mapToJson(p.getOutputProperties()), mapToJson(p.getAttributes()), nullToEmpty(p.getResolvedEndpointUri()), nullToEmpty(p.getCircuitBreakerState()), @@ -166,6 +172,8 @@ public class ClickHouseExecutionStore implements ExecutionStore { nullToEmpty(p.getOutputBody()), mapToJson(p.getInputHeaders()), mapToJson(p.getOutputHeaders()), + mapToJson(p.getInputProperties()), + mapToJson(p.getOutputProperties()), mapToJson(p.getAttributes()), nullToEmpty(p.getResolvedEndpointUri()), nullToEmpty(p.getCircuitBreakerState()), @@ -182,11 +190,12 @@ public class ClickHouseExecutionStore implements ExecutionStore { iteration, iteration_size, status, end_time, duration_ms, error_message, error_stacktrace, error_type, error_category, root_cause_type, root_cause_message, - input_body, output_body, input_headers, output_headers, attributes, + input_body, output_body, input_headers, output_headers, + input_properties, output_properties, attributes, resolved_endpoint_uri, circuit_breaker_state, fallback_triggered, filter_matched, duplicate_message ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, allRows); } @@ -198,7 +207,8 @@ public class ClickHouseExecutionStore implements ExecutionStore { SELECT execution_id, route_id, instance_id, application_id, status, correlation_id, exchange_id, start_time, end_time, duration_ms, error_message, error_stacktrace, diagram_content_hash, engine_level, - input_body, output_body, input_headers, output_headers, attributes, + input_body, output_body, input_headers, output_headers, + input_properties, output_properties, attributes, error_type, error_category, root_cause_type, root_cause_message, trace_id, span_id, has_trace_data, is_replay FROM executions FINAL @@ -218,7 +228,8 @@ public class ClickHouseExecutionStore implements ExecutionStore { iteration, iteration_size, status, end_time, duration_ms, error_message, error_stacktrace, error_type, error_category, root_cause_type, root_cause_message, - input_body, output_body, input_headers, output_headers, attributes, + input_body, output_body, input_headers, output_headers, + input_properties, output_properties, attributes, resolved_endpoint_uri, circuit_breaker_state, fallback_triggered, filter_matched, duplicate_message FROM processor_executions @@ -237,7 +248,8 @@ public class ClickHouseExecutionStore implements ExecutionStore { iteration, iteration_size, status, end_time, duration_ms, error_message, error_stacktrace, error_type, error_category, root_cause_type, root_cause_message, - input_body, output_body, input_headers, output_headers, attributes, + input_body, output_body, input_headers, output_headers, + input_properties, output_properties, attributes, resolved_endpoint_uri, circuit_breaker_state, fallback_triggered, filter_matched, duplicate_message FROM processor_executions @@ -257,7 +269,8 @@ public class ClickHouseExecutionStore implements ExecutionStore { iteration, iteration_size, status, end_time, duration_ms, error_message, error_stacktrace, error_type, error_category, root_cause_type, root_cause_message, - input_body, output_body, input_headers, output_headers, attributes, + input_body, output_body, input_headers, output_headers, + input_properties, output_properties, attributes, resolved_endpoint_uri, circuit_breaker_state, fallback_triggered, filter_matched, duplicate_message FROM processor_executions @@ -305,6 +318,8 @@ public class ClickHouseExecutionStore implements ExecutionStore { emptyToNull(rs.getString("output_body")), emptyToNull(rs.getString("input_headers")), emptyToNull(rs.getString("output_headers")), + emptyToNull(rs.getString("input_properties")), + emptyToNull(rs.getString("output_properties")), emptyToNull(rs.getString("attributes")), emptyToNull(rs.getString("error_type")), emptyToNull(rs.getString("error_category")), @@ -337,6 +352,8 @@ public class ClickHouseExecutionStore implements ExecutionStore { emptyToNull(rs.getString("output_body")), emptyToNull(rs.getString("input_headers")), emptyToNull(rs.getString("output_headers")), + emptyToNull(rs.getString("input_properties")), + emptyToNull(rs.getString("output_properties")), emptyToNull(rs.getString("attributes")), null, // loopIndex null, // loopSize diff --git a/cameleer3-server-app/src/main/resources/clickhouse/init.sql b/cameleer3-server-app/src/main/resources/clickhouse/init.sql index 26ed083b..38ac19d1 100644 --- a/cameleer3-server-app/src/main/resources/clickhouse/init.sql +++ b/cameleer3-server-app/src/main/resources/clickhouse/init.sql @@ -362,6 +362,12 @@ SETTINGS index_granularity = 8192; ALTER TABLE logs ADD COLUMN IF NOT EXISTS source LowCardinality(String) DEFAULT 'app'; +-- ── Exchange Properties (added for agent protocol v2) ────────────────── +ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_properties String DEFAULT ''; +ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_properties String DEFAULT ''; +ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS input_properties String DEFAULT ''; +ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS output_properties String DEFAULT ''; + -- ── Usage Events ──────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS usage_events ( diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java index 17eebe26..e7bd9036 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java @@ -62,7 +62,7 @@ class ClickHouseSearchIndexIT { 500L, "", "", "", "", "", "", "hash-abc", "FULL", - "{\"order\":\"12345\"}", "", "", "", "{\"env\":\"prod\"}", + "{\"order\":\"12345\"}", "", "", "", "", "", "{\"env\":\"prod\"}", "", "", false, false, null, null @@ -79,7 +79,7 @@ class ClickHouseSearchIndexIT { "java.lang.NPE\n at Foo.bar(Foo.java:42)", "NullPointerException", "RUNTIME", "", "", "", "FULL", - "", "", "", "", "", + "", "", "", "", "", "", "", "", "", false, false, null, null @@ -94,7 +94,7 @@ class ClickHouseSearchIndexIT { 100L, "", "", "", "", "", "", "", "FULL", - "", "", "", "", "", + "", "", "", "", "", "", "", "", "", false, false, null, null diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionReadIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionReadIT.java index 6174a47d..c5507bb5 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionReadIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionReadIT.java @@ -62,7 +62,7 @@ class ClickHouseExecutionReadIT { 1000L, "", "", "", "", "", "", "", "REGULAR", - "", "", "", "", "{}", + "", "", "", "", "", "", "{}", "", "", false, false, null, null diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java index 6ebf7481..e3fcf676 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java @@ -59,6 +59,7 @@ class ClickHouseExecutionStoreIT { "hash-abc", "FULL", "{\"key\":\"val\"}", "{\"out\":\"val\"}", "{\"h1\":\"v1\"}", "{\"h2\":\"v2\"}", + "", "", "{\"attr\":\"val\"}", "trace-123", "span-456", true, false, @@ -187,7 +188,7 @@ class ClickHouseExecutionStoreIT { null, null, "", "", "", "", "", "", "", "FULL", - "", "", "", "", "", + "", "", "", "", "", "", "", "", "", false, false, null, null @@ -201,7 +202,7 @@ class ClickHouseExecutionStoreIT { 5000L, "", "", "", "", "", "", "", "FULL", - "", "", "", "", "", + "", "", "", "", "", "", "", "", "", false, false, null, null diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java index 855d8a02..b170bc75 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java @@ -44,6 +44,7 @@ public class DetailService { exec.diagramContentHash(), processors, exec.inputBody(), exec.outputBody(), exec.inputHeaders(), exec.outputHeaders(), + exec.inputProperties(), exec.outputProperties(), parseAttributes(exec.attributes()), exec.errorType(), exec.errorCategory(), exec.rootCauseType(), exec.rootCauseMessage(), @@ -54,26 +55,23 @@ public class DetailService { public Optional> getProcessorSnapshot(String executionId, String processorId) { return executionStore.findProcessorById(executionId, processorId) - .map(p -> { - Map snapshot = new LinkedHashMap<>(); - if (p.inputBody() != null) snapshot.put("inputBody", p.inputBody()); - if (p.outputBody() != null) snapshot.put("outputBody", p.outputBody()); - if (p.inputHeaders() != null) snapshot.put("inputHeaders", p.inputHeaders()); - if (p.outputHeaders() != null) snapshot.put("outputHeaders", p.outputHeaders()); - return snapshot; - }); + .map(DetailService::snapshotFromRecord); } public Optional> getProcessorSnapshotBySeq(String executionId, int seq) { return executionStore.findProcessorBySeq(executionId, seq) - .map(p -> { - Map snapshot = new LinkedHashMap<>(); - if (p.inputBody() != null) snapshot.put("inputBody", p.inputBody()); - if (p.outputBody() != null) snapshot.put("outputBody", p.outputBody()); - if (p.inputHeaders() != null) snapshot.put("inputHeaders", p.inputHeaders()); - if (p.outputHeaders() != null) snapshot.put("outputHeaders", p.outputHeaders()); - return snapshot; - }); + .map(DetailService::snapshotFromRecord); + } + + private static Map snapshotFromRecord(ProcessorRecord p) { + Map snapshot = new LinkedHashMap<>(); + if (p.inputBody() != null) snapshot.put("inputBody", p.inputBody()); + if (p.outputBody() != null) snapshot.put("outputBody", p.outputBody()); + if (p.inputHeaders() != null) snapshot.put("inputHeaders", p.inputHeaders()); + if (p.outputHeaders() != null) snapshot.put("outputHeaders", p.outputHeaders()); + if (p.inputProperties() != null) snapshot.put("inputProperties", p.inputProperties()); + if (p.outputProperties() != null) snapshot.put("outputProperties", p.outputProperties()); + return snapshot; } /** Parse the raw processor tree JSON stored alongside the execution. */ diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java index 862e8a51..09d2ca64 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java @@ -47,6 +47,8 @@ public record ExecutionDetail( String outputBody, String inputHeaders, String outputHeaders, + String inputProperties, + String outputProperties, Map attributes, String errorType, String errorCategory, diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java index f9986f5e..3729720a 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java @@ -201,6 +201,8 @@ public class ChunkAccumulator { extractBody(envelope.getOutputSnapshot()), extractHeaders(envelope.getInputSnapshot()), extractHeaders(envelope.getOutputSnapshot()), + extractProperties(envelope.getInputSnapshot()), + extractProperties(envelope.getOutputSnapshot()), serializeAttributes(envelope.getAttributes()), envelope.getTraceId(), envelope.getSpanId(), @@ -226,6 +228,16 @@ public class ChunkAccumulator { } } + private static String extractProperties(ExchangeSnapshot snapshot) { + if (snapshot == null || snapshot.getProperties() == null) return ""; + try { + return MAPPER.writeValueAsString(snapshot.getProperties()); + } catch (JsonProcessingException e) { + log.warn("Failed to serialize snapshot properties", e); + return ""; + } + } + private static String serializeAttributes(Map attributes) { if (attributes == null || attributes.isEmpty()) { return "{}"; diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java index 41b1291f..63df17bf 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java @@ -87,17 +87,21 @@ public class IngestionService { String outputBody = null; String inputHeaders = null; String outputHeaders = null; + String inputProperties = null; + String outputProperties = null; ExchangeSnapshot inputSnapshot = exec.getInputSnapshot(); if (inputSnapshot != null) { inputBody = truncateBody(inputSnapshot.getBody()); inputHeaders = toJson(inputSnapshot.getHeaders()); + inputProperties = toJson(inputSnapshot.getProperties()); } ExchangeSnapshot outputSnapshot = exec.getOutputSnapshot(); if (outputSnapshot != null) { outputBody = truncateBody(outputSnapshot.getBody()); outputHeaders = toJson(outputSnapshot.getHeaders()); + outputProperties = toJson(outputSnapshot.getProperties()); } boolean hasTraceData = hasAnyTraceData(exec.getProcessors()); @@ -118,6 +122,7 @@ public class IngestionService { diagramHash, exec.getEngineLevel(), inputBody, outputBody, inputHeaders, outputHeaders, + inputProperties, outputProperties, toJson(exec.getAttributes()), exec.getErrorType(), exec.getErrorCategory(), exec.getRootCauseType(), exec.getRootCauseMessage(), @@ -153,6 +158,7 @@ public class IngestionService { p.getErrorMessage(), p.getErrorStackTrace(), truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()), toJson(p.getInputHeaders()), toJson(p.getOutputHeaders()), + null, null, // inputProperties, outputProperties (not on ProcessorExecution) toJson(p.getAttributes()), null, null, null, null, null, p.getResolvedEndpointUri(), diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java index d8e55dee..34d27689 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java @@ -32,6 +32,8 @@ public record MergedExecution( String outputBody, String inputHeaders, String outputHeaders, + String inputProperties, + String outputProperties, String attributes, String traceId, String spanId, diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java index bab18656..5e8cdc1d 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java @@ -25,6 +25,7 @@ public interface ExecutionStore { String errorMessage, String errorStacktrace, String diagramContentHash, String engineLevel, String inputBody, String outputBody, String inputHeaders, String outputHeaders, + String inputProperties, String outputProperties, String attributes, String errorType, String errorCategory, String rootCauseType, String rootCauseMessage, @@ -41,6 +42,7 @@ public interface ExecutionStore { Instant startTime, Instant endTime, Long durationMs, String errorMessage, String errorStacktrace, String inputBody, String outputBody, String inputHeaders, String outputHeaders, + String inputProperties, String outputProperties, String attributes, Integer loopIndex, Integer loopSize, Integer splitIndex, Integer splitSize, diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java index 0fdc29e9..dfb2448c 100644 --- a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java +++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java @@ -27,7 +27,7 @@ class TreeReconstructionTest { "exec-1", id, type, "default", "route1", depth, parentId, status, NOW, NOW, 10L, - null, null, null, null, null, null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null @@ -118,7 +118,7 @@ class TreeReconstructionTest { return new ProcessorRecord( "exec-1", id, type, "app", "route1", 0, null, status, NOW, NOW, 10L, - null, null, null, null, null, null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, seq, parentSeq, iteration, iterationSize, null, null diff --git a/ui/src/api/schema.d.ts b/ui/src/api/schema.d.ts index 64eeae63..732d591a 100644 --- a/ui/src/api/schema.d.ts +++ b/ui/src/api/schema.d.ts @@ -1945,6 +1945,8 @@ export interface components { outputBody: string; inputHeaders: string; outputHeaders: string; + inputProperties: string; + outputProperties: string; attributes: { [key: string]: string; }; diff --git a/ui/src/components/ExecutionDiagram/DetailPanel.tsx b/ui/src/components/ExecutionDiagram/DetailPanel.tsx index 9de312bc..a15465a5 100644 --- a/ui/src/components/ExecutionDiagram/DetailPanel.tsx +++ b/ui/src/components/ExecutionDiagram/DetailPanel.tsx @@ -3,6 +3,7 @@ import type { ProcessorNode, ExecutionDetail, DetailTab } from './types'; import { useProcessorSnapshotById } from '../../api/queries/executions'; import { InfoTab } from './tabs/InfoTab'; import { HeadersTab } from './tabs/HeadersTab'; +import { PropertiesTab } from './tabs/PropertiesTab'; import { BodyTab } from './tabs/BodyTab'; import { ErrorTab } from './tabs/ErrorTab'; import { ConfigTab } from './tabs/ConfigTab'; @@ -20,6 +21,7 @@ interface DetailPanelProps { const TABS: { key: DetailTab; label: string }[] = [ { key: 'info', label: 'Info' }, { key: 'headers', label: 'Headers' }, + { key: 'properties', label: 'Properties' }, { key: 'input', label: 'Input' }, { key: 'output', label: 'Output' }, { key: 'error', label: 'Error' }, @@ -60,20 +62,24 @@ export function DetailPanel({ let inputBody: string | undefined; let outputBody: string | undefined; let hasHeaders = false; + let hasProperties = false; if (selectedProcessor && snapshotQuery.data) { inputBody = snapshotQuery.data.inputBody; outputBody = snapshotQuery.data.outputBody; hasHeaders = !!(snapshotQuery.data.inputHeaders || snapshotQuery.data.outputHeaders); + hasProperties = !!(snapshotQuery.data.inputProperties || snapshotQuery.data.outputProperties); } else if (selectedProcessor && snapshotQuery.isLoading) { // Still loading — keep tabs enabled hasHeaders = true; + hasProperties = true; inputBody = undefined; outputBody = undefined; } else if (!selectedProcessor) { inputBody = executionDetail.inputBody; outputBody = executionDetail.outputBody; hasHeaders = !!(executionDetail.inputHeaders || executionDetail.outputHeaders); + hasProperties = !!(executionDetail.inputProperties || executionDetail.outputProperties); } const hasInput = !!inputBody; @@ -82,9 +88,10 @@ export function DetailPanel({ // If active tab becomes disabled, fall back to info useEffect(() => { if (activeTab === 'headers' && !hasHeaders) setActiveTab('info'); + if (activeTab === 'properties' && !hasProperties) setActiveTab('info'); if (activeTab === 'input' && !hasInput) setActiveTab('info'); if (activeTab === 'output' && !hasOutput) setActiveTab('info'); - }, [hasHeaders, hasInput, hasOutput, activeTab]); + }, [hasHeaders, hasProperties, hasInput, hasOutput, activeTab]); return (
@@ -99,6 +106,7 @@ export function DetailPanel({ const isActive = activeTab === tab.key; const isDisabled = tab.key === 'config' || (tab.key === 'headers' && !hasHeaders) + || (tab.key === 'properties' && !hasProperties) || (tab.key === 'input' && !hasInput) || (tab.key === 'output' && !hasOutput); const isError = tab.key === 'error' && hasError; @@ -138,6 +146,14 @@ export function DetailPanel({ exchangeOutputHeaders={executionDetail.outputHeaders} /> )} + {activeTab === 'properties' && ( + + )} {activeTab === 'input' && ( )} diff --git a/ui/src/components/ExecutionDiagram/tabs/PropertiesTab.tsx b/ui/src/components/ExecutionDiagram/tabs/PropertiesTab.tsx new file mode 100644 index 00000000..d91b47dd --- /dev/null +++ b/ui/src/components/ExecutionDiagram/tabs/PropertiesTab.tsx @@ -0,0 +1,80 @@ +import { useProcessorSnapshotById } from '../../../api/queries/executions'; +import styles from '../ExecutionDiagram.module.css'; + +interface PropertiesTabProps { + executionId: string; + processorId: string | null; + exchangeInputProperties?: string; + exchangeOutputProperties?: string; +} + +function parseProperties(json: string | undefined): Record { + if (!json) return {}; + try { + return JSON.parse(json); + } catch { + return {}; + } +} + +function PropertiesTable({ properties }: { properties: Record }) { + const entries = Object.entries(properties).sort(([a], [b]) => a.localeCompare(b)); + if (entries.length === 0) { + return
No properties
; + } + return ( + + + {entries.map(([k, v]) => ( + + + + + ))} + +
{k}{v}
+ ); +} + +export function PropertiesTab({ + executionId, + processorId, + exchangeInputProperties, + exchangeOutputProperties, +}: PropertiesTabProps) { + const snapshotQuery = useProcessorSnapshotById( + processorId ? executionId : null, + processorId, + ); + + let inputProperties: Record; + let outputProperties: Record; + + if (processorId && snapshotQuery.data) { + inputProperties = parseProperties(snapshotQuery.data.inputProperties); + outputProperties = parseProperties(snapshotQuery.data.outputProperties); + } else if (!processorId) { + inputProperties = parseProperties(exchangeInputProperties); + outputProperties = parseProperties(exchangeOutputProperties); + } else { + inputProperties = {}; + outputProperties = {}; + } + + if (processorId && snapshotQuery.isLoading) { + return
Loading properties...
; + } + + return ( +
+
+
Input Properties
+ +
+
+
Output Properties
+ +
+
+ ); +} diff --git a/ui/src/components/ExecutionDiagram/types.ts b/ui/src/components/ExecutionDiagram/types.ts index 9404b88a..4848c355 100644 --- a/ui/src/components/ExecutionDiagram/types.ts +++ b/ui/src/components/ExecutionDiagram/types.ts @@ -27,4 +27,4 @@ export interface IterationInfo { type: 'loop' | 'split' | 'multicast'; } -export type DetailTab = 'info' | 'headers' | 'input' | 'output' | 'error' | 'config' | 'timeline' | 'log'; +export type DetailTab = 'info' | 'headers' | 'properties' | 'input' | 'output' | 'error' | 'config' | 'timeline' | 'log';