From 20c8e1784303b11ca9dd124c5fd975a62552bc86 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 19:02:47 +0200 Subject: [PATCH 1/9] feat: add server-side ExecutionChunk and FlatProcessorRecord DTOs Co-Authored-By: Claude Opus 4.6 (1M context) --- cameleer3-server-core/pom.xml | 5 + .../core/storage/model/ExecutionChunk.java | 42 +++++++ .../storage/model/FlatProcessorRecord.java | 42 +++++++ .../ExecutionChunkDeserializationTest.java | 109 ++++++++++++++++++ 4 files changed, 198 insertions(+) create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java create mode 100644 cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java diff --git a/cameleer3-server-core/pom.xml b/cameleer3-server-core/pom.xml index 5e2e517c..fcc2542d 100644 --- a/cameleer3-server-core/pom.xml +++ b/cameleer3-server-core/pom.xml @@ -37,6 +37,11 @@ spring-security-core provided + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + test + org.junit.jupiter junit-jupiter diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java new file mode 100644 index 00000000..20066f09 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java @@ -0,0 +1,42 @@ +package com.cameleer3.server.core.storage.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +/** + * Chunk document: exchange envelope + list of FlatProcessorRecords. + * Mirrors cameleer3-common ExecutionChunk — replace with common lib import when available. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public record ExecutionChunk( + String exchangeId, + String applicationName, + String agentId, + String routeId, + String correlationId, + String status, + Instant startTime, + Instant endTime, + Long durationMs, + String engineLevel, + String errorMessage, + String errorStackTrace, + String errorType, + String errorCategory, + String rootCauseType, + String rootCauseMessage, + Map attributes, + String traceId, + String spanId, + String originalExchangeId, + String replayExchangeId, + int chunkSeq, + @JsonProperty("final") boolean isFinal, + List processors +) {} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java new file mode 100644 index 00000000..deb89221 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java @@ -0,0 +1,42 @@ +package com.cameleer3.server.core.storage.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; + +import java.time.Instant; +import java.util.Map; + +/** + * Flat processor execution record with seq/parentSeq for tree reconstruction. + * Mirrors cameleer3-common FlatProcessorRecord — replace with common lib import when available. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public record FlatProcessorRecord( + int seq, + Integer parentSeq, + String parentProcessorId, + String processorId, + String processorType, + Integer iteration, + Integer iterationSize, + String status, + Instant startTime, + long durationMs, + String resolvedEndpointUri, + String inputBody, + String outputBody, + Map inputHeaders, + Map outputHeaders, + String errorMessage, + String errorStackTrace, + String errorType, + String errorCategory, + String rootCauseType, + String rootCauseMessage, + Map attributes, + String circuitBreakerState, + Boolean fallbackTriggered, + Boolean filterMatched, + Boolean duplicateMessage +) {} diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java new file mode 100644 index 00000000..de0a50cf --- /dev/null +++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java @@ -0,0 +1,109 @@ +package com.cameleer3.server.core.storage.model; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +class ExecutionChunkDeserializationTest { + + private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()); + + @Test + void roundTrip_fullChunk() throws Exception { + ExecutionChunk chunk = new ExecutionChunk( + "ex-1", "order-service", "pod-1", "order-route", + "corr-1", "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + "REGULAR", + null, null, null, null, null, null, + Map.of("orderId", "ORD-1"), + "trace-1", "span-1", null, null, + 2, true, + List.of(new FlatProcessorRecord( + 1, null, null, "log1", "log", + null, null, "COMPLETED", + Instant.parse("2026-03-31T10:00:00.100Z"), 5L, + null, "body", null, null, null, + null, null, null, null, null, null, + null, null, null, null, null))); + + String json = MAPPER.writeValueAsString(chunk); + ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class); + + assertThat(deserialized.exchangeId()).isEqualTo("ex-1"); + assertThat(deserialized.isFinal()).isTrue(); + assertThat(deserialized.chunkSeq()).isEqualTo(2); + assertThat(deserialized.processors()).hasSize(1); + assertThat(deserialized.processors().get(0).seq()).isEqualTo(1); + assertThat(deserialized.processors().get(0).processorId()).isEqualTo("log1"); + assertThat(deserialized.attributes()).containsEntry("orderId", "ORD-1"); + } + + @Test + void roundTrip_runningChunkWithIterations() throws Exception { + ExecutionChunk chunk = new ExecutionChunk( + "ex-2", "app", "agent-1", "route-1", + "ex-2", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 0, false, + List.of( + new FlatProcessorRecord( + 1, null, null, "split1", "split", + null, 3, "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 100L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord( + 2, 1, "split1", "log1", "log", + 0, null, "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 5L, + null, "item-0", null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord( + 3, 1, "split1", "log1", "log", + 1, null, "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 5L, + null, "item-1", null, null, null, + null, null, null, null, null, null, + null, null, null, null, null))); + + String json = MAPPER.writeValueAsString(chunk); + ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class); + + assertThat(deserialized.isFinal()).isFalse(); + assertThat(deserialized.processors()).hasSize(3); + + FlatProcessorRecord split = deserialized.processors().get(0); + assertThat(split.iterationSize()).isEqualTo(3); + assertThat(split.parentSeq()).isNull(); + + FlatProcessorRecord child0 = deserialized.processors().get(1); + assertThat(child0.parentSeq()).isEqualTo(1); + assertThat(child0.parentProcessorId()).isEqualTo("split1"); + assertThat(child0.iteration()).isEqualTo(0); + } + + @Test + void deserialize_unknownFieldsIgnored() throws Exception { + String json = """ + {"exchangeId":"ex-1","routeId":"r1","status":"COMPLETED", + "startTime":"2026-03-31T10:00:00Z","chunkSeq":0,"final":true, + "futureField":"ignored","processors":[]} + """; + ExecutionChunk chunk = MAPPER.readValue(json, ExecutionChunk.class); + assertThat(chunk.exchangeId()).isEqualTo("ex-1"); + assertThat(chunk.isFinal()).isTrue(); + } +} From b30dfa39f448e717ba89be05f777be9fc42e8224 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 19:04:19 +0200 Subject: [PATCH 2/9] feat(clickhouse): add executions and processor_executions DDL for chunked transport Co-Authored-By: Claude Opus 4.6 (1M context) --- .../resources/clickhouse/V2__executions.sql | 48 +++++++++++++++++++ .../clickhouse/V3__processor_executions.sql | 45 +++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql create mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql b/cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql new file mode 100644 index 00000000..e9b31f9e --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql @@ -0,0 +1,48 @@ +CREATE TABLE IF NOT EXISTS executions ( + tenant_id LowCardinality(String) DEFAULT 'default', + execution_id String, + start_time DateTime64(3), + _version UInt64 DEFAULT 1, + route_id LowCardinality(String), + agent_id LowCardinality(String), + application_name LowCardinality(String), + status LowCardinality(String), + correlation_id String DEFAULT '', + exchange_id String DEFAULT '', + end_time Nullable(DateTime64(3)), + duration_ms Nullable(Int64), + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + error_type LowCardinality(String) DEFAULT '', + error_category LowCardinality(String) DEFAULT '', + root_cause_type String DEFAULT '', + root_cause_message String DEFAULT '', + diagram_content_hash String DEFAULT '', + engine_level LowCardinality(String) DEFAULT '', + input_body String DEFAULT '', + output_body String DEFAULT '', + input_headers String DEFAULT '', + output_headers String DEFAULT '', + attributes String DEFAULT '', + trace_id String DEFAULT '', + span_id String DEFAULT '', + has_trace_data Bool DEFAULT false, + is_replay Bool DEFAULT false, + + _search_text String MATERIALIZED + concat(error_message, ' ', error_stacktrace, ' ', attributes, + ' ', input_body, ' ', output_body, ' ', input_headers, + ' ', output_headers, ' ', root_cause_message), + + INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_status status TYPE set(10) GRANULARITY 1, + INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4 +) +ENGINE = ReplacingMergeTree(_version) +PARTITION BY (tenant_id, toYYYYMM(start_time)) +ORDER BY (tenant_id, start_time, application_name, route_id, execution_id) +TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql b/cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql new file mode 100644 index 00000000..2f4ea1ec --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql @@ -0,0 +1,45 @@ +CREATE TABLE IF NOT EXISTS processor_executions ( + tenant_id LowCardinality(String) DEFAULT 'default', + execution_id String, + seq UInt32, + parent_seq Nullable(UInt32), + parent_processor_id String DEFAULT '', + processor_id String, + processor_type LowCardinality(String), + start_time DateTime64(3), + route_id LowCardinality(String), + application_name LowCardinality(String), + iteration Nullable(Int32), + iteration_size Nullable(Int32), + status LowCardinality(String), + end_time Nullable(DateTime64(3)), + duration_ms Nullable(Int64), + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + error_type LowCardinality(String) DEFAULT '', + error_category LowCardinality(String) DEFAULT '', + root_cause_type String DEFAULT '', + root_cause_message String DEFAULT '', + input_body String DEFAULT '', + output_body String DEFAULT '', + input_headers String DEFAULT '', + output_headers String DEFAULT '', + attributes String DEFAULT '', + resolved_endpoint_uri String DEFAULT '', + circuit_breaker_state LowCardinality(String) DEFAULT '', + fallback_triggered Bool DEFAULT false, + filter_matched Bool DEFAULT false, + duplicate_message Bool DEFAULT false, + + _search_text String MATERIALIZED + concat(error_message, ' ', error_stacktrace, ' ', attributes, + ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers), + + INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4 +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(start_time)) +ORDER BY (tenant_id, start_time, application_name, route_id, execution_id, seq) +TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; From 81f7f8afe1195070977a637d17a9a9cc3757b8d8 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 19:07:33 +0200 Subject: [PATCH 3/9] feat(clickhouse): add ClickHouseExecutionStore with batch insert for chunked format Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/storage/ClickHouseExecutionStore.java | 151 ++++++++++++ .../storage/ClickHouseExecutionStoreIT.java | 231 ++++++++++++++++++ .../core/ingestion/MergedExecution.java | 39 +++ 3 files changed, 421 insertions(+) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java create mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java new file mode 100644 index 00000000..287bd18c --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java @@ -0,0 +1,151 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +public class ClickHouseExecutionStore { + + private final JdbcTemplate jdbc; + private final ObjectMapper objectMapper; + + public ClickHouseExecutionStore(JdbcTemplate jdbc) { + this(jdbc, new ObjectMapper()); + } + + public ClickHouseExecutionStore(JdbcTemplate jdbc, ObjectMapper objectMapper) { + this.jdbc = jdbc; + this.objectMapper = objectMapper; + } + + public void insertExecutionBatch(List executions) { + if (executions.isEmpty()) return; + + jdbc.batchUpdate(""" + INSERT INTO executions ( + tenant_id, _version, execution_id, route_id, agent_id, application_name, + status, correlation_id, exchange_id, start_time, end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, diagram_content_hash, engine_level, + input_body, output_body, input_headers, output_headers, attributes, + trace_id, span_id, has_trace_data, is_replay + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + executions.stream().map(e -> new Object[]{ + nullToEmpty(e.tenantId()), + e.version(), + nullToEmpty(e.executionId()), + nullToEmpty(e.routeId()), + nullToEmpty(e.agentId()), + nullToEmpty(e.applicationName()), + nullToEmpty(e.status()), + nullToEmpty(e.correlationId()), + nullToEmpty(e.exchangeId()), + Timestamp.from(e.startTime()), + e.endTime() != null ? Timestamp.from(e.endTime()) : null, + e.durationMs(), + nullToEmpty(e.errorMessage()), + nullToEmpty(e.errorStacktrace()), + nullToEmpty(e.errorType()), + nullToEmpty(e.errorCategory()), + nullToEmpty(e.rootCauseType()), + nullToEmpty(e.rootCauseMessage()), + nullToEmpty(e.diagramContentHash()), + nullToEmpty(e.engineLevel()), + nullToEmpty(e.inputBody()), + nullToEmpty(e.outputBody()), + nullToEmpty(e.inputHeaders()), + nullToEmpty(e.outputHeaders()), + nullToEmpty(e.attributes()), + nullToEmpty(e.traceId()), + nullToEmpty(e.spanId()), + e.hasTraceData(), + e.isReplay() + }).toList()); + } + + public void insertProcessorBatch(String tenantId, String executionId, String routeId, + String applicationName, Instant execStartTime, + List processors) { + if (processors.isEmpty()) return; + + jdbc.batchUpdate(""" + INSERT INTO processor_executions ( + tenant_id, execution_id, seq, parent_seq, parent_processor_id, + processor_id, processor_type, start_time, route_id, application_name, + iteration, iteration_size, status, end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, + input_body, output_body, input_headers, output_headers, attributes, + resolved_endpoint_uri, circuit_breaker_state, + fallback_triggered, filter_matched, duplicate_message + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + processors.stream().map(p -> new Object[]{ + nullToEmpty(tenantId), + nullToEmpty(executionId), + p.seq(), + p.parentSeq(), + nullToEmpty(p.parentProcessorId()), + nullToEmpty(p.processorId()), + nullToEmpty(p.processorType()), + Timestamp.from(p.startTime() != null ? p.startTime() : execStartTime), + nullToEmpty(routeId), + nullToEmpty(applicationName), + p.iteration(), + p.iterationSize(), + nullToEmpty(p.status()), + computeEndTime(p.startTime(), p.durationMs()), + p.durationMs(), + nullToEmpty(p.errorMessage()), + nullToEmpty(p.errorStackTrace()), + nullToEmpty(p.errorType()), + nullToEmpty(p.errorCategory()), + nullToEmpty(p.rootCauseType()), + nullToEmpty(p.rootCauseMessage()), + nullToEmpty(p.inputBody()), + nullToEmpty(p.outputBody()), + mapToJson(p.inputHeaders()), + mapToJson(p.outputHeaders()), + mapToJson(p.attributes()), + nullToEmpty(p.resolvedEndpointUri()), + nullToEmpty(p.circuitBreakerState()), + boolOrFalse(p.fallbackTriggered()), + boolOrFalse(p.filterMatched()), + boolOrFalse(p.duplicateMessage()) + }).toList()); + } + + private static String nullToEmpty(String value) { + return value != null ? value : ""; + } + + private static boolean boolOrFalse(Boolean value) { + return value != null && value; + } + + private static Timestamp computeEndTime(Instant startTime, long durationMs) { + if (startTime != null && durationMs > 0) { + return Timestamp.from(startTime.plusMillis(durationMs)); + } + return null; + } + + private String mapToJson(Map map) { + if (map == null || map.isEmpty()) return ""; + try { + return objectMapper.writeValueAsString(map); + } catch (JsonProcessingException e) { + return ""; + } + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java new file mode 100644 index 00000000..0904507e --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java @@ -0,0 +1,231 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseExecutionStoreIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseExecutionStore store; + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + // Load DDL from classpath resources + String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + + jdbc.execute(executionsDdl); + jdbc.execute(processorsDdl); + + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + store = new ClickHouseExecutionStore(jdbc); + } + + @Test + void insertExecutionBatch_writesToClickHouse() { + MergedExecution exec = new MergedExecution( + "default", 1L, "exec-1", "route-a", "agent-1", "my-app", + "COMPLETED", "corr-1", "exchange-1", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), + 1000L, + "some error", "stack trace", "IOException", "IO", + "FileNotFoundException", "file not found", + "hash-abc", "FULL", + "{\"key\":\"val\"}", "{\"out\":\"val\"}", + "{\"h1\":\"v1\"}", "{\"h2\":\"v2\"}", + "{\"attr\":\"val\"}", + "trace-123", "span-456", + true, false + ); + + store.insertExecutionBatch(List.of(exec)); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM executions WHERE execution_id = 'exec-1'", + Integer.class); + assertThat(count).isEqualTo(1); + } + + @Test + void insertProcessorBatch_writesToClickHouse() { + FlatProcessorRecord proc = new FlatProcessorRecord( + 1, null, null, + "proc-1", "to", null, null, + "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 50L, + "http://example.com", + "input body", "output body", + Map.of("h1", "v1"), Map.of("h2", "v2"), + null, null, null, null, null, null, + Map.of("a1", "v1"), + null, null, null, null + ); + + store.insertProcessorBatch( + "default", "exec-1", "route-a", "my-app", + Instant.parse("2026-03-31T10:00:00Z"), + List.of(proc)); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM processor_executions WHERE execution_id = 'exec-1'", + Integer.class); + assertThat(count).isEqualTo(1); + + // Verify seq is stored + Integer seq = jdbc.queryForObject( + "SELECT seq FROM processor_executions WHERE execution_id = 'exec-1'", + Integer.class); + assertThat(seq).isEqualTo(1); + } + + @Test + void insertProcessorBatch_withIterations() { + FlatProcessorRecord splitContainer = new FlatProcessorRecord( + 1, null, null, + "split-1", "split", null, 3, + "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 300L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null + ); + + FlatProcessorRecord child0 = new FlatProcessorRecord( + 2, 1, "split-1", + "child-proc", "to", 0, null, + "COMPLETED", + Instant.parse("2026-03-31T10:00:00.100Z"), 80L, + "http://svc-a", "body0", "out0", + null, null, + null, null, null, null, null, null, + null, null, null, null, null + ); + + FlatProcessorRecord child1 = new FlatProcessorRecord( + 3, 1, "split-1", + "child-proc", "to", 1, null, + "COMPLETED", + Instant.parse("2026-03-31T10:00:00.200Z"), 90L, + "http://svc-a", "body1", "out1", + null, null, + null, null, null, null, null, null, + null, null, null, null, null + ); + + FlatProcessorRecord child2 = new FlatProcessorRecord( + 4, 1, "split-1", + "child-proc", "to", 2, null, + "COMPLETED", + Instant.parse("2026-03-31T10:00:00.300Z"), 100L, + "http://svc-a", "body2", "out2", + null, null, + null, null, null, null, null, null, + null, null, null, null, null + ); + + store.insertProcessorBatch( + "default", "exec-2", "route-b", "my-app", + Instant.parse("2026-03-31T10:00:00Z"), + List.of(splitContainer, child0, child1, child2)); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM processor_executions WHERE execution_id = 'exec-2'", + Integer.class); + assertThat(count).isEqualTo(4); + + // Verify iteration data on the split container + Integer iterationSize = jdbc.queryForObject( + "SELECT iteration_size FROM processor_executions " + + "WHERE execution_id = 'exec-2' AND seq = 1", + Integer.class); + assertThat(iterationSize).isEqualTo(3); + + // Verify iteration index on a child + Integer iteration = jdbc.queryForObject( + "SELECT iteration FROM processor_executions " + + "WHERE execution_id = 'exec-2' AND seq = 3", + Integer.class); + assertThat(iteration).isEqualTo(1); + } + + @Test + void insertExecutionBatch_emptyList_doesNothing() { + store.insertExecutionBatch(List.of()); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM executions", Integer.class); + assertThat(count).isEqualTo(0); + } + + @Test + void insertExecutionBatch_replacingMergeTree_keepsLatestVersion() { + MergedExecution v1 = new MergedExecution( + "default", 1L, "exec-r", "route-a", "agent-1", "my-app", + "RUNNING", "corr-1", "exchange-1", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, + "", "", "", "", "", "", + "", "FULL", + "", "", "", "", "", + "", "", + false, false + ); + + MergedExecution v2 = new MergedExecution( + "default", 2L, "exec-r", "route-a", "agent-1", "my-app", + "COMPLETED", "corr-1", "exchange-1", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:05Z"), + 5000L, + "", "", "", "", "", "", + "", "FULL", + "", "", "", "", "", + "", "", + false, false + ); + + store.insertExecutionBatch(List.of(v1)); + store.insertExecutionBatch(List.of(v2)); + + // Force merge to apply ReplacingMergeTree deduplication + jdbc.execute("OPTIMIZE TABLE executions FINAL"); + + String status = jdbc.queryForObject( + "SELECT status FROM executions " + + "WHERE execution_id = 'exec-r'", + String.class); + assertThat(status).isEqualTo("COMPLETED"); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java new file mode 100644 index 00000000..d5227ab8 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java @@ -0,0 +1,39 @@ +package com.cameleer3.server.core.ingestion; + +import java.time.Instant; + +/** + * A merged execution envelope ready for ClickHouse insertion. + * Produced by ChunkAccumulator after receiving the final chunk. + */ +public record MergedExecution( + String tenantId, + long version, + String executionId, + String routeId, + String agentId, + String applicationName, + String status, + String correlationId, + String exchangeId, + Instant startTime, + Instant endTime, + Long durationMs, + String errorMessage, + String errorStacktrace, + String errorType, + String errorCategory, + String rootCauseType, + String rootCauseMessage, + String diagramContentHash, + String engineLevel, + String inputBody, + String outputBody, + String inputHeaders, + String outputHeaders, + String attributes, + String traceId, + String spanId, + boolean hasTraceData, + boolean isReplay +) {} From 62420cf0c21197c9952f9d800cfb8b92e8092b75 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 19:10:21 +0200 Subject: [PATCH 4/9] feat(clickhouse): add ChunkAccumulator for chunked execution ingestion Co-Authored-By: Claude Opus 4.6 (1M context) --- .../core/ingestion/ChunkAccumulator.java | 205 ++++++++++++++++ .../core/ingestion/ChunkAccumulatorTest.java | 226 ++++++++++++++++++ 2 files changed, 431 insertions(+) create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java create mode 100644 cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java new file mode 100644 index 00000000..35eccbb9 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java @@ -0,0 +1,205 @@ +package com.cameleer3.server.core.ingestion; + +import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +/** + * Accumulates {@link ExecutionChunk} documents and produces: + *
    + *
  • {@link ProcessorBatch} — pushed immediately for each chunk (append-only)
  • + *
  • {@link MergedExecution} — pushed when the final chunk arrives or on stale sweep
  • + *
+ */ +public class ChunkAccumulator { + + private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class); + private static final String DEFAULT_TENANT = "default"; + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final Consumer executionSink; + private final Consumer processorSink; + private final Duration staleThreshold; + private final ConcurrentHashMap pending = new ConcurrentHashMap<>(); + + public ChunkAccumulator(Consumer executionSink, + Consumer processorSink, + Duration staleThreshold) { + this.executionSink = executionSink; + this.processorSink = processorSink; + this.staleThreshold = staleThreshold; + } + + /** + * Process an incoming chunk: push processors immediately, + * buffer/merge the envelope, and emit when final. + */ + public void onChunk(ExecutionChunk chunk) { + // 1. Push processor records immediately (append-only) + if (chunk.processors() != null && !chunk.processors().isEmpty()) { + processorSink.accept(new ProcessorBatch( + DEFAULT_TENANT, + chunk.exchangeId(), + chunk.routeId(), + chunk.applicationName(), + chunk.startTime(), + chunk.processors())); + } + + // 2. Buffer/merge the exchange envelope + if (chunk.isFinal()) { + // Merge with any pending envelope, then emit + PendingExchange existing = pending.remove(chunk.exchangeId()); + ExecutionChunk merged = existing != null + ? mergeEnvelopes(existing.envelope(), chunk) + : chunk; + executionSink.accept(toMergedExecution(merged)); + } else { + // Buffer the envelope for later merging + pending.merge(chunk.exchangeId(), + new PendingExchange(chunk, Instant.now()), + (old, incoming) -> new PendingExchange( + mergeEnvelopes(old.envelope(), incoming.envelope()), + old.receivedAt())); + } + } + + /** + * Flush exchanges that have been pending longer than the stale threshold. + * Called periodically by a scheduled task. + */ + public void sweepStale() { + Instant cutoff = Instant.now().minus(staleThreshold); + pending.forEach((exchangeId, pe) -> { + if (pe.receivedAt().isBefore(cutoff)) { + PendingExchange removed = pending.remove(exchangeId); + if (removed != null) { + log.info("Flushing stale exchange {} (pending since {})", + exchangeId, removed.receivedAt()); + executionSink.accept(toMergedExecution(removed.envelope())); + } + } + }); + } + + /** Number of exchanges awaiting a final chunk. */ + public int getPendingCount() { + return pending.size(); + } + + // ---- Merge logic ---- + + /** + * COALESCE merge: for each field, prefer the newer value if non-null, else keep older. + * The newer chunk (higher chunkSeq) takes precedence for status, endTime, durationMs. + */ + private static ExecutionChunk mergeEnvelopes(ExecutionChunk older, ExecutionChunk newer) { + return new ExecutionChunk( + coalesce(newer.exchangeId(), older.exchangeId()), + coalesce(newer.applicationName(), older.applicationName()), + coalesce(newer.agentId(), older.agentId()), + coalesce(newer.routeId(), older.routeId()), + coalesce(newer.correlationId(), older.correlationId()), + coalesce(newer.status(), older.status()), + coalesce(older.startTime(), newer.startTime()), // prefer earliest startTime + coalesce(newer.endTime(), older.endTime()), + coalesce(newer.durationMs(), older.durationMs()), + coalesce(newer.engineLevel(), older.engineLevel()), + coalesce(newer.errorMessage(), older.errorMessage()), + coalesce(newer.errorStackTrace(), older.errorStackTrace()), + coalesce(newer.errorType(), older.errorType()), + coalesce(newer.errorCategory(), older.errorCategory()), + coalesce(newer.rootCauseType(), older.rootCauseType()), + coalesce(newer.rootCauseMessage(), older.rootCauseMessage()), + coalesce(newer.attributes(), older.attributes()), + coalesce(newer.traceId(), older.traceId()), + coalesce(newer.spanId(), older.spanId()), + coalesce(newer.originalExchangeId(), older.originalExchangeId()), + coalesce(newer.replayExchangeId(), older.replayExchangeId()), + Math.max(newer.chunkSeq(), older.chunkSeq()), + newer.isFinal() || older.isFinal(), + List.of() // processors are handled separately + ); + } + + private static T coalesce(T a, T b) { + return a != null ? a : b; + } + + // ---- Conversion to MergedExecution ---- + + private static MergedExecution toMergedExecution(ExecutionChunk envelope) { + return new MergedExecution( + DEFAULT_TENANT, + 1L, + envelope.exchangeId(), + envelope.routeId(), + envelope.agentId(), + envelope.applicationName(), + envelope.status(), + envelope.correlationId(), + envelope.exchangeId(), + envelope.startTime(), + envelope.endTime(), + envelope.durationMs(), + envelope.errorMessage(), + envelope.errorStackTrace(), + envelope.errorType(), + envelope.errorCategory(), + envelope.rootCauseType(), + envelope.rootCauseMessage(), + "", // diagramContentHash — server-side lookup, not in chunk + envelope.engineLevel(), + "", // inputBody — on processor records now + "", // outputBody + "", // inputHeaders + "", // outputHeaders + serializeAttributes(envelope.attributes()), + envelope.traceId(), + envelope.spanId(), + false, // hasTraceData — not tracked at envelope level + envelope.replayExchangeId() != null // isReplay + ); + } + + private static String serializeAttributes(Map attributes) { + if (attributes == null || attributes.isEmpty()) { + return "{}"; + } + try { + return MAPPER.writeValueAsString(attributes); + } catch (JsonProcessingException e) { + log.warn("Failed to serialize attributes, falling back to empty object", e); + return "{}"; + } + } + + // ---- Inner types ---- + + /** + * A batch of processor records from a single chunk, ready for ClickHouse insertion. + */ + public record ProcessorBatch( + String tenantId, + String executionId, + String routeId, + String applicationName, + Instant execStartTime, + List processors + ) {} + + /** + * Envelope buffered while waiting for the final chunk. + */ + private record PendingExchange(ExecutionChunk envelope, Instant receivedAt) {} +} diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java new file mode 100644 index 00000000..fd9cdca8 --- /dev/null +++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java @@ -0,0 +1,226 @@ +package com.cameleer3.server.core.ingestion; + +import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +import static org.assertj.core.api.Assertions.assertThat; + +class ChunkAccumulatorTest { + + private CopyOnWriteArrayList executionSink; + private CopyOnWriteArrayList processorSink; + private ChunkAccumulator accumulator; + + @BeforeEach + void setUp() { + executionSink = new CopyOnWriteArrayList<>(); + processorSink = new CopyOnWriteArrayList<>(); + accumulator = new ChunkAccumulator( + executionSink::add, processorSink::add, Duration.ofMinutes(5)); + } + + @Test + void singleFinalChunk_producesExecutionAndProcessors() { + ExecutionChunk chunk = new ExecutionChunk( + "ex-1", "order-service", "agent-1", "route-1", + "corr-1", "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + "REGULAR", + null, null, null, null, null, null, + Map.of("orderId", "ORD-1"), + "trace-1", "span-1", null, null, + 0, true, + List.of(proc(1, null, "log1", "log", "COMPLETED", 5L))); + + accumulator.onChunk(chunk); + + // Processor sink should receive 1 batch with 1 record + assertThat(processorSink).hasSize(1); + ChunkAccumulator.ProcessorBatch batch = processorSink.get(0); + assertThat(batch.tenantId()).isEqualTo("default"); + assertThat(batch.executionId()).isEqualTo("ex-1"); + assertThat(batch.routeId()).isEqualTo("route-1"); + assertThat(batch.applicationName()).isEqualTo("order-service"); + assertThat(batch.execStartTime()).isEqualTo(Instant.parse("2026-03-31T10:00:00Z")); + assertThat(batch.processors()).hasSize(1); + + // Execution sink should receive 1 merged execution + assertThat(executionSink).hasSize(1); + MergedExecution exec = executionSink.get(0); + assertThat(exec.tenantId()).isEqualTo("default"); + assertThat(exec.version()).isEqualTo(1L); + assertThat(exec.executionId()).isEqualTo("ex-1"); + assertThat(exec.routeId()).isEqualTo("route-1"); + assertThat(exec.status()).isEqualTo("COMPLETED"); + assertThat(exec.durationMs()).isEqualTo(1000L); + assertThat(exec.traceId()).isEqualTo("trace-1"); + assertThat(exec.spanId()).isEqualTo("span-1"); + assertThat(exec.attributes()).contains("orderId"); + } + + @Test + void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() { + ExecutionChunk chunk0 = new ExecutionChunk( + "ex-2", "app", "agent-1", "route-1", + "ex-2", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 0, false, + List.of( + proc(1, null, "log1", "log", "COMPLETED", 5L), + proc(2, null, "log2", "log", "COMPLETED", 3L))); + + accumulator.onChunk(chunk0); + + // Processors pushed immediately on chunk 0 + assertThat(processorSink).hasSize(1); + assertThat(processorSink.get(0).processors()).hasSize(2); + + // No execution yet (not final) + assertThat(executionSink).isEmpty(); + + ExecutionChunk chunk1 = new ExecutionChunk( + "ex-2", "app", "agent-1", "route-1", + "ex-2", "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:02Z"), 2000L, + "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 1, true, + List.of(proc(3, null, "log3", "log", "COMPLETED", 7L))); + + accumulator.onChunk(chunk1); + + // Processors from chunk 1 also pushed + assertThat(processorSink).hasSize(2); + assertThat(processorSink.get(1).processors()).hasSize(1); + + // Now execution is emitted + assertThat(executionSink).hasSize(1); + MergedExecution exec = executionSink.get(0); + assertThat(exec.status()).isEqualTo("COMPLETED"); + assertThat(exec.durationMs()).isEqualTo(2000L); + } + + @Test + void staleExchange_flushedBySweep() throws Exception { + ChunkAccumulator staleAccumulator = new ChunkAccumulator( + executionSink::add, processorSink::add, Duration.ofMillis(1)); + + ExecutionChunk chunk = new ExecutionChunk( + "ex-3", "app", "agent-1", "route-1", + "ex-3", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 0, false, + List.of()); + + staleAccumulator.onChunk(chunk); + assertThat(executionSink).isEmpty(); + + Thread.sleep(5); + staleAccumulator.sweepStale(); + + assertThat(executionSink).hasSize(1); + MergedExecution exec = executionSink.get(0); + assertThat(exec.status()).isEqualTo("RUNNING"); + assertThat(exec.executionId()).isEqualTo("ex-3"); + } + + @Test + void finalChunkWithErrors_populatesErrorFields() { + ExecutionChunk chunk = new ExecutionChunk( + "ex-4", "app", "agent-1", "route-1", + "ex-4", "FAILED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + "REGULAR", + "NullPointerException", "at com.foo.Bar.baz(Bar.java:42)", + "NullPointerException", "RUNTIME", + "NullPointerException", "null value at index 0", + null, null, null, null, null, + 0, true, + List.of()); + + accumulator.onChunk(chunk); + + assertThat(executionSink).hasSize(1); + MergedExecution exec = executionSink.get(0); + assertThat(exec.status()).isEqualTo("FAILED"); + assertThat(exec.errorMessage()).isEqualTo("NullPointerException"); + assertThat(exec.errorStacktrace()).isEqualTo("at com.foo.Bar.baz(Bar.java:42)"); + assertThat(exec.errorType()).isEqualTo("NullPointerException"); + assertThat(exec.errorCategory()).isEqualTo("RUNTIME"); + assertThat(exec.rootCauseType()).isEqualTo("NullPointerException"); + assertThat(exec.rootCauseMessage()).isEqualTo("null value at index 0"); + } + + @Test + void getPendingCount_tracksBufferedExchanges() { + ExecutionChunk running1 = new ExecutionChunk( + "ex-5", "app", "agent-1", "route-1", + "ex-5", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 0, false, + List.of()); + + ExecutionChunk running2 = new ExecutionChunk( + "ex-6", "app", "agent-1", "route-2", + "ex-6", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 0, false, + List.of()); + + accumulator.onChunk(running1); + accumulator.onChunk(running2); + assertThat(accumulator.getPendingCount()).isEqualTo(2); + + // Send final for ex-5 + ExecutionChunk final5 = new ExecutionChunk( + "ex-5", "app", "agent-1", "route-1", + "ex-5", "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 1, true, + List.of()); + + accumulator.onChunk(final5); + assertThat(accumulator.getPendingCount()).isEqualTo(1); + } + + /** Helper to create a FlatProcessorRecord with minimal fields. */ + private static FlatProcessorRecord proc(int seq, Integer parentSeq, + String processorId, String processorType, + String status, long durationMs) { + return new FlatProcessorRecord( + seq, parentSeq, null, processorId, processorType, + null, null, status, + Instant.parse("2026-03-31T10:00:00.100Z"), durationMs, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null); + } +} From 776f2ce90d180f867c08e72bed53aacba783418f Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 19:12:38 +0200 Subject: [PATCH 5/9] feat(clickhouse): add ExecutionFlushScheduler and ChunkIngestionController ExecutionFlushScheduler drains MergedExecution and ProcessorBatch write buffers on a fixed interval and delegates batch inserts to ClickHouseExecutionStore. Also sweeps stale exchanges every 60s. ChunkIngestionController exposes POST /api/v1/data/chunks, accepts single or array ExecutionChunk payloads, and feeds them into the ChunkAccumulator. Conditional on ChunkAccumulator bean (clickhouse.enabled). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../controller/ChunkIngestionController.java | 68 +++++++++ .../ingestion/ExecutionFlushScheduler.java | 136 ++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java new file mode 100644 index 00000000..21ed7602 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java @@ -0,0 +1,68 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * Ingestion endpoint for execution chunk data (ClickHouse pipeline). + *

+ * Accepts single or array {@link ExecutionChunk} payloads and feeds them + * into the {@link ChunkAccumulator}. Only active when + * {@code clickhouse.enabled=true} (conditional on the accumulator bean). + */ +@RestController +@RequestMapping("/api/v1/data") +@ConditionalOnBean(ChunkAccumulator.class) +@Tag(name = "Ingestion", description = "Data ingestion endpoints") +public class ChunkIngestionController { + + private static final Logger log = LoggerFactory.getLogger(ChunkIngestionController.class); + + private final ChunkAccumulator accumulator; + private final ObjectMapper objectMapper; + + public ChunkIngestionController(ChunkAccumulator accumulator) { + this.accumulator = accumulator; + this.objectMapper = new ObjectMapper(); + this.objectMapper.registerModule(new JavaTimeModule()); + } + + @PostMapping("/chunks") + @Operation(summary = "Ingest execution chunk") + public ResponseEntity ingestChunks(@RequestBody String body) { + try { + String trimmed = body.strip(); + List chunks; + if (trimmed.startsWith("[")) { + chunks = objectMapper.readValue(trimmed, new TypeReference<>() {}); + } else { + ExecutionChunk single = objectMapper.readValue(trimmed, ExecutionChunk.class); + chunks = List.of(single); + } + + for (ExecutionChunk chunk : chunks) { + accumulator.onChunk(chunk); + } + + return ResponseEntity.accepted().build(); + } catch (Exception e) { + log.warn("Failed to parse execution chunk payload: {}", e.getMessage()); + return ResponseEntity.badRequest().build(); + } + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java new file mode 100644 index 00000000..31ce0fe4 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java @@ -0,0 +1,136 @@ +package com.cameleer3.server.app.ingestion; + +import com.cameleer3.server.app.config.IngestionConfig; +import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.ingestion.WriteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; +import org.springframework.scheduling.annotation.Scheduled; + +import java.util.List; + +/** + * Scheduled flush task for ClickHouse execution and processor write buffers. + *

+ * Drains both buffers on a fixed interval and delegates batch inserts to + * {@link ClickHouseExecutionStore}. Also periodically sweeps stale exchanges + * from the {@link ChunkAccumulator}. + *

+ * Not a {@code @Component} — instantiated as a {@code @Bean} in StorageBeanConfig. + */ +public class ExecutionFlushScheduler implements SmartLifecycle { + + private static final Logger log = LoggerFactory.getLogger(ExecutionFlushScheduler.class); + + private final WriteBuffer executionBuffer; + private final WriteBuffer processorBuffer; + private final ClickHouseExecutionStore executionStore; + private final ChunkAccumulator accumulator; + private final int batchSize; + private volatile boolean running = false; + + public ExecutionFlushScheduler(WriteBuffer executionBuffer, + WriteBuffer processorBuffer, + ClickHouseExecutionStore executionStore, + ChunkAccumulator accumulator, + IngestionConfig config) { + this.executionBuffer = executionBuffer; + this.processorBuffer = processorBuffer; + this.executionStore = executionStore; + this.accumulator = accumulator; + this.batchSize = config.getBatchSize(); + } + + @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}") + public void flush() { + try { + List executions = executionBuffer.drain(batchSize); + if (!executions.isEmpty()) { + executionStore.insertExecutionBatch(executions); + log.debug("Flushed {} executions to ClickHouse", executions.size()); + } + } catch (Exception e) { + log.error("Failed to flush executions", e); + } + + try { + List batches = processorBuffer.drain(batchSize); + for (ChunkAccumulator.ProcessorBatch batch : batches) { + executionStore.insertProcessorBatch( + batch.tenantId(), + batch.executionId(), + batch.routeId(), + batch.applicationName(), + batch.execStartTime(), + batch.processors()); + } + if (!batches.isEmpty()) { + log.debug("Flushed {} processor batches to ClickHouse", batches.size()); + } + } catch (Exception e) { + log.error("Failed to flush processor batches", e); + } + } + + @Scheduled(fixedDelay = 60_000) + public void sweepStale() { + try { + accumulator.sweepStale(); + } catch (Exception e) { + log.error("Failed to sweep stale exchanges", e); + } + } + + @Override + public void start() { + running = true; + } + + @Override + public void stop() { + // Drain remaining executions on shutdown + while (executionBuffer.size() > 0) { + List batch = executionBuffer.drain(batchSize); + if (batch.isEmpty()) break; + try { + executionStore.insertExecutionBatch(batch); + } catch (Exception e) { + log.error("Failed to flush executions during shutdown", e); + break; + } + } + // Drain remaining processor batches on shutdown + while (processorBuffer.size() > 0) { + List batches = processorBuffer.drain(batchSize); + if (batches.isEmpty()) break; + try { + for (ChunkAccumulator.ProcessorBatch batch : batches) { + executionStore.insertProcessorBatch( + batch.tenantId(), + batch.executionId(), + batch.routeId(), + batch.applicationName(), + batch.execStartTime(), + batch.processors()); + } + } catch (Exception e) { + log.error("Failed to flush processor batches during shutdown", e); + break; + } + } + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + public int getPhase() { + return Integer.MAX_VALUE - 1; + } +} From 6052407c82a762d30dc4def37cd4ef8390fa8e0a Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 19:18:01 +0200 Subject: [PATCH 6/9] feat(clickhouse): add ClickHouseSearchIndex with ngram-accelerated SQL search Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/search/ClickHouseSearchIndex.java | 304 +++++++++++++++++ .../app/search/ClickHouseSearchIndexIT.java | 319 ++++++++++++++++++ 2 files changed, 623 insertions(+) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java create mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java new file mode 100644 index 00000000..6dbdd3e6 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java @@ -0,0 +1,304 @@ +package com.cameleer3.server.app.search; + +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.SearchIndex; +import com.cameleer3.server.core.storage.model.ExecutionDocument; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.*; + +/** + * ClickHouse-backed implementation of {@link SearchIndex}. + *

+ * Queries the {@code executions} and {@code processor_executions} tables directly + * using SQL with ngram bloom-filter indexes for full-text search acceleration. + *

+ * The {@link #index} and {@link #delete} methods are no-ops because data is + * written by the accumulator/store pipeline, not the search index. + */ +public class ClickHouseSearchIndex implements SearchIndex { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseSearchIndex.class); + private static final ObjectMapper JSON = new ObjectMapper(); + private static final TypeReference> STR_MAP = new TypeReference<>() {}; + private static final int HIGHLIGHT_CONTEXT_CHARS = 120; + + private static final Map SORT_FIELD_MAP = Map.of( + "startTime", "start_time", + "durationMs", "duration_ms", + "status", "status", + "agentId", "agent_id", + "routeId", "route_id", + "correlationId", "correlation_id", + "executionId", "execution_id", + "applicationName", "application_name" + ); + + private final JdbcTemplate jdbc; + + public ClickHouseSearchIndex(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void index(ExecutionDocument document) { + // No-op: data is written by ClickHouseExecutionStore + } + + @Override + public void delete(String executionId) { + // No-op: ClickHouse ReplacingMergeTree handles versioning + } + + @Override + public SearchResult search(SearchRequest request) { + try { + List params = new ArrayList<>(); + String whereClause = buildWhereClause(request, params); + String searchTerm = request.text(); + + // Count query + String countSql = "SELECT count() FROM executions FINAL WHERE " + whereClause; + Long total = jdbc.queryForObject(countSql, Long.class, params.toArray()); + if (total == null || total == 0) { + return SearchResult.empty(request.offset(), request.limit()); + } + + // Data query + String sortColumn = SORT_FIELD_MAP.getOrDefault(request.sortField(), "start_time"); + String sortDir = "asc".equalsIgnoreCase(request.sortDir()) ? "ASC" : "DESC"; + + String dataSql = "SELECT execution_id, route_id, agent_id, application_name, " + + "status, start_time, end_time, duration_ms, correlation_id, " + + "error_message, error_stacktrace, diagram_content_hash, attributes, " + + "has_trace_data, is_replay, " + + "input_body, output_body, input_headers, output_headers, root_cause_message " + + "FROM executions FINAL WHERE " + whereClause + + " ORDER BY " + sortColumn + " " + sortDir + + " LIMIT ? OFFSET ?"; + + List dataParams = new ArrayList<>(params); + dataParams.add(request.limit()); + dataParams.add(request.offset()); + + List data = jdbc.query( + dataSql, dataParams.toArray(), + (rs, rowNum) -> mapRow(rs, searchTerm)); + + return new SearchResult<>(data, total, request.offset(), request.limit()); + } catch (Exception e) { + log.error("ClickHouse search failed", e); + return SearchResult.empty(request.offset(), request.limit()); + } + } + + @Override + public long count(SearchRequest request) { + try { + List params = new ArrayList<>(); + String whereClause = buildWhereClause(request, params); + String sql = "SELECT count() FROM executions FINAL WHERE " + whereClause; + Long result = jdbc.queryForObject(sql, Long.class, params.toArray()); + return result != null ? result : 0L; + } catch (Exception e) { + log.error("ClickHouse count failed", e); + return 0L; + } + } + + private String buildWhereClause(SearchRequest request, List params) { + List conditions = new ArrayList<>(); + conditions.add("tenant_id = 'default'"); + + if (request.timeFrom() != null) { + conditions.add("start_time >= ?"); + params.add(Timestamp.from(request.timeFrom())); + } + if (request.timeTo() != null) { + conditions.add("start_time <= ?"); + params.add(Timestamp.from(request.timeTo())); + } + + if (request.status() != null && !request.status().isBlank()) { + String[] statuses = request.status().split(","); + if (statuses.length == 1) { + conditions.add("status = ?"); + params.add(statuses[0].trim()); + } else { + String placeholders = String.join(", ", Collections.nCopies(statuses.length, "?")); + conditions.add("status IN (" + placeholders + ")"); + for (String s : statuses) { + params.add(s.trim()); + } + } + } + + if (request.routeId() != null) { + conditions.add("route_id = ?"); + params.add(request.routeId()); + } + + if (request.agentId() != null) { + conditions.add("agent_id = ?"); + params.add(request.agentId()); + } + + if (request.correlationId() != null) { + conditions.add("correlation_id = ?"); + params.add(request.correlationId()); + } + + if (request.application() != null && !request.application().isBlank()) { + conditions.add("application_name = ?"); + params.add(request.application()); + } + + if (request.agentIds() != null && !request.agentIds().isEmpty()) { + String placeholders = String.join(", ", Collections.nCopies(request.agentIds().size(), "?")); + conditions.add("agent_id IN (" + placeholders + ")"); + params.addAll(request.agentIds()); + } + + if (request.durationMin() != null) { + conditions.add("duration_ms >= ?"); + params.add(request.durationMin()); + } + + if (request.durationMax() != null) { + conditions.add("duration_ms <= ?"); + params.add(request.durationMax()); + } + + // Global full-text search: execution-level _search_text OR processor-level _search_text + if (request.text() != null && !request.text().isBlank()) { + String likeTerm = "%" + escapeLike(request.text()) + "%"; + conditions.add("(_search_text LIKE ? OR execution_id IN (" + + "SELECT DISTINCT execution_id FROM processor_executions " + + "WHERE tenant_id = 'default' AND _search_text LIKE ?))"); + params.add(likeTerm); + params.add(likeTerm); + } + + // Scoped body search in processor_executions + if (request.textInBody() != null && !request.textInBody().isBlank()) { + String likeTerm = "%" + escapeLike(request.textInBody()) + "%"; + conditions.add("execution_id IN (" + + "SELECT DISTINCT execution_id FROM processor_executions " + + "WHERE tenant_id = 'default' AND (input_body LIKE ? OR output_body LIKE ?))"); + params.add(likeTerm); + params.add(likeTerm); + } + + // Scoped headers search in processor_executions + if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) { + String likeTerm = "%" + escapeLike(request.textInHeaders()) + "%"; + conditions.add("execution_id IN (" + + "SELECT DISTINCT execution_id FROM processor_executions " + + "WHERE tenant_id = 'default' AND (input_headers LIKE ? OR output_headers LIKE ?))"); + params.add(likeTerm); + params.add(likeTerm); + } + + // Scoped error search: execution-level + processor-level + if (request.textInErrors() != null && !request.textInErrors().isBlank()) { + String likeTerm = "%" + escapeLike(request.textInErrors()) + "%"; + conditions.add("(error_message LIKE ? OR error_stacktrace LIKE ? OR execution_id IN (" + + "SELECT DISTINCT execution_id FROM processor_executions " + + "WHERE tenant_id = 'default' AND (error_message LIKE ? OR error_stacktrace LIKE ?)))"); + params.add(likeTerm); + params.add(likeTerm); + params.add(likeTerm); + params.add(likeTerm); + } + + return String.join(" AND ", conditions); + } + + private ExecutionSummary mapRow(ResultSet rs, String searchTerm) throws SQLException { + String executionId = rs.getString("execution_id"); + String routeId = rs.getString("route_id"); + String agentId = rs.getString("agent_id"); + String applicationName = rs.getString("application_name"); + String status = rs.getString("status"); + + Timestamp startTs = rs.getTimestamp("start_time"); + Instant startTime = startTs != null ? startTs.toInstant() : null; + + Timestamp endTs = rs.getTimestamp("end_time"); + Instant endTime = endTs != null ? endTs.toInstant() : null; + + long durationMs = rs.getLong("duration_ms"); + String correlationId = rs.getString("correlation_id"); + String errorMessage = rs.getString("error_message"); + String errorStacktrace = rs.getString("error_stacktrace"); + String diagramContentHash = rs.getString("diagram_content_hash"); + String attributesJson = rs.getString("attributes"); + boolean hasTraceData = rs.getBoolean("has_trace_data"); + boolean isReplay = rs.getBoolean("is_replay"); + String inputBody = rs.getString("input_body"); + String outputBody = rs.getString("output_body"); + String inputHeaders = rs.getString("input_headers"); + String outputHeaders = rs.getString("output_headers"); + String rootCauseMessage = rs.getString("root_cause_message"); + + Map attributes = parseAttributesJson(attributesJson); + + // Application-side highlighting + String highlight = null; + if (searchTerm != null && !searchTerm.isBlank()) { + highlight = findHighlight(searchTerm, errorMessage, errorStacktrace, + inputBody, outputBody, inputHeaders, outputHeaders, attributesJson, rootCauseMessage); + } + + return new ExecutionSummary( + executionId, routeId, agentId, applicationName, status, + startTime, endTime, durationMs, + correlationId, errorMessage, diagramContentHash, + highlight, attributes, hasTraceData, isReplay + ); + } + + private String findHighlight(String searchTerm, String... fields) { + for (String field : fields) { + String snippet = extractSnippet(field, searchTerm, HIGHLIGHT_CONTEXT_CHARS); + if (snippet != null) { + return snippet; + } + } + return null; + } + + static String extractSnippet(String text, String searchTerm, int contextChars) { + if (text == null || text.isEmpty() || searchTerm == null) return null; + int idx = text.toLowerCase().indexOf(searchTerm.toLowerCase()); + if (idx < 0) return null; + int start = Math.max(0, idx - contextChars / 2); + int end = Math.min(text.length(), idx + searchTerm.length() + contextChars / 2); + return (start > 0 ? "..." : "") + text.substring(start, end) + (end < text.length() ? "..." : ""); + } + + private static String escapeLike(String term) { + return term.replace("\\", "\\\\") + .replace("%", "\\%") + .replace("_", "\\_"); + } + + private static Map parseAttributesJson(String json) { + if (json == null || json.isBlank()) return null; + try { + return JSON.readValue(json, STR_MAP); + } catch (Exception e) { + return null; + } + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java new file mode 100644 index 00000000..31a9580c --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java @@ -0,0 +1,319 @@ +package com.cameleer3.server.app.search; + +import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseSearchIndexIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseSearchIndex searchIndex; + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + // Load DDL from classpath resources + String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + + jdbc.execute(executionsDdl); + jdbc.execute(processorsDdl); + + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + ClickHouseExecutionStore store = new ClickHouseExecutionStore(jdbc); + searchIndex = new ClickHouseSearchIndex(jdbc); + + // Seed test data + Instant baseTime = Instant.parse("2026-03-31T10:00:00Z"); + + // exec-1: COMPLETED, route-timer, agent-a, my-app, corr-1, 500ms, input_body with order number, attributes + MergedExecution exec1 = new MergedExecution( + "default", 1L, "exec-1", "route-timer", "agent-a", "my-app", + "COMPLETED", "corr-1", "exchange-1", + baseTime, + baseTime.plusMillis(500), + 500L, + "", "", "", "", "", "", + "hash-abc", "FULL", + "{\"order\":\"12345\"}", "", "", "", "{\"env\":\"prod\"}", + "", "", + false, false + ); + + // exec-2: FAILED, route-timer, agent-a, my-app, corr-2, 200ms, with error + MergedExecution exec2 = new MergedExecution( + "default", 1L, "exec-2", "route-timer", "agent-a", "my-app", + "FAILED", "corr-2", "exchange-2", + baseTime.plusSeconds(1), + baseTime.plusSeconds(1).plusMillis(200), + 200L, + "NullPointerException at line 42", + "java.lang.NPE\n at Foo.bar(Foo.java:42)", + "NullPointerException", "RUNTIME", "", "", + "", "FULL", + "", "", "", "", "", + "", "", + false, false + ); + + // exec-3: COMPLETED, route-rest, agent-b, other-app, 100ms, no error + MergedExecution exec3 = new MergedExecution( + "default", 1L, "exec-3", "route-rest", "agent-b", "other-app", + "COMPLETED", "", "exchange-3", + baseTime.plusSeconds(2), + baseTime.plusSeconds(2).plusMillis(100), + 100L, + "", "", "", "", "", "", + "", "FULL", + "", "", "", "", "", + "", "", + false, false + ); + + store.insertExecutionBatch(List.of(exec1, exec2, exec3)); + + // Processor for exec-1: seq=1, to, inputBody with "Hello World", inputHeaders with secret-token + FlatProcessorRecord proc1 = new FlatProcessorRecord( + 1, null, null, + "proc-1", "to", null, null, + "COMPLETED", + baseTime, 50L, + null, + "Hello World request body", "", + Map.of("Authorization", "Bearer secret-token"), null, + null, null, null, null, null, null, + null, null, null, null, null + ); + + store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app", baseTime, List.of(proc1)); + } + + @Test + void search_withNoFilters_returnsAllExecutions() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(3); + assertThat(result.data()).hasSize(3); + } + + @Test + void search_byStatus_filtersCorrectly() { + SearchRequest request = new SearchRequest( + "FAILED", null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data()).hasSize(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-2"); + } + + @Test + void search_byTimeRange_filtersCorrectly() { + Instant baseTime = Instant.parse("2026-03-31T10:00:00Z"); + // Time window covering exec-1 and exec-2 but not exec-3 + SearchRequest request = new SearchRequest( + null, baseTime, baseTime.plusMillis(1500), null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(2); + assertThat(result.data()).extracting(ExecutionSummary::executionId) + .containsExactlyInAnyOrder("exec-1", "exec-2"); + } + + @Test + void search_fullTextSearch_findsInErrorMessage() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, "NullPointerException", null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-2"); + } + + @Test + void search_fullTextSearch_findsInInputBody() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, "12345", null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } + + @Test + void search_textInBody_searchesProcessorBodies() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, "Hello World", null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } + + @Test + void search_textInHeaders_searchesProcessorHeaders() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, "secret-token", null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } + + @Test + void search_textInErrors_searchesErrorFields() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, "Foo.bar", + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-2"); + } + + @Test + void search_withHighlight_returnsSnippet() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, "NullPointerException", null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).highlight()).contains("NullPointerException"); + } + + @Test + void search_pagination_works() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 2, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(3); + assertThat(result.data()).hasSize(2); + assertThat(result.offset()).isEqualTo(0); + assertThat(result.limit()).isEqualTo(2); + } + + @Test + void search_byApplication_filtersCorrectly() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, null, + null, null, null, "other-app", null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-3"); + } + + @Test + void search_byAgentIds_filtersCorrectly() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, null, + null, null, null, null, List.of("agent-b"), 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-3"); + } + + @Test + void count_returnsMatchingCount() { + SearchRequest request = new SearchRequest( + "COMPLETED", null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + long count = searchIndex.count(request); + + assertThat(count).isEqualTo(2); + } + + @Test + void search_multipleStatusFilter_works() { + SearchRequest request = new SearchRequest( + "COMPLETED,FAILED", null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(3); + } + + @Test + void search_byCorrelationId_filtersCorrectly() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, "corr-1", null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } + + @Test + void search_byDurationRange_filtersCorrectly() { + SearchRequest request = new SearchRequest( + null, null, null, 300L, 600L, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } +} From 31f7113b3fd6f897e0a3a111fa3ee4eafed5d95b Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 19:21:19 +0200 Subject: [PATCH 7/9] feat(clickhouse): wire ChunkAccumulator, flush scheduler, and search feature flag Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/config/IngestionBeanConfig.java | 15 +++++++ .../server/app/config/StorageBeanConfig.java | 44 +++++++++++++++++++ .../server/app/search/OpenSearchIndex.java | 2 + .../src/main/resources/application.yml | 1 + deploy/base/server.yaml | 2 + 5 files changed, 64 insertions(+) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java index c0d3a479..3f38c4b6 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java @@ -1,7 +1,10 @@ package com.cameleer3.server.app.config; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -19,4 +22,16 @@ public class IngestionBeanConfig { public WriteBuffer metricsBuffer(IngestionConfig config) { return new WriteBuffer<>(config.getBufferCapacity()); } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public WriteBuffer executionBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); + } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public WriteBuffer processorBatchBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 71b5bf7d..ab733408 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -8,7 +8,12 @@ import com.cameleer3.server.core.admin.AuditRepository; import com.cameleer3.server.core.admin.AuditService; import com.cameleer3.server.core.detail.DetailService; import com.cameleer3.server.core.indexing.SearchIndexer; +import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler; +import com.cameleer3.server.app.search.ClickHouseSearchIndex; +import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.IngestionService; +import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.storage.*; import com.cameleer3.server.core.storage.model.MetricsSnapshot; @@ -74,4 +79,43 @@ public class StorageBeanConfig { public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) { return new PostgresMetricsQueryStore(jdbc); } + + // ── ClickHouse Execution Store ────────────────────────────────────── + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public ClickHouseExecutionStore clickHouseExecutionStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseExecutionStore(clickHouseJdbc); + } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public ChunkAccumulator chunkAccumulator( + WriteBuffer executionBuffer, + WriteBuffer processorBatchBuffer) { + return new ChunkAccumulator( + executionBuffer::offer, + processorBatchBuffer::offer, + java.time.Duration.ofMinutes(5)); + } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public ExecutionFlushScheduler executionFlushScheduler( + WriteBuffer executionBuffer, + WriteBuffer processorBatchBuffer, + ClickHouseExecutionStore executionStore, + ChunkAccumulator accumulator, + IngestionConfig config) { + return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer, + executionStore, accumulator, config); + } + + @Bean + @ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse") + public SearchIndex clickHouseSearchIndex( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseSearchIndex(clickHouseJdbc); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java index 2421d7da..d63f90f9 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java @@ -20,6 +20,7 @@ import org.opensearch.client.opensearch.indices.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Repository; import java.io.IOException; @@ -30,6 +31,7 @@ import java.util.*; import java.util.stream.Collectors; @Repository +@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "opensearch", matchIfMissing = true) public class OpenSearchIndex implements SearchIndex { private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class); diff --git a/cameleer3-server-app/src/main/resources/application.yml b/cameleer3-server-app/src/main/resources/application.yml index abab463e..55cd27e1 100644 --- a/cameleer3-server-app/src/main/resources/application.yml +++ b/cameleer3-server-app/src/main/resources/application.yml @@ -50,6 +50,7 @@ cameleer: retention-days: ${CAMELEER_RETENTION_DAYS:30} storage: metrics: ${CAMELEER_STORAGE_METRICS:postgres} + search: ${CAMELEER_STORAGE_SEARCH:opensearch} security: access-token-expiry-ms: 3600000 diff --git a/deploy/base/server.yaml b/deploy/base/server.yaml index 79228066..06c131a3 100644 --- a/deploy/base/server.yaml +++ b/deploy/base/server.yaml @@ -91,6 +91,8 @@ spec: key: CLICKHOUSE_PASSWORD - name: CAMELEER_STORAGE_METRICS value: "postgres" + - name: CAMELEER_STORAGE_SEARCH + value: "opensearch" resources: requests: From 38551eac9d228e061b58136d6edb471eb79b520f Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 19:24:55 +0200 Subject: [PATCH 8/9] test(clickhouse): add end-to-end chunk pipeline integration test Co-Authored-By: Claude Opus 4.6 (1M context) --- .../storage/ClickHouseChunkPipelineIT.java | 169 ++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java new file mode 100644 index 00000000..219f836b --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java @@ -0,0 +1,169 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.app.search.ClickHouseSearchIndex; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseChunkPipelineIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseExecutionStore executionStore; + private ClickHouseSearchIndex searchIndex; + private ChunkAccumulator accumulator; + private List executionBuffer; + private List processorBuffer; + + @BeforeEach + void setUp() throws IOException { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + jdbc = new JdbcTemplate(ds); + + String execDdl = new String(getClass().getResourceAsStream( + "/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8); + String procDdl = new String(getClass().getResourceAsStream( + "/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8); + jdbc.execute(execDdl); + jdbc.execute(procDdl); + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + executionStore = new ClickHouseExecutionStore(jdbc); + searchIndex = new ClickHouseSearchIndex(jdbc); + + executionBuffer = new ArrayList<>(); + processorBuffer = new ArrayList<>(); + accumulator = new ChunkAccumulator(executionBuffer::add, processorBuffer::add, Duration.ofMinutes(5)); + } + + @Test + void fullPipeline_chunkedIngestion_thenSearch() { + Instant start = Instant.parse("2026-03-31T12:00:00Z"); + + // Chunk 0: RUNNING with initial processors + accumulator.onChunk(new ExecutionChunk( + "pipeline-1", "order-service", "pod-1", "order-route", + "corr-1", "RUNNING", + start, null, null, "DEEP", + null, null, null, null, null, null, + Map.of("orderId", "ORD-123"), + null, null, null, null, + 0, false, + List.of( + new FlatProcessorRecord(1, null, null, "log1", "log", + null, null, "COMPLETED", start, 2L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord(2, null, null, "split1", "split", + null, 3, "COMPLETED", start.plusMillis(2), 100L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord(3, 2, "split1", "to1", "to", + 0, null, "COMPLETED", start.plusMillis(5), 30L, + "http://inventory/api", + "order ABC-123 check stock", "stock available", + null, null, + null, null, null, null, null, null, + null, null, null, null, null)))); + + // Processors should be buffered immediately + assertThat(processorBuffer).hasSize(1); + assertThat(executionBuffer).isEmpty(); + + // Chunk 1: COMPLETED (final) + accumulator.onChunk(new ExecutionChunk( + "pipeline-1", "order-service", "pod-1", "order-route", + "corr-1", "COMPLETED", + start, start.plusMillis(750), 750L, "DEEP", + null, null, null, null, null, null, + null, null, null, null, null, + 1, true, + List.of( + new FlatProcessorRecord(4, 2, "split1", "to1", "to", + 1, null, "COMPLETED", start.plusMillis(40), 25L, + "http://inventory/api", + "order DEF-456 check stock", "stock available", + null, null, + null, null, null, null, null, null, + null, null, null, null, null)))); + + assertThat(executionBuffer).hasSize(1); + assertThat(processorBuffer).hasSize(2); + + // Flush to ClickHouse (simulating ExecutionFlushScheduler) + executionStore.insertExecutionBatch(executionBuffer); + for (ChunkAccumulator.ProcessorBatch batch : processorBuffer) { + executionStore.insertProcessorBatch( + batch.tenantId(), batch.executionId(), + batch.routeId(), batch.applicationName(), + batch.execStartTime(), batch.processors()); + } + + // Search by order ID in attributes (via _search_text on executions) + SearchResult result = searchIndex.search(new SearchRequest( + null, null, null, null, null, null, + "ORD-123", null, null, null, + null, null, null, null, null, + 0, 50, null, null)); + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1"); + assertThat(result.data().get(0).status()).isEqualTo("COMPLETED"); + assertThat(result.data().get(0).durationMs()).isEqualTo(750L); + + // Search in processor body + SearchResult bodyResult = searchIndex.search(new SearchRequest( + null, null, null, null, null, null, + null, "ABC-123", null, null, + null, null, null, null, null, + 0, 50, null, null)); + assertThat(bodyResult.total()).isEqualTo(1); + + // Verify iteration data in processor_executions + Integer iterSize = jdbc.queryForObject( + "SELECT iteration_size FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 2", + Integer.class); + assertThat(iterSize).isEqualTo(3); + + Integer iter0 = jdbc.queryForObject( + "SELECT iteration FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 3", + Integer.class); + assertThat(iter0).isEqualTo(0); + + // Verify total processor count + Integer procCount = jdbc.queryForObject( + "SELECT count() FROM processor_executions WHERE execution_id = 'pipeline-1'", + Integer.class); + assertThat(procCount).isEqualTo(4); + } +} From 07f215b0fdb093b7afa741d9dd6832c08fd82692 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 19:33:49 +0200 Subject: [PATCH 9/9] refactor: replace server-side DTOs with cameleer3-common ExecutionChunk and FlatProcessorRecord Co-Authored-By: Claude Opus 4.6 (1M context) --- .../controller/ChunkIngestionController.java | 2 +- .../app/storage/ClickHouseExecutionStore.java | 56 ++++---- .../app/search/ClickHouseSearchIndexIT.java | 21 ++- .../storage/ClickHouseChunkPipelineIT.java | 110 ++++++++++------ .../storage/ClickHouseExecutionStoreIT.java | 99 +++++++------- .../core/ingestion/ChunkAccumulator.java | 114 ++++++++-------- .../core/storage/model/ExecutionChunk.java | 42 ------ .../storage/model/FlatProcessorRecord.java | 42 ------ .../core/ingestion/ChunkAccumulatorTest.java | 123 +++++++++--------- .../ExecutionChunkDeserializationTest.java | 109 ---------------- 10 files changed, 270 insertions(+), 448 deletions(-) delete mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java delete mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java delete mode 100644 cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java index 21ed7602..24cace8a 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java @@ -1,7 +1,7 @@ package com.cameleer3.server.app.controller; import com.cameleer3.server.core.ingestion.ChunkAccumulator; -import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.cameleer3.common.model.ExecutionChunk; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java index 287bd18c..dc64b148 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java @@ -1,7 +1,7 @@ package com.cameleer3.server.app.storage; import com.cameleer3.server.core.ingestion.MergedExecution; -import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.cameleer3.common.model.FlatProcessorRecord; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.jdbc.core.JdbcTemplate; @@ -93,35 +93,35 @@ public class ClickHouseExecutionStore { processors.stream().map(p -> new Object[]{ nullToEmpty(tenantId), nullToEmpty(executionId), - p.seq(), - p.parentSeq(), - nullToEmpty(p.parentProcessorId()), - nullToEmpty(p.processorId()), - nullToEmpty(p.processorType()), - Timestamp.from(p.startTime() != null ? p.startTime() : execStartTime), + p.getSeq(), + p.getParentSeq(), + nullToEmpty(p.getParentProcessorId()), + nullToEmpty(p.getProcessorId()), + nullToEmpty(p.getProcessorType()), + Timestamp.from(p.getStartTime() != null ? p.getStartTime() : execStartTime), nullToEmpty(routeId), nullToEmpty(applicationName), - p.iteration(), - p.iterationSize(), - nullToEmpty(p.status()), - computeEndTime(p.startTime(), p.durationMs()), - p.durationMs(), - nullToEmpty(p.errorMessage()), - nullToEmpty(p.errorStackTrace()), - nullToEmpty(p.errorType()), - nullToEmpty(p.errorCategory()), - nullToEmpty(p.rootCauseType()), - nullToEmpty(p.rootCauseMessage()), - nullToEmpty(p.inputBody()), - nullToEmpty(p.outputBody()), - mapToJson(p.inputHeaders()), - mapToJson(p.outputHeaders()), - mapToJson(p.attributes()), - nullToEmpty(p.resolvedEndpointUri()), - nullToEmpty(p.circuitBreakerState()), - boolOrFalse(p.fallbackTriggered()), - boolOrFalse(p.filterMatched()), - boolOrFalse(p.duplicateMessage()) + p.getIteration(), + p.getIterationSize(), + p.getStatus() != null ? p.getStatus().name() : "", + computeEndTime(p.getStartTime(), p.getDurationMs()), + p.getDurationMs(), + nullToEmpty(p.getErrorMessage()), + nullToEmpty(p.getErrorStackTrace()), + nullToEmpty(p.getErrorType()), + nullToEmpty(p.getErrorCategory()), + nullToEmpty(p.getRootCauseType()), + nullToEmpty(p.getRootCauseMessage()), + nullToEmpty(p.getInputBody()), + nullToEmpty(p.getOutputBody()), + mapToJson(p.getInputHeaders()), + mapToJson(p.getOutputHeaders()), + mapToJson(p.getAttributes()), + nullToEmpty(p.getResolvedEndpointUri()), + nullToEmpty(p.getCircuitBreakerState()), + boolOrFalse(p.getFallbackTriggered()), + boolOrFalse(p.getFilterMatched()), + boolOrFalse(p.getDuplicateMessage()) }).toList()); } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java index 31a9580c..4cb2de53 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java @@ -5,7 +5,8 @@ import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.search.ExecutionSummary; import com.cameleer3.server.core.search.SearchRequest; import com.cameleer3.server.core.search.SearchResult; -import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -106,17 +107,13 @@ class ClickHouseSearchIndexIT { store.insertExecutionBatch(List.of(exec1, exec2, exec3)); // Processor for exec-1: seq=1, to, inputBody with "Hello World", inputHeaders with secret-token - FlatProcessorRecord proc1 = new FlatProcessorRecord( - 1, null, null, - "proc-1", "to", null, null, - "COMPLETED", - baseTime, 50L, - null, - "Hello World request body", "", - Map.of("Authorization", "Bearer secret-token"), null, - null, null, null, null, null, null, - null, null, null, null, null - ); + FlatProcessorRecord proc1 = new FlatProcessorRecord(1, "proc-1", "to"); + proc1.setStatus(ExecutionStatus.COMPLETED); + proc1.setStartTime(baseTime); + proc1.setDurationMs(50L); + proc1.setInputBody("Hello World request body"); + proc1.setOutputBody(""); + proc1.setInputHeaders(Map.of("Authorization", "Bearer secret-token")); store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app", baseTime, List.of(proc1)); } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java index 219f836b..2227ad7f 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java @@ -6,8 +6,9 @@ import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.search.ExecutionSummary; import com.cameleer3.server.core.search.SearchRequest; import com.cameleer3.server.core.search.SearchResult; -import com.cameleer3.server.core.storage.model.ExecutionChunk; -import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.cameleer3.common.model.ExecutionChunk; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -70,53 +71,76 @@ class ClickHouseChunkPipelineIT { Instant start = Instant.parse("2026-03-31T12:00:00Z"); // Chunk 0: RUNNING with initial processors - accumulator.onChunk(new ExecutionChunk( - "pipeline-1", "order-service", "pod-1", "order-route", - "corr-1", "RUNNING", - start, null, null, "DEEP", - null, null, null, null, null, null, - Map.of("orderId", "ORD-123"), - null, null, null, null, - 0, false, - List.of( - new FlatProcessorRecord(1, null, null, "log1", "log", - null, null, "COMPLETED", start, 2L, - null, null, null, null, null, - null, null, null, null, null, null, - null, null, null, null, null), - new FlatProcessorRecord(2, null, null, "split1", "split", - null, 3, "COMPLETED", start.plusMillis(2), 100L, - null, null, null, null, null, - null, null, null, null, null, null, - null, null, null, null, null), - new FlatProcessorRecord(3, 2, "split1", "to1", "to", - 0, null, "COMPLETED", start.plusMillis(5), 30L, - "http://inventory/api", - "order ABC-123 check stock", "stock available", - null, null, - null, null, null, null, null, null, - null, null, null, null, null)))); + ExecutionChunk chunk0 = new ExecutionChunk(); + chunk0.setExchangeId("pipeline-1"); + chunk0.setApplicationName("order-service"); + chunk0.setAgentId("pod-1"); + chunk0.setRouteId("order-route"); + chunk0.setCorrelationId("corr-1"); + chunk0.setStatus(ExecutionStatus.RUNNING); + chunk0.setStartTime(start); + chunk0.setEngineLevel("DEEP"); + chunk0.setAttributes(Map.of("orderId", "ORD-123")); + chunk0.setChunkSeq(0); + chunk0.setFinal(false); + + FlatProcessorRecord p1 = new FlatProcessorRecord(1, "log1", "log"); + p1.setStatus(ExecutionStatus.COMPLETED); + p1.setStartTime(start); + p1.setDurationMs(2L); + + FlatProcessorRecord p2 = new FlatProcessorRecord(2, "split1", "split"); + p2.setIterationSize(3); + p2.setStatus(ExecutionStatus.COMPLETED); + p2.setStartTime(start.plusMillis(2)); + p2.setDurationMs(100L); + + FlatProcessorRecord p3 = new FlatProcessorRecord(3, "to1", "to"); + p3.setParentSeq(2); + p3.setParentProcessorId("split1"); + p3.setIteration(0); + p3.setStatus(ExecutionStatus.COMPLETED); + p3.setStartTime(start.plusMillis(5)); + p3.setDurationMs(30L); + p3.setResolvedEndpointUri("http://inventory/api"); + p3.setInputBody("order ABC-123 check stock"); + p3.setOutputBody("stock available"); + + chunk0.setProcessors(List.of(p1, p2, p3)); + accumulator.onChunk(chunk0); // Processors should be buffered immediately assertThat(processorBuffer).hasSize(1); assertThat(executionBuffer).isEmpty(); // Chunk 1: COMPLETED (final) - accumulator.onChunk(new ExecutionChunk( - "pipeline-1", "order-service", "pod-1", "order-route", - "corr-1", "COMPLETED", - start, start.plusMillis(750), 750L, "DEEP", - null, null, null, null, null, null, - null, null, null, null, null, - 1, true, - List.of( - new FlatProcessorRecord(4, 2, "split1", "to1", "to", - 1, null, "COMPLETED", start.plusMillis(40), 25L, - "http://inventory/api", - "order DEF-456 check stock", "stock available", - null, null, - null, null, null, null, null, null, - null, null, null, null, null)))); + ExecutionChunk chunk1 = new ExecutionChunk(); + chunk1.setExchangeId("pipeline-1"); + chunk1.setApplicationName("order-service"); + chunk1.setAgentId("pod-1"); + chunk1.setRouteId("order-route"); + chunk1.setCorrelationId("corr-1"); + chunk1.setStatus(ExecutionStatus.COMPLETED); + chunk1.setStartTime(start); + chunk1.setEndTime(start.plusMillis(750)); + chunk1.setDurationMs(750L); + chunk1.setEngineLevel("DEEP"); + chunk1.setChunkSeq(1); + chunk1.setFinal(true); + + FlatProcessorRecord p4 = new FlatProcessorRecord(4, "to1", "to"); + p4.setParentSeq(2); + p4.setParentProcessorId("split1"); + p4.setIteration(1); + p4.setStatus(ExecutionStatus.COMPLETED); + p4.setStartTime(start.plusMillis(40)); + p4.setDurationMs(25L); + p4.setResolvedEndpointUri("http://inventory/api"); + p4.setInputBody("order DEF-456 check stock"); + p4.setOutputBody("stock available"); + + chunk1.setProcessors(List.of(p4)); + accumulator.onChunk(chunk1); assertThat(executionBuffer).hasSize(1); assertThat(processorBuffer).hasSize(2); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java index 0904507e..8b8ede77 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java @@ -1,7 +1,8 @@ package com.cameleer3.server.app.storage; import com.cameleer3.server.core.ingestion.MergedExecution; -import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -80,18 +81,16 @@ class ClickHouseExecutionStoreIT { @Test void insertProcessorBatch_writesToClickHouse() { - FlatProcessorRecord proc = new FlatProcessorRecord( - 1, null, null, - "proc-1", "to", null, null, - "COMPLETED", - Instant.parse("2026-03-31T10:00:00Z"), 50L, - "http://example.com", - "input body", "output body", - Map.of("h1", "v1"), Map.of("h2", "v2"), - null, null, null, null, null, null, - Map.of("a1", "v1"), - null, null, null, null - ); + FlatProcessorRecord proc = new FlatProcessorRecord(1, "proc-1", "to"); + proc.setStatus(ExecutionStatus.COMPLETED); + proc.setStartTime(Instant.parse("2026-03-31T10:00:00Z")); + proc.setDurationMs(50L); + proc.setResolvedEndpointUri("http://example.com"); + proc.setInputBody("input body"); + proc.setOutputBody("output body"); + proc.setInputHeaders(Map.of("h1", "v1")); + proc.setOutputHeaders(Map.of("h2", "v2")); + proc.setAttributes(Map.of("a1", "v1")); store.insertProcessorBatch( "default", "exec-1", "route-a", "my-app", @@ -112,48 +111,44 @@ class ClickHouseExecutionStoreIT { @Test void insertProcessorBatch_withIterations() { - FlatProcessorRecord splitContainer = new FlatProcessorRecord( - 1, null, null, - "split-1", "split", null, 3, - "COMPLETED", - Instant.parse("2026-03-31T10:00:00Z"), 300L, - null, null, null, null, null, - null, null, null, null, null, null, - null, null, null, null, null - ); + FlatProcessorRecord splitContainer = new FlatProcessorRecord(1, "split-1", "split"); + splitContainer.setIterationSize(3); + splitContainer.setStatus(ExecutionStatus.COMPLETED); + splitContainer.setStartTime(Instant.parse("2026-03-31T10:00:00Z")); + splitContainer.setDurationMs(300L); - FlatProcessorRecord child0 = new FlatProcessorRecord( - 2, 1, "split-1", - "child-proc", "to", 0, null, - "COMPLETED", - Instant.parse("2026-03-31T10:00:00.100Z"), 80L, - "http://svc-a", "body0", "out0", - null, null, - null, null, null, null, null, null, - null, null, null, null, null - ); + FlatProcessorRecord child0 = new FlatProcessorRecord(2, "child-proc", "to"); + child0.setParentSeq(1); + child0.setParentProcessorId("split-1"); + child0.setIteration(0); + child0.setStatus(ExecutionStatus.COMPLETED); + child0.setStartTime(Instant.parse("2026-03-31T10:00:00.100Z")); + child0.setDurationMs(80L); + child0.setResolvedEndpointUri("http://svc-a"); + child0.setInputBody("body0"); + child0.setOutputBody("out0"); - FlatProcessorRecord child1 = new FlatProcessorRecord( - 3, 1, "split-1", - "child-proc", "to", 1, null, - "COMPLETED", - Instant.parse("2026-03-31T10:00:00.200Z"), 90L, - "http://svc-a", "body1", "out1", - null, null, - null, null, null, null, null, null, - null, null, null, null, null - ); + FlatProcessorRecord child1 = new FlatProcessorRecord(3, "child-proc", "to"); + child1.setParentSeq(1); + child1.setParentProcessorId("split-1"); + child1.setIteration(1); + child1.setStatus(ExecutionStatus.COMPLETED); + child1.setStartTime(Instant.parse("2026-03-31T10:00:00.200Z")); + child1.setDurationMs(90L); + child1.setResolvedEndpointUri("http://svc-a"); + child1.setInputBody("body1"); + child1.setOutputBody("out1"); - FlatProcessorRecord child2 = new FlatProcessorRecord( - 4, 1, "split-1", - "child-proc", "to", 2, null, - "COMPLETED", - Instant.parse("2026-03-31T10:00:00.300Z"), 100L, - "http://svc-a", "body2", "out2", - null, null, - null, null, null, null, null, null, - null, null, null, null, null - ); + FlatProcessorRecord child2 = new FlatProcessorRecord(4, "child-proc", "to"); + child2.setParentSeq(1); + child2.setParentProcessorId("split-1"); + child2.setIteration(2); + child2.setStatus(ExecutionStatus.COMPLETED); + child2.setStartTime(Instant.parse("2026-03-31T10:00:00.300Z")); + child2.setDurationMs(100L); + child2.setResolvedEndpointUri("http://svc-a"); + child2.setInputBody("body2"); + child2.setOutputBody("out2"); store.insertProcessorBatch( "default", "exec-2", "route-b", "my-app", diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java index 35eccbb9..dcc7d486 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java @@ -1,7 +1,7 @@ package com.cameleer3.server.core.ingestion; -import com.cameleer3.server.core.storage.model.ExecutionChunk; -import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.cameleer3.common.model.ExecutionChunk; +import com.cameleer3.common.model.FlatProcessorRecord; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; @@ -46,27 +46,27 @@ public class ChunkAccumulator { */ public void onChunk(ExecutionChunk chunk) { // 1. Push processor records immediately (append-only) - if (chunk.processors() != null && !chunk.processors().isEmpty()) { + if (chunk.getProcessors() != null && !chunk.getProcessors().isEmpty()) { processorSink.accept(new ProcessorBatch( DEFAULT_TENANT, - chunk.exchangeId(), - chunk.routeId(), - chunk.applicationName(), - chunk.startTime(), - chunk.processors())); + chunk.getExchangeId(), + chunk.getRouteId(), + chunk.getApplicationName(), + chunk.getStartTime(), + chunk.getProcessors())); } // 2. Buffer/merge the exchange envelope if (chunk.isFinal()) { // Merge with any pending envelope, then emit - PendingExchange existing = pending.remove(chunk.exchangeId()); + PendingExchange existing = pending.remove(chunk.getExchangeId()); ExecutionChunk merged = existing != null ? mergeEnvelopes(existing.envelope(), chunk) : chunk; executionSink.accept(toMergedExecution(merged)); } else { // Buffer the envelope for later merging - pending.merge(chunk.exchangeId(), + pending.merge(chunk.getExchangeId(), new PendingExchange(chunk, Instant.now()), (old, incoming) -> new PendingExchange( mergeEnvelopes(old.envelope(), incoming.envelope()), @@ -104,32 +104,32 @@ public class ChunkAccumulator { * The newer chunk (higher chunkSeq) takes precedence for status, endTime, durationMs. */ private static ExecutionChunk mergeEnvelopes(ExecutionChunk older, ExecutionChunk newer) { - return new ExecutionChunk( - coalesce(newer.exchangeId(), older.exchangeId()), - coalesce(newer.applicationName(), older.applicationName()), - coalesce(newer.agentId(), older.agentId()), - coalesce(newer.routeId(), older.routeId()), - coalesce(newer.correlationId(), older.correlationId()), - coalesce(newer.status(), older.status()), - coalesce(older.startTime(), newer.startTime()), // prefer earliest startTime - coalesce(newer.endTime(), older.endTime()), - coalesce(newer.durationMs(), older.durationMs()), - coalesce(newer.engineLevel(), older.engineLevel()), - coalesce(newer.errorMessage(), older.errorMessage()), - coalesce(newer.errorStackTrace(), older.errorStackTrace()), - coalesce(newer.errorType(), older.errorType()), - coalesce(newer.errorCategory(), older.errorCategory()), - coalesce(newer.rootCauseType(), older.rootCauseType()), - coalesce(newer.rootCauseMessage(), older.rootCauseMessage()), - coalesce(newer.attributes(), older.attributes()), - coalesce(newer.traceId(), older.traceId()), - coalesce(newer.spanId(), older.spanId()), - coalesce(newer.originalExchangeId(), older.originalExchangeId()), - coalesce(newer.replayExchangeId(), older.replayExchangeId()), - Math.max(newer.chunkSeq(), older.chunkSeq()), - newer.isFinal() || older.isFinal(), - List.of() // processors are handled separately - ); + ExecutionChunk merged = new ExecutionChunk(); + merged.setExchangeId(coalesce(newer.getExchangeId(), older.getExchangeId())); + merged.setApplicationName(coalesce(newer.getApplicationName(), older.getApplicationName())); + merged.setAgentId(coalesce(newer.getAgentId(), older.getAgentId())); + merged.setRouteId(coalesce(newer.getRouteId(), older.getRouteId())); + merged.setCorrelationId(coalesce(newer.getCorrelationId(), older.getCorrelationId())); + merged.setStatus(coalesce(newer.getStatus(), older.getStatus())); + merged.setStartTime(coalesce(older.getStartTime(), newer.getStartTime())); // prefer earliest startTime + merged.setEndTime(coalesce(newer.getEndTime(), older.getEndTime())); + merged.setDurationMs(coalesce(newer.getDurationMs(), older.getDurationMs())); + merged.setEngineLevel(coalesce(newer.getEngineLevel(), older.getEngineLevel())); + merged.setErrorMessage(coalesce(newer.getErrorMessage(), older.getErrorMessage())); + merged.setErrorStackTrace(coalesce(newer.getErrorStackTrace(), older.getErrorStackTrace())); + merged.setErrorType(coalesce(newer.getErrorType(), older.getErrorType())); + merged.setErrorCategory(coalesce(newer.getErrorCategory(), older.getErrorCategory())); + merged.setRootCauseType(coalesce(newer.getRootCauseType(), older.getRootCauseType())); + merged.setRootCauseMessage(coalesce(newer.getRootCauseMessage(), older.getRootCauseMessage())); + merged.setAttributes(coalesce(newer.getAttributes(), older.getAttributes())); + merged.setTraceId(coalesce(newer.getTraceId(), older.getTraceId())); + merged.setSpanId(coalesce(newer.getSpanId(), older.getSpanId())); + merged.setOriginalExchangeId(coalesce(newer.getOriginalExchangeId(), older.getOriginalExchangeId())); + merged.setReplayExchangeId(coalesce(newer.getReplayExchangeId(), older.getReplayExchangeId())); + merged.setChunkSeq(Math.max(newer.getChunkSeq(), older.getChunkSeq())); + merged.setFinal(newer.isFinal() || older.isFinal()); + merged.setProcessors(List.of()); // processors are handled separately + return merged; } private static T coalesce(T a, T b) { @@ -142,33 +142,33 @@ public class ChunkAccumulator { return new MergedExecution( DEFAULT_TENANT, 1L, - envelope.exchangeId(), - envelope.routeId(), - envelope.agentId(), - envelope.applicationName(), - envelope.status(), - envelope.correlationId(), - envelope.exchangeId(), - envelope.startTime(), - envelope.endTime(), - envelope.durationMs(), - envelope.errorMessage(), - envelope.errorStackTrace(), - envelope.errorType(), - envelope.errorCategory(), - envelope.rootCauseType(), - envelope.rootCauseMessage(), + envelope.getExchangeId(), + envelope.getRouteId(), + envelope.getAgentId(), + envelope.getApplicationName(), + envelope.getStatus() != null ? envelope.getStatus().name() : "RUNNING", + envelope.getCorrelationId(), + envelope.getExchangeId(), + envelope.getStartTime(), + envelope.getEndTime(), + envelope.getDurationMs(), + envelope.getErrorMessage(), + envelope.getErrorStackTrace(), + envelope.getErrorType(), + envelope.getErrorCategory(), + envelope.getRootCauseType(), + envelope.getRootCauseMessage(), "", // diagramContentHash — server-side lookup, not in chunk - envelope.engineLevel(), + envelope.getEngineLevel(), "", // inputBody — on processor records now "", // outputBody "", // inputHeaders "", // outputHeaders - serializeAttributes(envelope.attributes()), - envelope.traceId(), - envelope.spanId(), + serializeAttributes(envelope.getAttributes()), + envelope.getTraceId(), + envelope.getSpanId(), false, // hasTraceData — not tracked at envelope level - envelope.replayExchangeId() != null // isReplay + envelope.getReplayExchangeId() != null // isReplay ); } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java deleted file mode 100644 index 20066f09..00000000 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.cameleer3.server.core.storage.model; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.time.Instant; -import java.util.List; -import java.util.Map; - -/** - * Chunk document: exchange envelope + list of FlatProcessorRecords. - * Mirrors cameleer3-common ExecutionChunk — replace with common lib import when available. - */ -@JsonIgnoreProperties(ignoreUnknown = true) -@JsonInclude(JsonInclude.Include.NON_NULL) -public record ExecutionChunk( - String exchangeId, - String applicationName, - String agentId, - String routeId, - String correlationId, - String status, - Instant startTime, - Instant endTime, - Long durationMs, - String engineLevel, - String errorMessage, - String errorStackTrace, - String errorType, - String errorCategory, - String rootCauseType, - String rootCauseMessage, - Map attributes, - String traceId, - String spanId, - String originalExchangeId, - String replayExchangeId, - int chunkSeq, - @JsonProperty("final") boolean isFinal, - List processors -) {} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java deleted file mode 100644 index deb89221..00000000 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.cameleer3.server.core.storage.model; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonInclude; - -import java.time.Instant; -import java.util.Map; - -/** - * Flat processor execution record with seq/parentSeq for tree reconstruction. - * Mirrors cameleer3-common FlatProcessorRecord — replace with common lib import when available. - */ -@JsonIgnoreProperties(ignoreUnknown = true) -@JsonInclude(JsonInclude.Include.NON_NULL) -public record FlatProcessorRecord( - int seq, - Integer parentSeq, - String parentProcessorId, - String processorId, - String processorType, - Integer iteration, - Integer iterationSize, - String status, - Instant startTime, - long durationMs, - String resolvedEndpointUri, - String inputBody, - String outputBody, - Map inputHeaders, - Map outputHeaders, - String errorMessage, - String errorStackTrace, - String errorType, - String errorCategory, - String rootCauseType, - String rootCauseMessage, - Map attributes, - String circuitBreakerState, - Boolean fallbackTriggered, - Boolean filterMatched, - Boolean duplicateMessage -) {} diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java index fd9cdca8..75771697 100644 --- a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java +++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java @@ -1,7 +1,8 @@ package com.cameleer3.server.core.ingestion; -import com.cameleer3.server.core.storage.model.ExecutionChunk; -import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.cameleer3.common.model.ExecutionChunk; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -29,17 +30,15 @@ class ChunkAccumulatorTest { @Test void singleFinalChunk_producesExecutionAndProcessors() { - ExecutionChunk chunk = new ExecutionChunk( - "ex-1", "order-service", "agent-1", "route-1", - "corr-1", "COMPLETED", + ExecutionChunk chunk = chunk("ex-1", "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:01Z"), 1000L, - "REGULAR", - null, null, null, null, null, null, - Map.of("orderId", "ORD-1"), - "trace-1", "span-1", null, null, 0, true, List.of(proc(1, null, "log1", "log", "COMPLETED", 5L))); + chunk.setCorrelationId("corr-1"); + chunk.setAttributes(Map.of("orderId", "ORD-1")); + chunk.setTraceId("trace-1"); + chunk.setSpanId("span-1"); accumulator.onChunk(chunk); @@ -69,17 +68,14 @@ class ChunkAccumulatorTest { @Test void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() { - ExecutionChunk chunk0 = new ExecutionChunk( - "ex-2", "app", "agent-1", "route-1", - "ex-2", "RUNNING", + ExecutionChunk chunk0 = chunk("ex-2", "RUNNING", Instant.parse("2026-03-31T10:00:00Z"), - null, null, "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, + null, null, 0, false, List.of( proc(1, null, "log1", "log", "COMPLETED", 5L), proc(2, null, "log2", "log", "COMPLETED", 3L))); + chunk0.setCorrelationId("ex-2"); accumulator.onChunk(chunk0); @@ -90,16 +86,12 @@ class ChunkAccumulatorTest { // No execution yet (not final) assertThat(executionSink).isEmpty(); - ExecutionChunk chunk1 = new ExecutionChunk( - "ex-2", "app", "agent-1", "route-1", - "ex-2", "COMPLETED", + ExecutionChunk chunk1 = chunk("ex-2", "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:02Z"), 2000L, - "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, 1, true, List.of(proc(3, null, "log3", "log", "COMPLETED", 7L))); + chunk1.setCorrelationId("ex-2"); accumulator.onChunk(chunk1); @@ -119,17 +111,14 @@ class ChunkAccumulatorTest { ChunkAccumulator staleAccumulator = new ChunkAccumulator( executionSink::add, processorSink::add, Duration.ofMillis(1)); - ExecutionChunk chunk = new ExecutionChunk( - "ex-3", "app", "agent-1", "route-1", - "ex-3", "RUNNING", + ExecutionChunk c = chunk("ex-3", "RUNNING", Instant.parse("2026-03-31T10:00:00Z"), - null, null, "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, + null, null, 0, false, List.of()); + c.setCorrelationId("ex-3"); - staleAccumulator.onChunk(chunk); + staleAccumulator.onChunk(c); assertThat(executionSink).isEmpty(); Thread.sleep(5); @@ -143,20 +132,20 @@ class ChunkAccumulatorTest { @Test void finalChunkWithErrors_populatesErrorFields() { - ExecutionChunk chunk = new ExecutionChunk( - "ex-4", "app", "agent-1", "route-1", - "ex-4", "FAILED", + ExecutionChunk c = chunk("ex-4", "FAILED", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:01Z"), 1000L, - "REGULAR", - "NullPointerException", "at com.foo.Bar.baz(Bar.java:42)", - "NullPointerException", "RUNTIME", - "NullPointerException", "null value at index 0", - null, null, null, null, null, 0, true, List.of()); + c.setCorrelationId("ex-4"); + c.setErrorMessage("NullPointerException"); + c.setErrorStackTrace("at com.foo.Bar.baz(Bar.java:42)"); + c.setErrorType("NullPointerException"); + c.setErrorCategory("RUNTIME"); + c.setRootCauseType("NullPointerException"); + c.setRootCauseMessage("null value at index 0"); - accumulator.onChunk(chunk); + accumulator.onChunk(c); assertThat(executionSink).hasSize(1); MergedExecution exec = executionSink.get(0); @@ -171,56 +160,66 @@ class ChunkAccumulatorTest { @Test void getPendingCount_tracksBufferedExchanges() { - ExecutionChunk running1 = new ExecutionChunk( - "ex-5", "app", "agent-1", "route-1", - "ex-5", "RUNNING", + ExecutionChunk running1 = chunk("ex-5", "RUNNING", Instant.parse("2026-03-31T10:00:00Z"), - null, null, "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, + null, null, 0, false, List.of()); + running1.setCorrelationId("ex-5"); - ExecutionChunk running2 = new ExecutionChunk( - "ex-6", "app", "agent-1", "route-2", - "ex-6", "RUNNING", + ExecutionChunk running2 = chunk("ex-6", "RUNNING", Instant.parse("2026-03-31T10:00:00Z"), - null, null, "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, + null, null, 0, false, List.of()); + running2.setCorrelationId("ex-6"); + running2.setRouteId("route-2"); accumulator.onChunk(running1); accumulator.onChunk(running2); assertThat(accumulator.getPendingCount()).isEqualTo(2); // Send final for ex-5 - ExecutionChunk final5 = new ExecutionChunk( - "ex-5", "app", "agent-1", "route-1", - "ex-5", "COMPLETED", + ExecutionChunk final5 = chunk("ex-5", "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:01Z"), 1000L, - "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, 1, true, List.of()); + final5.setCorrelationId("ex-5"); accumulator.onChunk(final5); assertThat(accumulator.getPendingCount()).isEqualTo(1); } + /** Helper to create an ExecutionChunk with common fields. */ + private static ExecutionChunk chunk(String exchangeId, String status, Instant start, Instant end, Long duration, + int chunkSeq, boolean isFinal, List processors) { + ExecutionChunk c = new ExecutionChunk(); + c.setExchangeId(exchangeId); + c.setApplicationName(exchangeId.equals("ex-1") ? "order-service" : "app"); + c.setAgentId("agent-1"); + c.setRouteId("route-1"); + c.setCorrelationId(null); + c.setStatus(ExecutionStatus.valueOf(status)); + c.setStartTime(start); + c.setEndTime(end); + c.setDurationMs(duration); + c.setEngineLevel("REGULAR"); + c.setChunkSeq(chunkSeq); + c.setFinal(isFinal); + c.setProcessors(processors); + return c; + } + /** Helper to create a FlatProcessorRecord with minimal fields. */ private static FlatProcessorRecord proc(int seq, Integer parentSeq, String processorId, String processorType, String status, long durationMs) { - return new FlatProcessorRecord( - seq, parentSeq, null, processorId, processorType, - null, null, status, - Instant.parse("2026-03-31T10:00:00.100Z"), durationMs, - null, null, null, null, null, - null, null, null, null, null, null, - null, null, null, null, null); + FlatProcessorRecord p = new FlatProcessorRecord(seq, processorId, processorType); + p.setParentSeq(parentSeq); + p.setStatus(ExecutionStatus.valueOf(status)); + p.setStartTime(Instant.parse("2026-03-31T10:00:00.100Z")); + p.setDurationMs(durationMs); + return p; } } diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java deleted file mode 100644 index de0a50cf..00000000 --- a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java +++ /dev/null @@ -1,109 +0,0 @@ -package com.cameleer3.server.core.storage.model; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.junit.jupiter.api.Test; - -import java.time.Instant; -import java.util.List; -import java.util.Map; - -import static org.assertj.core.api.Assertions.assertThat; - -class ExecutionChunkDeserializationTest { - - private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()); - - @Test - void roundTrip_fullChunk() throws Exception { - ExecutionChunk chunk = new ExecutionChunk( - "ex-1", "order-service", "pod-1", "order-route", - "corr-1", "COMPLETED", - Instant.parse("2026-03-31T10:00:00Z"), - Instant.parse("2026-03-31T10:00:01Z"), 1000L, - "REGULAR", - null, null, null, null, null, null, - Map.of("orderId", "ORD-1"), - "trace-1", "span-1", null, null, - 2, true, - List.of(new FlatProcessorRecord( - 1, null, null, "log1", "log", - null, null, "COMPLETED", - Instant.parse("2026-03-31T10:00:00.100Z"), 5L, - null, "body", null, null, null, - null, null, null, null, null, null, - null, null, null, null, null))); - - String json = MAPPER.writeValueAsString(chunk); - ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class); - - assertThat(deserialized.exchangeId()).isEqualTo("ex-1"); - assertThat(deserialized.isFinal()).isTrue(); - assertThat(deserialized.chunkSeq()).isEqualTo(2); - assertThat(deserialized.processors()).hasSize(1); - assertThat(deserialized.processors().get(0).seq()).isEqualTo(1); - assertThat(deserialized.processors().get(0).processorId()).isEqualTo("log1"); - assertThat(deserialized.attributes()).containsEntry("orderId", "ORD-1"); - } - - @Test - void roundTrip_runningChunkWithIterations() throws Exception { - ExecutionChunk chunk = new ExecutionChunk( - "ex-2", "app", "agent-1", "route-1", - "ex-2", "RUNNING", - Instant.parse("2026-03-31T10:00:00Z"), - null, null, "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, - 0, false, - List.of( - new FlatProcessorRecord( - 1, null, null, "split1", "split", - null, 3, "COMPLETED", - Instant.parse("2026-03-31T10:00:00Z"), 100L, - null, null, null, null, null, - null, null, null, null, null, null, - null, null, null, null, null), - new FlatProcessorRecord( - 2, 1, "split1", "log1", "log", - 0, null, "COMPLETED", - Instant.parse("2026-03-31T10:00:00Z"), 5L, - null, "item-0", null, null, null, - null, null, null, null, null, null, - null, null, null, null, null), - new FlatProcessorRecord( - 3, 1, "split1", "log1", "log", - 1, null, "COMPLETED", - Instant.parse("2026-03-31T10:00:00Z"), 5L, - null, "item-1", null, null, null, - null, null, null, null, null, null, - null, null, null, null, null))); - - String json = MAPPER.writeValueAsString(chunk); - ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class); - - assertThat(deserialized.isFinal()).isFalse(); - assertThat(deserialized.processors()).hasSize(3); - - FlatProcessorRecord split = deserialized.processors().get(0); - assertThat(split.iterationSize()).isEqualTo(3); - assertThat(split.parentSeq()).isNull(); - - FlatProcessorRecord child0 = deserialized.processors().get(1); - assertThat(child0.parentSeq()).isEqualTo(1); - assertThat(child0.parentProcessorId()).isEqualTo("split1"); - assertThat(child0.iteration()).isEqualTo(0); - } - - @Test - void deserialize_unknownFieldsIgnored() throws Exception { - String json = """ - {"exchangeId":"ex-1","routeId":"r1","status":"COMPLETED", - "startTime":"2026-03-31T10:00:00Z","chunkSeq":0,"final":true, - "futureField":"ignored","processors":[]} - """; - ExecutionChunk chunk = MAPPER.readValue(json, ExecutionChunk.class); - assertThat(chunk.exchangeId()).isEqualTo("ex-1"); - assertThat(chunk.isFinal()).isTrue(); - } -}