diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java index c0d3a479..3f38c4b6 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java @@ -1,7 +1,10 @@ package com.cameleer3.server.app.config; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -19,4 +22,16 @@ public class IngestionBeanConfig { public WriteBuffer metricsBuffer(IngestionConfig config) { return new WriteBuffer<>(config.getBufferCapacity()); } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public WriteBuffer executionBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); + } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public WriteBuffer processorBatchBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 71b5bf7d..ab733408 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -8,7 +8,12 @@ import com.cameleer3.server.core.admin.AuditRepository; import com.cameleer3.server.core.admin.AuditService; import com.cameleer3.server.core.detail.DetailService; import com.cameleer3.server.core.indexing.SearchIndexer; +import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler; +import com.cameleer3.server.app.search.ClickHouseSearchIndex; +import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.IngestionService; +import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.storage.*; import com.cameleer3.server.core.storage.model.MetricsSnapshot; @@ -74,4 +79,43 @@ public class StorageBeanConfig { public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) { return new PostgresMetricsQueryStore(jdbc); } + + // ── ClickHouse Execution Store ────────────────────────────────────── + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public ClickHouseExecutionStore clickHouseExecutionStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseExecutionStore(clickHouseJdbc); + } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public ChunkAccumulator chunkAccumulator( + WriteBuffer executionBuffer, + WriteBuffer processorBatchBuffer) { + return new ChunkAccumulator( + executionBuffer::offer, + processorBatchBuffer::offer, + java.time.Duration.ofMinutes(5)); + } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public ExecutionFlushScheduler executionFlushScheduler( + WriteBuffer executionBuffer, + WriteBuffer processorBatchBuffer, + ClickHouseExecutionStore executionStore, + ChunkAccumulator accumulator, + IngestionConfig config) { + return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer, + executionStore, accumulator, config); + } + + @Bean + @ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse") + public SearchIndex clickHouseSearchIndex( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseSearchIndex(clickHouseJdbc); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java new file mode 100644 index 00000000..24cace8a --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java @@ -0,0 +1,68 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.common.model.ExecutionChunk; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * Ingestion endpoint for execution chunk data (ClickHouse pipeline). + *

+ * Accepts single or array {@link ExecutionChunk} payloads and feeds them + * into the {@link ChunkAccumulator}. Only active when + * {@code clickhouse.enabled=true} (conditional on the accumulator bean). + */ +@RestController +@RequestMapping("/api/v1/data") +@ConditionalOnBean(ChunkAccumulator.class) +@Tag(name = "Ingestion", description = "Data ingestion endpoints") +public class ChunkIngestionController { + + private static final Logger log = LoggerFactory.getLogger(ChunkIngestionController.class); + + private final ChunkAccumulator accumulator; + private final ObjectMapper objectMapper; + + public ChunkIngestionController(ChunkAccumulator accumulator) { + this.accumulator = accumulator; + this.objectMapper = new ObjectMapper(); + this.objectMapper.registerModule(new JavaTimeModule()); + } + + @PostMapping("/chunks") + @Operation(summary = "Ingest execution chunk") + public ResponseEntity ingestChunks(@RequestBody String body) { + try { + String trimmed = body.strip(); + List chunks; + if (trimmed.startsWith("[")) { + chunks = objectMapper.readValue(trimmed, new TypeReference<>() {}); + } else { + ExecutionChunk single = objectMapper.readValue(trimmed, ExecutionChunk.class); + chunks = List.of(single); + } + + for (ExecutionChunk chunk : chunks) { + accumulator.onChunk(chunk); + } + + return ResponseEntity.accepted().build(); + } catch (Exception e) { + log.warn("Failed to parse execution chunk payload: {}", e.getMessage()); + return ResponseEntity.badRequest().build(); + } + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java new file mode 100644 index 00000000..31ce0fe4 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java @@ -0,0 +1,136 @@ +package com.cameleer3.server.app.ingestion; + +import com.cameleer3.server.app.config.IngestionConfig; +import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.ingestion.WriteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; +import org.springframework.scheduling.annotation.Scheduled; + +import java.util.List; + +/** + * Scheduled flush task for ClickHouse execution and processor write buffers. + *

+ * Drains both buffers on a fixed interval and delegates batch inserts to + * {@link ClickHouseExecutionStore}. Also periodically sweeps stale exchanges + * from the {@link ChunkAccumulator}. + *

+ * Not a {@code @Component} — instantiated as a {@code @Bean} in StorageBeanConfig. + */ +public class ExecutionFlushScheduler implements SmartLifecycle { + + private static final Logger log = LoggerFactory.getLogger(ExecutionFlushScheduler.class); + + private final WriteBuffer executionBuffer; + private final WriteBuffer processorBuffer; + private final ClickHouseExecutionStore executionStore; + private final ChunkAccumulator accumulator; + private final int batchSize; + private volatile boolean running = false; + + public ExecutionFlushScheduler(WriteBuffer executionBuffer, + WriteBuffer processorBuffer, + ClickHouseExecutionStore executionStore, + ChunkAccumulator accumulator, + IngestionConfig config) { + this.executionBuffer = executionBuffer; + this.processorBuffer = processorBuffer; + this.executionStore = executionStore; + this.accumulator = accumulator; + this.batchSize = config.getBatchSize(); + } + + @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}") + public void flush() { + try { + List executions = executionBuffer.drain(batchSize); + if (!executions.isEmpty()) { + executionStore.insertExecutionBatch(executions); + log.debug("Flushed {} executions to ClickHouse", executions.size()); + } + } catch (Exception e) { + log.error("Failed to flush executions", e); + } + + try { + List batches = processorBuffer.drain(batchSize); + for (ChunkAccumulator.ProcessorBatch batch : batches) { + executionStore.insertProcessorBatch( + batch.tenantId(), + batch.executionId(), + batch.routeId(), + batch.applicationName(), + batch.execStartTime(), + batch.processors()); + } + if (!batches.isEmpty()) { + log.debug("Flushed {} processor batches to ClickHouse", batches.size()); + } + } catch (Exception e) { + log.error("Failed to flush processor batches", e); + } + } + + @Scheduled(fixedDelay = 60_000) + public void sweepStale() { + try { + accumulator.sweepStale(); + } catch (Exception e) { + log.error("Failed to sweep stale exchanges", e); + } + } + + @Override + public void start() { + running = true; + } + + @Override + public void stop() { + // Drain remaining executions on shutdown + while (executionBuffer.size() > 0) { + List batch = executionBuffer.drain(batchSize); + if (batch.isEmpty()) break; + try { + executionStore.insertExecutionBatch(batch); + } catch (Exception e) { + log.error("Failed to flush executions during shutdown", e); + break; + } + } + // Drain remaining processor batches on shutdown + while (processorBuffer.size() > 0) { + List batches = processorBuffer.drain(batchSize); + if (batches.isEmpty()) break; + try { + for (ChunkAccumulator.ProcessorBatch batch : batches) { + executionStore.insertProcessorBatch( + batch.tenantId(), + batch.executionId(), + batch.routeId(), + batch.applicationName(), + batch.execStartTime(), + batch.processors()); + } + } catch (Exception e) { + log.error("Failed to flush processor batches during shutdown", e); + break; + } + } + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + public int getPhase() { + return Integer.MAX_VALUE - 1; + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java new file mode 100644 index 00000000..6dbdd3e6 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java @@ -0,0 +1,304 @@ +package com.cameleer3.server.app.search; + +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.SearchIndex; +import com.cameleer3.server.core.storage.model.ExecutionDocument; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.*; + +/** + * ClickHouse-backed implementation of {@link SearchIndex}. + *

+ * Queries the {@code executions} and {@code processor_executions} tables directly + * using SQL with ngram bloom-filter indexes for full-text search acceleration. + *

+ * The {@link #index} and {@link #delete} methods are no-ops because data is + * written by the accumulator/store pipeline, not the search index. + */ +public class ClickHouseSearchIndex implements SearchIndex { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseSearchIndex.class); + private static final ObjectMapper JSON = new ObjectMapper(); + private static final TypeReference> STR_MAP = new TypeReference<>() {}; + private static final int HIGHLIGHT_CONTEXT_CHARS = 120; + + private static final Map SORT_FIELD_MAP = Map.of( + "startTime", "start_time", + "durationMs", "duration_ms", + "status", "status", + "agentId", "agent_id", + "routeId", "route_id", + "correlationId", "correlation_id", + "executionId", "execution_id", + "applicationName", "application_name" + ); + + private final JdbcTemplate jdbc; + + public ClickHouseSearchIndex(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void index(ExecutionDocument document) { + // No-op: data is written by ClickHouseExecutionStore + } + + @Override + public void delete(String executionId) { + // No-op: ClickHouse ReplacingMergeTree handles versioning + } + + @Override + public SearchResult search(SearchRequest request) { + try { + List params = new ArrayList<>(); + String whereClause = buildWhereClause(request, params); + String searchTerm = request.text(); + + // Count query + String countSql = "SELECT count() FROM executions FINAL WHERE " + whereClause; + Long total = jdbc.queryForObject(countSql, Long.class, params.toArray()); + if (total == null || total == 0) { + return SearchResult.empty(request.offset(), request.limit()); + } + + // Data query + String sortColumn = SORT_FIELD_MAP.getOrDefault(request.sortField(), "start_time"); + String sortDir = "asc".equalsIgnoreCase(request.sortDir()) ? "ASC" : "DESC"; + + String dataSql = "SELECT execution_id, route_id, agent_id, application_name, " + + "status, start_time, end_time, duration_ms, correlation_id, " + + "error_message, error_stacktrace, diagram_content_hash, attributes, " + + "has_trace_data, is_replay, " + + "input_body, output_body, input_headers, output_headers, root_cause_message " + + "FROM executions FINAL WHERE " + whereClause + + " ORDER BY " + sortColumn + " " + sortDir + + " LIMIT ? OFFSET ?"; + + List dataParams = new ArrayList<>(params); + dataParams.add(request.limit()); + dataParams.add(request.offset()); + + List data = jdbc.query( + dataSql, dataParams.toArray(), + (rs, rowNum) -> mapRow(rs, searchTerm)); + + return new SearchResult<>(data, total, request.offset(), request.limit()); + } catch (Exception e) { + log.error("ClickHouse search failed", e); + return SearchResult.empty(request.offset(), request.limit()); + } + } + + @Override + public long count(SearchRequest request) { + try { + List params = new ArrayList<>(); + String whereClause = buildWhereClause(request, params); + String sql = "SELECT count() FROM executions FINAL WHERE " + whereClause; + Long result = jdbc.queryForObject(sql, Long.class, params.toArray()); + return result != null ? result : 0L; + } catch (Exception e) { + log.error("ClickHouse count failed", e); + return 0L; + } + } + + private String buildWhereClause(SearchRequest request, List params) { + List conditions = new ArrayList<>(); + conditions.add("tenant_id = 'default'"); + + if (request.timeFrom() != null) { + conditions.add("start_time >= ?"); + params.add(Timestamp.from(request.timeFrom())); + } + if (request.timeTo() != null) { + conditions.add("start_time <= ?"); + params.add(Timestamp.from(request.timeTo())); + } + + if (request.status() != null && !request.status().isBlank()) { + String[] statuses = request.status().split(","); + if (statuses.length == 1) { + conditions.add("status = ?"); + params.add(statuses[0].trim()); + } else { + String placeholders = String.join(", ", Collections.nCopies(statuses.length, "?")); + conditions.add("status IN (" + placeholders + ")"); + for (String s : statuses) { + params.add(s.trim()); + } + } + } + + if (request.routeId() != null) { + conditions.add("route_id = ?"); + params.add(request.routeId()); + } + + if (request.agentId() != null) { + conditions.add("agent_id = ?"); + params.add(request.agentId()); + } + + if (request.correlationId() != null) { + conditions.add("correlation_id = ?"); + params.add(request.correlationId()); + } + + if (request.application() != null && !request.application().isBlank()) { + conditions.add("application_name = ?"); + params.add(request.application()); + } + + if (request.agentIds() != null && !request.agentIds().isEmpty()) { + String placeholders = String.join(", ", Collections.nCopies(request.agentIds().size(), "?")); + conditions.add("agent_id IN (" + placeholders + ")"); + params.addAll(request.agentIds()); + } + + if (request.durationMin() != null) { + conditions.add("duration_ms >= ?"); + params.add(request.durationMin()); + } + + if (request.durationMax() != null) { + conditions.add("duration_ms <= ?"); + params.add(request.durationMax()); + } + + // Global full-text search: execution-level _search_text OR processor-level _search_text + if (request.text() != null && !request.text().isBlank()) { + String likeTerm = "%" + escapeLike(request.text()) + "%"; + conditions.add("(_search_text LIKE ? OR execution_id IN (" + + "SELECT DISTINCT execution_id FROM processor_executions " + + "WHERE tenant_id = 'default' AND _search_text LIKE ?))"); + params.add(likeTerm); + params.add(likeTerm); + } + + // Scoped body search in processor_executions + if (request.textInBody() != null && !request.textInBody().isBlank()) { + String likeTerm = "%" + escapeLike(request.textInBody()) + "%"; + conditions.add("execution_id IN (" + + "SELECT DISTINCT execution_id FROM processor_executions " + + "WHERE tenant_id = 'default' AND (input_body LIKE ? OR output_body LIKE ?))"); + params.add(likeTerm); + params.add(likeTerm); + } + + // Scoped headers search in processor_executions + if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) { + String likeTerm = "%" + escapeLike(request.textInHeaders()) + "%"; + conditions.add("execution_id IN (" + + "SELECT DISTINCT execution_id FROM processor_executions " + + "WHERE tenant_id = 'default' AND (input_headers LIKE ? OR output_headers LIKE ?))"); + params.add(likeTerm); + params.add(likeTerm); + } + + // Scoped error search: execution-level + processor-level + if (request.textInErrors() != null && !request.textInErrors().isBlank()) { + String likeTerm = "%" + escapeLike(request.textInErrors()) + "%"; + conditions.add("(error_message LIKE ? OR error_stacktrace LIKE ? OR execution_id IN (" + + "SELECT DISTINCT execution_id FROM processor_executions " + + "WHERE tenant_id = 'default' AND (error_message LIKE ? OR error_stacktrace LIKE ?)))"); + params.add(likeTerm); + params.add(likeTerm); + params.add(likeTerm); + params.add(likeTerm); + } + + return String.join(" AND ", conditions); + } + + private ExecutionSummary mapRow(ResultSet rs, String searchTerm) throws SQLException { + String executionId = rs.getString("execution_id"); + String routeId = rs.getString("route_id"); + String agentId = rs.getString("agent_id"); + String applicationName = rs.getString("application_name"); + String status = rs.getString("status"); + + Timestamp startTs = rs.getTimestamp("start_time"); + Instant startTime = startTs != null ? startTs.toInstant() : null; + + Timestamp endTs = rs.getTimestamp("end_time"); + Instant endTime = endTs != null ? endTs.toInstant() : null; + + long durationMs = rs.getLong("duration_ms"); + String correlationId = rs.getString("correlation_id"); + String errorMessage = rs.getString("error_message"); + String errorStacktrace = rs.getString("error_stacktrace"); + String diagramContentHash = rs.getString("diagram_content_hash"); + String attributesJson = rs.getString("attributes"); + boolean hasTraceData = rs.getBoolean("has_trace_data"); + boolean isReplay = rs.getBoolean("is_replay"); + String inputBody = rs.getString("input_body"); + String outputBody = rs.getString("output_body"); + String inputHeaders = rs.getString("input_headers"); + String outputHeaders = rs.getString("output_headers"); + String rootCauseMessage = rs.getString("root_cause_message"); + + Map attributes = parseAttributesJson(attributesJson); + + // Application-side highlighting + String highlight = null; + if (searchTerm != null && !searchTerm.isBlank()) { + highlight = findHighlight(searchTerm, errorMessage, errorStacktrace, + inputBody, outputBody, inputHeaders, outputHeaders, attributesJson, rootCauseMessage); + } + + return new ExecutionSummary( + executionId, routeId, agentId, applicationName, status, + startTime, endTime, durationMs, + correlationId, errorMessage, diagramContentHash, + highlight, attributes, hasTraceData, isReplay + ); + } + + private String findHighlight(String searchTerm, String... fields) { + for (String field : fields) { + String snippet = extractSnippet(field, searchTerm, HIGHLIGHT_CONTEXT_CHARS); + if (snippet != null) { + return snippet; + } + } + return null; + } + + static String extractSnippet(String text, String searchTerm, int contextChars) { + if (text == null || text.isEmpty() || searchTerm == null) return null; + int idx = text.toLowerCase().indexOf(searchTerm.toLowerCase()); + if (idx < 0) return null; + int start = Math.max(0, idx - contextChars / 2); + int end = Math.min(text.length(), idx + searchTerm.length() + contextChars / 2); + return (start > 0 ? "..." : "") + text.substring(start, end) + (end < text.length() ? "..." : ""); + } + + private static String escapeLike(String term) { + return term.replace("\\", "\\\\") + .replace("%", "\\%") + .replace("_", "\\_"); + } + + private static Map parseAttributesJson(String json) { + if (json == null || json.isBlank()) return null; + try { + return JSON.readValue(json, STR_MAP); + } catch (Exception e) { + return null; + } + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java index 2421d7da..d63f90f9 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java @@ -20,6 +20,7 @@ import org.opensearch.client.opensearch.indices.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Repository; import java.io.IOException; @@ -30,6 +31,7 @@ import java.util.*; import java.util.stream.Collectors; @Repository +@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "opensearch", matchIfMissing = true) public class OpenSearchIndex implements SearchIndex { private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java new file mode 100644 index 00000000..dc64b148 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java @@ -0,0 +1,151 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.common.model.FlatProcessorRecord; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +public class ClickHouseExecutionStore { + + private final JdbcTemplate jdbc; + private final ObjectMapper objectMapper; + + public ClickHouseExecutionStore(JdbcTemplate jdbc) { + this(jdbc, new ObjectMapper()); + } + + public ClickHouseExecutionStore(JdbcTemplate jdbc, ObjectMapper objectMapper) { + this.jdbc = jdbc; + this.objectMapper = objectMapper; + } + + public void insertExecutionBatch(List executions) { + if (executions.isEmpty()) return; + + jdbc.batchUpdate(""" + INSERT INTO executions ( + tenant_id, _version, execution_id, route_id, agent_id, application_name, + status, correlation_id, exchange_id, start_time, end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, diagram_content_hash, engine_level, + input_body, output_body, input_headers, output_headers, attributes, + trace_id, span_id, has_trace_data, is_replay + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + executions.stream().map(e -> new Object[]{ + nullToEmpty(e.tenantId()), + e.version(), + nullToEmpty(e.executionId()), + nullToEmpty(e.routeId()), + nullToEmpty(e.agentId()), + nullToEmpty(e.applicationName()), + nullToEmpty(e.status()), + nullToEmpty(e.correlationId()), + nullToEmpty(e.exchangeId()), + Timestamp.from(e.startTime()), + e.endTime() != null ? Timestamp.from(e.endTime()) : null, + e.durationMs(), + nullToEmpty(e.errorMessage()), + nullToEmpty(e.errorStacktrace()), + nullToEmpty(e.errorType()), + nullToEmpty(e.errorCategory()), + nullToEmpty(e.rootCauseType()), + nullToEmpty(e.rootCauseMessage()), + nullToEmpty(e.diagramContentHash()), + nullToEmpty(e.engineLevel()), + nullToEmpty(e.inputBody()), + nullToEmpty(e.outputBody()), + nullToEmpty(e.inputHeaders()), + nullToEmpty(e.outputHeaders()), + nullToEmpty(e.attributes()), + nullToEmpty(e.traceId()), + nullToEmpty(e.spanId()), + e.hasTraceData(), + e.isReplay() + }).toList()); + } + + public void insertProcessorBatch(String tenantId, String executionId, String routeId, + String applicationName, Instant execStartTime, + List processors) { + if (processors.isEmpty()) return; + + jdbc.batchUpdate(""" + INSERT INTO processor_executions ( + tenant_id, execution_id, seq, parent_seq, parent_processor_id, + processor_id, processor_type, start_time, route_id, application_name, + iteration, iteration_size, status, end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, + input_body, output_body, input_headers, output_headers, attributes, + resolved_endpoint_uri, circuit_breaker_state, + fallback_triggered, filter_matched, duplicate_message + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + processors.stream().map(p -> new Object[]{ + nullToEmpty(tenantId), + nullToEmpty(executionId), + p.getSeq(), + p.getParentSeq(), + nullToEmpty(p.getParentProcessorId()), + nullToEmpty(p.getProcessorId()), + nullToEmpty(p.getProcessorType()), + Timestamp.from(p.getStartTime() != null ? p.getStartTime() : execStartTime), + nullToEmpty(routeId), + nullToEmpty(applicationName), + p.getIteration(), + p.getIterationSize(), + p.getStatus() != null ? p.getStatus().name() : "", + computeEndTime(p.getStartTime(), p.getDurationMs()), + p.getDurationMs(), + nullToEmpty(p.getErrorMessage()), + nullToEmpty(p.getErrorStackTrace()), + nullToEmpty(p.getErrorType()), + nullToEmpty(p.getErrorCategory()), + nullToEmpty(p.getRootCauseType()), + nullToEmpty(p.getRootCauseMessage()), + nullToEmpty(p.getInputBody()), + nullToEmpty(p.getOutputBody()), + mapToJson(p.getInputHeaders()), + mapToJson(p.getOutputHeaders()), + mapToJson(p.getAttributes()), + nullToEmpty(p.getResolvedEndpointUri()), + nullToEmpty(p.getCircuitBreakerState()), + boolOrFalse(p.getFallbackTriggered()), + boolOrFalse(p.getFilterMatched()), + boolOrFalse(p.getDuplicateMessage()) + }).toList()); + } + + private static String nullToEmpty(String value) { + return value != null ? value : ""; + } + + private static boolean boolOrFalse(Boolean value) { + return value != null && value; + } + + private static Timestamp computeEndTime(Instant startTime, long durationMs) { + if (startTime != null && durationMs > 0) { + return Timestamp.from(startTime.plusMillis(durationMs)); + } + return null; + } + + private String mapToJson(Map map) { + if (map == null || map.isEmpty()) return ""; + try { + return objectMapper.writeValueAsString(map); + } catch (JsonProcessingException e) { + return ""; + } + } +} diff --git a/cameleer3-server-app/src/main/resources/application.yml b/cameleer3-server-app/src/main/resources/application.yml index abab463e..55cd27e1 100644 --- a/cameleer3-server-app/src/main/resources/application.yml +++ b/cameleer3-server-app/src/main/resources/application.yml @@ -50,6 +50,7 @@ cameleer: retention-days: ${CAMELEER_RETENTION_DAYS:30} storage: metrics: ${CAMELEER_STORAGE_METRICS:postgres} + search: ${CAMELEER_STORAGE_SEARCH:opensearch} security: access-token-expiry-ms: 3600000 diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql b/cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql new file mode 100644 index 00000000..e9b31f9e --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql @@ -0,0 +1,48 @@ +CREATE TABLE IF NOT EXISTS executions ( + tenant_id LowCardinality(String) DEFAULT 'default', + execution_id String, + start_time DateTime64(3), + _version UInt64 DEFAULT 1, + route_id LowCardinality(String), + agent_id LowCardinality(String), + application_name LowCardinality(String), + status LowCardinality(String), + correlation_id String DEFAULT '', + exchange_id String DEFAULT '', + end_time Nullable(DateTime64(3)), + duration_ms Nullable(Int64), + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + error_type LowCardinality(String) DEFAULT '', + error_category LowCardinality(String) DEFAULT '', + root_cause_type String DEFAULT '', + root_cause_message String DEFAULT '', + diagram_content_hash String DEFAULT '', + engine_level LowCardinality(String) DEFAULT '', + input_body String DEFAULT '', + output_body String DEFAULT '', + input_headers String DEFAULT '', + output_headers String DEFAULT '', + attributes String DEFAULT '', + trace_id String DEFAULT '', + span_id String DEFAULT '', + has_trace_data Bool DEFAULT false, + is_replay Bool DEFAULT false, + + _search_text String MATERIALIZED + concat(error_message, ' ', error_stacktrace, ' ', attributes, + ' ', input_body, ' ', output_body, ' ', input_headers, + ' ', output_headers, ' ', root_cause_message), + + INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_status status TYPE set(10) GRANULARITY 1, + INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4 +) +ENGINE = ReplacingMergeTree(_version) +PARTITION BY (tenant_id, toYYYYMM(start_time)) +ORDER BY (tenant_id, start_time, application_name, route_id, execution_id) +TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql b/cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql new file mode 100644 index 00000000..2f4ea1ec --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql @@ -0,0 +1,45 @@ +CREATE TABLE IF NOT EXISTS processor_executions ( + tenant_id LowCardinality(String) DEFAULT 'default', + execution_id String, + seq UInt32, + parent_seq Nullable(UInt32), + parent_processor_id String DEFAULT '', + processor_id String, + processor_type LowCardinality(String), + start_time DateTime64(3), + route_id LowCardinality(String), + application_name LowCardinality(String), + iteration Nullable(Int32), + iteration_size Nullable(Int32), + status LowCardinality(String), + end_time Nullable(DateTime64(3)), + duration_ms Nullable(Int64), + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + error_type LowCardinality(String) DEFAULT '', + error_category LowCardinality(String) DEFAULT '', + root_cause_type String DEFAULT '', + root_cause_message String DEFAULT '', + input_body String DEFAULT '', + output_body String DEFAULT '', + input_headers String DEFAULT '', + output_headers String DEFAULT '', + attributes String DEFAULT '', + resolved_endpoint_uri String DEFAULT '', + circuit_breaker_state LowCardinality(String) DEFAULT '', + fallback_triggered Bool DEFAULT false, + filter_matched Bool DEFAULT false, + duplicate_message Bool DEFAULT false, + + _search_text String MATERIALIZED + concat(error_message, ' ', error_stacktrace, ' ', attributes, + ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers), + + INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4 +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(start_time)) +ORDER BY (tenant_id, start_time, application_name, route_id, execution_id, seq) +TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java new file mode 100644 index 00000000..4cb2de53 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java @@ -0,0 +1,316 @@ +package com.cameleer3.server.app.search; + +import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseSearchIndexIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseSearchIndex searchIndex; + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + // Load DDL from classpath resources + String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + + jdbc.execute(executionsDdl); + jdbc.execute(processorsDdl); + + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + ClickHouseExecutionStore store = new ClickHouseExecutionStore(jdbc); + searchIndex = new ClickHouseSearchIndex(jdbc); + + // Seed test data + Instant baseTime = Instant.parse("2026-03-31T10:00:00Z"); + + // exec-1: COMPLETED, route-timer, agent-a, my-app, corr-1, 500ms, input_body with order number, attributes + MergedExecution exec1 = new MergedExecution( + "default", 1L, "exec-1", "route-timer", "agent-a", "my-app", + "COMPLETED", "corr-1", "exchange-1", + baseTime, + baseTime.plusMillis(500), + 500L, + "", "", "", "", "", "", + "hash-abc", "FULL", + "{\"order\":\"12345\"}", "", "", "", "{\"env\":\"prod\"}", + "", "", + false, false + ); + + // exec-2: FAILED, route-timer, agent-a, my-app, corr-2, 200ms, with error + MergedExecution exec2 = new MergedExecution( + "default", 1L, "exec-2", "route-timer", "agent-a", "my-app", + "FAILED", "corr-2", "exchange-2", + baseTime.plusSeconds(1), + baseTime.plusSeconds(1).plusMillis(200), + 200L, + "NullPointerException at line 42", + "java.lang.NPE\n at Foo.bar(Foo.java:42)", + "NullPointerException", "RUNTIME", "", "", + "", "FULL", + "", "", "", "", "", + "", "", + false, false + ); + + // exec-3: COMPLETED, route-rest, agent-b, other-app, 100ms, no error + MergedExecution exec3 = new MergedExecution( + "default", 1L, "exec-3", "route-rest", "agent-b", "other-app", + "COMPLETED", "", "exchange-3", + baseTime.plusSeconds(2), + baseTime.plusSeconds(2).plusMillis(100), + 100L, + "", "", "", "", "", "", + "", "FULL", + "", "", "", "", "", + "", "", + false, false + ); + + store.insertExecutionBatch(List.of(exec1, exec2, exec3)); + + // Processor for exec-1: seq=1, to, inputBody with "Hello World", inputHeaders with secret-token + FlatProcessorRecord proc1 = new FlatProcessorRecord(1, "proc-1", "to"); + proc1.setStatus(ExecutionStatus.COMPLETED); + proc1.setStartTime(baseTime); + proc1.setDurationMs(50L); + proc1.setInputBody("Hello World request body"); + proc1.setOutputBody(""); + proc1.setInputHeaders(Map.of("Authorization", "Bearer secret-token")); + + store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app", baseTime, List.of(proc1)); + } + + @Test + void search_withNoFilters_returnsAllExecutions() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(3); + assertThat(result.data()).hasSize(3); + } + + @Test + void search_byStatus_filtersCorrectly() { + SearchRequest request = new SearchRequest( + "FAILED", null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data()).hasSize(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-2"); + } + + @Test + void search_byTimeRange_filtersCorrectly() { + Instant baseTime = Instant.parse("2026-03-31T10:00:00Z"); + // Time window covering exec-1 and exec-2 but not exec-3 + SearchRequest request = new SearchRequest( + null, baseTime, baseTime.plusMillis(1500), null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(2); + assertThat(result.data()).extracting(ExecutionSummary::executionId) + .containsExactlyInAnyOrder("exec-1", "exec-2"); + } + + @Test + void search_fullTextSearch_findsInErrorMessage() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, "NullPointerException", null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-2"); + } + + @Test + void search_fullTextSearch_findsInInputBody() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, "12345", null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } + + @Test + void search_textInBody_searchesProcessorBodies() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, "Hello World", null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } + + @Test + void search_textInHeaders_searchesProcessorHeaders() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, "secret-token", null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } + + @Test + void search_textInErrors_searchesErrorFields() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, "Foo.bar", + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-2"); + } + + @Test + void search_withHighlight_returnsSnippet() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, "NullPointerException", null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).highlight()).contains("NullPointerException"); + } + + @Test + void search_pagination_works() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 2, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(3); + assertThat(result.data()).hasSize(2); + assertThat(result.offset()).isEqualTo(0); + assertThat(result.limit()).isEqualTo(2); + } + + @Test + void search_byApplication_filtersCorrectly() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, null, + null, null, null, "other-app", null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-3"); + } + + @Test + void search_byAgentIds_filtersCorrectly() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, null, + null, null, null, null, List.of("agent-b"), 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-3"); + } + + @Test + void count_returnsMatchingCount() { + SearchRequest request = new SearchRequest( + "COMPLETED", null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + long count = searchIndex.count(request); + + assertThat(count).isEqualTo(2); + } + + @Test + void search_multipleStatusFilter_works() { + SearchRequest request = new SearchRequest( + "COMPLETED,FAILED", null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(3); + } + + @Test + void search_byCorrelationId_filtersCorrectly() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, "corr-1", null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } + + @Test + void search_byDurationRange_filtersCorrectly() { + SearchRequest request = new SearchRequest( + null, null, null, 300L, 600L, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java new file mode 100644 index 00000000..2227ad7f --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java @@ -0,0 +1,193 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.app.search.ClickHouseSearchIndex; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.common.model.ExecutionChunk; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseChunkPipelineIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseExecutionStore executionStore; + private ClickHouseSearchIndex searchIndex; + private ChunkAccumulator accumulator; + private List executionBuffer; + private List processorBuffer; + + @BeforeEach + void setUp() throws IOException { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + jdbc = new JdbcTemplate(ds); + + String execDdl = new String(getClass().getResourceAsStream( + "/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8); + String procDdl = new String(getClass().getResourceAsStream( + "/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8); + jdbc.execute(execDdl); + jdbc.execute(procDdl); + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + executionStore = new ClickHouseExecutionStore(jdbc); + searchIndex = new ClickHouseSearchIndex(jdbc); + + executionBuffer = new ArrayList<>(); + processorBuffer = new ArrayList<>(); + accumulator = new ChunkAccumulator(executionBuffer::add, processorBuffer::add, Duration.ofMinutes(5)); + } + + @Test + void fullPipeline_chunkedIngestion_thenSearch() { + Instant start = Instant.parse("2026-03-31T12:00:00Z"); + + // Chunk 0: RUNNING with initial processors + ExecutionChunk chunk0 = new ExecutionChunk(); + chunk0.setExchangeId("pipeline-1"); + chunk0.setApplicationName("order-service"); + chunk0.setAgentId("pod-1"); + chunk0.setRouteId("order-route"); + chunk0.setCorrelationId("corr-1"); + chunk0.setStatus(ExecutionStatus.RUNNING); + chunk0.setStartTime(start); + chunk0.setEngineLevel("DEEP"); + chunk0.setAttributes(Map.of("orderId", "ORD-123")); + chunk0.setChunkSeq(0); + chunk0.setFinal(false); + + FlatProcessorRecord p1 = new FlatProcessorRecord(1, "log1", "log"); + p1.setStatus(ExecutionStatus.COMPLETED); + p1.setStartTime(start); + p1.setDurationMs(2L); + + FlatProcessorRecord p2 = new FlatProcessorRecord(2, "split1", "split"); + p2.setIterationSize(3); + p2.setStatus(ExecutionStatus.COMPLETED); + p2.setStartTime(start.plusMillis(2)); + p2.setDurationMs(100L); + + FlatProcessorRecord p3 = new FlatProcessorRecord(3, "to1", "to"); + p3.setParentSeq(2); + p3.setParentProcessorId("split1"); + p3.setIteration(0); + p3.setStatus(ExecutionStatus.COMPLETED); + p3.setStartTime(start.plusMillis(5)); + p3.setDurationMs(30L); + p3.setResolvedEndpointUri("http://inventory/api"); + p3.setInputBody("order ABC-123 check stock"); + p3.setOutputBody("stock available"); + + chunk0.setProcessors(List.of(p1, p2, p3)); + accumulator.onChunk(chunk0); + + // Processors should be buffered immediately + assertThat(processorBuffer).hasSize(1); + assertThat(executionBuffer).isEmpty(); + + // Chunk 1: COMPLETED (final) + ExecutionChunk chunk1 = new ExecutionChunk(); + chunk1.setExchangeId("pipeline-1"); + chunk1.setApplicationName("order-service"); + chunk1.setAgentId("pod-1"); + chunk1.setRouteId("order-route"); + chunk1.setCorrelationId("corr-1"); + chunk1.setStatus(ExecutionStatus.COMPLETED); + chunk1.setStartTime(start); + chunk1.setEndTime(start.plusMillis(750)); + chunk1.setDurationMs(750L); + chunk1.setEngineLevel("DEEP"); + chunk1.setChunkSeq(1); + chunk1.setFinal(true); + + FlatProcessorRecord p4 = new FlatProcessorRecord(4, "to1", "to"); + p4.setParentSeq(2); + p4.setParentProcessorId("split1"); + p4.setIteration(1); + p4.setStatus(ExecutionStatus.COMPLETED); + p4.setStartTime(start.plusMillis(40)); + p4.setDurationMs(25L); + p4.setResolvedEndpointUri("http://inventory/api"); + p4.setInputBody("order DEF-456 check stock"); + p4.setOutputBody("stock available"); + + chunk1.setProcessors(List.of(p4)); + accumulator.onChunk(chunk1); + + assertThat(executionBuffer).hasSize(1); + assertThat(processorBuffer).hasSize(2); + + // Flush to ClickHouse (simulating ExecutionFlushScheduler) + executionStore.insertExecutionBatch(executionBuffer); + for (ChunkAccumulator.ProcessorBatch batch : processorBuffer) { + executionStore.insertProcessorBatch( + batch.tenantId(), batch.executionId(), + batch.routeId(), batch.applicationName(), + batch.execStartTime(), batch.processors()); + } + + // Search by order ID in attributes (via _search_text on executions) + SearchResult result = searchIndex.search(new SearchRequest( + null, null, null, null, null, null, + "ORD-123", null, null, null, + null, null, null, null, null, + 0, 50, null, null)); + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1"); + assertThat(result.data().get(0).status()).isEqualTo("COMPLETED"); + assertThat(result.data().get(0).durationMs()).isEqualTo(750L); + + // Search in processor body + SearchResult bodyResult = searchIndex.search(new SearchRequest( + null, null, null, null, null, null, + null, "ABC-123", null, null, + null, null, null, null, null, + 0, 50, null, null)); + assertThat(bodyResult.total()).isEqualTo(1); + + // Verify iteration data in processor_executions + Integer iterSize = jdbc.queryForObject( + "SELECT iteration_size FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 2", + Integer.class); + assertThat(iterSize).isEqualTo(3); + + Integer iter0 = jdbc.queryForObject( + "SELECT iteration FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 3", + Integer.class); + assertThat(iter0).isEqualTo(0); + + // Verify total processor count + Integer procCount = jdbc.queryForObject( + "SELECT count() FROM processor_executions WHERE execution_id = 'pipeline-1'", + Integer.class); + assertThat(procCount).isEqualTo(4); + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java new file mode 100644 index 00000000..8b8ede77 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java @@ -0,0 +1,226 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseExecutionStoreIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseExecutionStore store; + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + // Load DDL from classpath resources + String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + + jdbc.execute(executionsDdl); + jdbc.execute(processorsDdl); + + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + store = new ClickHouseExecutionStore(jdbc); + } + + @Test + void insertExecutionBatch_writesToClickHouse() { + MergedExecution exec = new MergedExecution( + "default", 1L, "exec-1", "route-a", "agent-1", "my-app", + "COMPLETED", "corr-1", "exchange-1", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), + 1000L, + "some error", "stack trace", "IOException", "IO", + "FileNotFoundException", "file not found", + "hash-abc", "FULL", + "{\"key\":\"val\"}", "{\"out\":\"val\"}", + "{\"h1\":\"v1\"}", "{\"h2\":\"v2\"}", + "{\"attr\":\"val\"}", + "trace-123", "span-456", + true, false + ); + + store.insertExecutionBatch(List.of(exec)); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM executions WHERE execution_id = 'exec-1'", + Integer.class); + assertThat(count).isEqualTo(1); + } + + @Test + void insertProcessorBatch_writesToClickHouse() { + FlatProcessorRecord proc = new FlatProcessorRecord(1, "proc-1", "to"); + proc.setStatus(ExecutionStatus.COMPLETED); + proc.setStartTime(Instant.parse("2026-03-31T10:00:00Z")); + proc.setDurationMs(50L); + proc.setResolvedEndpointUri("http://example.com"); + proc.setInputBody("input body"); + proc.setOutputBody("output body"); + proc.setInputHeaders(Map.of("h1", "v1")); + proc.setOutputHeaders(Map.of("h2", "v2")); + proc.setAttributes(Map.of("a1", "v1")); + + store.insertProcessorBatch( + "default", "exec-1", "route-a", "my-app", + Instant.parse("2026-03-31T10:00:00Z"), + List.of(proc)); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM processor_executions WHERE execution_id = 'exec-1'", + Integer.class); + assertThat(count).isEqualTo(1); + + // Verify seq is stored + Integer seq = jdbc.queryForObject( + "SELECT seq FROM processor_executions WHERE execution_id = 'exec-1'", + Integer.class); + assertThat(seq).isEqualTo(1); + } + + @Test + void insertProcessorBatch_withIterations() { + FlatProcessorRecord splitContainer = new FlatProcessorRecord(1, "split-1", "split"); + splitContainer.setIterationSize(3); + splitContainer.setStatus(ExecutionStatus.COMPLETED); + splitContainer.setStartTime(Instant.parse("2026-03-31T10:00:00Z")); + splitContainer.setDurationMs(300L); + + FlatProcessorRecord child0 = new FlatProcessorRecord(2, "child-proc", "to"); + child0.setParentSeq(1); + child0.setParentProcessorId("split-1"); + child0.setIteration(0); + child0.setStatus(ExecutionStatus.COMPLETED); + child0.setStartTime(Instant.parse("2026-03-31T10:00:00.100Z")); + child0.setDurationMs(80L); + child0.setResolvedEndpointUri("http://svc-a"); + child0.setInputBody("body0"); + child0.setOutputBody("out0"); + + FlatProcessorRecord child1 = new FlatProcessorRecord(3, "child-proc", "to"); + child1.setParentSeq(1); + child1.setParentProcessorId("split-1"); + child1.setIteration(1); + child1.setStatus(ExecutionStatus.COMPLETED); + child1.setStartTime(Instant.parse("2026-03-31T10:00:00.200Z")); + child1.setDurationMs(90L); + child1.setResolvedEndpointUri("http://svc-a"); + child1.setInputBody("body1"); + child1.setOutputBody("out1"); + + FlatProcessorRecord child2 = new FlatProcessorRecord(4, "child-proc", "to"); + child2.setParentSeq(1); + child2.setParentProcessorId("split-1"); + child2.setIteration(2); + child2.setStatus(ExecutionStatus.COMPLETED); + child2.setStartTime(Instant.parse("2026-03-31T10:00:00.300Z")); + child2.setDurationMs(100L); + child2.setResolvedEndpointUri("http://svc-a"); + child2.setInputBody("body2"); + child2.setOutputBody("out2"); + + store.insertProcessorBatch( + "default", "exec-2", "route-b", "my-app", + Instant.parse("2026-03-31T10:00:00Z"), + List.of(splitContainer, child0, child1, child2)); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM processor_executions WHERE execution_id = 'exec-2'", + Integer.class); + assertThat(count).isEqualTo(4); + + // Verify iteration data on the split container + Integer iterationSize = jdbc.queryForObject( + "SELECT iteration_size FROM processor_executions " + + "WHERE execution_id = 'exec-2' AND seq = 1", + Integer.class); + assertThat(iterationSize).isEqualTo(3); + + // Verify iteration index on a child + Integer iteration = jdbc.queryForObject( + "SELECT iteration FROM processor_executions " + + "WHERE execution_id = 'exec-2' AND seq = 3", + Integer.class); + assertThat(iteration).isEqualTo(1); + } + + @Test + void insertExecutionBatch_emptyList_doesNothing() { + store.insertExecutionBatch(List.of()); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM executions", Integer.class); + assertThat(count).isEqualTo(0); + } + + @Test + void insertExecutionBatch_replacingMergeTree_keepsLatestVersion() { + MergedExecution v1 = new MergedExecution( + "default", 1L, "exec-r", "route-a", "agent-1", "my-app", + "RUNNING", "corr-1", "exchange-1", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, + "", "", "", "", "", "", + "", "FULL", + "", "", "", "", "", + "", "", + false, false + ); + + MergedExecution v2 = new MergedExecution( + "default", 2L, "exec-r", "route-a", "agent-1", "my-app", + "COMPLETED", "corr-1", "exchange-1", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:05Z"), + 5000L, + "", "", "", "", "", "", + "", "FULL", + "", "", "", "", "", + "", "", + false, false + ); + + store.insertExecutionBatch(List.of(v1)); + store.insertExecutionBatch(List.of(v2)); + + // Force merge to apply ReplacingMergeTree deduplication + jdbc.execute("OPTIMIZE TABLE executions FINAL"); + + String status = jdbc.queryForObject( + "SELECT status FROM executions " + + "WHERE execution_id = 'exec-r'", + String.class); + assertThat(status).isEqualTo("COMPLETED"); + } +} diff --git a/cameleer3-server-core/pom.xml b/cameleer3-server-core/pom.xml index 5e2e517c..fcc2542d 100644 --- a/cameleer3-server-core/pom.xml +++ b/cameleer3-server-core/pom.xml @@ -37,6 +37,11 @@ spring-security-core provided + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + test + org.junit.jupiter junit-jupiter diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java new file mode 100644 index 00000000..dcc7d486 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java @@ -0,0 +1,205 @@ +package com.cameleer3.server.core.ingestion; + +import com.cameleer3.common.model.ExecutionChunk; +import com.cameleer3.common.model.FlatProcessorRecord; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +/** + * Accumulates {@link ExecutionChunk} documents and produces: + *
    + *
  • {@link ProcessorBatch} — pushed immediately for each chunk (append-only)
  • + *
  • {@link MergedExecution} — pushed when the final chunk arrives or on stale sweep
  • + *
+ */ +public class ChunkAccumulator { + + private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class); + private static final String DEFAULT_TENANT = "default"; + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final Consumer executionSink; + private final Consumer processorSink; + private final Duration staleThreshold; + private final ConcurrentHashMap pending = new ConcurrentHashMap<>(); + + public ChunkAccumulator(Consumer executionSink, + Consumer processorSink, + Duration staleThreshold) { + this.executionSink = executionSink; + this.processorSink = processorSink; + this.staleThreshold = staleThreshold; + } + + /** + * Process an incoming chunk: push processors immediately, + * buffer/merge the envelope, and emit when final. + */ + public void onChunk(ExecutionChunk chunk) { + // 1. Push processor records immediately (append-only) + if (chunk.getProcessors() != null && !chunk.getProcessors().isEmpty()) { + processorSink.accept(new ProcessorBatch( + DEFAULT_TENANT, + chunk.getExchangeId(), + chunk.getRouteId(), + chunk.getApplicationName(), + chunk.getStartTime(), + chunk.getProcessors())); + } + + // 2. Buffer/merge the exchange envelope + if (chunk.isFinal()) { + // Merge with any pending envelope, then emit + PendingExchange existing = pending.remove(chunk.getExchangeId()); + ExecutionChunk merged = existing != null + ? mergeEnvelopes(existing.envelope(), chunk) + : chunk; + executionSink.accept(toMergedExecution(merged)); + } else { + // Buffer the envelope for later merging + pending.merge(chunk.getExchangeId(), + new PendingExchange(chunk, Instant.now()), + (old, incoming) -> new PendingExchange( + mergeEnvelopes(old.envelope(), incoming.envelope()), + old.receivedAt())); + } + } + + /** + * Flush exchanges that have been pending longer than the stale threshold. + * Called periodically by a scheduled task. + */ + public void sweepStale() { + Instant cutoff = Instant.now().minus(staleThreshold); + pending.forEach((exchangeId, pe) -> { + if (pe.receivedAt().isBefore(cutoff)) { + PendingExchange removed = pending.remove(exchangeId); + if (removed != null) { + log.info("Flushing stale exchange {} (pending since {})", + exchangeId, removed.receivedAt()); + executionSink.accept(toMergedExecution(removed.envelope())); + } + } + }); + } + + /** Number of exchanges awaiting a final chunk. */ + public int getPendingCount() { + return pending.size(); + } + + // ---- Merge logic ---- + + /** + * COALESCE merge: for each field, prefer the newer value if non-null, else keep older. + * The newer chunk (higher chunkSeq) takes precedence for status, endTime, durationMs. + */ + private static ExecutionChunk mergeEnvelopes(ExecutionChunk older, ExecutionChunk newer) { + ExecutionChunk merged = new ExecutionChunk(); + merged.setExchangeId(coalesce(newer.getExchangeId(), older.getExchangeId())); + merged.setApplicationName(coalesce(newer.getApplicationName(), older.getApplicationName())); + merged.setAgentId(coalesce(newer.getAgentId(), older.getAgentId())); + merged.setRouteId(coalesce(newer.getRouteId(), older.getRouteId())); + merged.setCorrelationId(coalesce(newer.getCorrelationId(), older.getCorrelationId())); + merged.setStatus(coalesce(newer.getStatus(), older.getStatus())); + merged.setStartTime(coalesce(older.getStartTime(), newer.getStartTime())); // prefer earliest startTime + merged.setEndTime(coalesce(newer.getEndTime(), older.getEndTime())); + merged.setDurationMs(coalesce(newer.getDurationMs(), older.getDurationMs())); + merged.setEngineLevel(coalesce(newer.getEngineLevel(), older.getEngineLevel())); + merged.setErrorMessage(coalesce(newer.getErrorMessage(), older.getErrorMessage())); + merged.setErrorStackTrace(coalesce(newer.getErrorStackTrace(), older.getErrorStackTrace())); + merged.setErrorType(coalesce(newer.getErrorType(), older.getErrorType())); + merged.setErrorCategory(coalesce(newer.getErrorCategory(), older.getErrorCategory())); + merged.setRootCauseType(coalesce(newer.getRootCauseType(), older.getRootCauseType())); + merged.setRootCauseMessage(coalesce(newer.getRootCauseMessage(), older.getRootCauseMessage())); + merged.setAttributes(coalesce(newer.getAttributes(), older.getAttributes())); + merged.setTraceId(coalesce(newer.getTraceId(), older.getTraceId())); + merged.setSpanId(coalesce(newer.getSpanId(), older.getSpanId())); + merged.setOriginalExchangeId(coalesce(newer.getOriginalExchangeId(), older.getOriginalExchangeId())); + merged.setReplayExchangeId(coalesce(newer.getReplayExchangeId(), older.getReplayExchangeId())); + merged.setChunkSeq(Math.max(newer.getChunkSeq(), older.getChunkSeq())); + merged.setFinal(newer.isFinal() || older.isFinal()); + merged.setProcessors(List.of()); // processors are handled separately + return merged; + } + + private static T coalesce(T a, T b) { + return a != null ? a : b; + } + + // ---- Conversion to MergedExecution ---- + + private static MergedExecution toMergedExecution(ExecutionChunk envelope) { + return new MergedExecution( + DEFAULT_TENANT, + 1L, + envelope.getExchangeId(), + envelope.getRouteId(), + envelope.getAgentId(), + envelope.getApplicationName(), + envelope.getStatus() != null ? envelope.getStatus().name() : "RUNNING", + envelope.getCorrelationId(), + envelope.getExchangeId(), + envelope.getStartTime(), + envelope.getEndTime(), + envelope.getDurationMs(), + envelope.getErrorMessage(), + envelope.getErrorStackTrace(), + envelope.getErrorType(), + envelope.getErrorCategory(), + envelope.getRootCauseType(), + envelope.getRootCauseMessage(), + "", // diagramContentHash — server-side lookup, not in chunk + envelope.getEngineLevel(), + "", // inputBody — on processor records now + "", // outputBody + "", // inputHeaders + "", // outputHeaders + serializeAttributes(envelope.getAttributes()), + envelope.getTraceId(), + envelope.getSpanId(), + false, // hasTraceData — not tracked at envelope level + envelope.getReplayExchangeId() != null // isReplay + ); + } + + private static String serializeAttributes(Map attributes) { + if (attributes == null || attributes.isEmpty()) { + return "{}"; + } + try { + return MAPPER.writeValueAsString(attributes); + } catch (JsonProcessingException e) { + log.warn("Failed to serialize attributes, falling back to empty object", e); + return "{}"; + } + } + + // ---- Inner types ---- + + /** + * A batch of processor records from a single chunk, ready for ClickHouse insertion. + */ + public record ProcessorBatch( + String tenantId, + String executionId, + String routeId, + String applicationName, + Instant execStartTime, + List processors + ) {} + + /** + * Envelope buffered while waiting for the final chunk. + */ + private record PendingExchange(ExecutionChunk envelope, Instant receivedAt) {} +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java new file mode 100644 index 00000000..d5227ab8 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java @@ -0,0 +1,39 @@ +package com.cameleer3.server.core.ingestion; + +import java.time.Instant; + +/** + * A merged execution envelope ready for ClickHouse insertion. + * Produced by ChunkAccumulator after receiving the final chunk. + */ +public record MergedExecution( + String tenantId, + long version, + String executionId, + String routeId, + String agentId, + String applicationName, + String status, + String correlationId, + String exchangeId, + Instant startTime, + Instant endTime, + Long durationMs, + String errorMessage, + String errorStacktrace, + String errorType, + String errorCategory, + String rootCauseType, + String rootCauseMessage, + String diagramContentHash, + String engineLevel, + String inputBody, + String outputBody, + String inputHeaders, + String outputHeaders, + String attributes, + String traceId, + String spanId, + boolean hasTraceData, + boolean isReplay +) {} diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java new file mode 100644 index 00000000..75771697 --- /dev/null +++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java @@ -0,0 +1,225 @@ +package com.cameleer3.server.core.ingestion; + +import com.cameleer3.common.model.ExecutionChunk; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +import static org.assertj.core.api.Assertions.assertThat; + +class ChunkAccumulatorTest { + + private CopyOnWriteArrayList executionSink; + private CopyOnWriteArrayList processorSink; + private ChunkAccumulator accumulator; + + @BeforeEach + void setUp() { + executionSink = new CopyOnWriteArrayList<>(); + processorSink = new CopyOnWriteArrayList<>(); + accumulator = new ChunkAccumulator( + executionSink::add, processorSink::add, Duration.ofMinutes(5)); + } + + @Test + void singleFinalChunk_producesExecutionAndProcessors() { + ExecutionChunk chunk = chunk("ex-1", "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + 0, true, + List.of(proc(1, null, "log1", "log", "COMPLETED", 5L))); + chunk.setCorrelationId("corr-1"); + chunk.setAttributes(Map.of("orderId", "ORD-1")); + chunk.setTraceId("trace-1"); + chunk.setSpanId("span-1"); + + accumulator.onChunk(chunk); + + // Processor sink should receive 1 batch with 1 record + assertThat(processorSink).hasSize(1); + ChunkAccumulator.ProcessorBatch batch = processorSink.get(0); + assertThat(batch.tenantId()).isEqualTo("default"); + assertThat(batch.executionId()).isEqualTo("ex-1"); + assertThat(batch.routeId()).isEqualTo("route-1"); + assertThat(batch.applicationName()).isEqualTo("order-service"); + assertThat(batch.execStartTime()).isEqualTo(Instant.parse("2026-03-31T10:00:00Z")); + assertThat(batch.processors()).hasSize(1); + + // Execution sink should receive 1 merged execution + assertThat(executionSink).hasSize(1); + MergedExecution exec = executionSink.get(0); + assertThat(exec.tenantId()).isEqualTo("default"); + assertThat(exec.version()).isEqualTo(1L); + assertThat(exec.executionId()).isEqualTo("ex-1"); + assertThat(exec.routeId()).isEqualTo("route-1"); + assertThat(exec.status()).isEqualTo("COMPLETED"); + assertThat(exec.durationMs()).isEqualTo(1000L); + assertThat(exec.traceId()).isEqualTo("trace-1"); + assertThat(exec.spanId()).isEqualTo("span-1"); + assertThat(exec.attributes()).contains("orderId"); + } + + @Test + void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() { + ExecutionChunk chunk0 = chunk("ex-2", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, + 0, false, + List.of( + proc(1, null, "log1", "log", "COMPLETED", 5L), + proc(2, null, "log2", "log", "COMPLETED", 3L))); + chunk0.setCorrelationId("ex-2"); + + accumulator.onChunk(chunk0); + + // Processors pushed immediately on chunk 0 + assertThat(processorSink).hasSize(1); + assertThat(processorSink.get(0).processors()).hasSize(2); + + // No execution yet (not final) + assertThat(executionSink).isEmpty(); + + ExecutionChunk chunk1 = chunk("ex-2", "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:02Z"), 2000L, + 1, true, + List.of(proc(3, null, "log3", "log", "COMPLETED", 7L))); + chunk1.setCorrelationId("ex-2"); + + accumulator.onChunk(chunk1); + + // Processors from chunk 1 also pushed + assertThat(processorSink).hasSize(2); + assertThat(processorSink.get(1).processors()).hasSize(1); + + // Now execution is emitted + assertThat(executionSink).hasSize(1); + MergedExecution exec = executionSink.get(0); + assertThat(exec.status()).isEqualTo("COMPLETED"); + assertThat(exec.durationMs()).isEqualTo(2000L); + } + + @Test + void staleExchange_flushedBySweep() throws Exception { + ChunkAccumulator staleAccumulator = new ChunkAccumulator( + executionSink::add, processorSink::add, Duration.ofMillis(1)); + + ExecutionChunk c = chunk("ex-3", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, + 0, false, + List.of()); + c.setCorrelationId("ex-3"); + + staleAccumulator.onChunk(c); + assertThat(executionSink).isEmpty(); + + Thread.sleep(5); + staleAccumulator.sweepStale(); + + assertThat(executionSink).hasSize(1); + MergedExecution exec = executionSink.get(0); + assertThat(exec.status()).isEqualTo("RUNNING"); + assertThat(exec.executionId()).isEqualTo("ex-3"); + } + + @Test + void finalChunkWithErrors_populatesErrorFields() { + ExecutionChunk c = chunk("ex-4", "FAILED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + 0, true, + List.of()); + c.setCorrelationId("ex-4"); + c.setErrorMessage("NullPointerException"); + c.setErrorStackTrace("at com.foo.Bar.baz(Bar.java:42)"); + c.setErrorType("NullPointerException"); + c.setErrorCategory("RUNTIME"); + c.setRootCauseType("NullPointerException"); + c.setRootCauseMessage("null value at index 0"); + + accumulator.onChunk(c); + + assertThat(executionSink).hasSize(1); + MergedExecution exec = executionSink.get(0); + assertThat(exec.status()).isEqualTo("FAILED"); + assertThat(exec.errorMessage()).isEqualTo("NullPointerException"); + assertThat(exec.errorStacktrace()).isEqualTo("at com.foo.Bar.baz(Bar.java:42)"); + assertThat(exec.errorType()).isEqualTo("NullPointerException"); + assertThat(exec.errorCategory()).isEqualTo("RUNTIME"); + assertThat(exec.rootCauseType()).isEqualTo("NullPointerException"); + assertThat(exec.rootCauseMessage()).isEqualTo("null value at index 0"); + } + + @Test + void getPendingCount_tracksBufferedExchanges() { + ExecutionChunk running1 = chunk("ex-5", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, + 0, false, + List.of()); + running1.setCorrelationId("ex-5"); + + ExecutionChunk running2 = chunk("ex-6", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, + 0, false, + List.of()); + running2.setCorrelationId("ex-6"); + running2.setRouteId("route-2"); + + accumulator.onChunk(running1); + accumulator.onChunk(running2); + assertThat(accumulator.getPendingCount()).isEqualTo(2); + + // Send final for ex-5 + ExecutionChunk final5 = chunk("ex-5", "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + 1, true, + List.of()); + final5.setCorrelationId("ex-5"); + + accumulator.onChunk(final5); + assertThat(accumulator.getPendingCount()).isEqualTo(1); + } + + /** Helper to create an ExecutionChunk with common fields. */ + private static ExecutionChunk chunk(String exchangeId, String status, Instant start, Instant end, Long duration, + int chunkSeq, boolean isFinal, List processors) { + ExecutionChunk c = new ExecutionChunk(); + c.setExchangeId(exchangeId); + c.setApplicationName(exchangeId.equals("ex-1") ? "order-service" : "app"); + c.setAgentId("agent-1"); + c.setRouteId("route-1"); + c.setCorrelationId(null); + c.setStatus(ExecutionStatus.valueOf(status)); + c.setStartTime(start); + c.setEndTime(end); + c.setDurationMs(duration); + c.setEngineLevel("REGULAR"); + c.setChunkSeq(chunkSeq); + c.setFinal(isFinal); + c.setProcessors(processors); + return c; + } + + /** Helper to create a FlatProcessorRecord with minimal fields. */ + private static FlatProcessorRecord proc(int seq, Integer parentSeq, + String processorId, String processorType, + String status, long durationMs) { + FlatProcessorRecord p = new FlatProcessorRecord(seq, processorId, processorType); + p.setParentSeq(parentSeq); + p.setStatus(ExecutionStatus.valueOf(status)); + p.setStartTime(Instant.parse("2026-03-31T10:00:00.100Z")); + p.setDurationMs(durationMs); + return p; + } +} diff --git a/deploy/base/server.yaml b/deploy/base/server.yaml index 79228066..06c131a3 100644 --- a/deploy/base/server.yaml +++ b/deploy/base/server.yaml @@ -91,6 +91,8 @@ spec: key: CLICKHOUSE_PASSWORD - name: CAMELEER_STORAGE_METRICS value: "postgres" + - name: CAMELEER_STORAGE_SEARCH + value: "opensearch" resources: requests: