diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java new file mode 100644 index 00000000..287bd18c --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java @@ -0,0 +1,151 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +public class ClickHouseExecutionStore { + + private final JdbcTemplate jdbc; + private final ObjectMapper objectMapper; + + public ClickHouseExecutionStore(JdbcTemplate jdbc) { + this(jdbc, new ObjectMapper()); + } + + public ClickHouseExecutionStore(JdbcTemplate jdbc, ObjectMapper objectMapper) { + this.jdbc = jdbc; + this.objectMapper = objectMapper; + } + + public void insertExecutionBatch(List executions) { + if (executions.isEmpty()) return; + + jdbc.batchUpdate(""" + INSERT INTO executions ( + tenant_id, _version, execution_id, route_id, agent_id, application_name, + status, correlation_id, exchange_id, start_time, end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, diagram_content_hash, engine_level, + input_body, output_body, input_headers, output_headers, attributes, + trace_id, span_id, has_trace_data, is_replay + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + executions.stream().map(e -> new Object[]{ + nullToEmpty(e.tenantId()), + e.version(), + nullToEmpty(e.executionId()), + nullToEmpty(e.routeId()), + nullToEmpty(e.agentId()), + nullToEmpty(e.applicationName()), + nullToEmpty(e.status()), + nullToEmpty(e.correlationId()), + nullToEmpty(e.exchangeId()), + Timestamp.from(e.startTime()), + e.endTime() != null ? Timestamp.from(e.endTime()) : null, + e.durationMs(), + nullToEmpty(e.errorMessage()), + nullToEmpty(e.errorStacktrace()), + nullToEmpty(e.errorType()), + nullToEmpty(e.errorCategory()), + nullToEmpty(e.rootCauseType()), + nullToEmpty(e.rootCauseMessage()), + nullToEmpty(e.diagramContentHash()), + nullToEmpty(e.engineLevel()), + nullToEmpty(e.inputBody()), + nullToEmpty(e.outputBody()), + nullToEmpty(e.inputHeaders()), + nullToEmpty(e.outputHeaders()), + nullToEmpty(e.attributes()), + nullToEmpty(e.traceId()), + nullToEmpty(e.spanId()), + e.hasTraceData(), + e.isReplay() + }).toList()); + } + + public void insertProcessorBatch(String tenantId, String executionId, String routeId, + String applicationName, Instant execStartTime, + List processors) { + if (processors.isEmpty()) return; + + jdbc.batchUpdate(""" + INSERT INTO processor_executions ( + tenant_id, execution_id, seq, parent_seq, parent_processor_id, + processor_id, processor_type, start_time, route_id, application_name, + iteration, iteration_size, status, end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, + input_body, output_body, input_headers, output_headers, attributes, + resolved_endpoint_uri, circuit_breaker_state, + fallback_triggered, filter_matched, duplicate_message + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + processors.stream().map(p -> new Object[]{ + nullToEmpty(tenantId), + nullToEmpty(executionId), + p.seq(), + p.parentSeq(), + nullToEmpty(p.parentProcessorId()), + nullToEmpty(p.processorId()), + nullToEmpty(p.processorType()), + Timestamp.from(p.startTime() != null ? p.startTime() : execStartTime), + nullToEmpty(routeId), + nullToEmpty(applicationName), + p.iteration(), + p.iterationSize(), + nullToEmpty(p.status()), + computeEndTime(p.startTime(), p.durationMs()), + p.durationMs(), + nullToEmpty(p.errorMessage()), + nullToEmpty(p.errorStackTrace()), + nullToEmpty(p.errorType()), + nullToEmpty(p.errorCategory()), + nullToEmpty(p.rootCauseType()), + nullToEmpty(p.rootCauseMessage()), + nullToEmpty(p.inputBody()), + nullToEmpty(p.outputBody()), + mapToJson(p.inputHeaders()), + mapToJson(p.outputHeaders()), + mapToJson(p.attributes()), + nullToEmpty(p.resolvedEndpointUri()), + nullToEmpty(p.circuitBreakerState()), + boolOrFalse(p.fallbackTriggered()), + boolOrFalse(p.filterMatched()), + boolOrFalse(p.duplicateMessage()) + }).toList()); + } + + private static String nullToEmpty(String value) { + return value != null ? value : ""; + } + + private static boolean boolOrFalse(Boolean value) { + return value != null && value; + } + + private static Timestamp computeEndTime(Instant startTime, long durationMs) { + if (startTime != null && durationMs > 0) { + return Timestamp.from(startTime.plusMillis(durationMs)); + } + return null; + } + + private String mapToJson(Map map) { + if (map == null || map.isEmpty()) return ""; + try { + return objectMapper.writeValueAsString(map); + } catch (JsonProcessingException e) { + return ""; + } + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java new file mode 100644 index 00000000..0904507e --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java @@ -0,0 +1,231 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseExecutionStoreIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseExecutionStore store; + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + // Load DDL from classpath resources + String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + + jdbc.execute(executionsDdl); + jdbc.execute(processorsDdl); + + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + store = new ClickHouseExecutionStore(jdbc); + } + + @Test + void insertExecutionBatch_writesToClickHouse() { + MergedExecution exec = new MergedExecution( + "default", 1L, "exec-1", "route-a", "agent-1", "my-app", + "COMPLETED", "corr-1", "exchange-1", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), + 1000L, + "some error", "stack trace", "IOException", "IO", + "FileNotFoundException", "file not found", + "hash-abc", "FULL", + "{\"key\":\"val\"}", "{\"out\":\"val\"}", + "{\"h1\":\"v1\"}", "{\"h2\":\"v2\"}", + "{\"attr\":\"val\"}", + "trace-123", "span-456", + true, false + ); + + store.insertExecutionBatch(List.of(exec)); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM executions WHERE execution_id = 'exec-1'", + Integer.class); + assertThat(count).isEqualTo(1); + } + + @Test + void insertProcessorBatch_writesToClickHouse() { + FlatProcessorRecord proc = new FlatProcessorRecord( + 1, null, null, + "proc-1", "to", null, null, + "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 50L, + "http://example.com", + "input body", "output body", + Map.of("h1", "v1"), Map.of("h2", "v2"), + null, null, null, null, null, null, + Map.of("a1", "v1"), + null, null, null, null + ); + + store.insertProcessorBatch( + "default", "exec-1", "route-a", "my-app", + Instant.parse("2026-03-31T10:00:00Z"), + List.of(proc)); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM processor_executions WHERE execution_id = 'exec-1'", + Integer.class); + assertThat(count).isEqualTo(1); + + // Verify seq is stored + Integer seq = jdbc.queryForObject( + "SELECT seq FROM processor_executions WHERE execution_id = 'exec-1'", + Integer.class); + assertThat(seq).isEqualTo(1); + } + + @Test + void insertProcessorBatch_withIterations() { + FlatProcessorRecord splitContainer = new FlatProcessorRecord( + 1, null, null, + "split-1", "split", null, 3, + "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 300L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null + ); + + FlatProcessorRecord child0 = new FlatProcessorRecord( + 2, 1, "split-1", + "child-proc", "to", 0, null, + "COMPLETED", + Instant.parse("2026-03-31T10:00:00.100Z"), 80L, + "http://svc-a", "body0", "out0", + null, null, + null, null, null, null, null, null, + null, null, null, null, null + ); + + FlatProcessorRecord child1 = new FlatProcessorRecord( + 3, 1, "split-1", + "child-proc", "to", 1, null, + "COMPLETED", + Instant.parse("2026-03-31T10:00:00.200Z"), 90L, + "http://svc-a", "body1", "out1", + null, null, + null, null, null, null, null, null, + null, null, null, null, null + ); + + FlatProcessorRecord child2 = new FlatProcessorRecord( + 4, 1, "split-1", + "child-proc", "to", 2, null, + "COMPLETED", + Instant.parse("2026-03-31T10:00:00.300Z"), 100L, + "http://svc-a", "body2", "out2", + null, null, + null, null, null, null, null, null, + null, null, null, null, null + ); + + store.insertProcessorBatch( + "default", "exec-2", "route-b", "my-app", + Instant.parse("2026-03-31T10:00:00Z"), + List.of(splitContainer, child0, child1, child2)); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM processor_executions WHERE execution_id = 'exec-2'", + Integer.class); + assertThat(count).isEqualTo(4); + + // Verify iteration data on the split container + Integer iterationSize = jdbc.queryForObject( + "SELECT iteration_size FROM processor_executions " + + "WHERE execution_id = 'exec-2' AND seq = 1", + Integer.class); + assertThat(iterationSize).isEqualTo(3); + + // Verify iteration index on a child + Integer iteration = jdbc.queryForObject( + "SELECT iteration FROM processor_executions " + + "WHERE execution_id = 'exec-2' AND seq = 3", + Integer.class); + assertThat(iteration).isEqualTo(1); + } + + @Test + void insertExecutionBatch_emptyList_doesNothing() { + store.insertExecutionBatch(List.of()); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM executions", Integer.class); + assertThat(count).isEqualTo(0); + } + + @Test + void insertExecutionBatch_replacingMergeTree_keepsLatestVersion() { + MergedExecution v1 = new MergedExecution( + "default", 1L, "exec-r", "route-a", "agent-1", "my-app", + "RUNNING", "corr-1", "exchange-1", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, + "", "", "", "", "", "", + "", "FULL", + "", "", "", "", "", + "", "", + false, false + ); + + MergedExecution v2 = new MergedExecution( + "default", 2L, "exec-r", "route-a", "agent-1", "my-app", + "COMPLETED", "corr-1", "exchange-1", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:05Z"), + 5000L, + "", "", "", "", "", "", + "", "FULL", + "", "", "", "", "", + "", "", + false, false + ); + + store.insertExecutionBatch(List.of(v1)); + store.insertExecutionBatch(List.of(v2)); + + // Force merge to apply ReplacingMergeTree deduplication + jdbc.execute("OPTIMIZE TABLE executions FINAL"); + + String status = jdbc.queryForObject( + "SELECT status FROM executions " + + "WHERE execution_id = 'exec-r'", + String.class); + assertThat(status).isEqualTo("COMPLETED"); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java new file mode 100644 index 00000000..d5227ab8 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java @@ -0,0 +1,39 @@ +package com.cameleer3.server.core.ingestion; + +import java.time.Instant; + +/** + * A merged execution envelope ready for ClickHouse insertion. + * Produced by ChunkAccumulator after receiving the final chunk. + */ +public record MergedExecution( + String tenantId, + long version, + String executionId, + String routeId, + String agentId, + String applicationName, + String status, + String correlationId, + String exchangeId, + Instant startTime, + Instant endTime, + Long durationMs, + String errorMessage, + String errorStacktrace, + String errorType, + String errorCategory, + String rootCauseType, + String rootCauseMessage, + String diagramContentHash, + String engineLevel, + String inputBody, + String outputBody, + String inputHeaders, + String outputHeaders, + String attributes, + String traceId, + String spanId, + boolean hasTraceData, + boolean isReplay +) {}