diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java new file mode 100644 index 00000000..6dbdd3e6 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java @@ -0,0 +1,304 @@ +package com.cameleer3.server.app.search; + +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.SearchIndex; +import com.cameleer3.server.core.storage.model.ExecutionDocument; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.*; + +/** + * ClickHouse-backed implementation of {@link SearchIndex}. + *

+ * Queries the {@code executions} and {@code processor_executions} tables directly + * using SQL with ngram bloom-filter indexes for full-text search acceleration. + *

+ * The {@link #index} and {@link #delete} methods are no-ops because data is + * written by the accumulator/store pipeline, not the search index. + */ +public class ClickHouseSearchIndex implements SearchIndex { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseSearchIndex.class); + private static final ObjectMapper JSON = new ObjectMapper(); + private static final TypeReference> STR_MAP = new TypeReference<>() {}; + private static final int HIGHLIGHT_CONTEXT_CHARS = 120; + + private static final Map SORT_FIELD_MAP = Map.of( + "startTime", "start_time", + "durationMs", "duration_ms", + "status", "status", + "agentId", "agent_id", + "routeId", "route_id", + "correlationId", "correlation_id", + "executionId", "execution_id", + "applicationName", "application_name" + ); + + private final JdbcTemplate jdbc; + + public ClickHouseSearchIndex(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void index(ExecutionDocument document) { + // No-op: data is written by ClickHouseExecutionStore + } + + @Override + public void delete(String executionId) { + // No-op: ClickHouse ReplacingMergeTree handles versioning + } + + @Override + public SearchResult search(SearchRequest request) { + try { + List params = new ArrayList<>(); + String whereClause = buildWhereClause(request, params); + String searchTerm = request.text(); + + // Count query + String countSql = "SELECT count() FROM executions FINAL WHERE " + whereClause; + Long total = jdbc.queryForObject(countSql, Long.class, params.toArray()); + if (total == null || total == 0) { + return SearchResult.empty(request.offset(), request.limit()); + } + + // Data query + String sortColumn = SORT_FIELD_MAP.getOrDefault(request.sortField(), "start_time"); + String sortDir = "asc".equalsIgnoreCase(request.sortDir()) ? "ASC" : "DESC"; + + String dataSql = "SELECT execution_id, route_id, agent_id, application_name, " + + "status, start_time, end_time, duration_ms, correlation_id, " + + "error_message, error_stacktrace, diagram_content_hash, attributes, " + + "has_trace_data, is_replay, " + + "input_body, output_body, input_headers, output_headers, root_cause_message " + + "FROM executions FINAL WHERE " + whereClause + + " ORDER BY " + sortColumn + " " + sortDir + + " LIMIT ? OFFSET ?"; + + List dataParams = new ArrayList<>(params); + dataParams.add(request.limit()); + dataParams.add(request.offset()); + + List data = jdbc.query( + dataSql, dataParams.toArray(), + (rs, rowNum) -> mapRow(rs, searchTerm)); + + return new SearchResult<>(data, total, request.offset(), request.limit()); + } catch (Exception e) { + log.error("ClickHouse search failed", e); + return SearchResult.empty(request.offset(), request.limit()); + } + } + + @Override + public long count(SearchRequest request) { + try { + List params = new ArrayList<>(); + String whereClause = buildWhereClause(request, params); + String sql = "SELECT count() FROM executions FINAL WHERE " + whereClause; + Long result = jdbc.queryForObject(sql, Long.class, params.toArray()); + return result != null ? result : 0L; + } catch (Exception e) { + log.error("ClickHouse count failed", e); + return 0L; + } + } + + private String buildWhereClause(SearchRequest request, List params) { + List conditions = new ArrayList<>(); + conditions.add("tenant_id = 'default'"); + + if (request.timeFrom() != null) { + conditions.add("start_time >= ?"); + params.add(Timestamp.from(request.timeFrom())); + } + if (request.timeTo() != null) { + conditions.add("start_time <= ?"); + params.add(Timestamp.from(request.timeTo())); + } + + if (request.status() != null && !request.status().isBlank()) { + String[] statuses = request.status().split(","); + if (statuses.length == 1) { + conditions.add("status = ?"); + params.add(statuses[0].trim()); + } else { + String placeholders = String.join(", ", Collections.nCopies(statuses.length, "?")); + conditions.add("status IN (" + placeholders + ")"); + for (String s : statuses) { + params.add(s.trim()); + } + } + } + + if (request.routeId() != null) { + conditions.add("route_id = ?"); + params.add(request.routeId()); + } + + if (request.agentId() != null) { + conditions.add("agent_id = ?"); + params.add(request.agentId()); + } + + if (request.correlationId() != null) { + conditions.add("correlation_id = ?"); + params.add(request.correlationId()); + } + + if (request.application() != null && !request.application().isBlank()) { + conditions.add("application_name = ?"); + params.add(request.application()); + } + + if (request.agentIds() != null && !request.agentIds().isEmpty()) { + String placeholders = String.join(", ", Collections.nCopies(request.agentIds().size(), "?")); + conditions.add("agent_id IN (" + placeholders + ")"); + params.addAll(request.agentIds()); + } + + if (request.durationMin() != null) { + conditions.add("duration_ms >= ?"); + params.add(request.durationMin()); + } + + if (request.durationMax() != null) { + conditions.add("duration_ms <= ?"); + params.add(request.durationMax()); + } + + // Global full-text search: execution-level _search_text OR processor-level _search_text + if (request.text() != null && !request.text().isBlank()) { + String likeTerm = "%" + escapeLike(request.text()) + "%"; + conditions.add("(_search_text LIKE ? OR execution_id IN (" + + "SELECT DISTINCT execution_id FROM processor_executions " + + "WHERE tenant_id = 'default' AND _search_text LIKE ?))"); + params.add(likeTerm); + params.add(likeTerm); + } + + // Scoped body search in processor_executions + if (request.textInBody() != null && !request.textInBody().isBlank()) { + String likeTerm = "%" + escapeLike(request.textInBody()) + "%"; + conditions.add("execution_id IN (" + + "SELECT DISTINCT execution_id FROM processor_executions " + + "WHERE tenant_id = 'default' AND (input_body LIKE ? OR output_body LIKE ?))"); + params.add(likeTerm); + params.add(likeTerm); + } + + // Scoped headers search in processor_executions + if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) { + String likeTerm = "%" + escapeLike(request.textInHeaders()) + "%"; + conditions.add("execution_id IN (" + + "SELECT DISTINCT execution_id FROM processor_executions " + + "WHERE tenant_id = 'default' AND (input_headers LIKE ? OR output_headers LIKE ?))"); + params.add(likeTerm); + params.add(likeTerm); + } + + // Scoped error search: execution-level + processor-level + if (request.textInErrors() != null && !request.textInErrors().isBlank()) { + String likeTerm = "%" + escapeLike(request.textInErrors()) + "%"; + conditions.add("(error_message LIKE ? OR error_stacktrace LIKE ? OR execution_id IN (" + + "SELECT DISTINCT execution_id FROM processor_executions " + + "WHERE tenant_id = 'default' AND (error_message LIKE ? OR error_stacktrace LIKE ?)))"); + params.add(likeTerm); + params.add(likeTerm); + params.add(likeTerm); + params.add(likeTerm); + } + + return String.join(" AND ", conditions); + } + + private ExecutionSummary mapRow(ResultSet rs, String searchTerm) throws SQLException { + String executionId = rs.getString("execution_id"); + String routeId = rs.getString("route_id"); + String agentId = rs.getString("agent_id"); + String applicationName = rs.getString("application_name"); + String status = rs.getString("status"); + + Timestamp startTs = rs.getTimestamp("start_time"); + Instant startTime = startTs != null ? startTs.toInstant() : null; + + Timestamp endTs = rs.getTimestamp("end_time"); + Instant endTime = endTs != null ? endTs.toInstant() : null; + + long durationMs = rs.getLong("duration_ms"); + String correlationId = rs.getString("correlation_id"); + String errorMessage = rs.getString("error_message"); + String errorStacktrace = rs.getString("error_stacktrace"); + String diagramContentHash = rs.getString("diagram_content_hash"); + String attributesJson = rs.getString("attributes"); + boolean hasTraceData = rs.getBoolean("has_trace_data"); + boolean isReplay = rs.getBoolean("is_replay"); + String inputBody = rs.getString("input_body"); + String outputBody = rs.getString("output_body"); + String inputHeaders = rs.getString("input_headers"); + String outputHeaders = rs.getString("output_headers"); + String rootCauseMessage = rs.getString("root_cause_message"); + + Map attributes = parseAttributesJson(attributesJson); + + // Application-side highlighting + String highlight = null; + if (searchTerm != null && !searchTerm.isBlank()) { + highlight = findHighlight(searchTerm, errorMessage, errorStacktrace, + inputBody, outputBody, inputHeaders, outputHeaders, attributesJson, rootCauseMessage); + } + + return new ExecutionSummary( + executionId, routeId, agentId, applicationName, status, + startTime, endTime, durationMs, + correlationId, errorMessage, diagramContentHash, + highlight, attributes, hasTraceData, isReplay + ); + } + + private String findHighlight(String searchTerm, String... fields) { + for (String field : fields) { + String snippet = extractSnippet(field, searchTerm, HIGHLIGHT_CONTEXT_CHARS); + if (snippet != null) { + return snippet; + } + } + return null; + } + + static String extractSnippet(String text, String searchTerm, int contextChars) { + if (text == null || text.isEmpty() || searchTerm == null) return null; + int idx = text.toLowerCase().indexOf(searchTerm.toLowerCase()); + if (idx < 0) return null; + int start = Math.max(0, idx - contextChars / 2); + int end = Math.min(text.length(), idx + searchTerm.length() + contextChars / 2); + return (start > 0 ? "..." : "") + text.substring(start, end) + (end < text.length() ? "..." : ""); + } + + private static String escapeLike(String term) { + return term.replace("\\", "\\\\") + .replace("%", "\\%") + .replace("_", "\\_"); + } + + private static Map parseAttributesJson(String json) { + if (json == null || json.isBlank()) return null; + try { + return JSON.readValue(json, STR_MAP); + } catch (Exception e) { + return null; + } + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java new file mode 100644 index 00000000..31a9580c --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java @@ -0,0 +1,319 @@ +package com.cameleer3.server.app.search; + +import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseSearchIndexIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseSearchIndex searchIndex; + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + // Load DDL from classpath resources + String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql") + .getContentAsString(StandardCharsets.UTF_8); + + jdbc.execute(executionsDdl); + jdbc.execute(processorsDdl); + + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + ClickHouseExecutionStore store = new ClickHouseExecutionStore(jdbc); + searchIndex = new ClickHouseSearchIndex(jdbc); + + // Seed test data + Instant baseTime = Instant.parse("2026-03-31T10:00:00Z"); + + // exec-1: COMPLETED, route-timer, agent-a, my-app, corr-1, 500ms, input_body with order number, attributes + MergedExecution exec1 = new MergedExecution( + "default", 1L, "exec-1", "route-timer", "agent-a", "my-app", + "COMPLETED", "corr-1", "exchange-1", + baseTime, + baseTime.plusMillis(500), + 500L, + "", "", "", "", "", "", + "hash-abc", "FULL", + "{\"order\":\"12345\"}", "", "", "", "{\"env\":\"prod\"}", + "", "", + false, false + ); + + // exec-2: FAILED, route-timer, agent-a, my-app, corr-2, 200ms, with error + MergedExecution exec2 = new MergedExecution( + "default", 1L, "exec-2", "route-timer", "agent-a", "my-app", + "FAILED", "corr-2", "exchange-2", + baseTime.plusSeconds(1), + baseTime.plusSeconds(1).plusMillis(200), + 200L, + "NullPointerException at line 42", + "java.lang.NPE\n at Foo.bar(Foo.java:42)", + "NullPointerException", "RUNTIME", "", "", + "", "FULL", + "", "", "", "", "", + "", "", + false, false + ); + + // exec-3: COMPLETED, route-rest, agent-b, other-app, 100ms, no error + MergedExecution exec3 = new MergedExecution( + "default", 1L, "exec-3", "route-rest", "agent-b", "other-app", + "COMPLETED", "", "exchange-3", + baseTime.plusSeconds(2), + baseTime.plusSeconds(2).plusMillis(100), + 100L, + "", "", "", "", "", "", + "", "FULL", + "", "", "", "", "", + "", "", + false, false + ); + + store.insertExecutionBatch(List.of(exec1, exec2, exec3)); + + // Processor for exec-1: seq=1, to, inputBody with "Hello World", inputHeaders with secret-token + FlatProcessorRecord proc1 = new FlatProcessorRecord( + 1, null, null, + "proc-1", "to", null, null, + "COMPLETED", + baseTime, 50L, + null, + "Hello World request body", "", + Map.of("Authorization", "Bearer secret-token"), null, + null, null, null, null, null, null, + null, null, null, null, null + ); + + store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app", baseTime, List.of(proc1)); + } + + @Test + void search_withNoFilters_returnsAllExecutions() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(3); + assertThat(result.data()).hasSize(3); + } + + @Test + void search_byStatus_filtersCorrectly() { + SearchRequest request = new SearchRequest( + "FAILED", null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data()).hasSize(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-2"); + } + + @Test + void search_byTimeRange_filtersCorrectly() { + Instant baseTime = Instant.parse("2026-03-31T10:00:00Z"); + // Time window covering exec-1 and exec-2 but not exec-3 + SearchRequest request = new SearchRequest( + null, baseTime, baseTime.plusMillis(1500), null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(2); + assertThat(result.data()).extracting(ExecutionSummary::executionId) + .containsExactlyInAnyOrder("exec-1", "exec-2"); + } + + @Test + void search_fullTextSearch_findsInErrorMessage() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, "NullPointerException", null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-2"); + } + + @Test + void search_fullTextSearch_findsInInputBody() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, "12345", null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } + + @Test + void search_textInBody_searchesProcessorBodies() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, "Hello World", null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } + + @Test + void search_textInHeaders_searchesProcessorHeaders() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, "secret-token", null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } + + @Test + void search_textInErrors_searchesErrorFields() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, "Foo.bar", + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-2"); + } + + @Test + void search_withHighlight_returnsSnippet() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, "NullPointerException", null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).highlight()).contains("NullPointerException"); + } + + @Test + void search_pagination_works() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 2, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(3); + assertThat(result.data()).hasSize(2); + assertThat(result.offset()).isEqualTo(0); + assertThat(result.limit()).isEqualTo(2); + } + + @Test + void search_byApplication_filtersCorrectly() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, null, + null, null, null, "other-app", null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-3"); + } + + @Test + void search_byAgentIds_filtersCorrectly() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, null, null, null, null, null, + null, null, null, null, List.of("agent-b"), 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-3"); + } + + @Test + void count_returnsMatchingCount() { + SearchRequest request = new SearchRequest( + "COMPLETED", null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + long count = searchIndex.count(request); + + assertThat(count).isEqualTo(2); + } + + @Test + void search_multipleStatusFilter_works() { + SearchRequest request = new SearchRequest( + "COMPLETED,FAILED", null, null, null, null, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(3); + } + + @Test + void search_byCorrelationId_filtersCorrectly() { + SearchRequest request = new SearchRequest( + null, null, null, null, null, "corr-1", null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } + + @Test + void search_byDurationRange_filtersCorrectly() { + SearchRequest request = new SearchRequest( + null, null, null, 300L, 600L, null, null, null, null, null, + null, null, null, null, null, 0, 50, null, null); + + SearchResult result = searchIndex.search(request); + + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("exec-1"); + } +}