feat: add engine_level and route-level snapshot columns to PostgresExecutionStore
Some checks failed
CI / docker (push) Has been cancelled
CI / build (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / deploy-feature (push) Has been cancelled
CI / cleanup-branch (push) Has been cancelled

Add engine_level, input_body, output_body, input_headers, output_headers
to the executions INSERT/SELECT/UPSERT and row mapper. Required for
REGULAR mode where route-level payloads exist but no processor records.

Note: requires ALTER TABLE migration to add the new columns.
This commit is contained in:
2026-03-24 16:12:46 +01:00
parent e7835e1100
commit b1679b110c

View File

@@ -27,8 +27,9 @@ public class PostgresExecutionStore implements ExecutionStore {
INSERT INTO executions (execution_id, route_id, agent_id, application_name,
status, correlation_id, exchange_id, start_time, end_time,
duration_ms, error_message, error_stacktrace, diagram_content_hash,
engine_level, input_body, output_body, input_headers, output_headers,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, now(), now())
ON CONFLICT (execution_id, start_time) DO UPDATE SET
status = CASE
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
@@ -42,6 +43,11 @@ public class PostgresExecutionStore implements ExecutionStore {
error_message = COALESCE(EXCLUDED.error_message, executions.error_message),
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace),
diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash),
engine_level = COALESCE(EXCLUDED.engine_level, executions.engine_level),
input_body = COALESCE(EXCLUDED.input_body, executions.input_body),
output_body = COALESCE(EXCLUDED.output_body, executions.output_body),
input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers),
output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers),
updated_at = now()
""",
execution.executionId(), execution.routeId(), execution.agentId(),
@@ -50,7 +56,10 @@ public class PostgresExecutionStore implements ExecutionStore {
Timestamp.from(execution.startTime()),
execution.endTime() != null ? Timestamp.from(execution.endTime()) : null,
execution.durationMs(), execution.errorMessage(),
execution.errorStacktrace(), execution.diagramContentHash());
execution.errorStacktrace(), execution.diagramContentHash(),
execution.engineLevel(),
execution.inputBody(), execution.outputBody(),
execution.inputHeaders(), execution.outputHeaders());
}
@Override
@@ -109,7 +118,10 @@ public class PostgresExecutionStore implements ExecutionStore {
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
rs.getString("error_message"), rs.getString("error_stacktrace"),
rs.getString("diagram_content_hash"));
rs.getString("diagram_content_hash"),
rs.getString("engine_level"),
rs.getString("input_body"), rs.getString("output_body"),
rs.getString("input_headers"), rs.getString("output_headers"));
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
new ProcessorRecord(