From 520181d241f12eba1350dabc57d128d11eeee88c Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 1 Apr 2026 00:11:44 +0200 Subject: [PATCH] test(clickhouse): add integration tests for execution read path and tree reconstruction Co-Authored-By: Claude Opus 4.6 (1M context) --- .../storage/ClickHouseExecutionReadIT.java | 262 ++++++++++++++++++ 1 file changed, 262 insertions(+) create mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionReadIT.java diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionReadIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionReadIT.java new file mode 100644 index 00000000..e252a8de --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionReadIT.java @@ -0,0 +1,262 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; +import com.cameleer3.server.core.detail.DetailService; +import com.cameleer3.server.core.detail.ExecutionDetail; +import com.cameleer3.server.core.detail.ProcessorNode; +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseExecutionReadIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseExecutionStore store; + private DetailService detailService; + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + jdbc = new JdbcTemplate(ds); + + // Load DDL for both tables + String execDdl = new ClassPathResource("clickhouse/V2__executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + String procDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + // Also load V5 for replay fields + String replayDdl = new ClassPathResource("clickhouse/V5__replay_fields.sql") + .getContentAsString(StandardCharsets.UTF_8); + + jdbc.execute(execDdl); + jdbc.execute(procDdl); + // V5 has ALTER TABLE statements — execute each separately + for (String stmt : replayDdl.split(";")) { + String trimmed = stmt.trim(); + if (!trimmed.isEmpty()) jdbc.execute(trimmed); + } + + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + store = new ClickHouseExecutionStore(jdbc); + detailService = new DetailService(store); + } + + // --- Helper factory methods --- + + private MergedExecution minimalExecution(String executionId) { + return new MergedExecution( + "default", 1L, executionId, "route-a", "agent-1", "my-app", + "COMPLETED", "corr-1", "exchange-1", + Instant.parse("2026-04-01T10:00:00Z"), + Instant.parse("2026-04-01T10:00:01Z"), + 1000L, + "", "", "", "", "", "", + "", "REGULAR", + "", "", "", "", "{}", + "", "", + false, false, + null, null + ); + } + + private FlatProcessorRecord processor(int seq, String processorId, String processorType) { + FlatProcessorRecord p = new FlatProcessorRecord(seq, processorId, processorType); + p.setStatus(ExecutionStatus.COMPLETED); + p.setStartTime(Instant.parse("2026-04-01T10:00:00Z")); + p.setDurationMs(10L); + return p; + } + + // --- Tests --- + + @Test + void findById_returnsInsertedExecution() { + store.insertExecutionBatch(List.of(minimalExecution("exec-1"))); + + Optional result = + store.findById("exec-1"); + + assertThat(result).isPresent(); + assertThat(result.get().executionId()).isEqualTo("exec-1"); + assertThat(result.get().routeId()).isEqualTo("route-a"); + assertThat(result.get().status()).isEqualTo("COMPLETED"); + assertThat(result.get().agentId()).isEqualTo("agent-1"); + assertThat(result.get().applicationName()).isEqualTo("my-app"); + assertThat(result.get().processorsJson()).isNull(); + } + + @Test + void findById_notFound_returnsEmpty() { + Optional result = + store.findById("nonexistent"); + + assertThat(result).isEmpty(); + } + + @Test + void findProcessors_returnsOrderedBySeq() { + store.insertExecutionBatch(List.of(minimalExecution("exec-1"))); + + FlatProcessorRecord p1 = processor(1, "log-1", "log"); + FlatProcessorRecord p2 = processor(2, "transform-1", "setBody"); + FlatProcessorRecord p3 = processor(3, "to-1", "to"); + p2.setParentSeq(1); + p2.setParentProcessorId("log-1"); + p3.setParentSeq(1); + p3.setParentProcessorId("log-1"); + + store.insertProcessorBatch( + "default", "exec-1", "route-a", "my-app", + Instant.parse("2026-04-01T10:00:00Z"), + List.of(p1, p2, p3)); + + List records = store.findProcessors("exec-1"); + + assertThat(records).hasSize(3); + assertThat(records.get(0).seq()).isEqualTo(1); + assertThat(records.get(0).processorId()).isEqualTo("log-1"); + assertThat(records.get(1).seq()).isEqualTo(2); + assertThat(records.get(1).processorId()).isEqualTo("transform-1"); + assertThat(records.get(1).parentSeq()).isEqualTo(1); + assertThat(records.get(2).seq()).isEqualTo(3); + assertThat(records.get(2).processorId()).isEqualTo("to-1"); + assertThat(records.get(2).parentSeq()).isEqualTo(1); + } + + @Test + void findProcessorBySeq_returnsCorrectRecord() { + store.insertExecutionBatch(List.of(minimalExecution("exec-1"))); + + FlatProcessorRecord p1 = processor(1, "log-1", "log"); + FlatProcessorRecord p2 = processor(2, "to-1", "to"); + FlatProcessorRecord p3 = processor(3, "log-2", "log"); + + store.insertProcessorBatch( + "default", "exec-1", "route-a", "my-app", + Instant.parse("2026-04-01T10:00:00Z"), + List.of(p1, p2, p3)); + + Optional result = store.findProcessorBySeq("exec-1", 2); + + assertThat(result).isPresent(); + assertThat(result.get().seq()).isEqualTo(2); + assertThat(result.get().processorId()).isEqualTo("to-1"); + assertThat(result.get().processorType()).isEqualTo("to"); + } + + @Test + void findProcessorById_returnsFirstOccurrence() { + store.insertExecutionBatch(List.of(minimalExecution("exec-1"))); + + // Three processors with the same processorId (iteration scenario) + FlatProcessorRecord iter0 = processor(1, "to-1", "to"); + iter0.setIteration(0); + + FlatProcessorRecord iter1 = processor(2, "to-1", "to"); + iter1.setIteration(1); + + FlatProcessorRecord iter2 = processor(3, "to-1", "to"); + iter2.setIteration(2); + + store.insertProcessorBatch( + "default", "exec-1", "route-a", "my-app", + Instant.parse("2026-04-01T10:00:00Z"), + List.of(iter0, iter1, iter2)); + + Optional result = store.findProcessorById("exec-1", "to-1"); + + assertThat(result).isPresent(); + // ClickHouse LIMIT 1 returns one record — verify it has the correct processorId + assertThat(result.get().processorId()).isEqualTo("to-1"); + // The returned record should have the lowest seq (first occurrence) + assertThat(result.get().seq()).isEqualTo(1); + } + + @Test + void detailService_buildTree_withIterations() { + // Insert an execution + store.insertExecutionBatch(List.of(minimalExecution("exec-1"))); + + // Build a split scenario: + // seq=1: log-1 (root) + // seq=2: split-1 (root) + // seq=3: to-1 (child of split-1, iteration=0) + // seq=4: to-1 (child of split-1, iteration=1) + // seq=5: to-1 (child of split-1, iteration=2) + // seq=6: log-2 (root) + + FlatProcessorRecord log1 = processor(1, "log-1", "log"); + + FlatProcessorRecord split1 = processor(2, "split-1", "split"); + split1.setIterationSize(3); + + FlatProcessorRecord to1iter0 = processor(3, "to-1", "to"); + to1iter0.setParentSeq(2); + to1iter0.setParentProcessorId("split-1"); + to1iter0.setIteration(0); + + FlatProcessorRecord to1iter1 = processor(4, "to-1", "to"); + to1iter1.setParentSeq(2); + to1iter1.setParentProcessorId("split-1"); + to1iter1.setIteration(1); + + FlatProcessorRecord to1iter2 = processor(5, "to-1", "to"); + to1iter2.setParentSeq(2); + to1iter2.setParentProcessorId("split-1"); + to1iter2.setIteration(2); + + FlatProcessorRecord log2 = processor(6, "log-2", "log"); + + store.insertProcessorBatch( + "default", "exec-1", "route-a", "my-app", + Instant.parse("2026-04-01T10:00:00Z"), + List.of(log1, split1, to1iter0, to1iter1, to1iter2, log2)); + + // Invoke DetailService + Optional detail = detailService.getDetail("exec-1"); + + assertThat(detail).isPresent(); + + List roots = detail.get().processors(); + assertThat(roots).hasSize(3); + assertThat(roots.get(0).getProcessorId()).isEqualTo("log-1"); + assertThat(roots.get(1).getProcessorId()).isEqualTo("split-1"); + assertThat(roots.get(2).getProcessorId()).isEqualTo("log-2"); + + // Verify split-1 has 3 children (all with processorId "to-1") + ProcessorNode splitNode = roots.get(1); + List children = splitNode.getChildren(); + assertThat(children).hasSize(3); + assertThat(children).allMatch(c -> "to-1".equals(c.getProcessorId())); + + // Verify iteration values via getLoopIndex() (iteration maps to loopIndex in the seq-based path) + assertThat(children.get(0).getLoopIndex()).isEqualTo(0); + assertThat(children.get(1).getLoopIndex()).isEqualTo(1); + assertThat(children.get(2).getLoopIndex()).isEqualTo(2); + } +}