feat(ingestion): wire attributes through ingestion pipeline into PostgreSQL (Task 3)
IngestionService passes attributes (currently null, pending cameleer3-common update) to ExecutionRecord and ProcessorRecord. PostgresExecutionStore includes the attributes column in INSERT and ON CONFLICT UPDATE (with COALESCE), and reads it back in both row mappers. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -28,8 +28,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
|||||||
status, correlation_id, exchange_id, start_time, end_time,
|
status, correlation_id, exchange_id, start_time, end_time,
|
||||||
duration_ms, error_message, error_stacktrace, diagram_content_hash,
|
duration_ms, error_message, error_stacktrace, diagram_content_hash,
|
||||||
engine_level, input_body, output_body, input_headers, output_headers,
|
engine_level, input_body, output_body, input_headers, output_headers,
|
||||||
created_at, updated_at)
|
attributes, created_at, updated_at)
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, now(), now())
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, now(), now())
|
||||||
ON CONFLICT (execution_id, start_time) DO UPDATE SET
|
ON CONFLICT (execution_id, start_time) DO UPDATE SET
|
||||||
status = CASE
|
status = CASE
|
||||||
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
|
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
|
||||||
@@ -48,6 +48,7 @@ public class PostgresExecutionStore implements ExecutionStore {
|
|||||||
output_body = COALESCE(EXCLUDED.output_body, executions.output_body),
|
output_body = COALESCE(EXCLUDED.output_body, executions.output_body),
|
||||||
input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers),
|
input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers),
|
||||||
output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers),
|
output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers),
|
||||||
|
attributes = COALESCE(EXCLUDED.attributes, executions.attributes),
|
||||||
updated_at = now()
|
updated_at = now()
|
||||||
""",
|
""",
|
||||||
execution.executionId(), execution.routeId(), execution.agentId(),
|
execution.executionId(), execution.routeId(), execution.agentId(),
|
||||||
@@ -59,7 +60,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
|||||||
execution.errorStacktrace(), execution.diagramContentHash(),
|
execution.errorStacktrace(), execution.diagramContentHash(),
|
||||||
execution.engineLevel(),
|
execution.engineLevel(),
|
||||||
execution.inputBody(), execution.outputBody(),
|
execution.inputBody(), execution.outputBody(),
|
||||||
execution.inputHeaders(), execution.outputHeaders());
|
execution.inputHeaders(), execution.outputHeaders(),
|
||||||
|
execution.attributes());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -70,8 +72,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
|||||||
INSERT INTO processor_executions (execution_id, processor_id, processor_type,
|
INSERT INTO processor_executions (execution_id, processor_id, processor_type,
|
||||||
diagram_node_id, application_name, route_id, depth, parent_processor_id,
|
diagram_node_id, application_name, route_id, depth, parent_processor_id,
|
||||||
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
|
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
|
||||||
input_body, output_body, input_headers, output_headers)
|
input_body, output_body, input_headers, output_headers, attributes)
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb)
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb)
|
||||||
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
|
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
|
||||||
status = EXCLUDED.status,
|
status = EXCLUDED.status,
|
||||||
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
|
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
|
||||||
@@ -81,7 +83,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
|||||||
input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body),
|
input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body),
|
||||||
output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body),
|
output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body),
|
||||||
input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers),
|
input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers),
|
||||||
output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers)
|
output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers),
|
||||||
|
attributes = COALESCE(EXCLUDED.attributes, processor_executions.attributes)
|
||||||
""",
|
""",
|
||||||
processors.stream().map(p -> new Object[]{
|
processors.stream().map(p -> new Object[]{
|
||||||
p.executionId(), p.processorId(), p.processorType(),
|
p.executionId(), p.processorId(), p.processorType(),
|
||||||
@@ -90,7 +93,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
|||||||
Timestamp.from(p.startTime()),
|
Timestamp.from(p.startTime()),
|
||||||
p.endTime() != null ? Timestamp.from(p.endTime()) : null,
|
p.endTime() != null ? Timestamp.from(p.endTime()) : null,
|
||||||
p.durationMs(), p.errorMessage(), p.errorStacktrace(),
|
p.durationMs(), p.errorMessage(), p.errorStacktrace(),
|
||||||
p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders()
|
p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders(),
|
||||||
|
p.attributes()
|
||||||
}).toList());
|
}).toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,7 +125,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
|||||||
rs.getString("diagram_content_hash"),
|
rs.getString("diagram_content_hash"),
|
||||||
rs.getString("engine_level"),
|
rs.getString("engine_level"),
|
||||||
rs.getString("input_body"), rs.getString("output_body"),
|
rs.getString("input_body"), rs.getString("output_body"),
|
||||||
rs.getString("input_headers"), rs.getString("output_headers"));
|
rs.getString("input_headers"), rs.getString("output_headers"),
|
||||||
|
rs.getString("attributes"));
|
||||||
|
|
||||||
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
|
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
|
||||||
new ProcessorRecord(
|
new ProcessorRecord(
|
||||||
@@ -134,7 +139,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
|||||||
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
|
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
|
||||||
rs.getString("error_message"), rs.getString("error_stacktrace"),
|
rs.getString("error_message"), rs.getString("error_stacktrace"),
|
||||||
rs.getString("input_body"), rs.getString("output_body"),
|
rs.getString("input_body"), rs.getString("output_body"),
|
||||||
rs.getString("input_headers"), rs.getString("output_headers"));
|
rs.getString("input_headers"), rs.getString("output_headers"),
|
||||||
|
rs.getString("attributes"));
|
||||||
|
|
||||||
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
|
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
|
||||||
Timestamp ts = rs.getTimestamp(column);
|
Timestamp ts = rs.getTimestamp(column);
|
||||||
|
|||||||
@@ -106,7 +106,8 @@ public class IngestionService {
|
|||||||
exec.getErrorMessage(), exec.getErrorStackTrace(),
|
exec.getErrorMessage(), exec.getErrorStackTrace(),
|
||||||
diagramHash,
|
diagramHash,
|
||||||
exec.getEngineLevel(),
|
exec.getEngineLevel(),
|
||||||
inputBody, outputBody, inputHeaders, outputHeaders
|
inputBody, outputBody, inputHeaders, outputHeaders,
|
||||||
|
null // attributes: populated once cameleer3-common exposes getAttributes()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -126,7 +127,8 @@ public class IngestionService {
|
|||||||
p.getDurationMs(),
|
p.getDurationMs(),
|
||||||
p.getErrorMessage(), p.getErrorStackTrace(),
|
p.getErrorMessage(), p.getErrorStackTrace(),
|
||||||
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
|
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
|
||||||
toJson(p.getInputHeaders()), toJson(p.getOutputHeaders())
|
toJson(p.getInputHeaders()), toJson(p.getOutputHeaders()),
|
||||||
|
null // attributes: populated once cameleer3-common exposes getAttributes()
|
||||||
));
|
));
|
||||||
if (p.getChildren() != null) {
|
if (p.getChildren() != null) {
|
||||||
flat.addAll(flattenProcessors(
|
flat.addAll(flattenProcessors(
|
||||||
|
|||||||
Reference in New Issue
Block a user