feat: persist and display exchange properties from agent
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m59s
CI / docker (push) Successful in 2m13s
CI / deploy (push) Successful in 58s
CI / deploy-feature (push) Has been skipped

Add support for exchange properties sent by the agent alongside headers.
Properties flow through the same pipeline as headers: ClickHouse columns
(input_properties, output_properties) on both executions and
processor_executions tables, MergedExecution record, ChunkAccumulator
extraction, DetailService snapshot, and REST API response.

UI adds a Properties tab next to Headers in the process diagram detail
panel, with the same input/output split table layout.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-14 14:23:53 +02:00
parent 199d0259cd
commit 0827fd21e3
16 changed files with 180 additions and 36 deletions

View File

@@ -41,11 +41,12 @@ public class ClickHouseExecutionStore implements ExecutionStore {
environment, status, correlation_id, exchange_id, start_time, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message, diagram_content_hash, engine_level,
input_body, output_body, input_headers, output_headers, attributes,
input_body, output_body, input_headers, output_headers,
input_properties, output_properties, attributes,
trace_id, span_id, has_trace_data, is_replay,
original_exchange_id, replay_exchange_id
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
executions.stream().map(e -> new Object[]{
nullToEmpty(e.tenantId()),
@@ -73,6 +74,8 @@ public class ClickHouseExecutionStore implements ExecutionStore {
nullToEmpty(e.outputBody()),
nullToEmpty(e.inputHeaders()),
nullToEmpty(e.outputHeaders()),
nullToEmpty(e.inputProperties()),
nullToEmpty(e.outputProperties()),
nullToEmpty(e.attributes()),
nullToEmpty(e.traceId()),
nullToEmpty(e.spanId()),
@@ -95,11 +98,12 @@ public class ClickHouseExecutionStore implements ExecutionStore {
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
input_body, output_body, input_headers, output_headers,
input_properties, output_properties, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
processors.stream().map(p -> new Object[]{
nullToEmpty(tenantId),
@@ -127,6 +131,8 @@ public class ClickHouseExecutionStore implements ExecutionStore {
nullToEmpty(p.getOutputBody()),
mapToJson(p.getInputHeaders()),
mapToJson(p.getOutputHeaders()),
mapToJson(p.getInputProperties()),
mapToJson(p.getOutputProperties()),
mapToJson(p.getAttributes()),
nullToEmpty(p.getResolvedEndpointUri()),
nullToEmpty(p.getCircuitBreakerState()),
@@ -166,6 +172,8 @@ public class ClickHouseExecutionStore implements ExecutionStore {
nullToEmpty(p.getOutputBody()),
mapToJson(p.getInputHeaders()),
mapToJson(p.getOutputHeaders()),
mapToJson(p.getInputProperties()),
mapToJson(p.getOutputProperties()),
mapToJson(p.getAttributes()),
nullToEmpty(p.getResolvedEndpointUri()),
nullToEmpty(p.getCircuitBreakerState()),
@@ -182,11 +190,12 @@ public class ClickHouseExecutionStore implements ExecutionStore {
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
input_body, output_body, input_headers, output_headers,
input_properties, output_properties, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", allRows);
}
@@ -198,7 +207,8 @@ public class ClickHouseExecutionStore implements ExecutionStore {
SELECT execution_id, route_id, instance_id, application_id, status,
correlation_id, exchange_id, start_time, end_time, duration_ms,
error_message, error_stacktrace, diagram_content_hash, engine_level,
input_body, output_body, input_headers, output_headers, attributes,
input_body, output_body, input_headers, output_headers,
input_properties, output_properties, attributes,
error_type, error_category, root_cause_type, root_cause_message,
trace_id, span_id, has_trace_data, is_replay
FROM executions FINAL
@@ -218,7 +228,8 @@ public class ClickHouseExecutionStore implements ExecutionStore {
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
input_body, output_body, input_headers, output_headers,
input_properties, output_properties, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
FROM processor_executions
@@ -237,7 +248,8 @@ public class ClickHouseExecutionStore implements ExecutionStore {
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
input_body, output_body, input_headers, output_headers,
input_properties, output_properties, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
FROM processor_executions
@@ -257,7 +269,8 @@ public class ClickHouseExecutionStore implements ExecutionStore {
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
input_body, output_body, input_headers, output_headers,
input_properties, output_properties, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
FROM processor_executions
@@ -305,6 +318,8 @@ public class ClickHouseExecutionStore implements ExecutionStore {
emptyToNull(rs.getString("output_body")),
emptyToNull(rs.getString("input_headers")),
emptyToNull(rs.getString("output_headers")),
emptyToNull(rs.getString("input_properties")),
emptyToNull(rs.getString("output_properties")),
emptyToNull(rs.getString("attributes")),
emptyToNull(rs.getString("error_type")),
emptyToNull(rs.getString("error_category")),
@@ -337,6 +352,8 @@ public class ClickHouseExecutionStore implements ExecutionStore {
emptyToNull(rs.getString("output_body")),
emptyToNull(rs.getString("input_headers")),
emptyToNull(rs.getString("output_headers")),
emptyToNull(rs.getString("input_properties")),
emptyToNull(rs.getString("output_properties")),
emptyToNull(rs.getString("attributes")),
null, // loopIndex
null, // loopSize

View File

@@ -362,6 +362,12 @@ SETTINGS index_granularity = 8192;
ALTER TABLE logs ADD COLUMN IF NOT EXISTS source LowCardinality(String) DEFAULT 'app';
-- ── Exchange Properties (added for agent protocol v2) ──────────────────
ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_properties String DEFAULT '';
ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_properties String DEFAULT '';
ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS input_properties String DEFAULT '';
ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS output_properties String DEFAULT '';
-- ── Usage Events ────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS usage_events (

View File

@@ -62,7 +62,7 @@ class ClickHouseSearchIndexIT {
500L,
"", "", "", "", "", "",
"hash-abc", "FULL",
"{\"order\":\"12345\"}", "", "", "", "{\"env\":\"prod\"}",
"{\"order\":\"12345\"}", "", "", "", "", "", "{\"env\":\"prod\"}",
"", "",
false, false,
null, null
@@ -79,7 +79,7 @@ class ClickHouseSearchIndexIT {
"java.lang.NPE\n at Foo.bar(Foo.java:42)",
"NullPointerException", "RUNTIME", "", "",
"", "FULL",
"", "", "", "", "",
"", "", "", "", "", "", "",
"", "",
false, false,
null, null
@@ -94,7 +94,7 @@ class ClickHouseSearchIndexIT {
100L,
"", "", "", "", "", "",
"", "FULL",
"", "", "", "", "",
"", "", "", "", "", "", "",
"", "",
false, false,
null, null

View File

@@ -62,7 +62,7 @@ class ClickHouseExecutionReadIT {
1000L,
"", "", "", "", "", "",
"", "REGULAR",
"", "", "", "", "{}",
"", "", "", "", "", "", "{}",
"", "",
false, false,
null, null

View File

@@ -59,6 +59,7 @@ class ClickHouseExecutionStoreIT {
"hash-abc", "FULL",
"{\"key\":\"val\"}", "{\"out\":\"val\"}",
"{\"h1\":\"v1\"}", "{\"h2\":\"v2\"}",
"", "",
"{\"attr\":\"val\"}",
"trace-123", "span-456",
true, false,
@@ -187,7 +188,7 @@ class ClickHouseExecutionStoreIT {
null, null,
"", "", "", "", "", "",
"", "FULL",
"", "", "", "", "",
"", "", "", "", "", "", "",
"", "",
false, false,
null, null
@@ -201,7 +202,7 @@ class ClickHouseExecutionStoreIT {
5000L,
"", "", "", "", "", "",
"", "FULL",
"", "", "", "", "",
"", "", "", "", "", "", "",
"", "",
false, false,
null, null

View File

@@ -44,6 +44,7 @@ public class DetailService {
exec.diagramContentHash(), processors,
exec.inputBody(), exec.outputBody(),
exec.inputHeaders(), exec.outputHeaders(),
exec.inputProperties(), exec.outputProperties(),
parseAttributes(exec.attributes()),
exec.errorType(), exec.errorCategory(),
exec.rootCauseType(), exec.rootCauseMessage(),
@@ -54,26 +55,23 @@ public class DetailService {
public Optional<Map<String, String>> getProcessorSnapshot(String executionId, String processorId) {
return executionStore.findProcessorById(executionId, processorId)
.map(p -> {
Map<String, String> snapshot = new LinkedHashMap<>();
if (p.inputBody() != null) snapshot.put("inputBody", p.inputBody());
if (p.outputBody() != null) snapshot.put("outputBody", p.outputBody());
if (p.inputHeaders() != null) snapshot.put("inputHeaders", p.inputHeaders());
if (p.outputHeaders() != null) snapshot.put("outputHeaders", p.outputHeaders());
return snapshot;
});
.map(DetailService::snapshotFromRecord);
}
public Optional<Map<String, String>> getProcessorSnapshotBySeq(String executionId, int seq) {
return executionStore.findProcessorBySeq(executionId, seq)
.map(p -> {
.map(DetailService::snapshotFromRecord);
}
private static Map<String, String> snapshotFromRecord(ProcessorRecord p) {
Map<String, String> snapshot = new LinkedHashMap<>();
if (p.inputBody() != null) snapshot.put("inputBody", p.inputBody());
if (p.outputBody() != null) snapshot.put("outputBody", p.outputBody());
if (p.inputHeaders() != null) snapshot.put("inputHeaders", p.inputHeaders());
if (p.outputHeaders() != null) snapshot.put("outputHeaders", p.outputHeaders());
if (p.inputProperties() != null) snapshot.put("inputProperties", p.inputProperties());
if (p.outputProperties() != null) snapshot.put("outputProperties", p.outputProperties());
return snapshot;
});
}
/** Parse the raw processor tree JSON stored alongside the execution. */

View File

@@ -47,6 +47,8 @@ public record ExecutionDetail(
String outputBody,
String inputHeaders,
String outputHeaders,
String inputProperties,
String outputProperties,
Map<String, String> attributes,
String errorType,
String errorCategory,

View File

@@ -201,6 +201,8 @@ public class ChunkAccumulator {
extractBody(envelope.getOutputSnapshot()),
extractHeaders(envelope.getInputSnapshot()),
extractHeaders(envelope.getOutputSnapshot()),
extractProperties(envelope.getInputSnapshot()),
extractProperties(envelope.getOutputSnapshot()),
serializeAttributes(envelope.getAttributes()),
envelope.getTraceId(),
envelope.getSpanId(),
@@ -226,6 +228,16 @@ public class ChunkAccumulator {
}
}
private static String extractProperties(ExchangeSnapshot snapshot) {
if (snapshot == null || snapshot.getProperties() == null) return "";
try {
return MAPPER.writeValueAsString(snapshot.getProperties());
} catch (JsonProcessingException e) {
log.warn("Failed to serialize snapshot properties", e);
return "";
}
}
private static String serializeAttributes(Map<String, String> attributes) {
if (attributes == null || attributes.isEmpty()) {
return "{}";

View File

@@ -87,17 +87,21 @@ public class IngestionService {
String outputBody = null;
String inputHeaders = null;
String outputHeaders = null;
String inputProperties = null;
String outputProperties = null;
ExchangeSnapshot inputSnapshot = exec.getInputSnapshot();
if (inputSnapshot != null) {
inputBody = truncateBody(inputSnapshot.getBody());
inputHeaders = toJson(inputSnapshot.getHeaders());
inputProperties = toJson(inputSnapshot.getProperties());
}
ExchangeSnapshot outputSnapshot = exec.getOutputSnapshot();
if (outputSnapshot != null) {
outputBody = truncateBody(outputSnapshot.getBody());
outputHeaders = toJson(outputSnapshot.getHeaders());
outputProperties = toJson(outputSnapshot.getProperties());
}
boolean hasTraceData = hasAnyTraceData(exec.getProcessors());
@@ -118,6 +122,7 @@ public class IngestionService {
diagramHash,
exec.getEngineLevel(),
inputBody, outputBody, inputHeaders, outputHeaders,
inputProperties, outputProperties,
toJson(exec.getAttributes()),
exec.getErrorType(), exec.getErrorCategory(),
exec.getRootCauseType(), exec.getRootCauseMessage(),
@@ -153,6 +158,7 @@ public class IngestionService {
p.getErrorMessage(), p.getErrorStackTrace(),
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
toJson(p.getInputHeaders()), toJson(p.getOutputHeaders()),
null, null, // inputProperties, outputProperties (not on ProcessorExecution)
toJson(p.getAttributes()),
null, null, null, null, null,
p.getResolvedEndpointUri(),

View File

@@ -32,6 +32,8 @@ public record MergedExecution(
String outputBody,
String inputHeaders,
String outputHeaders,
String inputProperties,
String outputProperties,
String attributes,
String traceId,
String spanId,

View File

@@ -25,6 +25,7 @@ public interface ExecutionStore {
String errorMessage, String errorStacktrace, String diagramContentHash,
String engineLevel,
String inputBody, String outputBody, String inputHeaders, String outputHeaders,
String inputProperties, String outputProperties,
String attributes,
String errorType, String errorCategory,
String rootCauseType, String rootCauseMessage,
@@ -41,6 +42,7 @@ public interface ExecutionStore {
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace,
String inputBody, String outputBody, String inputHeaders, String outputHeaders,
String inputProperties, String outputProperties,
String attributes,
Integer loopIndex, Integer loopSize,
Integer splitIndex, Integer splitSize,

View File

@@ -27,7 +27,7 @@ class TreeReconstructionTest {
"exec-1", id, type,
"default", "route1", depth, parentId,
status, NOW, NOW, 10L,
null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null,
null, null, null, null, null,
null, null, null, null, null, null, null, null,
null, null, null, null, null, null
@@ -118,7 +118,7 @@ class TreeReconstructionTest {
return new ProcessorRecord(
"exec-1", id, type, "app", "route1",
0, null, status, NOW, NOW, 10L,
null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null,
null, null, null, null, null,
null, null, null, null, null, null, null, null,
seq, parentSeq, iteration, iterationSize, null, null

View File

@@ -1945,6 +1945,8 @@ export interface components {
outputBody: string;
inputHeaders: string;
outputHeaders: string;
inputProperties: string;
outputProperties: string;
attributes: {
[key: string]: string;
};

View File

@@ -3,6 +3,7 @@ import type { ProcessorNode, ExecutionDetail, DetailTab } from './types';
import { useProcessorSnapshotById } from '../../api/queries/executions';
import { InfoTab } from './tabs/InfoTab';
import { HeadersTab } from './tabs/HeadersTab';
import { PropertiesTab } from './tabs/PropertiesTab';
import { BodyTab } from './tabs/BodyTab';
import { ErrorTab } from './tabs/ErrorTab';
import { ConfigTab } from './tabs/ConfigTab';
@@ -20,6 +21,7 @@ interface DetailPanelProps {
const TABS: { key: DetailTab; label: string }[] = [
{ key: 'info', label: 'Info' },
{ key: 'headers', label: 'Headers' },
{ key: 'properties', label: 'Properties' },
{ key: 'input', label: 'Input' },
{ key: 'output', label: 'Output' },
{ key: 'error', label: 'Error' },
@@ -60,20 +62,24 @@ export function DetailPanel({
let inputBody: string | undefined;
let outputBody: string | undefined;
let hasHeaders = false;
let hasProperties = false;
if (selectedProcessor && snapshotQuery.data) {
inputBody = snapshotQuery.data.inputBody;
outputBody = snapshotQuery.data.outputBody;
hasHeaders = !!(snapshotQuery.data.inputHeaders || snapshotQuery.data.outputHeaders);
hasProperties = !!(snapshotQuery.data.inputProperties || snapshotQuery.data.outputProperties);
} else if (selectedProcessor && snapshotQuery.isLoading) {
// Still loading — keep tabs enabled
hasHeaders = true;
hasProperties = true;
inputBody = undefined;
outputBody = undefined;
} else if (!selectedProcessor) {
inputBody = executionDetail.inputBody;
outputBody = executionDetail.outputBody;
hasHeaders = !!(executionDetail.inputHeaders || executionDetail.outputHeaders);
hasProperties = !!(executionDetail.inputProperties || executionDetail.outputProperties);
}
const hasInput = !!inputBody;
@@ -82,9 +88,10 @@ export function DetailPanel({
// If active tab becomes disabled, fall back to info
useEffect(() => {
if (activeTab === 'headers' && !hasHeaders) setActiveTab('info');
if (activeTab === 'properties' && !hasProperties) setActiveTab('info');
if (activeTab === 'input' && !hasInput) setActiveTab('info');
if (activeTab === 'output' && !hasOutput) setActiveTab('info');
}, [hasHeaders, hasInput, hasOutput, activeTab]);
}, [hasHeaders, hasProperties, hasInput, hasOutput, activeTab]);
return (
<div className={styles.detailPanel}>
@@ -99,6 +106,7 @@ export function DetailPanel({
const isActive = activeTab === tab.key;
const isDisabled = tab.key === 'config'
|| (tab.key === 'headers' && !hasHeaders)
|| (tab.key === 'properties' && !hasProperties)
|| (tab.key === 'input' && !hasInput)
|| (tab.key === 'output' && !hasOutput);
const isError = tab.key === 'error' && hasError;
@@ -138,6 +146,14 @@ export function DetailPanel({
exchangeOutputHeaders={executionDetail.outputHeaders}
/>
)}
{activeTab === 'properties' && (
<PropertiesTab
executionId={executionId}
processorId={selectedProcessor?.processorId ?? null}
exchangeInputProperties={executionDetail.inputProperties}
exchangeOutputProperties={executionDetail.outputProperties}
/>
)}
{activeTab === 'input' && (
<BodyTab body={inputBody} label="Input" />
)}

View File

@@ -0,0 +1,80 @@
import { useProcessorSnapshotById } from '../../../api/queries/executions';
import styles from '../ExecutionDiagram.module.css';
interface PropertiesTabProps {
executionId: string;
processorId: string | null;
exchangeInputProperties?: string;
exchangeOutputProperties?: string;
}
function parseProperties(json: string | undefined): Record<string, string> {
if (!json) return {};
try {
return JSON.parse(json);
} catch {
return {};
}
}
function PropertiesTable({ properties }: { properties: Record<string, string> }) {
const entries = Object.entries(properties).sort(([a], [b]) => a.localeCompare(b));
if (entries.length === 0) {
return <div className={styles.emptyState}>No properties</div>;
}
return (
<table className={styles.headersTable}>
<tbody>
{entries.map(([k, v]) => (
<tr key={k}>
<td className={styles.headerKey}>{k}</td>
<td className={styles.headerVal}>{v}</td>
</tr>
))}
</tbody>
</table>
);
}
export function PropertiesTab({
executionId,
processorId,
exchangeInputProperties,
exchangeOutputProperties,
}: PropertiesTabProps) {
const snapshotQuery = useProcessorSnapshotById(
processorId ? executionId : null,
processorId,
);
let inputProperties: Record<string, string>;
let outputProperties: Record<string, string>;
if (processorId && snapshotQuery.data) {
inputProperties = parseProperties(snapshotQuery.data.inputProperties);
outputProperties = parseProperties(snapshotQuery.data.outputProperties);
} else if (!processorId) {
inputProperties = parseProperties(exchangeInputProperties);
outputProperties = parseProperties(exchangeOutputProperties);
} else {
inputProperties = {};
outputProperties = {};
}
if (processorId && snapshotQuery.isLoading) {
return <div className={styles.emptyState}>Loading properties...</div>;
}
return (
<div className={styles.headersSplit}>
<div className={styles.headersColumn}>
<div className={styles.headersColumnLabel}>Input Properties</div>
<PropertiesTable properties={inputProperties} />
</div>
<div className={styles.headersColumn}>
<div className={styles.headersColumnLabel}>Output Properties</div>
<PropertiesTable properties={outputProperties} />
</div>
</div>
);
}

View File

@@ -27,4 +27,4 @@ export interface IterationInfo {
type: 'loop' | 'split' | 'multicast';
}
export type DetailTab = 'info' | 'headers' | 'input' | 'output' | 'error' | 'config' | 'timeline' | 'log';
export type DetailTab = 'info' | 'headers' | 'properties' | 'input' | 'output' | 'error' | 'config' | 'timeline' | 'log';