feat: implement PostgresExecutionStore with upsert and dedup
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,131 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.ExecutionStore;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Repository
|
||||
public class PostgresExecutionStore implements ExecutionStore {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public PostgresExecutionStore(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void upsert(ExecutionRecord execution) {
|
||||
jdbc.update("""
|
||||
INSERT INTO executions (execution_id, route_id, agent_id, group_name,
|
||||
status, correlation_id, exchange_id, start_time, end_time,
|
||||
duration_ms, error_message, error_stacktrace, diagram_content_hash,
|
||||
created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())
|
||||
ON CONFLICT (execution_id, start_time) DO UPDATE SET
|
||||
status = CASE
|
||||
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
|
||||
AND executions.status = 'RUNNING'
|
||||
THEN EXCLUDED.status
|
||||
WHEN EXCLUDED.status = executions.status THEN executions.status
|
||||
ELSE EXCLUDED.status
|
||||
END,
|
||||
end_time = COALESCE(EXCLUDED.end_time, executions.end_time),
|
||||
duration_ms = COALESCE(EXCLUDED.duration_ms, executions.duration_ms),
|
||||
error_message = COALESCE(EXCLUDED.error_message, executions.error_message),
|
||||
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace),
|
||||
diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash),
|
||||
updated_at = now()
|
||||
""",
|
||||
execution.executionId(), execution.routeId(), execution.agentId(),
|
||||
execution.groupName(), execution.status(), execution.correlationId(),
|
||||
execution.exchangeId(),
|
||||
Timestamp.from(execution.startTime()),
|
||||
execution.endTime() != null ? Timestamp.from(execution.endTime()) : null,
|
||||
execution.durationMs(), execution.errorMessage(),
|
||||
execution.errorStacktrace(), execution.diagramContentHash());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void upsertProcessors(String executionId, Instant startTime,
|
||||
String groupName, String routeId,
|
||||
List<ProcessorRecord> processors) {
|
||||
jdbc.batchUpdate("""
|
||||
INSERT INTO processor_executions (execution_id, processor_id, processor_type,
|
||||
diagram_node_id, group_name, route_id, depth, parent_processor_id,
|
||||
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
|
||||
input_body, output_body, input_headers, output_headers)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb)
|
||||
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
|
||||
status = EXCLUDED.status,
|
||||
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
|
||||
duration_ms = COALESCE(EXCLUDED.duration_ms, processor_executions.duration_ms),
|
||||
error_message = COALESCE(EXCLUDED.error_message, processor_executions.error_message),
|
||||
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, processor_executions.error_stacktrace),
|
||||
input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body),
|
||||
output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body),
|
||||
input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers),
|
||||
output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers)
|
||||
""",
|
||||
processors.stream().map(p -> new Object[]{
|
||||
p.executionId(), p.processorId(), p.processorType(),
|
||||
p.diagramNodeId(), p.groupName(), p.routeId(),
|
||||
p.depth(), p.parentProcessorId(), p.status(),
|
||||
Timestamp.from(p.startTime()),
|
||||
p.endTime() != null ? Timestamp.from(p.endTime()) : null,
|
||||
p.durationMs(), p.errorMessage(), p.errorStacktrace(),
|
||||
p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders()
|
||||
}).toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ExecutionRecord> findById(String executionId) {
|
||||
List<ExecutionRecord> results = jdbc.query(
|
||||
"SELECT * FROM executions WHERE execution_id = ? ORDER BY start_time DESC LIMIT 1",
|
||||
EXECUTION_MAPPER, executionId);
|
||||
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ProcessorRecord> findProcessors(String executionId) {
|
||||
return jdbc.query(
|
||||
"SELECT * FROM processor_executions WHERE execution_id = ? ORDER BY depth, start_time",
|
||||
PROCESSOR_MAPPER, executionId);
|
||||
}
|
||||
|
||||
private static final RowMapper<ExecutionRecord> EXECUTION_MAPPER = (rs, rowNum) ->
|
||||
new ExecutionRecord(
|
||||
rs.getString("execution_id"), rs.getString("route_id"),
|
||||
rs.getString("agent_id"), rs.getString("group_name"),
|
||||
rs.getString("status"), rs.getString("correlation_id"),
|
||||
rs.getString("exchange_id"),
|
||||
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
|
||||
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
|
||||
rs.getString("error_message"), rs.getString("error_stacktrace"),
|
||||
rs.getString("diagram_content_hash"));
|
||||
|
||||
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
|
||||
new ProcessorRecord(
|
||||
rs.getString("execution_id"), rs.getString("processor_id"),
|
||||
rs.getString("processor_type"), rs.getString("diagram_node_id"),
|
||||
rs.getString("group_name"), rs.getString("route_id"),
|
||||
rs.getInt("depth"), rs.getString("parent_processor_id"),
|
||||
rs.getString("status"),
|
||||
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
|
||||
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
|
||||
rs.getString("error_message"), rs.getString("error_stacktrace"),
|
||||
rs.getString("input_body"), rs.getString("output_body"),
|
||||
rs.getString("input_headers"), rs.getString("output_headers"));
|
||||
|
||||
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
|
||||
Timestamp ts = rs.getTimestamp(column);
|
||||
return ts != null ? ts.toInstant() : null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.app.AbstractPostgresIT;
|
||||
import com.cameleer3.server.core.storage.ExecutionStore;
|
||||
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
|
||||
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
class PostgresExecutionStoreIT extends AbstractPostgresIT {
|
||||
|
||||
@Autowired
|
||||
ExecutionStore executionStore;
|
||||
|
||||
@Test
|
||||
void upsertAndFindById() {
|
||||
Instant now = Instant.now();
|
||||
ExecutionRecord record = new ExecutionRecord(
|
||||
"exec-1", "route-a", "agent-1", "app-1",
|
||||
"COMPLETED", "corr-1", "exchange-1",
|
||||
now, now.plusMillis(100), 100L,
|
||||
null, null, null);
|
||||
|
||||
executionStore.upsert(record);
|
||||
Optional<ExecutionRecord> found = executionStore.findById("exec-1");
|
||||
|
||||
assertTrue(found.isPresent());
|
||||
assertEquals("exec-1", found.get().executionId());
|
||||
assertEquals("COMPLETED", found.get().status());
|
||||
}
|
||||
|
||||
@Test
|
||||
void upsertDeduplicatesByExecutionId() {
|
||||
Instant now = Instant.now();
|
||||
ExecutionRecord first = new ExecutionRecord(
|
||||
"exec-dup", "route-a", "agent-1", "app-1",
|
||||
"RUNNING", null, null, now, null, null, null, null, null);
|
||||
ExecutionRecord second = new ExecutionRecord(
|
||||
"exec-dup", "route-a", "agent-1", "app-1",
|
||||
"COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null);
|
||||
|
||||
executionStore.upsert(first);
|
||||
executionStore.upsert(second);
|
||||
|
||||
Optional<ExecutionRecord> found = executionStore.findById("exec-dup");
|
||||
assertTrue(found.isPresent());
|
||||
assertEquals("COMPLETED", found.get().status());
|
||||
assertEquals(200L, found.get().durationMs());
|
||||
}
|
||||
|
||||
@Test
|
||||
void upsertProcessorsAndFind() {
|
||||
Instant now = Instant.now();
|
||||
ExecutionRecord exec = new ExecutionRecord(
|
||||
"exec-proc", "route-a", "agent-1", "app-1",
|
||||
"COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null);
|
||||
executionStore.upsert(exec);
|
||||
|
||||
List<ProcessorRecord> processors = List.of(
|
||||
new ProcessorRecord("exec-proc", "proc-1", "log", null,
|
||||
"app-1", "route-a", 0, null, "COMPLETED",
|
||||
now, now.plusMillis(10), 10L, null, null,
|
||||
"input body", "output body", null, null),
|
||||
new ProcessorRecord("exec-proc", "proc-2", "to", null,
|
||||
"app-1", "route-a", 1, "proc-1", "COMPLETED",
|
||||
now.plusMillis(10), now.plusMillis(30), 20L, null, null,
|
||||
null, null, null, null)
|
||||
);
|
||||
executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors);
|
||||
|
||||
List<ProcessorRecord> found = executionStore.findProcessors("exec-proc");
|
||||
assertEquals(2, found.size());
|
||||
assertEquals("proc-1", found.get(0).processorId());
|
||||
assertEquals("proc-2", found.get(1).processorId());
|
||||
assertEquals("proc-1", found.get(1).parentProcessorId());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user