diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java index 3f81fa3c..423bd106 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java @@ -28,8 +28,8 @@ public class PostgresExecutionStore implements ExecutionStore { status, correlation_id, exchange_id, start_time, end_time, duration_ms, error_message, error_stacktrace, diagram_content_hash, engine_level, input_body, output_body, input_headers, output_headers, - created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, now(), now()) + attributes, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, now(), now()) ON CONFLICT (execution_id, start_time) DO UPDATE SET status = CASE WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED') @@ -48,6 +48,7 @@ public class PostgresExecutionStore implements ExecutionStore { output_body = COALESCE(EXCLUDED.output_body, executions.output_body), input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers), output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers), + attributes = COALESCE(EXCLUDED.attributes, executions.attributes), updated_at = now() """, execution.executionId(), execution.routeId(), execution.agentId(), @@ -59,7 +60,8 @@ public class PostgresExecutionStore implements ExecutionStore { execution.errorStacktrace(), execution.diagramContentHash(), execution.engineLevel(), execution.inputBody(), execution.outputBody(), - execution.inputHeaders(), execution.outputHeaders()); + execution.inputHeaders(), execution.outputHeaders(), + execution.attributes()); } @Override @@ -70,8 +72,8 @@ public class PostgresExecutionStore implements ExecutionStore { INSERT INTO processor_executions (execution_id, processor_id, processor_type, diagram_node_id, application_name, route_id, depth, parent_processor_id, status, start_time, end_time, duration_ms, error_message, error_stacktrace, - input_body, output_body, input_headers, output_headers) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb) + input_body, output_body, input_headers, output_headers, attributes) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb) ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET status = EXCLUDED.status, end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time), @@ -81,7 +83,8 @@ public class PostgresExecutionStore implements ExecutionStore { input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body), output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body), input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers), - output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers) + output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers), + attributes = COALESCE(EXCLUDED.attributes, processor_executions.attributes) """, processors.stream().map(p -> new Object[]{ p.executionId(), p.processorId(), p.processorType(), @@ -90,7 +93,8 @@ public class PostgresExecutionStore implements ExecutionStore { Timestamp.from(p.startTime()), p.endTime() != null ? Timestamp.from(p.endTime()) : null, p.durationMs(), p.errorMessage(), p.errorStacktrace(), - p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders() + p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders(), + p.attributes() }).toList()); } @@ -121,7 +125,8 @@ public class PostgresExecutionStore implements ExecutionStore { rs.getString("diagram_content_hash"), rs.getString("engine_level"), rs.getString("input_body"), rs.getString("output_body"), - rs.getString("input_headers"), rs.getString("output_headers")); + rs.getString("input_headers"), rs.getString("output_headers"), + rs.getString("attributes")); private static final RowMapper PROCESSOR_MAPPER = (rs, rowNum) -> new ProcessorRecord( @@ -134,7 +139,8 @@ public class PostgresExecutionStore implements ExecutionStore { rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, rs.getString("error_message"), rs.getString("error_stacktrace"), rs.getString("input_body"), rs.getString("output_body"), - rs.getString("input_headers"), rs.getString("output_headers")); + rs.getString("input_headers"), rs.getString("output_headers"), + rs.getString("attributes")); private static Instant toInstant(ResultSet rs, String column) throws SQLException { Timestamp ts = rs.getTimestamp(column); diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java index 119defcc..2b992270 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java @@ -106,7 +106,8 @@ public class IngestionService { exec.getErrorMessage(), exec.getErrorStackTrace(), diagramHash, exec.getEngineLevel(), - inputBody, outputBody, inputHeaders, outputHeaders + inputBody, outputBody, inputHeaders, outputHeaders, + null // attributes: populated once cameleer3-common exposes getAttributes() ); } @@ -126,7 +127,8 @@ public class IngestionService { p.getDurationMs(), p.getErrorMessage(), p.getErrorStackTrace(), truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()), - toJson(p.getInputHeaders()), toJson(p.getOutputHeaders()) + toJson(p.getInputHeaders()), toJson(p.getOutputHeaders()), + null // attributes: populated once cameleer3-common exposes getAttributes() )); if (p.getChildren() != null) { flat.addAll(flattenProcessors(