test(clickhouse): add end-to-end chunk pipeline integration test
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,169 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.app.search.ClickHouseSearchIndex;
|
||||
import com.cameleer3.server.core.ingestion.ChunkAccumulator;
|
||||
import com.cameleer3.server.core.ingestion.MergedExecution;
|
||||
import com.cameleer3.server.core.search.ExecutionSummary;
|
||||
import com.cameleer3.server.core.search.SearchRequest;
|
||||
import com.cameleer3.server.core.search.SearchResult;
|
||||
import com.cameleer3.server.core.storage.model.ExecutionChunk;
|
||||
import com.cameleer3.server.core.storage.model.FlatProcessorRecord;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Testcontainers
|
||||
class ClickHouseChunkPipelineIT {
|
||||
|
||||
@Container
|
||||
static final ClickHouseContainer clickhouse =
|
||||
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||
|
||||
private JdbcTemplate jdbc;
|
||||
private ClickHouseExecutionStore executionStore;
|
||||
private ClickHouseSearchIndex searchIndex;
|
||||
private ChunkAccumulator accumulator;
|
||||
private List<MergedExecution> executionBuffer;
|
||||
private List<ChunkAccumulator.ProcessorBatch> processorBuffer;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws IOException {
|
||||
HikariDataSource ds = new HikariDataSource();
|
||||
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||
ds.setUsername(clickhouse.getUsername());
|
||||
ds.setPassword(clickhouse.getPassword());
|
||||
jdbc = new JdbcTemplate(ds);
|
||||
|
||||
String execDdl = new String(getClass().getResourceAsStream(
|
||||
"/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8);
|
||||
String procDdl = new String(getClass().getResourceAsStream(
|
||||
"/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8);
|
||||
jdbc.execute(execDdl);
|
||||
jdbc.execute(procDdl);
|
||||
jdbc.execute("TRUNCATE TABLE executions");
|
||||
jdbc.execute("TRUNCATE TABLE processor_executions");
|
||||
|
||||
executionStore = new ClickHouseExecutionStore(jdbc);
|
||||
searchIndex = new ClickHouseSearchIndex(jdbc);
|
||||
|
||||
executionBuffer = new ArrayList<>();
|
||||
processorBuffer = new ArrayList<>();
|
||||
accumulator = new ChunkAccumulator(executionBuffer::add, processorBuffer::add, Duration.ofMinutes(5));
|
||||
}
|
||||
|
||||
@Test
|
||||
void fullPipeline_chunkedIngestion_thenSearch() {
|
||||
Instant start = Instant.parse("2026-03-31T12:00:00Z");
|
||||
|
||||
// Chunk 0: RUNNING with initial processors
|
||||
accumulator.onChunk(new ExecutionChunk(
|
||||
"pipeline-1", "order-service", "pod-1", "order-route",
|
||||
"corr-1", "RUNNING",
|
||||
start, null, null, "DEEP",
|
||||
null, null, null, null, null, null,
|
||||
Map.of("orderId", "ORD-123"),
|
||||
null, null, null, null,
|
||||
0, false,
|
||||
List.of(
|
||||
new FlatProcessorRecord(1, null, null, "log1", "log",
|
||||
null, null, "COMPLETED", start, 2L,
|
||||
null, null, null, null, null,
|
||||
null, null, null, null, null, null,
|
||||
null, null, null, null, null),
|
||||
new FlatProcessorRecord(2, null, null, "split1", "split",
|
||||
null, 3, "COMPLETED", start.plusMillis(2), 100L,
|
||||
null, null, null, null, null,
|
||||
null, null, null, null, null, null,
|
||||
null, null, null, null, null),
|
||||
new FlatProcessorRecord(3, 2, "split1", "to1", "to",
|
||||
0, null, "COMPLETED", start.plusMillis(5), 30L,
|
||||
"http://inventory/api",
|
||||
"order ABC-123 check stock", "stock available",
|
||||
null, null,
|
||||
null, null, null, null, null, null,
|
||||
null, null, null, null, null))));
|
||||
|
||||
// Processors should be buffered immediately
|
||||
assertThat(processorBuffer).hasSize(1);
|
||||
assertThat(executionBuffer).isEmpty();
|
||||
|
||||
// Chunk 1: COMPLETED (final)
|
||||
accumulator.onChunk(new ExecutionChunk(
|
||||
"pipeline-1", "order-service", "pod-1", "order-route",
|
||||
"corr-1", "COMPLETED",
|
||||
start, start.plusMillis(750), 750L, "DEEP",
|
||||
null, null, null, null, null, null,
|
||||
null, null, null, null, null,
|
||||
1, true,
|
||||
List.of(
|
||||
new FlatProcessorRecord(4, 2, "split1", "to1", "to",
|
||||
1, null, "COMPLETED", start.plusMillis(40), 25L,
|
||||
"http://inventory/api",
|
||||
"order DEF-456 check stock", "stock available",
|
||||
null, null,
|
||||
null, null, null, null, null, null,
|
||||
null, null, null, null, null))));
|
||||
|
||||
assertThat(executionBuffer).hasSize(1);
|
||||
assertThat(processorBuffer).hasSize(2);
|
||||
|
||||
// Flush to ClickHouse (simulating ExecutionFlushScheduler)
|
||||
executionStore.insertExecutionBatch(executionBuffer);
|
||||
for (ChunkAccumulator.ProcessorBatch batch : processorBuffer) {
|
||||
executionStore.insertProcessorBatch(
|
||||
batch.tenantId(), batch.executionId(),
|
||||
batch.routeId(), batch.applicationName(),
|
||||
batch.execStartTime(), batch.processors());
|
||||
}
|
||||
|
||||
// Search by order ID in attributes (via _search_text on executions)
|
||||
SearchResult<ExecutionSummary> result = searchIndex.search(new SearchRequest(
|
||||
null, null, null, null, null, null,
|
||||
"ORD-123", null, null, null,
|
||||
null, null, null, null, null,
|
||||
0, 50, null, null));
|
||||
assertThat(result.total()).isEqualTo(1);
|
||||
assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1");
|
||||
assertThat(result.data().get(0).status()).isEqualTo("COMPLETED");
|
||||
assertThat(result.data().get(0).durationMs()).isEqualTo(750L);
|
||||
|
||||
// Search in processor body
|
||||
SearchResult<ExecutionSummary> bodyResult = searchIndex.search(new SearchRequest(
|
||||
null, null, null, null, null, null,
|
||||
null, "ABC-123", null, null,
|
||||
null, null, null, null, null,
|
||||
0, 50, null, null));
|
||||
assertThat(bodyResult.total()).isEqualTo(1);
|
||||
|
||||
// Verify iteration data in processor_executions
|
||||
Integer iterSize = jdbc.queryForObject(
|
||||
"SELECT iteration_size FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 2",
|
||||
Integer.class);
|
||||
assertThat(iterSize).isEqualTo(3);
|
||||
|
||||
Integer iter0 = jdbc.queryForObject(
|
||||
"SELECT iteration FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 3",
|
||||
Integer.class);
|
||||
assertThat(iter0).isEqualTo(0);
|
||||
|
||||
// Verify total processor count
|
||||
Integer procCount = jdbc.queryForObject(
|
||||
"SELECT count() FROM processor_executions WHERE execution_id = 'pipeline-1'",
|
||||
Integer.class);
|
||||
assertThat(procCount).isEqualTo(4);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user