feat: persist and expose resolvedEndpointUri for execution-level drill-down
Wire resolvedEndpointUri through the full chain: - V9 migration adds resolved_endpoint_uri column - IngestionService extracts from ProcessorExecution - PostgresExecutionStore persists and reads the column - ProcessorNode includes field in detail API response - UI schema updated for ProcessorNode and PositionedNode Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -73,8 +73,9 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
application_name, route_id, depth, parent_processor_id,
|
||||
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
|
||||
input_body, output_body, input_headers, output_headers, attributes,
|
||||
loop_index, loop_size, split_index, split_size, multicast_index)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?, ?, ?, ?, ?)
|
||||
loop_index, loop_size, split_index, split_size, multicast_index,
|
||||
resolved_endpoint_uri)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
|
||||
status = EXCLUDED.status,
|
||||
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
|
||||
@@ -90,7 +91,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
loop_size = COALESCE(EXCLUDED.loop_size, processor_executions.loop_size),
|
||||
split_index = COALESCE(EXCLUDED.split_index, processor_executions.split_index),
|
||||
split_size = COALESCE(EXCLUDED.split_size, processor_executions.split_size),
|
||||
multicast_index = COALESCE(EXCLUDED.multicast_index, processor_executions.multicast_index)
|
||||
multicast_index = COALESCE(EXCLUDED.multicast_index, processor_executions.multicast_index),
|
||||
resolved_endpoint_uri = COALESCE(EXCLUDED.resolved_endpoint_uri, processor_executions.resolved_endpoint_uri)
|
||||
""",
|
||||
processors.stream().map(p -> new Object[]{
|
||||
p.executionId(), p.processorId(), p.processorType(),
|
||||
@@ -102,7 +104,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders(),
|
||||
p.attributes(),
|
||||
p.loopIndex(), p.loopSize(), p.splitIndex(), p.splitSize(),
|
||||
p.multicastIndex()
|
||||
p.multicastIndex(),
|
||||
p.resolvedEndpointUri()
|
||||
}).toList());
|
||||
}
|
||||
|
||||
@@ -160,7 +163,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
rs.getObject("loop_size") != null ? rs.getInt("loop_size") : null,
|
||||
rs.getObject("split_index") != null ? rs.getInt("split_index") : null,
|
||||
rs.getObject("split_size") != null ? rs.getInt("split_size") : null,
|
||||
rs.getObject("multicast_index") != null ? rs.getInt("multicast_index") : null);
|
||||
rs.getObject("multicast_index") != null ? rs.getInt("multicast_index") : null,
|
||||
rs.getString("resolved_endpoint_uri"));
|
||||
|
||||
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
|
||||
Timestamp ts = rs.getTimestamp(column);
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE processor_executions ADD COLUMN resolved_endpoint_uri TEXT;
|
||||
Reference in New Issue
Block a user