diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java new file mode 100644 index 00000000..219f836b --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java @@ -0,0 +1,169 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.app.search.ClickHouseSearchIndex; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseChunkPipelineIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseExecutionStore executionStore; + private ClickHouseSearchIndex searchIndex; + private ChunkAccumulator accumulator; + private List executionBuffer; + private List processorBuffer; + + @BeforeEach + void setUp() throws IOException { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + jdbc = new JdbcTemplate(ds); + + String execDdl = new String(getClass().getResourceAsStream( + "/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8); + String procDdl = new String(getClass().getResourceAsStream( + "/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8); + jdbc.execute(execDdl); + jdbc.execute(procDdl); + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + executionStore = new ClickHouseExecutionStore(jdbc); + searchIndex = new ClickHouseSearchIndex(jdbc); + + executionBuffer = new ArrayList<>(); + processorBuffer = new ArrayList<>(); + accumulator = new ChunkAccumulator(executionBuffer::add, processorBuffer::add, Duration.ofMinutes(5)); + } + + @Test + void fullPipeline_chunkedIngestion_thenSearch() { + Instant start = Instant.parse("2026-03-31T12:00:00Z"); + + // Chunk 0: RUNNING with initial processors + accumulator.onChunk(new ExecutionChunk( + "pipeline-1", "order-service", "pod-1", "order-route", + "corr-1", "RUNNING", + start, null, null, "DEEP", + null, null, null, null, null, null, + Map.of("orderId", "ORD-123"), + null, null, null, null, + 0, false, + List.of( + new FlatProcessorRecord(1, null, null, "log1", "log", + null, null, "COMPLETED", start, 2L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord(2, null, null, "split1", "split", + null, 3, "COMPLETED", start.plusMillis(2), 100L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord(3, 2, "split1", "to1", "to", + 0, null, "COMPLETED", start.plusMillis(5), 30L, + "http://inventory/api", + "order ABC-123 check stock", "stock available", + null, null, + null, null, null, null, null, null, + null, null, null, null, null)))); + + // Processors should be buffered immediately + assertThat(processorBuffer).hasSize(1); + assertThat(executionBuffer).isEmpty(); + + // Chunk 1: COMPLETED (final) + accumulator.onChunk(new ExecutionChunk( + "pipeline-1", "order-service", "pod-1", "order-route", + "corr-1", "COMPLETED", + start, start.plusMillis(750), 750L, "DEEP", + null, null, null, null, null, null, + null, null, null, null, null, + 1, true, + List.of( + new FlatProcessorRecord(4, 2, "split1", "to1", "to", + 1, null, "COMPLETED", start.plusMillis(40), 25L, + "http://inventory/api", + "order DEF-456 check stock", "stock available", + null, null, + null, null, null, null, null, null, + null, null, null, null, null)))); + + assertThat(executionBuffer).hasSize(1); + assertThat(processorBuffer).hasSize(2); + + // Flush to ClickHouse (simulating ExecutionFlushScheduler) + executionStore.insertExecutionBatch(executionBuffer); + for (ChunkAccumulator.ProcessorBatch batch : processorBuffer) { + executionStore.insertProcessorBatch( + batch.tenantId(), batch.executionId(), + batch.routeId(), batch.applicationName(), + batch.execStartTime(), batch.processors()); + } + + // Search by order ID in attributes (via _search_text on executions) + SearchResult result = searchIndex.search(new SearchRequest( + null, null, null, null, null, null, + "ORD-123", null, null, null, + null, null, null, null, null, + 0, 50, null, null)); + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1"); + assertThat(result.data().get(0).status()).isEqualTo("COMPLETED"); + assertThat(result.data().get(0).durationMs()).isEqualTo(750L); + + // Search in processor body + SearchResult bodyResult = searchIndex.search(new SearchRequest( + null, null, null, null, null, null, + null, "ABC-123", null, null, + null, null, null, null, null, + 0, 50, null, null)); + assertThat(bodyResult.total()).isEqualTo(1); + + // Verify iteration data in processor_executions + Integer iterSize = jdbc.queryForObject( + "SELECT iteration_size FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 2", + Integer.class); + assertThat(iterSize).isEqualTo(3); + + Integer iter0 = jdbc.queryForObject( + "SELECT iteration FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 3", + Integer.class); + assertThat(iter0).isEqualTo(0); + + // Verify total processor count + Integer procCount = jdbc.queryForObject( + "SELECT count() FROM processor_executions WHERE execution_id = 'pipeline-1'", + Integer.class); + assertThat(procCount).isEqualTo(4); + } +}