test(clickhouse): add integration tests for execution read path and tree reconstruction
Some checks failed
Some checks failed
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,262 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.common.model.ExecutionStatus;
|
||||
import com.cameleer3.common.model.FlatProcessorRecord;
|
||||
import com.cameleer3.server.core.detail.DetailService;
|
||||
import com.cameleer3.server.core.detail.ExecutionDetail;
|
||||
import com.cameleer3.server.core.detail.ProcessorNode;
|
||||
import com.cameleer3.server.core.ingestion.MergedExecution;
|
||||
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Testcontainers
|
||||
class ClickHouseExecutionReadIT {
|
||||
|
||||
@Container
|
||||
static final ClickHouseContainer clickhouse =
|
||||
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||
|
||||
private JdbcTemplate jdbc;
|
||||
private ClickHouseExecutionStore store;
|
||||
private DetailService detailService;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws Exception {
|
||||
HikariDataSource ds = new HikariDataSource();
|
||||
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||
ds.setUsername(clickhouse.getUsername());
|
||||
ds.setPassword(clickhouse.getPassword());
|
||||
jdbc = new JdbcTemplate(ds);
|
||||
|
||||
// Load DDL for both tables
|
||||
String execDdl = new ClassPathResource("clickhouse/V2__executions.sql")
|
||||
.getContentAsString(StandardCharsets.UTF_8);
|
||||
String procDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql")
|
||||
.getContentAsString(StandardCharsets.UTF_8);
|
||||
// Also load V5 for replay fields
|
||||
String replayDdl = new ClassPathResource("clickhouse/V5__replay_fields.sql")
|
||||
.getContentAsString(StandardCharsets.UTF_8);
|
||||
|
||||
jdbc.execute(execDdl);
|
||||
jdbc.execute(procDdl);
|
||||
// V5 has ALTER TABLE statements — execute each separately
|
||||
for (String stmt : replayDdl.split(";")) {
|
||||
String trimmed = stmt.trim();
|
||||
if (!trimmed.isEmpty()) jdbc.execute(trimmed);
|
||||
}
|
||||
|
||||
jdbc.execute("TRUNCATE TABLE executions");
|
||||
jdbc.execute("TRUNCATE TABLE processor_executions");
|
||||
|
||||
store = new ClickHouseExecutionStore(jdbc);
|
||||
detailService = new DetailService(store);
|
||||
}
|
||||
|
||||
// --- Helper factory methods ---
|
||||
|
||||
private MergedExecution minimalExecution(String executionId) {
|
||||
return new MergedExecution(
|
||||
"default", 1L, executionId, "route-a", "agent-1", "my-app",
|
||||
"COMPLETED", "corr-1", "exchange-1",
|
||||
Instant.parse("2026-04-01T10:00:00Z"),
|
||||
Instant.parse("2026-04-01T10:00:01Z"),
|
||||
1000L,
|
||||
"", "", "", "", "", "",
|
||||
"", "REGULAR",
|
||||
"", "", "", "", "{}",
|
||||
"", "",
|
||||
false, false,
|
||||
null, null
|
||||
);
|
||||
}
|
||||
|
||||
private FlatProcessorRecord processor(int seq, String processorId, String processorType) {
|
||||
FlatProcessorRecord p = new FlatProcessorRecord(seq, processorId, processorType);
|
||||
p.setStatus(ExecutionStatus.COMPLETED);
|
||||
p.setStartTime(Instant.parse("2026-04-01T10:00:00Z"));
|
||||
p.setDurationMs(10L);
|
||||
return p;
|
||||
}
|
||||
|
||||
// --- Tests ---
|
||||
|
||||
@Test
|
||||
void findById_returnsInsertedExecution() {
|
||||
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
|
||||
|
||||
Optional<com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord> result =
|
||||
store.findById("exec-1");
|
||||
|
||||
assertThat(result).isPresent();
|
||||
assertThat(result.get().executionId()).isEqualTo("exec-1");
|
||||
assertThat(result.get().routeId()).isEqualTo("route-a");
|
||||
assertThat(result.get().status()).isEqualTo("COMPLETED");
|
||||
assertThat(result.get().agentId()).isEqualTo("agent-1");
|
||||
assertThat(result.get().applicationName()).isEqualTo("my-app");
|
||||
assertThat(result.get().processorsJson()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void findById_notFound_returnsEmpty() {
|
||||
Optional<com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord> result =
|
||||
store.findById("nonexistent");
|
||||
|
||||
assertThat(result).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void findProcessors_returnsOrderedBySeq() {
|
||||
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
|
||||
|
||||
FlatProcessorRecord p1 = processor(1, "log-1", "log");
|
||||
FlatProcessorRecord p2 = processor(2, "transform-1", "setBody");
|
||||
FlatProcessorRecord p3 = processor(3, "to-1", "to");
|
||||
p2.setParentSeq(1);
|
||||
p2.setParentProcessorId("log-1");
|
||||
p3.setParentSeq(1);
|
||||
p3.setParentProcessorId("log-1");
|
||||
|
||||
store.insertProcessorBatch(
|
||||
"default", "exec-1", "route-a", "my-app",
|
||||
Instant.parse("2026-04-01T10:00:00Z"),
|
||||
List.of(p1, p2, p3));
|
||||
|
||||
List<ProcessorRecord> records = store.findProcessors("exec-1");
|
||||
|
||||
assertThat(records).hasSize(3);
|
||||
assertThat(records.get(0).seq()).isEqualTo(1);
|
||||
assertThat(records.get(0).processorId()).isEqualTo("log-1");
|
||||
assertThat(records.get(1).seq()).isEqualTo(2);
|
||||
assertThat(records.get(1).processorId()).isEqualTo("transform-1");
|
||||
assertThat(records.get(1).parentSeq()).isEqualTo(1);
|
||||
assertThat(records.get(2).seq()).isEqualTo(3);
|
||||
assertThat(records.get(2).processorId()).isEqualTo("to-1");
|
||||
assertThat(records.get(2).parentSeq()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findProcessorBySeq_returnsCorrectRecord() {
|
||||
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
|
||||
|
||||
FlatProcessorRecord p1 = processor(1, "log-1", "log");
|
||||
FlatProcessorRecord p2 = processor(2, "to-1", "to");
|
||||
FlatProcessorRecord p3 = processor(3, "log-2", "log");
|
||||
|
||||
store.insertProcessorBatch(
|
||||
"default", "exec-1", "route-a", "my-app",
|
||||
Instant.parse("2026-04-01T10:00:00Z"),
|
||||
List.of(p1, p2, p3));
|
||||
|
||||
Optional<ProcessorRecord> result = store.findProcessorBySeq("exec-1", 2);
|
||||
|
||||
assertThat(result).isPresent();
|
||||
assertThat(result.get().seq()).isEqualTo(2);
|
||||
assertThat(result.get().processorId()).isEqualTo("to-1");
|
||||
assertThat(result.get().processorType()).isEqualTo("to");
|
||||
}
|
||||
|
||||
@Test
|
||||
void findProcessorById_returnsFirstOccurrence() {
|
||||
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
|
||||
|
||||
// Three processors with the same processorId (iteration scenario)
|
||||
FlatProcessorRecord iter0 = processor(1, "to-1", "to");
|
||||
iter0.setIteration(0);
|
||||
|
||||
FlatProcessorRecord iter1 = processor(2, "to-1", "to");
|
||||
iter1.setIteration(1);
|
||||
|
||||
FlatProcessorRecord iter2 = processor(3, "to-1", "to");
|
||||
iter2.setIteration(2);
|
||||
|
||||
store.insertProcessorBatch(
|
||||
"default", "exec-1", "route-a", "my-app",
|
||||
Instant.parse("2026-04-01T10:00:00Z"),
|
||||
List.of(iter0, iter1, iter2));
|
||||
|
||||
Optional<ProcessorRecord> result = store.findProcessorById("exec-1", "to-1");
|
||||
|
||||
assertThat(result).isPresent();
|
||||
// ClickHouse LIMIT 1 returns one record — verify it has the correct processorId
|
||||
assertThat(result.get().processorId()).isEqualTo("to-1");
|
||||
// The returned record should have the lowest seq (first occurrence)
|
||||
assertThat(result.get().seq()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void detailService_buildTree_withIterations() {
|
||||
// Insert an execution
|
||||
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
|
||||
|
||||
// Build a split scenario:
|
||||
// seq=1: log-1 (root)
|
||||
// seq=2: split-1 (root)
|
||||
// seq=3: to-1 (child of split-1, iteration=0)
|
||||
// seq=4: to-1 (child of split-1, iteration=1)
|
||||
// seq=5: to-1 (child of split-1, iteration=2)
|
||||
// seq=6: log-2 (root)
|
||||
|
||||
FlatProcessorRecord log1 = processor(1, "log-1", "log");
|
||||
|
||||
FlatProcessorRecord split1 = processor(2, "split-1", "split");
|
||||
split1.setIterationSize(3);
|
||||
|
||||
FlatProcessorRecord to1iter0 = processor(3, "to-1", "to");
|
||||
to1iter0.setParentSeq(2);
|
||||
to1iter0.setParentProcessorId("split-1");
|
||||
to1iter0.setIteration(0);
|
||||
|
||||
FlatProcessorRecord to1iter1 = processor(4, "to-1", "to");
|
||||
to1iter1.setParentSeq(2);
|
||||
to1iter1.setParentProcessorId("split-1");
|
||||
to1iter1.setIteration(1);
|
||||
|
||||
FlatProcessorRecord to1iter2 = processor(5, "to-1", "to");
|
||||
to1iter2.setParentSeq(2);
|
||||
to1iter2.setParentProcessorId("split-1");
|
||||
to1iter2.setIteration(2);
|
||||
|
||||
FlatProcessorRecord log2 = processor(6, "log-2", "log");
|
||||
|
||||
store.insertProcessorBatch(
|
||||
"default", "exec-1", "route-a", "my-app",
|
||||
Instant.parse("2026-04-01T10:00:00Z"),
|
||||
List.of(log1, split1, to1iter0, to1iter1, to1iter2, log2));
|
||||
|
||||
// Invoke DetailService
|
||||
Optional<ExecutionDetail> detail = detailService.getDetail("exec-1");
|
||||
|
||||
assertThat(detail).isPresent();
|
||||
|
||||
List<ProcessorNode> roots = detail.get().processors();
|
||||
assertThat(roots).hasSize(3);
|
||||
assertThat(roots.get(0).getProcessorId()).isEqualTo("log-1");
|
||||
assertThat(roots.get(1).getProcessorId()).isEqualTo("split-1");
|
||||
assertThat(roots.get(2).getProcessorId()).isEqualTo("log-2");
|
||||
|
||||
// Verify split-1 has 3 children (all with processorId "to-1")
|
||||
ProcessorNode splitNode = roots.get(1);
|
||||
List<ProcessorNode> children = splitNode.getChildren();
|
||||
assertThat(children).hasSize(3);
|
||||
assertThat(children).allMatch(c -> "to-1".equals(c.getProcessorId()));
|
||||
|
||||
// Verify iteration values via getLoopIndex() (iteration maps to loopIndex in the seq-based path)
|
||||
assertThat(children.get(0).getLoopIndex()).isEqualTo(0);
|
||||
assertThat(children.get(1).getLoopIndex()).isEqualTo(1);
|
||||
assertThat(children.get(2).getLoopIndex()).isEqualTo(2);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user