From b1679b110c1335afe2e356adb6d6e0ae8ce38f33 Mon Sep 17 00:00:00 2001 From: claude Date: Tue, 24 Mar 2026 16:12:46 +0100 Subject: [PATCH] feat: add engine_level and route-level snapshot columns to PostgresExecutionStore Add engine_level, input_body, output_body, input_headers, output_headers to the executions INSERT/SELECT/UPSERT and row mapper. Required for REGULAR mode where route-level payloads exist but no processor records. Note: requires ALTER TABLE migration to add the new columns. --- .../app/storage/PostgresExecutionStore.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java index 53535d1d..3f81fa3c 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java @@ -27,8 +27,9 @@ public class PostgresExecutionStore implements ExecutionStore { INSERT INTO executions (execution_id, route_id, agent_id, application_name, status, correlation_id, exchange_id, start_time, end_time, duration_ms, error_message, error_stacktrace, diagram_content_hash, + engine_level, input_body, output_body, input_headers, output_headers, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now()) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, now(), now()) ON CONFLICT (execution_id, start_time) DO UPDATE SET status = CASE WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED') @@ -42,6 +43,11 @@ public class PostgresExecutionStore implements ExecutionStore { error_message = COALESCE(EXCLUDED.error_message, executions.error_message), error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace), diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash), + engine_level = COALESCE(EXCLUDED.engine_level, executions.engine_level), + input_body = COALESCE(EXCLUDED.input_body, executions.input_body), + output_body = COALESCE(EXCLUDED.output_body, executions.output_body), + input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers), + output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers), updated_at = now() """, execution.executionId(), execution.routeId(), execution.agentId(), @@ -50,7 +56,10 @@ public class PostgresExecutionStore implements ExecutionStore { Timestamp.from(execution.startTime()), execution.endTime() != null ? Timestamp.from(execution.endTime()) : null, execution.durationMs(), execution.errorMessage(), - execution.errorStacktrace(), execution.diagramContentHash()); + execution.errorStacktrace(), execution.diagramContentHash(), + execution.engineLevel(), + execution.inputBody(), execution.outputBody(), + execution.inputHeaders(), execution.outputHeaders()); } @Override @@ -109,7 +118,10 @@ public class PostgresExecutionStore implements ExecutionStore { toInstant(rs, "start_time"), toInstant(rs, "end_time"), rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, rs.getString("error_message"), rs.getString("error_stacktrace"), - rs.getString("diagram_content_hash")); + rs.getString("diagram_content_hash"), + rs.getString("engine_level"), + rs.getString("input_body"), rs.getString("output_body"), + rs.getString("input_headers"), rs.getString("output_headers")); private static final RowMapper PROCESSOR_MAPPER = (rs, rowNum) -> new ProcessorRecord(