diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java new file mode 100644 index 00000000..84170327 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java @@ -0,0 +1,131 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.ExecutionStore; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +@Repository +public class PostgresExecutionStore implements ExecutionStore { + + private final JdbcTemplate jdbc; + + public PostgresExecutionStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void upsert(ExecutionRecord execution) { + jdbc.update(""" + INSERT INTO executions (execution_id, route_id, agent_id, group_name, + status, correlation_id, exchange_id, start_time, end_time, + duration_ms, error_message, error_stacktrace, diagram_content_hash, + created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now()) + ON CONFLICT (execution_id, start_time) DO UPDATE SET + status = CASE + WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED') + AND executions.status = 'RUNNING' + THEN EXCLUDED.status + WHEN EXCLUDED.status = executions.status THEN executions.status + ELSE EXCLUDED.status + END, + end_time = COALESCE(EXCLUDED.end_time, executions.end_time), + duration_ms = COALESCE(EXCLUDED.duration_ms, executions.duration_ms), + error_message = COALESCE(EXCLUDED.error_message, executions.error_message), + error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace), + diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash), + updated_at = now() + """, + execution.executionId(), execution.routeId(), execution.agentId(), + execution.groupName(), execution.status(), execution.correlationId(), + execution.exchangeId(), + Timestamp.from(execution.startTime()), + execution.endTime() != null ? Timestamp.from(execution.endTime()) : null, + execution.durationMs(), execution.errorMessage(), + execution.errorStacktrace(), execution.diagramContentHash()); + } + + @Override + public void upsertProcessors(String executionId, Instant startTime, + String groupName, String routeId, + List processors) { + jdbc.batchUpdate(""" + INSERT INTO processor_executions (execution_id, processor_id, processor_type, + diagram_node_id, group_name, route_id, depth, parent_processor_id, + status, start_time, end_time, duration_ms, error_message, error_stacktrace, + input_body, output_body, input_headers, output_headers) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb) + ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET + status = EXCLUDED.status, + end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time), + duration_ms = COALESCE(EXCLUDED.duration_ms, processor_executions.duration_ms), + error_message = COALESCE(EXCLUDED.error_message, processor_executions.error_message), + error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, processor_executions.error_stacktrace), + input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body), + output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body), + input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers), + output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers) + """, + processors.stream().map(p -> new Object[]{ + p.executionId(), p.processorId(), p.processorType(), + p.diagramNodeId(), p.groupName(), p.routeId(), + p.depth(), p.parentProcessorId(), p.status(), + Timestamp.from(p.startTime()), + p.endTime() != null ? Timestamp.from(p.endTime()) : null, + p.durationMs(), p.errorMessage(), p.errorStacktrace(), + p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders() + }).toList()); + } + + @Override + public Optional findById(String executionId) { + List results = jdbc.query( + "SELECT * FROM executions WHERE execution_id = ? ORDER BY start_time DESC LIMIT 1", + EXECUTION_MAPPER, executionId); + return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); + } + + @Override + public List findProcessors(String executionId) { + return jdbc.query( + "SELECT * FROM processor_executions WHERE execution_id = ? ORDER BY depth, start_time", + PROCESSOR_MAPPER, executionId); + } + + private static final RowMapper EXECUTION_MAPPER = (rs, rowNum) -> + new ExecutionRecord( + rs.getString("execution_id"), rs.getString("route_id"), + rs.getString("agent_id"), rs.getString("group_name"), + rs.getString("status"), rs.getString("correlation_id"), + rs.getString("exchange_id"), + toInstant(rs, "start_time"), toInstant(rs, "end_time"), + rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, + rs.getString("error_message"), rs.getString("error_stacktrace"), + rs.getString("diagram_content_hash")); + + private static final RowMapper PROCESSOR_MAPPER = (rs, rowNum) -> + new ProcessorRecord( + rs.getString("execution_id"), rs.getString("processor_id"), + rs.getString("processor_type"), rs.getString("diagram_node_id"), + rs.getString("group_name"), rs.getString("route_id"), + rs.getInt("depth"), rs.getString("parent_processor_id"), + rs.getString("status"), + toInstant(rs, "start_time"), toInstant(rs, "end_time"), + rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, + rs.getString("error_message"), rs.getString("error_stacktrace"), + rs.getString("input_body"), rs.getString("output_body"), + rs.getString("input_headers"), rs.getString("output_headers")); + + private static Instant toInstant(ResultSet rs, String column) throws SQLException { + Timestamp ts = rs.getTimestamp(column); + return ts != null ? ts.toInstant() : null; + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java new file mode 100644 index 00000000..8b698d5e --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java @@ -0,0 +1,83 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.app.AbstractPostgresIT; +import com.cameleer3.server.core.storage.ExecutionStore; +import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord; +import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.*; + +class PostgresExecutionStoreIT extends AbstractPostgresIT { + + @Autowired + ExecutionStore executionStore; + + @Test + void upsertAndFindById() { + Instant now = Instant.now(); + ExecutionRecord record = new ExecutionRecord( + "exec-1", "route-a", "agent-1", "app-1", + "COMPLETED", "corr-1", "exchange-1", + now, now.plusMillis(100), 100L, + null, null, null); + + executionStore.upsert(record); + Optional found = executionStore.findById("exec-1"); + + assertTrue(found.isPresent()); + assertEquals("exec-1", found.get().executionId()); + assertEquals("COMPLETED", found.get().status()); + } + + @Test + void upsertDeduplicatesByExecutionId() { + Instant now = Instant.now(); + ExecutionRecord first = new ExecutionRecord( + "exec-dup", "route-a", "agent-1", "app-1", + "RUNNING", null, null, now, null, null, null, null, null); + ExecutionRecord second = new ExecutionRecord( + "exec-dup", "route-a", "agent-1", "app-1", + "COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null); + + executionStore.upsert(first); + executionStore.upsert(second); + + Optional found = executionStore.findById("exec-dup"); + assertTrue(found.isPresent()); + assertEquals("COMPLETED", found.get().status()); + assertEquals(200L, found.get().durationMs()); + } + + @Test + void upsertProcessorsAndFind() { + Instant now = Instant.now(); + ExecutionRecord exec = new ExecutionRecord( + "exec-proc", "route-a", "agent-1", "app-1", + "COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null); + executionStore.upsert(exec); + + List processors = List.of( + new ProcessorRecord("exec-proc", "proc-1", "log", null, + "app-1", "route-a", 0, null, "COMPLETED", + now, now.plusMillis(10), 10L, null, null, + "input body", "output body", null, null), + new ProcessorRecord("exec-proc", "proc-2", "to", null, + "app-1", "route-a", 1, "proc-1", "COMPLETED", + now.plusMillis(10), now.plusMillis(30), 20L, null, null, + null, null, null, null) + ); + executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors); + + List found = executionStore.findProcessors("exec-proc"); + assertEquals(2, found.size()); + assertEquals("proc-1", found.get(0).processorId()); + assertEquals("proc-2", found.get(1).processorId()); + assertEquals("proc-1", found.get(1).parentProcessorId()); + } +}