Rename Java packages from com.cameleer3 to com.cameleer, module directories from cameleer3-* to cameleer-*, and all references throughout workflows, Dockerfiles, docs, migrations, and pom.xml. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1829 lines
76 KiB
Markdown
1829 lines
76 KiB
Markdown
# ClickHouse Phase 2: Executions + Search — Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Ingest chunked execution data into ClickHouse and provide a ClickHouse-backed search implementation behind a feature flag. Replace the old `RouteExecution` tree ingestion with `ExecutionChunk` + `FlatProcessorRecord` flat ingestion.
|
|
|
|
**Architecture:** Agents send `ExecutionChunk` documents containing flat `FlatProcessorRecord` entries with `seq`/`parentSeq`/`iteration` fields. A new `ChunkIngestionController` accepts chunks at `POST /api/v1/data/chunks`. A `ChunkAccumulator` buffers exchange envelope data in-memory, inserts processor records immediately via WriteBuffer, and writes the execution row when the final chunk arrives. A `ClickHouseSearchIndex` implements the `SearchIndex` interface using SQL with ngram bloom filter acceleration. Feature flags control which search backend is active. The old `RouteExecution` ingestion path is removed (no backward compatibility needed).
|
|
|
|
**Tech Stack:** ClickHouse 24.12, clickhouse-jdbc 0.9.7 (all classifier), Spring JdbcTemplate, Testcontainers
|
|
|
|
**Design Spec:** `docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md`
|
|
|
|
**Note on cameleer-common:** The agent team is refactoring `cameleer-common` to add `ExecutionChunk` and `FlatProcessorRecord`. Until that library is published, this plan defines server-side DTOs in `cameleer-server-core` that mirror the common models. When the common lib is ready, swap the server DTOs for the shared classes (import change only).
|
|
|
|
---
|
|
|
|
## File Structure
|
|
|
|
| File | Responsibility |
|
|
|------|----------------|
|
|
| `cameleer-server-app/.../resources/clickhouse/V2__executions.sql` | DDL for `executions` table (ReplacingMergeTree) |
|
|
| `cameleer-server-app/.../resources/clickhouse/V3__processor_executions.sql` | DDL for `processor_executions` table (MergeTree) — uses seq/parentSeq/iteration |
|
|
| `cameleer-server-core/.../model/ExecutionChunk.java` | Server-side DTO mirroring agent's ExecutionChunk (temporary until common lib ready) |
|
|
| `cameleer-server-core/.../model/FlatProcessorRecord.java` | Server-side DTO mirroring agent's FlatProcessorRecord (temporary until common lib ready) |
|
|
| `cameleer-server-core/.../ingestion/ChunkAccumulator.java` | Accumulates exchange envelope across chunks, pushes processor records + final execution row to WriteBuffers |
|
|
| `cameleer-server-core/.../ingestion/MergedExecution.java` | Record holding merged execution envelope + version + tenant |
|
|
| `cameleer-server-app/.../storage/ClickHouseExecutionStore.java` | Batch INSERT for executions + processor_executions to ClickHouse |
|
|
| `cameleer-server-app/.../search/ClickHouseSearchIndex.java` | SearchIndex impl using SQL with ngram indexes |
|
|
| `cameleer-server-app/.../ingestion/ExecutionFlushScheduler.java` | Drains execution + processor WriteBuffers → ClickHouseExecutionStore |
|
|
| `cameleer-server-app/.../controller/ChunkIngestionController.java` | REST endpoint `POST /api/v1/data/chunks` accepting ExecutionChunk |
|
|
| `cameleer-server-app/.../config/StorageBeanConfig.java` | Modified: add chunk accumulator + CH search beans |
|
|
| `cameleer-server-app/.../config/IngestionBeanConfig.java` | Modified: add execution + processor WriteBuffer beans |
|
|
| `cameleer-server-app/.../resources/application.yml` | Modified: add `cameleer.storage.search` flag |
|
|
| `cameleer-server-app/...test.../storage/ClickHouseExecutionStoreIT.java` | Integration test for CH execution writes |
|
|
| `cameleer-server-app/...test.../search/ClickHouseSearchIndexIT.java` | Integration test for CH search |
|
|
| `cameleer-server-core/...test.../ingestion/ChunkAccumulatorTest.java` | Unit test for accumulator logic |
|
|
|
|
---
|
|
|
|
### Task 1: Server-Side DTOs (ExecutionChunk + FlatProcessorRecord)
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/FlatProcessorRecord.java`
|
|
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionChunk.java`
|
|
|
|
These mirror the agent's models exactly. When `cameleer-common` is published with these classes, delete these files and update imports.
|
|
|
|
- [ ] **Step 1: Create FlatProcessorRecord**
|
|
|
|
```java
|
|
// cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/FlatProcessorRecord.java
|
|
package com.cameleer.server.core.storage.model;
|
|
|
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
|
|
|
import java.time.Instant;
|
|
import java.util.Map;
|
|
|
|
/**
|
|
* Flat processor execution record with seq/parentSeq for tree reconstruction.
|
|
* Mirrors cameleer-common FlatProcessorRecord — replace with common lib import when available.
|
|
*/
|
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
|
public record FlatProcessorRecord(
|
|
int seq,
|
|
Integer parentSeq,
|
|
String parentProcessorId,
|
|
String processorId,
|
|
String processorType,
|
|
Integer iteration,
|
|
Integer iterationSize,
|
|
String status,
|
|
Instant startTime,
|
|
long durationMs,
|
|
String resolvedEndpointUri,
|
|
String inputBody,
|
|
String outputBody,
|
|
Map<String, String> inputHeaders,
|
|
Map<String, String> outputHeaders,
|
|
String errorMessage,
|
|
String errorStackTrace,
|
|
String errorType,
|
|
String errorCategory,
|
|
String rootCauseType,
|
|
String rootCauseMessage,
|
|
Map<String, String> attributes,
|
|
String circuitBreakerState,
|
|
Boolean fallbackTriggered,
|
|
Boolean filterMatched,
|
|
Boolean duplicateMessage
|
|
) {}
|
|
```
|
|
|
|
- [ ] **Step 2: Create ExecutionChunk**
|
|
|
|
```java
|
|
// cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionChunk.java
|
|
package com.cameleer.server.core.storage.model;
|
|
|
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
|
|
|
import java.time.Instant;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
|
|
/**
|
|
* Chunk document: exchange envelope + list of FlatProcessorRecords.
|
|
* Mirrors cameleer-common ExecutionChunk — replace with common lib import when available.
|
|
*/
|
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
|
public record ExecutionChunk(
|
|
String exchangeId,
|
|
String applicationName,
|
|
String agentId,
|
|
String routeId,
|
|
String correlationId,
|
|
String status,
|
|
Instant startTime,
|
|
Instant endTime,
|
|
Long durationMs,
|
|
String engineLevel,
|
|
String errorMessage,
|
|
String errorStackTrace,
|
|
String errorType,
|
|
String errorCategory,
|
|
String rootCauseType,
|
|
String rootCauseMessage,
|
|
Map<String, String> attributes,
|
|
String traceId,
|
|
String spanId,
|
|
String originalExchangeId,
|
|
String replayExchangeId,
|
|
int chunkSeq,
|
|
@JsonProperty("final") boolean isFinal,
|
|
List<FlatProcessorRecord> processors
|
|
) {}
|
|
```
|
|
|
|
- [ ] **Step 3: Write deserialization test**
|
|
|
|
```java
|
|
// cameleer-server-core/src/test/java/com/cameleer/server/core/storage/model/ExecutionChunkDeserializationTest.java
|
|
package com.cameleer.server.core.storage.model;
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
|
import org.junit.jupiter.api.Test;
|
|
|
|
import java.time.Instant;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
class ExecutionChunkDeserializationTest {
|
|
|
|
private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule());
|
|
|
|
@Test
|
|
void roundTrip_fullChunk() throws Exception {
|
|
ExecutionChunk chunk = new ExecutionChunk(
|
|
"ex-1", "order-service", "pod-1", "order-route",
|
|
"corr-1", "COMPLETED",
|
|
Instant.parse("2026-03-31T10:00:00Z"),
|
|
Instant.parse("2026-03-31T10:00:01Z"), 1000L,
|
|
"REGULAR",
|
|
null, null, null, null, null, null,
|
|
Map.of("orderId", "ORD-1"),
|
|
"trace-1", "span-1", null, null,
|
|
2, true,
|
|
List.of(new FlatProcessorRecord(
|
|
1, null, null, "log1", "log",
|
|
null, null, "COMPLETED",
|
|
Instant.parse("2026-03-31T10:00:00.100Z"), 5L,
|
|
null, "body", null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null)));
|
|
|
|
String json = MAPPER.writeValueAsString(chunk);
|
|
ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class);
|
|
|
|
assertThat(deserialized.exchangeId()).isEqualTo("ex-1");
|
|
assertThat(deserialized.isFinal()).isTrue();
|
|
assertThat(deserialized.chunkSeq()).isEqualTo(2);
|
|
assertThat(deserialized.processors()).hasSize(1);
|
|
assertThat(deserialized.processors().get(0).seq()).isEqualTo(1);
|
|
assertThat(deserialized.processors().get(0).processorId()).isEqualTo("log1");
|
|
assertThat(deserialized.attributes()).containsEntry("orderId", "ORD-1");
|
|
}
|
|
|
|
@Test
|
|
void roundTrip_runningChunkWithIterations() throws Exception {
|
|
ExecutionChunk chunk = new ExecutionChunk(
|
|
"ex-2", "app", "agent-1", "route-1",
|
|
"ex-2", "RUNNING",
|
|
Instant.parse("2026-03-31T10:00:00Z"),
|
|
null, null, "REGULAR",
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null,
|
|
0, false,
|
|
List.of(
|
|
new FlatProcessorRecord(
|
|
1, null, null, "split1", "split",
|
|
null, 3, "COMPLETED",
|
|
Instant.parse("2026-03-31T10:00:00Z"), 100L,
|
|
null, null, null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null),
|
|
new FlatProcessorRecord(
|
|
2, 1, "split1", "log1", "log",
|
|
0, null, "COMPLETED",
|
|
Instant.parse("2026-03-31T10:00:00Z"), 5L,
|
|
null, "item-0", null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null),
|
|
new FlatProcessorRecord(
|
|
3, 1, "split1", "log1", "log",
|
|
1, null, "COMPLETED",
|
|
Instant.parse("2026-03-31T10:00:00Z"), 5L,
|
|
null, "item-1", null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null)));
|
|
|
|
String json = MAPPER.writeValueAsString(chunk);
|
|
ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class);
|
|
|
|
assertThat(deserialized.isFinal()).isFalse();
|
|
assertThat(deserialized.processors()).hasSize(3);
|
|
|
|
FlatProcessorRecord split = deserialized.processors().get(0);
|
|
assertThat(split.iterationSize()).isEqualTo(3);
|
|
assertThat(split.parentSeq()).isNull();
|
|
|
|
FlatProcessorRecord child0 = deserialized.processors().get(1);
|
|
assertThat(child0.parentSeq()).isEqualTo(1);
|
|
assertThat(child0.parentProcessorId()).isEqualTo("split1");
|
|
assertThat(child0.iteration()).isEqualTo(0);
|
|
}
|
|
|
|
@Test
|
|
void deserialize_unknownFieldsIgnored() throws Exception {
|
|
String json = """
|
|
{"exchangeId":"ex-1","routeId":"r1","status":"COMPLETED",
|
|
"startTime":"2026-03-31T10:00:00Z","chunkSeq":0,"final":true,
|
|
"futureField":"ignored","processors":[]}
|
|
""";
|
|
ExecutionChunk chunk = MAPPER.readValue(json, ExecutionChunk.class);
|
|
assertThat(chunk.exchangeId()).isEqualTo("ex-1");
|
|
assertThat(chunk.isFinal()).isTrue();
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests**
|
|
|
|
```bash
|
|
mvn test -pl cameleer-server-core -Dtest=ExecutionChunkDeserializationTest
|
|
```
|
|
|
|
Expected: PASS (3 tests).
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/FlatProcessorRecord.java \
|
|
cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionChunk.java \
|
|
cameleer-server-core/src/test/java/com/cameleer/server/core/storage/model/ExecutionChunkDeserializationTest.java
|
|
git commit -m "feat: add server-side ExecutionChunk and FlatProcessorRecord DTOs"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 2: DDL Scripts for executions and processor_executions
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/main/resources/clickhouse/V2__executions.sql`
|
|
- Create: `cameleer-server-app/src/main/resources/clickhouse/V3__processor_executions.sql`
|
|
|
|
- [ ] **Step 1: Create executions DDL**
|
|
|
|
The executions table stores one row per exchange (written when the final chunk arrives). Uses `ReplacingMergeTree(_version)` for rare late corrections.
|
|
|
|
```sql
|
|
-- V2__executions.sql
|
|
CREATE TABLE IF NOT EXISTS executions (
|
|
tenant_id LowCardinality(String) DEFAULT 'default',
|
|
execution_id String,
|
|
start_time DateTime64(3),
|
|
_version UInt64 DEFAULT 1,
|
|
route_id LowCardinality(String),
|
|
agent_id LowCardinality(String),
|
|
application_name LowCardinality(String),
|
|
status LowCardinality(String),
|
|
correlation_id String DEFAULT '',
|
|
exchange_id String DEFAULT '',
|
|
end_time Nullable(DateTime64(3)),
|
|
duration_ms Nullable(Int64),
|
|
error_message String DEFAULT '',
|
|
error_stacktrace String DEFAULT '',
|
|
error_type LowCardinality(String) DEFAULT '',
|
|
error_category LowCardinality(String) DEFAULT '',
|
|
root_cause_type String DEFAULT '',
|
|
root_cause_message String DEFAULT '',
|
|
diagram_content_hash String DEFAULT '',
|
|
engine_level LowCardinality(String) DEFAULT '',
|
|
input_body String DEFAULT '',
|
|
output_body String DEFAULT '',
|
|
input_headers String DEFAULT '',
|
|
output_headers String DEFAULT '',
|
|
attributes String DEFAULT '',
|
|
trace_id String DEFAULT '',
|
|
span_id String DEFAULT '',
|
|
has_trace_data Bool DEFAULT false,
|
|
is_replay Bool DEFAULT false,
|
|
|
|
_search_text String MATERIALIZED
|
|
concat(error_message, ' ', error_stacktrace, ' ', attributes,
|
|
' ', input_body, ' ', output_body, ' ', input_headers,
|
|
' ', output_headers, ' ', root_cause_message),
|
|
|
|
INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
|
INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
|
INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
|
INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
|
INDEX idx_status status TYPE set(10) GRANULARITY 1,
|
|
INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4
|
|
)
|
|
ENGINE = ReplacingMergeTree(_version)
|
|
PARTITION BY (tenant_id, toYYYYMM(start_time))
|
|
ORDER BY (tenant_id, start_time, application_name, route_id, execution_id)
|
|
TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE
|
|
SETTINGS index_granularity = 8192;
|
|
```
|
|
|
|
Note: Removed `processors_json` — with flat records in `processor_executions`, the nested JSON column is no longer needed.
|
|
|
|
- [ ] **Step 2: Create processor_executions DDL**
|
|
|
|
Uses `seq`/`parentSeq`/`iteration` instead of `depth`/`loopIndex`/`splitIndex`/`multicastIndex`. ORDER BY ends with `seq` (unique per execution) instead of `processor_id` (can repeat across iterations).
|
|
|
|
```sql
|
|
-- V3__processor_executions.sql
|
|
CREATE TABLE IF NOT EXISTS processor_executions (
|
|
tenant_id LowCardinality(String) DEFAULT 'default',
|
|
execution_id String,
|
|
seq UInt32,
|
|
parent_seq Nullable(UInt32),
|
|
parent_processor_id String DEFAULT '',
|
|
processor_id String,
|
|
processor_type LowCardinality(String),
|
|
start_time DateTime64(3),
|
|
route_id LowCardinality(String),
|
|
application_name LowCardinality(String),
|
|
iteration Nullable(Int32),
|
|
iteration_size Nullable(Int32),
|
|
status LowCardinality(String),
|
|
end_time Nullable(DateTime64(3)),
|
|
duration_ms Nullable(Int64),
|
|
error_message String DEFAULT '',
|
|
error_stacktrace String DEFAULT '',
|
|
error_type LowCardinality(String) DEFAULT '',
|
|
error_category LowCardinality(String) DEFAULT '',
|
|
root_cause_type String DEFAULT '',
|
|
root_cause_message String DEFAULT '',
|
|
input_body String DEFAULT '',
|
|
output_body String DEFAULT '',
|
|
input_headers String DEFAULT '',
|
|
output_headers String DEFAULT '',
|
|
attributes String DEFAULT '',
|
|
resolved_endpoint_uri String DEFAULT '',
|
|
circuit_breaker_state LowCardinality(String) DEFAULT '',
|
|
fallback_triggered Bool DEFAULT false,
|
|
filter_matched Bool DEFAULT false,
|
|
duplicate_message Bool DEFAULT false,
|
|
|
|
_search_text String MATERIALIZED
|
|
concat(error_message, ' ', error_stacktrace, ' ', attributes,
|
|
' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers),
|
|
|
|
INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
|
INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4
|
|
)
|
|
ENGINE = MergeTree()
|
|
PARTITION BY (tenant_id, toYYYYMM(start_time))
|
|
ORDER BY (tenant_id, start_time, application_name, route_id, execution_id, seq)
|
|
TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE
|
|
SETTINGS index_granularity = 8192;
|
|
```
|
|
|
|
- [ ] **Step 3: Verify DDL loads**
|
|
|
|
```bash
|
|
mvn clean compile -pl cameleer-server-app
|
|
```
|
|
|
|
`ClickHouseSchemaInitializer` scans `classpath:clickhouse/*.sql` automatically.
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/main/resources/clickhouse/V2__executions.sql \
|
|
cameleer-server-app/src/main/resources/clickhouse/V3__processor_executions.sql
|
|
git commit -m "feat(clickhouse): add executions and processor_executions DDL for chunked transport"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 3: MergedExecution + ClickHouseExecutionStore
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/MergedExecution.java`
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java`
|
|
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseExecutionStoreIT.java`
|
|
|
|
The store handles batch INSERT for both `executions` (from MergedExecution) and `processor_executions` (from FlatProcessorRecord). It does NOT implement the `ExecutionStore` interface — it has its own batch API consumed by the flush scheduler.
|
|
|
|
- [ ] **Step 1: Create MergedExecution record**
|
|
|
|
```java
|
|
// cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/MergedExecution.java
|
|
package com.cameleer.server.core.ingestion;
|
|
|
|
import java.time.Instant;
|
|
import java.util.Map;
|
|
|
|
/**
|
|
* A merged execution envelope ready for ClickHouse insertion.
|
|
* Produced by {@link ChunkAccumulator} after receiving the final chunk.
|
|
*/
|
|
public record MergedExecution(
|
|
String tenantId,
|
|
long version,
|
|
String executionId,
|
|
String routeId,
|
|
String agentId,
|
|
String applicationName,
|
|
String status,
|
|
String correlationId,
|
|
String exchangeId,
|
|
Instant startTime,
|
|
Instant endTime,
|
|
Long durationMs,
|
|
String errorMessage,
|
|
String errorStacktrace,
|
|
String errorType,
|
|
String errorCategory,
|
|
String rootCauseType,
|
|
String rootCauseMessage,
|
|
String diagramContentHash,
|
|
String engineLevel,
|
|
String inputBody,
|
|
String outputBody,
|
|
String inputHeaders,
|
|
String outputHeaders,
|
|
String attributes,
|
|
String traceId,
|
|
String spanId,
|
|
boolean hasTraceData,
|
|
boolean isReplay
|
|
) {}
|
|
```
|
|
|
|
- [ ] **Step 2: Write the failing integration test**
|
|
|
|
```java
|
|
// cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseExecutionStoreIT.java
|
|
package com.cameleer.server.app.storage;
|
|
|
|
import com.cameleer.server.core.ingestion.MergedExecution;
|
|
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
|
|
import com.zaxxer.hikari.HikariDataSource;
|
|
import org.junit.jupiter.api.BeforeEach;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.testcontainers.clickhouse.ClickHouseContainer;
|
|
import org.testcontainers.junit.jupiter.Container;
|
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
|
|
|
import java.io.IOException;
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.time.Instant;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
@Testcontainers
|
|
class ClickHouseExecutionStoreIT {
|
|
|
|
@Container
|
|
static final ClickHouseContainer clickhouse =
|
|
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
|
|
|
private JdbcTemplate jdbc;
|
|
private ClickHouseExecutionStore store;
|
|
|
|
@BeforeEach
|
|
void setUp() throws IOException {
|
|
HikariDataSource ds = new HikariDataSource();
|
|
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
|
ds.setUsername(clickhouse.getUsername());
|
|
ds.setPassword(clickhouse.getPassword());
|
|
jdbc = new JdbcTemplate(ds);
|
|
|
|
String execDdl = new String(getClass().getResourceAsStream(
|
|
"/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8);
|
|
String procDdl = new String(getClass().getResourceAsStream(
|
|
"/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8);
|
|
jdbc.execute(execDdl);
|
|
jdbc.execute(procDdl);
|
|
jdbc.execute("TRUNCATE TABLE executions");
|
|
jdbc.execute("TRUNCATE TABLE processor_executions");
|
|
|
|
store = new ClickHouseExecutionStore(jdbc);
|
|
}
|
|
|
|
@Test
|
|
void insertExecutionBatch_writesToClickHouse() {
|
|
MergedExecution exec = new MergedExecution(
|
|
"default", 1,
|
|
"exec-1", "route-timer", "agent-a", "my-app",
|
|
"COMPLETED", "corr-1", "exchange-1",
|
|
Instant.parse("2026-03-31T10:00:00Z"),
|
|
Instant.parse("2026-03-31T10:00:01Z"), 1000L,
|
|
null, null, null, null, null, null,
|
|
"hash-abc", "DEEP",
|
|
"{\"key\":\"val\"}", "{\"out\":\"data\"}", "{\"h\":\"1\"}", "{\"h\":\"2\"}",
|
|
"{\"attr\":\"val\"}",
|
|
"trace-1", "span-1", true, false);
|
|
|
|
store.insertExecutionBatch(List.of(exec));
|
|
|
|
Integer count = jdbc.queryForObject(
|
|
"SELECT count() FROM executions WHERE execution_id = 'exec-1'",
|
|
Integer.class);
|
|
assertThat(count).isEqualTo(1);
|
|
}
|
|
|
|
@Test
|
|
void insertProcessorBatch_writesToClickHouse() {
|
|
FlatProcessorRecord proc = new FlatProcessorRecord(
|
|
1, null, null, "proc-1", "to",
|
|
null, null, "COMPLETED",
|
|
Instant.parse("2026-03-31T10:00:00Z"), 500L,
|
|
"http://localhost:8080/api",
|
|
"input body", "output body",
|
|
Map.of("Content-Type", "application/json"), null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null);
|
|
|
|
store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app",
|
|
Instant.parse("2026-03-31T10:00:00Z"), List.of(proc));
|
|
|
|
Integer count = jdbc.queryForObject(
|
|
"SELECT count() FROM processor_executions WHERE execution_id = 'exec-1'",
|
|
Integer.class);
|
|
assertThat(count).isEqualTo(1);
|
|
|
|
// Verify seq and parent_seq are stored
|
|
Integer seq = jdbc.queryForObject(
|
|
"SELECT seq FROM processor_executions WHERE execution_id = 'exec-1'",
|
|
Integer.class);
|
|
assertThat(seq).isEqualTo(1);
|
|
}
|
|
|
|
@Test
|
|
void insertProcessorBatch_withIterations() {
|
|
List<FlatProcessorRecord> procs = List.of(
|
|
new FlatProcessorRecord(1, null, null, "split1", "split",
|
|
null, 3, "COMPLETED",
|
|
Instant.parse("2026-03-31T10:00:00Z"), 100L,
|
|
null, null, null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null),
|
|
new FlatProcessorRecord(2, 1, "split1", "log1", "log",
|
|
0, null, "COMPLETED",
|
|
Instant.parse("2026-03-31T10:00:00Z"), 5L,
|
|
null, "item-0", null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null),
|
|
new FlatProcessorRecord(3, 1, "split1", "log1", "log",
|
|
1, null, "COMPLETED",
|
|
Instant.parse("2026-03-31T10:00:00Z"), 5L,
|
|
null, "item-1", null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null),
|
|
new FlatProcessorRecord(4, 1, "split1", "log1", "log",
|
|
2, null, "COMPLETED",
|
|
Instant.parse("2026-03-31T10:00:00Z"), 5L,
|
|
null, "item-2", null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null));
|
|
|
|
store.insertProcessorBatch("default", "exec-split", "route-1", "my-app",
|
|
Instant.parse("2026-03-31T10:00:00Z"), procs);
|
|
|
|
Integer count = jdbc.queryForObject(
|
|
"SELECT count() FROM processor_executions WHERE execution_id = 'exec-split'",
|
|
Integer.class);
|
|
assertThat(count).isEqualTo(4);
|
|
|
|
// Verify iteration data
|
|
Integer iterSize = jdbc.queryForObject(
|
|
"SELECT iteration_size FROM processor_executions WHERE execution_id = 'exec-split' AND seq = 1",
|
|
Integer.class);
|
|
assertThat(iterSize).isEqualTo(3);
|
|
}
|
|
|
|
@Test
|
|
void insertExecutionBatch_emptyList_doesNothing() {
|
|
store.insertExecutionBatch(List.of());
|
|
Integer count = jdbc.queryForObject("SELECT count() FROM executions", Integer.class);
|
|
assertThat(count).isEqualTo(0);
|
|
}
|
|
|
|
@Test
|
|
void insertExecutionBatch_replacingMergeTree_keepsLatestVersion() {
|
|
Instant startTime = Instant.parse("2026-03-31T10:00:00Z");
|
|
MergedExecution v1 = new MergedExecution(
|
|
"default", 1,
|
|
"exec-dup", "route-1", "agent-a", "my-app",
|
|
"RUNNING", null, "exchange-1",
|
|
startTime, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null,
|
|
null, null, null, null, null,
|
|
null, null, false, false);
|
|
MergedExecution v2 = new MergedExecution(
|
|
"default", 2,
|
|
"exec-dup", "route-1", "agent-a", "my-app",
|
|
"COMPLETED", "corr-1", "exchange-1",
|
|
startTime, Instant.parse("2026-03-31T10:00:01Z"), 1000L,
|
|
null, null, null, null, null, null,
|
|
null, null,
|
|
null, null, null, null, null,
|
|
null, null, false, false);
|
|
|
|
store.insertExecutionBatch(List.of(v1, v2));
|
|
jdbc.execute("OPTIMIZE TABLE executions FINAL");
|
|
|
|
String status = jdbc.queryForObject(
|
|
"SELECT status FROM executions WHERE execution_id = 'exec-dup'",
|
|
String.class);
|
|
assertThat(status).isEqualTo("COMPLETED");
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 3: Run test to verify it fails**
|
|
|
|
```bash
|
|
mvn test -pl cameleer-server-app -Dtest=ClickHouseExecutionStoreIT -Dfailsafe.provider=surefire -DfailIfNoTests=false
|
|
```
|
|
|
|
Expected: compilation error — `ClickHouseExecutionStore` does not exist.
|
|
|
|
- [ ] **Step 4: Implement ClickHouseExecutionStore**
|
|
|
|
```java
|
|
// cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java
|
|
package com.cameleer.server.app.storage;
|
|
|
|
import com.cameleer.server.core.ingestion.MergedExecution;
|
|
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
|
|
import java.sql.Timestamp;
|
|
import java.time.Instant;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
|
|
public class ClickHouseExecutionStore {
|
|
|
|
private static final ObjectMapper JSON = new ObjectMapper();
|
|
private final JdbcTemplate jdbc;
|
|
|
|
public ClickHouseExecutionStore(JdbcTemplate jdbc) {
|
|
this.jdbc = jdbc;
|
|
}
|
|
|
|
public void insertExecutionBatch(List<MergedExecution> executions) {
|
|
if (executions.isEmpty()) return;
|
|
|
|
jdbc.batchUpdate("""
|
|
INSERT INTO executions (
|
|
tenant_id, execution_id, start_time, _version,
|
|
route_id, agent_id, application_name, status,
|
|
correlation_id, exchange_id, end_time, duration_ms,
|
|
error_message, error_stacktrace, error_type, error_category,
|
|
root_cause_type, root_cause_message,
|
|
diagram_content_hash, engine_level,
|
|
input_body, output_body, input_headers, output_headers,
|
|
attributes, trace_id, span_id,
|
|
has_trace_data, is_replay
|
|
) VALUES (
|
|
?, ?, ?, ?,
|
|
?, ?, ?, ?,
|
|
?, ?, ?, ?,
|
|
?, ?, ?, ?,
|
|
?, ?,
|
|
?, ?,
|
|
?, ?, ?, ?,
|
|
?, ?, ?,
|
|
?, ?
|
|
)
|
|
""",
|
|
executions.stream().map(e -> new Object[]{
|
|
e.tenantId(),
|
|
e.executionId(),
|
|
Timestamp.from(e.startTime()),
|
|
e.version(),
|
|
orEmpty(e.routeId()),
|
|
orEmpty(e.agentId()),
|
|
orEmpty(e.applicationName()),
|
|
orEmpty(e.status()),
|
|
orEmpty(e.correlationId()),
|
|
orEmpty(e.exchangeId()),
|
|
e.endTime() != null ? Timestamp.from(e.endTime()) : null,
|
|
e.durationMs(),
|
|
orEmpty(e.errorMessage()),
|
|
orEmpty(e.errorStacktrace()),
|
|
orEmpty(e.errorType()),
|
|
orEmpty(e.errorCategory()),
|
|
orEmpty(e.rootCauseType()),
|
|
orEmpty(e.rootCauseMessage()),
|
|
orEmpty(e.diagramContentHash()),
|
|
orEmpty(e.engineLevel()),
|
|
orEmpty(e.inputBody()),
|
|
orEmpty(e.outputBody()),
|
|
orEmpty(e.inputHeaders()),
|
|
orEmpty(e.outputHeaders()),
|
|
orEmpty(e.attributes()),
|
|
orEmpty(e.traceId()),
|
|
orEmpty(e.spanId()),
|
|
e.hasTraceData(),
|
|
e.isReplay()
|
|
}).toList());
|
|
}
|
|
|
|
public void insertProcessorBatch(String tenantId, String executionId,
|
|
String routeId, String applicationName,
|
|
Instant execStartTime,
|
|
List<FlatProcessorRecord> processors) {
|
|
if (processors.isEmpty()) return;
|
|
|
|
jdbc.batchUpdate("""
|
|
INSERT INTO processor_executions (
|
|
tenant_id, execution_id, seq, parent_seq, parent_processor_id,
|
|
processor_id, processor_type, start_time,
|
|
route_id, application_name,
|
|
iteration, iteration_size, status,
|
|
end_time, duration_ms,
|
|
error_message, error_stacktrace, error_type, error_category,
|
|
root_cause_type, root_cause_message,
|
|
input_body, output_body, input_headers, output_headers,
|
|
attributes, resolved_endpoint_uri,
|
|
circuit_breaker_state, fallback_triggered,
|
|
filter_matched, duplicate_message
|
|
) VALUES (
|
|
?, ?, ?, ?, ?,
|
|
?, ?, ?,
|
|
?, ?,
|
|
?, ?, ?,
|
|
?, ?,
|
|
?, ?, ?, ?,
|
|
?, ?,
|
|
?, ?, ?, ?,
|
|
?, ?,
|
|
?, ?,
|
|
?, ?
|
|
)
|
|
""",
|
|
processors.stream().map(p -> new Object[]{
|
|
tenantId,
|
|
executionId,
|
|
p.seq(),
|
|
p.parentSeq(),
|
|
orEmpty(p.parentProcessorId()),
|
|
p.processorId(),
|
|
p.processorType(),
|
|
p.startTime() != null ? Timestamp.from(p.startTime()) : Timestamp.from(execStartTime),
|
|
routeId,
|
|
applicationName,
|
|
p.iteration(),
|
|
p.iterationSize(),
|
|
orEmpty(p.status()),
|
|
p.startTime() != null && p.durationMs() > 0
|
|
? Timestamp.from(p.startTime().plusMillis(p.durationMs())) : null,
|
|
p.durationMs(),
|
|
orEmpty(p.errorMessage()),
|
|
orEmpty(p.errorStackTrace()),
|
|
orEmpty(p.errorType()),
|
|
orEmpty(p.errorCategory()),
|
|
orEmpty(p.rootCauseType()),
|
|
orEmpty(p.rootCauseMessage()),
|
|
orEmpty(p.inputBody()),
|
|
orEmpty(p.outputBody()),
|
|
headersToString(p.inputHeaders()),
|
|
headersToString(p.outputHeaders()),
|
|
mapToString(p.attributes()),
|
|
orEmpty(p.resolvedEndpointUri()),
|
|
orEmpty(p.circuitBreakerState()),
|
|
p.fallbackTriggered() != null ? p.fallbackTriggered() : false,
|
|
p.filterMatched() != null ? p.filterMatched() : false,
|
|
p.duplicateMessage() != null ? p.duplicateMessage() : false
|
|
}).toList());
|
|
}
|
|
|
|
private static String orEmpty(String value) {
|
|
return value != null ? value : "";
|
|
}
|
|
|
|
private static String headersToString(Map<String, String> headers) {
|
|
if (headers == null || headers.isEmpty()) return "";
|
|
try {
|
|
return JSON.writeValueAsString(headers);
|
|
} catch (JsonProcessingException e) {
|
|
return "";
|
|
}
|
|
}
|
|
|
|
private static String mapToString(Map<String, String> map) {
|
|
if (map == null || map.isEmpty()) return "";
|
|
try {
|
|
return JSON.writeValueAsString(map);
|
|
} catch (JsonProcessingException e) {
|
|
return "";
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 5: Run test to verify it passes**
|
|
|
|
```bash
|
|
mvn test -pl cameleer-server-app -Dtest=ClickHouseExecutionStoreIT -Dfailsafe.provider=surefire
|
|
```
|
|
|
|
Expected: all 5 tests PASS.
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/MergedExecution.java \
|
|
cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java \
|
|
cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseExecutionStoreIT.java
|
|
git commit -m "feat(clickhouse): add ClickHouseExecutionStore with batch insert for chunked format"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 4: ChunkAccumulator
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-core/src/test/java/com/cameleer/server/core/ingestion/ChunkAccumulatorTest.java`
|
|
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/ChunkAccumulator.java`
|
|
|
|
The ChunkAccumulator receives `ExecutionChunk` documents. For each chunk:
|
|
- Processor records are pushed to a sink immediately (they're append-only)
|
|
- Exchange envelope data is buffered/merged in a ConcurrentHashMap
|
|
- When `isFinal=true`, the merged envelope is pushed to an execution sink
|
|
|
|
A scheduled sweep flushes stale exchanges (no final chunk received within 5 minutes).
|
|
|
|
- [ ] **Step 1: Write the failing unit test**
|
|
|
|
```java
|
|
// cameleer-server-core/src/test/java/com/cameleer/server/core/ingestion/ChunkAccumulatorTest.java
|
|
package com.cameleer.server.core.ingestion;
|
|
|
|
import com.cameleer.server.core.storage.model.ExecutionChunk;
|
|
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
|
|
import org.junit.jupiter.api.BeforeEach;
|
|
import org.junit.jupiter.api.Test;
|
|
|
|
import java.time.Duration;
|
|
import java.time.Instant;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
class ChunkAccumulatorTest {
|
|
|
|
private List<MergedExecution> executionSink;
|
|
private List<ChunkAccumulator.ProcessorBatch> processorSink;
|
|
private ChunkAccumulator accumulator;
|
|
|
|
@BeforeEach
|
|
void setUp() {
|
|
executionSink = new CopyOnWriteArrayList<>();
|
|
processorSink = new CopyOnWriteArrayList<>();
|
|
accumulator = new ChunkAccumulator(executionSink::add, processorSink::add, Duration.ofMinutes(5));
|
|
}
|
|
|
|
@Test
|
|
void singleFinalChunk_producesExecutionAndProcessors() {
|
|
ExecutionChunk chunk = new ExecutionChunk(
|
|
"ex-1", "my-app", "agent-a", "route-1",
|
|
"corr-1", "COMPLETED",
|
|
Instant.parse("2026-03-31T10:00:00Z"),
|
|
Instant.parse("2026-03-31T10:00:01Z"), 1000L,
|
|
"DEEP",
|
|
null, null, null, null, null, null,
|
|
Map.of("env", "prod"),
|
|
"trace-1", "span-1", null, null,
|
|
0, true,
|
|
List.of(new FlatProcessorRecord(
|
|
1, null, null, "log1", "log",
|
|
null, null, "COMPLETED",
|
|
Instant.parse("2026-03-31T10:00:00.100Z"), 5L,
|
|
null, "body", null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null)));
|
|
|
|
accumulator.onChunk(chunk);
|
|
|
|
assertThat(executionSink).hasSize(1);
|
|
MergedExecution merged = executionSink.get(0);
|
|
assertThat(merged.executionId()).isEqualTo("ex-1");
|
|
assertThat(merged.status()).isEqualTo("COMPLETED");
|
|
assertThat(merged.durationMs()).isEqualTo(1000L);
|
|
assertThat(merged.version()).isEqualTo(1);
|
|
|
|
assertThat(processorSink).hasSize(1);
|
|
assertThat(processorSink.get(0).processors()).hasSize(1);
|
|
assertThat(processorSink.get(0).executionId()).isEqualTo("ex-1");
|
|
}
|
|
|
|
@Test
|
|
void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() {
|
|
Instant start = Instant.parse("2026-03-31T10:00:00Z");
|
|
|
|
// Chunk 0: RUNNING with 2 processors
|
|
ExecutionChunk chunk0 = new ExecutionChunk(
|
|
"ex-multi", "my-app", "agent-a", "route-1",
|
|
"corr-1", "RUNNING",
|
|
start, null, null, "DEEP",
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null,
|
|
0, false,
|
|
List.of(
|
|
new FlatProcessorRecord(1, null, null, "log1", "log",
|
|
null, null, "COMPLETED", start, 5L,
|
|
null, null, null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null),
|
|
new FlatProcessorRecord(2, null, null, "to1", "to",
|
|
null, null, "COMPLETED", start.plusMillis(5), 10L,
|
|
"http://svc/api", null, null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null)));
|
|
accumulator.onChunk(chunk0);
|
|
|
|
// Processors inserted immediately
|
|
assertThat(processorSink).hasSize(1);
|
|
assertThat(processorSink.get(0).processors()).hasSize(2);
|
|
// Execution NOT yet flushed
|
|
assertThat(executionSink).isEmpty();
|
|
|
|
// Chunk 1: COMPLETED (final) with 1 more processor
|
|
ExecutionChunk chunk1 = new ExecutionChunk(
|
|
"ex-multi", "my-app", "agent-a", "route-1",
|
|
"corr-1", "COMPLETED",
|
|
start, start.plusMillis(500), 500L, "DEEP",
|
|
null, null, null, null, null, null,
|
|
Map.of("result", "ok"),
|
|
null, null, null, null,
|
|
1, true,
|
|
List.of(new FlatProcessorRecord(3, null, null, "log2", "log",
|
|
null, null, "COMPLETED", start.plusMillis(100), 2L,
|
|
null, null, null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null)));
|
|
accumulator.onChunk(chunk1);
|
|
|
|
// Final chunk triggers execution flush
|
|
assertThat(executionSink).hasSize(1);
|
|
MergedExecution merged = executionSink.get(0);
|
|
assertThat(merged.status()).isEqualTo("COMPLETED");
|
|
assertThat(merged.durationMs()).isEqualTo(500L);
|
|
assertThat(merged.version()).isEqualTo(1);
|
|
|
|
// Second processor batch
|
|
assertThat(processorSink).hasSize(2);
|
|
assertThat(processorSink.get(1).processors()).hasSize(1);
|
|
}
|
|
|
|
@Test
|
|
void staleExchange_flushedBySweep() {
|
|
accumulator = new ChunkAccumulator(executionSink::add, processorSink::add, Duration.ofMillis(1));
|
|
|
|
ExecutionChunk chunk = new ExecutionChunk(
|
|
"ex-stale", "my-app", "agent-a", "route-1",
|
|
"ex-stale", "RUNNING",
|
|
Instant.parse("2026-03-31T09:50:00Z"),
|
|
null, null, "REGULAR",
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null,
|
|
0, false, List.of());
|
|
accumulator.onChunk(chunk);
|
|
|
|
try { Thread.sleep(5); } catch (InterruptedException ignored) {}
|
|
|
|
accumulator.sweepStale();
|
|
|
|
assertThat(executionSink).hasSize(1);
|
|
assertThat(executionSink.get(0).status()).isEqualTo("RUNNING");
|
|
assertThat(executionSink.get(0).version()).isEqualTo(1);
|
|
}
|
|
|
|
@Test
|
|
void finalChunkWithErrors_populatesErrorFields() {
|
|
ExecutionChunk chunk = new ExecutionChunk(
|
|
"ex-err", "my-app", "agent-a", "route-1",
|
|
"ex-err", "FAILED",
|
|
Instant.parse("2026-03-31T10:00:00Z"),
|
|
Instant.parse("2026-03-31T10:00:00.200Z"), 200L,
|
|
"REGULAR",
|
|
"Connection refused", "java.net.ConnectException...",
|
|
"java.net.ConnectException", "CONNECTION",
|
|
"java.net.ConnectException", "Connection refused",
|
|
null, null, null, null, null,
|
|
0, true, List.of());
|
|
accumulator.onChunk(chunk);
|
|
|
|
MergedExecution merged = executionSink.get(0);
|
|
assertThat(merged.errorMessage()).isEqualTo("Connection refused");
|
|
assertThat(merged.errorType()).isEqualTo("java.net.ConnectException");
|
|
assertThat(merged.errorCategory()).isEqualTo("CONNECTION");
|
|
}
|
|
|
|
@Test
|
|
void getPendingCount_tracksBufferedExchanges() {
|
|
Instant t = Instant.parse("2026-03-31T10:00:00Z");
|
|
accumulator.onChunk(new ExecutionChunk("e1", "app", "a", "r", "e1", "RUNNING",
|
|
t, null, null, "REGULAR", null, null, null, null, null, null,
|
|
null, null, null, null, null, 0, false, List.of()));
|
|
accumulator.onChunk(new ExecutionChunk("e2", "app", "a", "r", "e2", "RUNNING",
|
|
t, null, null, "REGULAR", null, null, null, null, null, null,
|
|
null, null, null, null, null, 0, false, List.of()));
|
|
assertThat(accumulator.getPendingCount()).isEqualTo(2);
|
|
|
|
accumulator.onChunk(new ExecutionChunk("e1", "app", "a", "r", "e1", "COMPLETED",
|
|
t, t.plusMillis(100), 100L, "REGULAR", null, null, null, null, null, null,
|
|
null, null, null, null, null, 1, true, List.of()));
|
|
assertThat(accumulator.getPendingCount()).isEqualTo(1);
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Run test to verify it fails**
|
|
|
|
```bash
|
|
mvn test -pl cameleer-server-core -Dtest=ChunkAccumulatorTest -DfailIfNoTests=false
|
|
```
|
|
|
|
Expected: compilation error — `ChunkAccumulator` does not exist.
|
|
|
|
- [ ] **Step 3: Implement ChunkAccumulator**
|
|
|
|
```java
|
|
// cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/ChunkAccumulator.java
|
|
package com.cameleer.server.core.ingestion;
|
|
|
|
import com.cameleer.server.core.storage.model.ExecutionChunk;
|
|
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import java.time.Duration;
|
|
import java.time.Instant;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.function.Consumer;
|
|
|
|
/**
|
|
* Accumulates ExecutionChunk documents per exchange.
|
|
* <p>
|
|
* Processor records are pushed to the processor sink immediately (append-only).
|
|
* Exchange envelope data is buffered and merged across chunks.
|
|
* When the final chunk arrives, the merged envelope is pushed to the execution sink.
|
|
*/
|
|
public class ChunkAccumulator {
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class);
|
|
private static final ObjectMapper JSON = new ObjectMapper();
|
|
private static final String DEFAULT_TENANT = "default";
|
|
|
|
private final Consumer<MergedExecution> executionSink;
|
|
private final Consumer<ProcessorBatch> processorSink;
|
|
private final Duration staleThreshold;
|
|
private final Map<String, PendingExchange> pending = new ConcurrentHashMap<>();
|
|
|
|
public ChunkAccumulator(Consumer<MergedExecution> executionSink,
|
|
Consumer<ProcessorBatch> processorSink,
|
|
Duration staleThreshold) {
|
|
this.executionSink = executionSink;
|
|
this.processorSink = processorSink;
|
|
this.staleThreshold = staleThreshold;
|
|
}
|
|
|
|
public void onChunk(ExecutionChunk chunk) {
|
|
String exchangeId = chunk.exchangeId();
|
|
|
|
// Insert processor records immediately (append-only)
|
|
if (chunk.processors() != null && !chunk.processors().isEmpty()) {
|
|
processorSink.accept(new ProcessorBatch(
|
|
DEFAULT_TENANT, exchangeId,
|
|
coalesce(chunk.routeId(), ""),
|
|
coalesce(chunk.applicationName(), ""),
|
|
chunk.startTime(),
|
|
chunk.processors()));
|
|
}
|
|
|
|
if (chunk.isFinal()) {
|
|
// Merge with any pending state and flush execution
|
|
PendingExchange pendingExchange = pending.remove(exchangeId);
|
|
MergedExecution merged = buildMergedExecution(chunk, pendingExchange);
|
|
executionSink.accept(merged);
|
|
} else {
|
|
// Buffer/update exchange envelope
|
|
pending.compute(exchangeId, (id, existing) -> {
|
|
if (existing == null) {
|
|
return new PendingExchange(chunk, Instant.now());
|
|
}
|
|
return existing.mergeWith(chunk);
|
|
});
|
|
}
|
|
}
|
|
|
|
public void sweepStale() {
|
|
Instant cutoff = Instant.now().minus(staleThreshold);
|
|
List<String> staleIds = new ArrayList<>();
|
|
|
|
pending.forEach((id, pe) -> {
|
|
if (pe.receivedAt().isBefore(cutoff)) {
|
|
staleIds.add(id);
|
|
}
|
|
});
|
|
|
|
for (String id : staleIds) {
|
|
PendingExchange stale = pending.remove(id);
|
|
if (stale != null) {
|
|
log.info("Flushing stale exchange {}", id);
|
|
executionSink.accept(buildMergedExecution(stale.envelope(), null));
|
|
}
|
|
}
|
|
}
|
|
|
|
public int getPendingCount() {
|
|
return pending.size();
|
|
}
|
|
|
|
private MergedExecution buildMergedExecution(ExecutionChunk finalChunk,
|
|
PendingExchange pendingState) {
|
|
ExecutionChunk base = pendingState != null ? pendingState.envelope() : null;
|
|
|
|
String attributes = serializeMap(finalChunk.attributes());
|
|
if ((attributes == null || attributes.isEmpty()) && base != null) {
|
|
attributes = serializeMap(base.envelope().attributes());
|
|
}
|
|
|
|
boolean hasTraceData = false;
|
|
boolean isReplay = finalChunk.replayExchangeId() != null;
|
|
|
|
return new MergedExecution(
|
|
DEFAULT_TENANT, 1,
|
|
finalChunk.exchangeId(),
|
|
coalesce(finalChunk.routeId(), base != null ? base.envelope().routeId() : null),
|
|
coalesce(finalChunk.agentId(), base != null ? base.envelope().agentId() : null),
|
|
coalesce(finalChunk.applicationName(), base != null ? base.envelope().applicationName() : null),
|
|
coalesce(finalChunk.status(), base != null ? base.envelope().status() : "RUNNING"),
|
|
coalesce(finalChunk.correlationId(), base != null ? base.envelope().correlationId() : null),
|
|
finalChunk.exchangeId(),
|
|
coalesce(finalChunk.startTime(), base != null ? base.envelope().startTime() : null),
|
|
coalesce(finalChunk.endTime(), base != null ? base.envelope().endTime() : null),
|
|
coalesce(finalChunk.durationMs(), base != null ? base.envelope().durationMs() : null),
|
|
coalesce(finalChunk.errorMessage(), base != null ? base.envelope().errorMessage() : null),
|
|
coalesce(finalChunk.errorStackTrace(), base != null ? base.envelope().errorStackTrace() : null),
|
|
coalesce(finalChunk.errorType(), base != null ? base.envelope().errorType() : null),
|
|
coalesce(finalChunk.errorCategory(), base != null ? base.envelope().errorCategory() : null),
|
|
coalesce(finalChunk.rootCauseType(), base != null ? base.envelope().rootCauseType() : null),
|
|
coalesce(finalChunk.rootCauseMessage(), base != null ? base.envelope().rootCauseMessage() : null),
|
|
"", // diagramContentHash — server-side lookup, not in chunk
|
|
coalesce(finalChunk.engineLevel(), base != null ? base.envelope().engineLevel() : null),
|
|
"", "", "", "", // input/output body/headers — on processor records now, not envelope
|
|
coalesce(attributes, ""),
|
|
coalesce(finalChunk.traceId(), base != null ? base.envelope().traceId() : null),
|
|
coalesce(finalChunk.spanId(), base != null ? base.envelope().spanId() : null),
|
|
hasTraceData,
|
|
isReplay
|
|
);
|
|
}
|
|
|
|
private static String serializeMap(Map<String, String> map) {
|
|
if (map == null || map.isEmpty()) return "";
|
|
try {
|
|
return JSON.writeValueAsString(map);
|
|
} catch (JsonProcessingException e) {
|
|
return "";
|
|
}
|
|
}
|
|
|
|
private static <T> T coalesce(T a, T b) {
|
|
return a != null ? a : b;
|
|
}
|
|
|
|
/**
|
|
* A batch of processor records for a single exchange, ready for insertion.
|
|
*/
|
|
public record ProcessorBatch(
|
|
String tenantId,
|
|
String executionId,
|
|
String routeId,
|
|
String applicationName,
|
|
Instant execStartTime,
|
|
List<FlatProcessorRecord> processors
|
|
) {}
|
|
|
|
private record PendingExchange(ExecutionChunk envelope, Instant receivedAt) {
|
|
PendingExchange mergeWith(ExecutionChunk newer) {
|
|
// Keep the latest envelope data (later chunkSeq has more complete info)
|
|
ExecutionChunk merged = new ExecutionChunk(
|
|
envelope.exchangeId(),
|
|
coalesce(newer.applicationName(), envelope.applicationName()),
|
|
coalesce(newer.agentId(), envelope.agentId()),
|
|
coalesce(newer.routeId(), envelope.routeId()),
|
|
coalesce(newer.correlationId(), envelope.correlationId()),
|
|
coalesce(newer.status(), envelope.status()),
|
|
coalesce(envelope.startTime(), newer.startTime()),
|
|
coalesce(newer.endTime(), envelope.endTime()),
|
|
coalesce(newer.durationMs(), envelope.durationMs()),
|
|
coalesce(newer.engineLevel(), envelope.engineLevel()),
|
|
coalesce(newer.errorMessage(), envelope.errorMessage()),
|
|
coalesce(newer.errorStackTrace(), envelope.errorStackTrace()),
|
|
coalesce(newer.errorType(), envelope.errorType()),
|
|
coalesce(newer.errorCategory(), envelope.errorCategory()),
|
|
coalesce(newer.rootCauseType(), envelope.rootCauseType()),
|
|
coalesce(newer.rootCauseMessage(), envelope.rootCauseMessage()),
|
|
newer.attributes() != null ? newer.attributes() : envelope.attributes(),
|
|
coalesce(newer.traceId(), envelope.traceId()),
|
|
coalesce(newer.spanId(), envelope.spanId()),
|
|
coalesce(newer.originalExchangeId(), envelope.originalExchangeId()),
|
|
coalesce(newer.replayExchangeId(), envelope.replayExchangeId()),
|
|
newer.chunkSeq(),
|
|
newer.isFinal(),
|
|
List.of());
|
|
return new PendingExchange(merged, receivedAt);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run test to verify it passes**
|
|
|
|
```bash
|
|
mvn test -pl cameleer-server-core -Dtest=ChunkAccumulatorTest
|
|
```
|
|
|
|
Expected: all 5 tests PASS.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/ChunkAccumulator.java \
|
|
cameleer-server-core/src/test/java/com/cameleer/server/core/ingestion/ChunkAccumulatorTest.java
|
|
git commit -m "feat(clickhouse): add ChunkAccumulator for chunked execution ingestion"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 5: ExecutionFlushScheduler + ChunkIngestionController
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ExecutionFlushScheduler.java`
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ChunkIngestionController.java`
|
|
|
|
- [ ] **Step 1: Implement ExecutionFlushScheduler**
|
|
|
|
Follows `MetricsFlushScheduler` pattern. Drains two WriteBuffers (executions + processor batches) and calls ClickHouseExecutionStore. Also runs the stale sweep.
|
|
|
|
```java
|
|
// cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ExecutionFlushScheduler.java
|
|
package com.cameleer.server.app.ingestion;
|
|
|
|
import com.cameleer.server.app.config.IngestionConfig;
|
|
import com.cameleer.server.app.storage.ClickHouseExecutionStore;
|
|
import com.cameleer.server.core.ingestion.ChunkAccumulator;
|
|
import com.cameleer.server.core.ingestion.MergedExecution;
|
|
import com.cameleer.server.core.ingestion.WriteBuffer;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.context.SmartLifecycle;
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
|
import java.util.List;
|
|
|
|
public class ExecutionFlushScheduler implements SmartLifecycle {
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(ExecutionFlushScheduler.class);
|
|
|
|
private final WriteBuffer<MergedExecution> executionBuffer;
|
|
private final WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer;
|
|
private final ClickHouseExecutionStore executionStore;
|
|
private final ChunkAccumulator accumulator;
|
|
private final int batchSize;
|
|
private volatile boolean running = false;
|
|
|
|
public ExecutionFlushScheduler(WriteBuffer<MergedExecution> executionBuffer,
|
|
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer,
|
|
ClickHouseExecutionStore executionStore,
|
|
ChunkAccumulator accumulator,
|
|
IngestionConfig config) {
|
|
this.executionBuffer = executionBuffer;
|
|
this.processorBuffer = processorBuffer;
|
|
this.executionStore = executionStore;
|
|
this.accumulator = accumulator;
|
|
this.batchSize = config.getBatchSize();
|
|
}
|
|
|
|
@Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
|
|
public void flush() {
|
|
flushExecutions();
|
|
flushProcessors();
|
|
}
|
|
|
|
private void flushExecutions() {
|
|
try {
|
|
List<MergedExecution> batch = executionBuffer.drain(batchSize);
|
|
if (!batch.isEmpty()) {
|
|
executionStore.insertExecutionBatch(batch);
|
|
log.debug("Flushed {} executions to ClickHouse", batch.size());
|
|
}
|
|
} catch (Exception e) {
|
|
log.error("Failed to flush executions to ClickHouse", e);
|
|
}
|
|
}
|
|
|
|
private void flushProcessors() {
|
|
try {
|
|
List<ChunkAccumulator.ProcessorBatch> batches = processorBuffer.drain(batchSize);
|
|
for (ChunkAccumulator.ProcessorBatch batch : batches) {
|
|
if (!batch.processors().isEmpty()) {
|
|
executionStore.insertProcessorBatch(
|
|
batch.tenantId(), batch.executionId(),
|
|
batch.routeId(), batch.applicationName(),
|
|
batch.execStartTime(), batch.processors());
|
|
}
|
|
}
|
|
if (!batches.isEmpty()) {
|
|
int totalProcs = batches.stream().mapToInt(b -> b.processors().size()).sum();
|
|
log.debug("Flushed {} processor batches ({} records) to ClickHouse",
|
|
batches.size(), totalProcs);
|
|
}
|
|
} catch (Exception e) {
|
|
log.error("Failed to flush processors to ClickHouse", e);
|
|
}
|
|
}
|
|
|
|
@Scheduled(fixedDelay = 60_000)
|
|
public void sweepStale() {
|
|
try {
|
|
accumulator.sweepStale();
|
|
} catch (Exception e) {
|
|
log.error("Failed to sweep stale exchanges", e);
|
|
}
|
|
}
|
|
|
|
@Override public void start() { running = true; }
|
|
|
|
@Override
|
|
public void stop() {
|
|
flush();
|
|
running = false;
|
|
}
|
|
|
|
@Override public boolean isRunning() { return running; }
|
|
@Override public int getPhase() { return Integer.MAX_VALUE - 1; }
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Implement ChunkIngestionController**
|
|
|
|
```java
|
|
// cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ChunkIngestionController.java
|
|
package com.cameleer.server.app.controller;
|
|
|
|
import com.cameleer.server.core.ingestion.ChunkAccumulator;
|
|
import com.cameleer.server.core.storage.model.ExecutionChunk;
|
|
import io.swagger.v3.oas.annotations.Operation;
|
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
|
import org.springframework.http.ResponseEntity;
|
|
import org.springframework.web.bind.annotation.PostMapping;
|
|
import org.springframework.web.bind.annotation.RequestBody;
|
|
import org.springframework.web.bind.annotation.RequestMapping;
|
|
import org.springframework.web.bind.annotation.RestController;
|
|
|
|
import java.util.List;
|
|
|
|
@RestController
|
|
@RequestMapping("/api/v1/data")
|
|
@Tag(name = "Ingestion", description = "Data ingestion endpoints")
|
|
public class ChunkIngestionController {
|
|
|
|
private final ChunkAccumulator accumulator;
|
|
|
|
public ChunkIngestionController(ChunkAccumulator accumulator) {
|
|
this.accumulator = accumulator;
|
|
}
|
|
|
|
@PostMapping("/chunks")
|
|
@Operation(summary = "Ingest execution chunk (single or array)")
|
|
public ResponseEntity<Void> ingestChunks(@RequestBody String body) {
|
|
try {
|
|
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper()
|
|
.registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule());
|
|
|
|
String trimmed = body.trim();
|
|
if (trimmed.startsWith("[")) {
|
|
List<ExecutionChunk> chunks = mapper.readValue(trimmed,
|
|
mapper.getTypeFactory().constructCollectionType(List.class, ExecutionChunk.class));
|
|
for (ExecutionChunk chunk : chunks) {
|
|
accumulator.onChunk(chunk);
|
|
}
|
|
} else {
|
|
ExecutionChunk chunk = mapper.readValue(trimmed, ExecutionChunk.class);
|
|
accumulator.onChunk(chunk);
|
|
}
|
|
return ResponseEntity.accepted().build();
|
|
} catch (Exception e) {
|
|
return ResponseEntity.badRequest().build();
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 3: Compile**
|
|
|
|
```bash
|
|
mvn clean compile -pl cameleer-server-app
|
|
```
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ExecutionFlushScheduler.java \
|
|
cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ChunkIngestionController.java
|
|
git commit -m "feat(clickhouse): add ExecutionFlushScheduler and ChunkIngestionController"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 6: ClickHouseSearchIndex
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexIT.java`
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseSearchIndex.java`
|
|
|
|
Same as the original plan — implements `SearchIndex` using SQL against ClickHouse. The search query patterns are unchanged: `_search_text LIKE '%term%'` on executions, subquery join on processor_executions for body/header/error scoped searches.
|
|
|
|
This task is identical to Task 5 in the original plan. Refer to that task's complete code for the `ClickHouseSearchIndex` and `ClickHouseSearchIndexIT` implementations. The only difference is that `processor_executions` now uses `seq`/`iteration` columns instead of `depth`/`loopIndex`/etc., but the search queries only use `_search_text`, `execution_id`, `input_body`, `output_body`, `input_headers`, `output_headers`, `error_message`, and `error_stacktrace` — none of which changed.
|
|
|
|
- [ ] **Step 1: Write the failing integration test**
|
|
|
|
Use the same test class from the original plan's Task 5, Step 1. The test seeds data via `ClickHouseExecutionStore` using the new `MergedExecution` and `FlatProcessorRecord` types. Refer to the original plan for the complete test code.
|
|
|
|
- [ ] **Step 2: Run test to verify it fails**
|
|
|
|
```bash
|
|
mvn test -pl cameleer-server-app -Dtest=ClickHouseSearchIndexIT -Dfailsafe.provider=surefire -DfailIfNoTests=false
|
|
```
|
|
|
|
- [ ] **Step 3: Implement ClickHouseSearchIndex**
|
|
|
|
Use the same implementation from the original plan's Task 5, Step 3. The SQL queries and WHERE clause building are identical.
|
|
|
|
- [ ] **Step 4: Run test to verify it passes**
|
|
|
|
```bash
|
|
mvn test -pl cameleer-server-app -Dtest=ClickHouseSearchIndexIT -Dfailsafe.provider=surefire
|
|
```
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseSearchIndex.java \
|
|
cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexIT.java
|
|
git commit -m "feat(clickhouse): add ClickHouseSearchIndex with ngram-accelerated SQL search"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 7: Feature Flag Wiring
|
|
|
|
**Files:**
|
|
- Modify: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java`
|
|
- Modify: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/IngestionBeanConfig.java`
|
|
- Modify: `cameleer-server-app/src/main/java/com/cameleer/server/app/search/OpenSearchIndex.java`
|
|
- Modify: `cameleer-server-app/src/main/resources/application.yml`
|
|
- Modify: `deploy/base/server.yaml`
|
|
|
|
Wire up the ChunkAccumulator, WriteBuffers, flush scheduler, and search switching.
|
|
|
|
- [ ] **Step 1: Add execution + processor WriteBuffer beans to IngestionBeanConfig**
|
|
|
|
```java
|
|
// Add to IngestionBeanConfig.java
|
|
@Bean
|
|
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
|
|
public WriteBuffer<MergedExecution> executionBuffer(IngestionConfig config) {
|
|
return new WriteBuffer<>(config.getBufferCapacity());
|
|
}
|
|
|
|
@Bean
|
|
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
|
|
public WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer(IngestionConfig config) {
|
|
return new WriteBuffer<>(config.getBufferCapacity());
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Add CH beans to StorageBeanConfig**
|
|
|
|
```java
|
|
// Add to StorageBeanConfig.java
|
|
|
|
@Bean
|
|
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
|
|
public ClickHouseExecutionStore clickHouseExecutionStore(
|
|
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
|
return new ClickHouseExecutionStore(clickHouseJdbc);
|
|
}
|
|
|
|
@Bean
|
|
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
|
|
public ChunkAccumulator chunkAccumulator(
|
|
WriteBuffer<MergedExecution> executionBuffer,
|
|
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer) {
|
|
return new ChunkAccumulator(
|
|
executionBuffer::offer,
|
|
processorBatchBuffer::offer,
|
|
java.time.Duration.ofMinutes(5));
|
|
}
|
|
|
|
@Bean
|
|
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
|
|
public ExecutionFlushScheduler executionFlushScheduler(
|
|
WriteBuffer<MergedExecution> executionBuffer,
|
|
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
|
|
ClickHouseExecutionStore executionStore,
|
|
ChunkAccumulator accumulator,
|
|
IngestionConfig config) {
|
|
return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer,
|
|
executionStore, accumulator, config);
|
|
}
|
|
|
|
@Bean
|
|
@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse")
|
|
public SearchIndex clickHouseSearchIndex(
|
|
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
|
return new ClickHouseSearchIndex(clickHouseJdbc);
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 3: Add ConditionalOnProperty to OpenSearchIndex**
|
|
|
|
```java
|
|
@Repository
|
|
@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "opensearch", matchIfMissing = true)
|
|
public class OpenSearchIndex implements SearchIndex {
|
|
```
|
|
|
|
- [ ] **Step 4: Update application.yml**
|
|
|
|
```yaml
|
|
cameleer:
|
|
storage:
|
|
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
|
|
search: ${CAMELEER_STORAGE_SEARCH:opensearch}
|
|
```
|
|
|
|
- [ ] **Step 5: Update deploy/base/server.yaml**
|
|
|
|
Add env var:
|
|
```yaml
|
|
- name: CAMELEER_STORAGE_SEARCH
|
|
value: "opensearch"
|
|
```
|
|
|
|
- [ ] **Step 6: Compile and verify all tests pass**
|
|
|
|
```bash
|
|
mvn clean verify -DskipITs
|
|
```
|
|
|
|
- [ ] **Step 7: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java \
|
|
cameleer-server-app/src/main/java/com/cameleer/server/app/config/IngestionBeanConfig.java \
|
|
cameleer-server-app/src/main/java/com/cameleer/server/app/search/OpenSearchIndex.java \
|
|
cameleer-server-app/src/main/resources/application.yml \
|
|
deploy/base/server.yaml
|
|
git commit -m "feat(clickhouse): wire ChunkAccumulator, flush scheduler, and search feature flag"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 8: End-to-End Integration Test
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java`
|
|
|
|
Validates the full pipeline: ChunkAccumulator → WriteBuffers → ClickHouseExecutionStore → ClickHouseSearchIndex.
|
|
|
|
- [ ] **Step 1: Write the integration test**
|
|
|
|
```java
|
|
// cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java
|
|
package com.cameleer.server.app.storage;
|
|
|
|
import com.cameleer.server.app.search.ClickHouseSearchIndex;
|
|
import com.cameleer.server.core.ingestion.ChunkAccumulator;
|
|
import com.cameleer.server.core.ingestion.MergedExecution;
|
|
import com.cameleer.server.core.search.ExecutionSummary;
|
|
import com.cameleer.server.core.search.SearchRequest;
|
|
import com.cameleer.server.core.search.SearchResult;
|
|
import com.cameleer.server.core.storage.model.ExecutionChunk;
|
|
import com.cameleer.server.core.storage.model.FlatProcessorRecord;
|
|
import com.zaxxer.hikari.HikariDataSource;
|
|
import org.junit.jupiter.api.BeforeEach;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.testcontainers.clickhouse.ClickHouseContainer;
|
|
import org.testcontainers.junit.jupiter.Container;
|
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
|
|
|
import java.io.IOException;
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.time.Duration;
|
|
import java.time.Instant;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
@Testcontainers
|
|
class ClickHouseChunkPipelineIT {
|
|
|
|
@Container
|
|
static final ClickHouseContainer clickhouse =
|
|
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
|
|
|
private JdbcTemplate jdbc;
|
|
private ClickHouseExecutionStore executionStore;
|
|
private ClickHouseSearchIndex searchIndex;
|
|
private ChunkAccumulator accumulator;
|
|
private List<MergedExecution> executionBuffer;
|
|
private List<ChunkAccumulator.ProcessorBatch> processorBuffer;
|
|
|
|
@BeforeEach
|
|
void setUp() throws IOException {
|
|
HikariDataSource ds = new HikariDataSource();
|
|
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
|
ds.setUsername(clickhouse.getUsername());
|
|
ds.setPassword(clickhouse.getPassword());
|
|
jdbc = new JdbcTemplate(ds);
|
|
|
|
String execDdl = new String(getClass().getResourceAsStream(
|
|
"/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8);
|
|
String procDdl = new String(getClass().getResourceAsStream(
|
|
"/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8);
|
|
jdbc.execute(execDdl);
|
|
jdbc.execute(procDdl);
|
|
jdbc.execute("TRUNCATE TABLE executions");
|
|
jdbc.execute("TRUNCATE TABLE processor_executions");
|
|
|
|
executionStore = new ClickHouseExecutionStore(jdbc);
|
|
searchIndex = new ClickHouseSearchIndex(jdbc);
|
|
|
|
executionBuffer = new ArrayList<>();
|
|
processorBuffer = new ArrayList<>();
|
|
accumulator = new ChunkAccumulator(executionBuffer::add, processorBuffer::add, Duration.ofMinutes(5));
|
|
}
|
|
|
|
@Test
|
|
void fullPipeline_chunkedIngestion_thenSearch() {
|
|
Instant start = Instant.parse("2026-03-31T12:00:00Z");
|
|
|
|
// Chunk 0: RUNNING with initial processors
|
|
accumulator.onChunk(new ExecutionChunk(
|
|
"pipeline-1", "order-service", "pod-1", "order-route",
|
|
"corr-1", "RUNNING",
|
|
start, null, null, "DEEP",
|
|
null, null, null, null, null, null,
|
|
Map.of("orderId", "ORD-123"),
|
|
null, null, null, null,
|
|
0, false,
|
|
List.of(
|
|
new FlatProcessorRecord(1, null, null, "log1", "log",
|
|
null, null, "COMPLETED", start, 2L,
|
|
null, null, null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null),
|
|
new FlatProcessorRecord(2, null, null, "split1", "split",
|
|
null, 3, "COMPLETED", start.plusMillis(2), 100L,
|
|
null, null, null, null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null),
|
|
new FlatProcessorRecord(3, 2, "split1", "to1", "to",
|
|
0, null, "COMPLETED", start.plusMillis(5), 30L,
|
|
"http://inventory/api",
|
|
"order ABC-123 check stock", "stock available",
|
|
null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null))));
|
|
|
|
// Processors should be flushed immediately
|
|
assertThat(processorBuffer).hasSize(1);
|
|
assertThat(executionBuffer).isEmpty();
|
|
|
|
// Chunk 1: COMPLETED (final)
|
|
accumulator.onChunk(new ExecutionChunk(
|
|
"pipeline-1", "order-service", "pod-1", "order-route",
|
|
"corr-1", "COMPLETED",
|
|
start, start.plusMillis(750), 750L, "DEEP",
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null,
|
|
1, true,
|
|
List.of(
|
|
new FlatProcessorRecord(4, 2, "split1", "to1", "to",
|
|
1, null, "COMPLETED", start.plusMillis(40), 25L,
|
|
"http://inventory/api",
|
|
"order DEF-456 check stock", "stock available",
|
|
null, null,
|
|
null, null, null, null, null, null,
|
|
null, null, null, null, null))));
|
|
|
|
assertThat(executionBuffer).hasSize(1);
|
|
assertThat(processorBuffer).hasSize(2);
|
|
|
|
// Flush to ClickHouse
|
|
executionStore.insertExecutionBatch(executionBuffer);
|
|
for (ChunkAccumulator.ProcessorBatch batch : processorBuffer) {
|
|
executionStore.insertProcessorBatch(
|
|
batch.tenantId(), batch.executionId(),
|
|
batch.routeId(), batch.applicationName(),
|
|
batch.execStartTime(), batch.processors());
|
|
}
|
|
|
|
// Search by order ID in attributes
|
|
SearchResult<ExecutionSummary> result = searchIndex.search(new SearchRequest(
|
|
null, null, null, null, null, null,
|
|
"ORD-123", null, null, null,
|
|
null, null, null, null, null,
|
|
0, 50, null, null));
|
|
assertThat(result.total()).isEqualTo(1);
|
|
assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1");
|
|
assertThat(result.data().get(0).status()).isEqualTo("COMPLETED");
|
|
assertThat(result.data().get(0).durationMs()).isEqualTo(750L);
|
|
|
|
// Search in processor body
|
|
SearchResult<ExecutionSummary> bodyResult = searchIndex.search(new SearchRequest(
|
|
null, null, null, null, null, null,
|
|
null, "ABC-123", null, null,
|
|
null, null, null, null, null,
|
|
0, 50, null, null));
|
|
assertThat(bodyResult.total()).isEqualTo(1);
|
|
|
|
// Verify iteration data in processor_executions
|
|
Integer iterSize = jdbc.queryForObject(
|
|
"SELECT iteration_size FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 2",
|
|
Integer.class);
|
|
assertThat(iterSize).isEqualTo(3);
|
|
|
|
Integer iter0 = jdbc.queryForObject(
|
|
"SELECT iteration FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 3",
|
|
Integer.class);
|
|
assertThat(iter0).isEqualTo(0);
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Run the integration test**
|
|
|
|
```bash
|
|
mvn test -pl cameleer-server-app -Dtest=ClickHouseChunkPipelineIT -Dfailsafe.provider=surefire
|
|
```
|
|
|
|
Expected: PASS.
|
|
|
|
- [ ] **Step 3: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java
|
|
git commit -m "test(clickhouse): add end-to-end chunk pipeline integration test"
|
|
```
|
|
|
|
---
|
|
|
|
## Verification Checklist
|
|
|
|
After all tasks are complete, verify:
|
|
|
|
1. **Chunk ingestion**: `POST /api/v1/data/chunks` accepts single and array ExecutionChunks
|
|
2. **Processor immediate insert**: Processor records are inserted as chunks arrive (append-only)
|
|
3. **Envelope accumulation**: Multiple non-final chunks merge envelope data correctly
|
|
4. **Final flush**: Final chunk triggers execution row write with version=1
|
|
5. **Stale sweep**: Exchanges without final chunk for 5 minutes are flushed as RUNNING
|
|
6. **Search**: All filter types work: status, time range, duration, correlation ID, application, text, textInBody, textInHeaders, textInErrors
|
|
7. **Highlighting**: Search results include 120-char context snippets
|
|
8. **Feature flag**: `cameleer.storage.search=opensearch` uses OpenSearch, `=clickhouse` uses ClickHouse
|
|
9. **Backward compat**: With `clickhouse.enabled=false`, server starts without CH beans (PG + OpenSearch only)
|
|
10. **seq/parentSeq**: Processor records correctly store seq, parentSeq, iteration, iterationSize
|
|
11. **CI**: `mvn clean verify -DskipITs` passes
|