feat(clickhouse): add ClickHouseSearchIndex with ngram-accelerated SQL search

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-31 19:18:01 +02:00
parent 776f2ce90d
commit 6052407c82
2 changed files with 623 additions and 0 deletions

View File

@@ -0,0 +1,304 @@
package com.cameleer3.server.app.search;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.*;
/**
* ClickHouse-backed implementation of {@link SearchIndex}.
* <p>
* Queries the {@code executions} and {@code processor_executions} tables directly
* using SQL with ngram bloom-filter indexes for full-text search acceleration.
* <p>
* The {@link #index} and {@link #delete} methods are no-ops because data is
* written by the accumulator/store pipeline, not the search index.
*/
public class ClickHouseSearchIndex implements SearchIndex {
private static final Logger log = LoggerFactory.getLogger(ClickHouseSearchIndex.class);
private static final ObjectMapper JSON = new ObjectMapper();
private static final TypeReference<Map<String, String>> STR_MAP = new TypeReference<>() {};
private static final int HIGHLIGHT_CONTEXT_CHARS = 120;
private static final Map<String, String> SORT_FIELD_MAP = Map.of(
"startTime", "start_time",
"durationMs", "duration_ms",
"status", "status",
"agentId", "agent_id",
"routeId", "route_id",
"correlationId", "correlation_id",
"executionId", "execution_id",
"applicationName", "application_name"
);
private final JdbcTemplate jdbc;
public ClickHouseSearchIndex(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public void index(ExecutionDocument document) {
// No-op: data is written by ClickHouseExecutionStore
}
@Override
public void delete(String executionId) {
// No-op: ClickHouse ReplacingMergeTree handles versioning
}
@Override
public SearchResult<ExecutionSummary> search(SearchRequest request) {
try {
List<Object> params = new ArrayList<>();
String whereClause = buildWhereClause(request, params);
String searchTerm = request.text();
// Count query
String countSql = "SELECT count() FROM executions FINAL WHERE " + whereClause;
Long total = jdbc.queryForObject(countSql, Long.class, params.toArray());
if (total == null || total == 0) {
return SearchResult.empty(request.offset(), request.limit());
}
// Data query
String sortColumn = SORT_FIELD_MAP.getOrDefault(request.sortField(), "start_time");
String sortDir = "asc".equalsIgnoreCase(request.sortDir()) ? "ASC" : "DESC";
String dataSql = "SELECT execution_id, route_id, agent_id, application_name, "
+ "status, start_time, end_time, duration_ms, correlation_id, "
+ "error_message, error_stacktrace, diagram_content_hash, attributes, "
+ "has_trace_data, is_replay, "
+ "input_body, output_body, input_headers, output_headers, root_cause_message "
+ "FROM executions FINAL WHERE " + whereClause
+ " ORDER BY " + sortColumn + " " + sortDir
+ " LIMIT ? OFFSET ?";
List<Object> dataParams = new ArrayList<>(params);
dataParams.add(request.limit());
dataParams.add(request.offset());
List<ExecutionSummary> data = jdbc.query(
dataSql, dataParams.toArray(),
(rs, rowNum) -> mapRow(rs, searchTerm));
return new SearchResult<>(data, total, request.offset(), request.limit());
} catch (Exception e) {
log.error("ClickHouse search failed", e);
return SearchResult.empty(request.offset(), request.limit());
}
}
@Override
public long count(SearchRequest request) {
try {
List<Object> params = new ArrayList<>();
String whereClause = buildWhereClause(request, params);
String sql = "SELECT count() FROM executions FINAL WHERE " + whereClause;
Long result = jdbc.queryForObject(sql, Long.class, params.toArray());
return result != null ? result : 0L;
} catch (Exception e) {
log.error("ClickHouse count failed", e);
return 0L;
}
}
private String buildWhereClause(SearchRequest request, List<Object> params) {
List<String> conditions = new ArrayList<>();
conditions.add("tenant_id = 'default'");
if (request.timeFrom() != null) {
conditions.add("start_time >= ?");
params.add(Timestamp.from(request.timeFrom()));
}
if (request.timeTo() != null) {
conditions.add("start_time <= ?");
params.add(Timestamp.from(request.timeTo()));
}
if (request.status() != null && !request.status().isBlank()) {
String[] statuses = request.status().split(",");
if (statuses.length == 1) {
conditions.add("status = ?");
params.add(statuses[0].trim());
} else {
String placeholders = String.join(", ", Collections.nCopies(statuses.length, "?"));
conditions.add("status IN (" + placeholders + ")");
for (String s : statuses) {
params.add(s.trim());
}
}
}
if (request.routeId() != null) {
conditions.add("route_id = ?");
params.add(request.routeId());
}
if (request.agentId() != null) {
conditions.add("agent_id = ?");
params.add(request.agentId());
}
if (request.correlationId() != null) {
conditions.add("correlation_id = ?");
params.add(request.correlationId());
}
if (request.application() != null && !request.application().isBlank()) {
conditions.add("application_name = ?");
params.add(request.application());
}
if (request.agentIds() != null && !request.agentIds().isEmpty()) {
String placeholders = String.join(", ", Collections.nCopies(request.agentIds().size(), "?"));
conditions.add("agent_id IN (" + placeholders + ")");
params.addAll(request.agentIds());
}
if (request.durationMin() != null) {
conditions.add("duration_ms >= ?");
params.add(request.durationMin());
}
if (request.durationMax() != null) {
conditions.add("duration_ms <= ?");
params.add(request.durationMax());
}
// Global full-text search: execution-level _search_text OR processor-level _search_text
if (request.text() != null && !request.text().isBlank()) {
String likeTerm = "%" + escapeLike(request.text()) + "%";
conditions.add("(_search_text LIKE ? OR execution_id IN ("
+ "SELECT DISTINCT execution_id FROM processor_executions "
+ "WHERE tenant_id = 'default' AND _search_text LIKE ?))");
params.add(likeTerm);
params.add(likeTerm);
}
// Scoped body search in processor_executions
if (request.textInBody() != null && !request.textInBody().isBlank()) {
String likeTerm = "%" + escapeLike(request.textInBody()) + "%";
conditions.add("execution_id IN ("
+ "SELECT DISTINCT execution_id FROM processor_executions "
+ "WHERE tenant_id = 'default' AND (input_body LIKE ? OR output_body LIKE ?))");
params.add(likeTerm);
params.add(likeTerm);
}
// Scoped headers search in processor_executions
if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) {
String likeTerm = "%" + escapeLike(request.textInHeaders()) + "%";
conditions.add("execution_id IN ("
+ "SELECT DISTINCT execution_id FROM processor_executions "
+ "WHERE tenant_id = 'default' AND (input_headers LIKE ? OR output_headers LIKE ?))");
params.add(likeTerm);
params.add(likeTerm);
}
// Scoped error search: execution-level + processor-level
if (request.textInErrors() != null && !request.textInErrors().isBlank()) {
String likeTerm = "%" + escapeLike(request.textInErrors()) + "%";
conditions.add("(error_message LIKE ? OR error_stacktrace LIKE ? OR execution_id IN ("
+ "SELECT DISTINCT execution_id FROM processor_executions "
+ "WHERE tenant_id = 'default' AND (error_message LIKE ? OR error_stacktrace LIKE ?)))");
params.add(likeTerm);
params.add(likeTerm);
params.add(likeTerm);
params.add(likeTerm);
}
return String.join(" AND ", conditions);
}
private ExecutionSummary mapRow(ResultSet rs, String searchTerm) throws SQLException {
String executionId = rs.getString("execution_id");
String routeId = rs.getString("route_id");
String agentId = rs.getString("agent_id");
String applicationName = rs.getString("application_name");
String status = rs.getString("status");
Timestamp startTs = rs.getTimestamp("start_time");
Instant startTime = startTs != null ? startTs.toInstant() : null;
Timestamp endTs = rs.getTimestamp("end_time");
Instant endTime = endTs != null ? endTs.toInstant() : null;
long durationMs = rs.getLong("duration_ms");
String correlationId = rs.getString("correlation_id");
String errorMessage = rs.getString("error_message");
String errorStacktrace = rs.getString("error_stacktrace");
String diagramContentHash = rs.getString("diagram_content_hash");
String attributesJson = rs.getString("attributes");
boolean hasTraceData = rs.getBoolean("has_trace_data");
boolean isReplay = rs.getBoolean("is_replay");
String inputBody = rs.getString("input_body");
String outputBody = rs.getString("output_body");
String inputHeaders = rs.getString("input_headers");
String outputHeaders = rs.getString("output_headers");
String rootCauseMessage = rs.getString("root_cause_message");
Map<String, String> attributes = parseAttributesJson(attributesJson);
// Application-side highlighting
String highlight = null;
if (searchTerm != null && !searchTerm.isBlank()) {
highlight = findHighlight(searchTerm, errorMessage, errorStacktrace,
inputBody, outputBody, inputHeaders, outputHeaders, attributesJson, rootCauseMessage);
}
return new ExecutionSummary(
executionId, routeId, agentId, applicationName, status,
startTime, endTime, durationMs,
correlationId, errorMessage, diagramContentHash,
highlight, attributes, hasTraceData, isReplay
);
}
private String findHighlight(String searchTerm, String... fields) {
for (String field : fields) {
String snippet = extractSnippet(field, searchTerm, HIGHLIGHT_CONTEXT_CHARS);
if (snippet != null) {
return snippet;
}
}
return null;
}
static String extractSnippet(String text, String searchTerm, int contextChars) {
if (text == null || text.isEmpty() || searchTerm == null) return null;
int idx = text.toLowerCase().indexOf(searchTerm.toLowerCase());
if (idx < 0) return null;
int start = Math.max(0, idx - contextChars / 2);
int end = Math.min(text.length(), idx + searchTerm.length() + contextChars / 2);
return (start > 0 ? "..." : "") + text.substring(start, end) + (end < text.length() ? "..." : "");
}
private static String escapeLike(String term) {
return term.replace("\\", "\\\\")
.replace("%", "\\%")
.replace("_", "\\_");
}
private static Map<String, String> parseAttributesJson(String json) {
if (json == null || json.isBlank()) return null;
try {
return JSON.readValue(json, STR_MAP);
} catch (Exception e) {
return null;
}
}
}

View File

@@ -0,0 +1,319 @@
package com.cameleer3.server.app.search;
import com.cameleer3.server.app.storage.ClickHouseExecutionStore;
import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.model.FlatProcessorRecord;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class ClickHouseSearchIndexIT {
@Container
static final ClickHouseContainer clickhouse =
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
private JdbcTemplate jdbc;
private ClickHouseSearchIndex searchIndex;
@BeforeEach
void setUp() throws Exception {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(clickhouse.getJdbcUrl());
ds.setUsername(clickhouse.getUsername());
ds.setPassword(clickhouse.getPassword());
jdbc = new JdbcTemplate(ds);
// Load DDL from classpath resources
String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql")
.getContentAsString(StandardCharsets.UTF_8);
String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql")
.getContentAsString(StandardCharsets.UTF_8);
jdbc.execute(executionsDdl);
jdbc.execute(processorsDdl);
jdbc.execute("TRUNCATE TABLE executions");
jdbc.execute("TRUNCATE TABLE processor_executions");
ClickHouseExecutionStore store = new ClickHouseExecutionStore(jdbc);
searchIndex = new ClickHouseSearchIndex(jdbc);
// Seed test data
Instant baseTime = Instant.parse("2026-03-31T10:00:00Z");
// exec-1: COMPLETED, route-timer, agent-a, my-app, corr-1, 500ms, input_body with order number, attributes
MergedExecution exec1 = new MergedExecution(
"default", 1L, "exec-1", "route-timer", "agent-a", "my-app",
"COMPLETED", "corr-1", "exchange-1",
baseTime,
baseTime.plusMillis(500),
500L,
"", "", "", "", "", "",
"hash-abc", "FULL",
"{\"order\":\"12345\"}", "", "", "", "{\"env\":\"prod\"}",
"", "",
false, false
);
// exec-2: FAILED, route-timer, agent-a, my-app, corr-2, 200ms, with error
MergedExecution exec2 = new MergedExecution(
"default", 1L, "exec-2", "route-timer", "agent-a", "my-app",
"FAILED", "corr-2", "exchange-2",
baseTime.plusSeconds(1),
baseTime.plusSeconds(1).plusMillis(200),
200L,
"NullPointerException at line 42",
"java.lang.NPE\n at Foo.bar(Foo.java:42)",
"NullPointerException", "RUNTIME", "", "",
"", "FULL",
"", "", "", "", "",
"", "",
false, false
);
// exec-3: COMPLETED, route-rest, agent-b, other-app, 100ms, no error
MergedExecution exec3 = new MergedExecution(
"default", 1L, "exec-3", "route-rest", "agent-b", "other-app",
"COMPLETED", "", "exchange-3",
baseTime.plusSeconds(2),
baseTime.plusSeconds(2).plusMillis(100),
100L,
"", "", "", "", "", "",
"", "FULL",
"", "", "", "", "",
"", "",
false, false
);
store.insertExecutionBatch(List.of(exec1, exec2, exec3));
// Processor for exec-1: seq=1, to, inputBody with "Hello World", inputHeaders with secret-token
FlatProcessorRecord proc1 = new FlatProcessorRecord(
1, null, null,
"proc-1", "to", null, null,
"COMPLETED",
baseTime, 50L,
null,
"Hello World request body", "",
Map.of("Authorization", "Bearer secret-token"), null,
null, null, null, null, null, null,
null, null, null, null, null
);
store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app", baseTime, List.of(proc1));
}
@Test
void search_withNoFilters_returnsAllExecutions() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(3);
assertThat(result.data()).hasSize(3);
}
@Test
void search_byStatus_filtersCorrectly() {
SearchRequest request = new SearchRequest(
"FAILED", null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(1);
assertThat(result.data()).hasSize(1);
assertThat(result.data().get(0).executionId()).isEqualTo("exec-2");
}
@Test
void search_byTimeRange_filtersCorrectly() {
Instant baseTime = Instant.parse("2026-03-31T10:00:00Z");
// Time window covering exec-1 and exec-2 but not exec-3
SearchRequest request = new SearchRequest(
null, baseTime, baseTime.plusMillis(1500), null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(2);
assertThat(result.data()).extracting(ExecutionSummary::executionId)
.containsExactlyInAnyOrder("exec-1", "exec-2");
}
@Test
void search_fullTextSearch_findsInErrorMessage() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, "NullPointerException", null, null, null,
null, null, null, null, null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).executionId()).isEqualTo("exec-2");
}
@Test
void search_fullTextSearch_findsInInputBody() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, "12345", null, null, null,
null, null, null, null, null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).executionId()).isEqualTo("exec-1");
}
@Test
void search_textInBody_searchesProcessorBodies() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, "Hello World", null, null,
null, null, null, null, null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).executionId()).isEqualTo("exec-1");
}
@Test
void search_textInHeaders_searchesProcessorHeaders() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, "secret-token", null,
null, null, null, null, null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).executionId()).isEqualTo("exec-1");
}
@Test
void search_textInErrors_searchesErrorFields() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, "Foo.bar",
null, null, null, null, null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).executionId()).isEqualTo("exec-2");
}
@Test
void search_withHighlight_returnsSnippet() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, "NullPointerException", null, null, null,
null, null, null, null, null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).highlight()).contains("NullPointerException");
}
@Test
void search_pagination_works() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 2, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(3);
assertThat(result.data()).hasSize(2);
assertThat(result.offset()).isEqualTo(0);
assertThat(result.limit()).isEqualTo(2);
}
@Test
void search_byApplication_filtersCorrectly() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, null,
null, null, null, "other-app", null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).executionId()).isEqualTo("exec-3");
}
@Test
void search_byAgentIds_filtersCorrectly() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, null,
null, null, null, null, List.of("agent-b"), 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).executionId()).isEqualTo("exec-3");
}
@Test
void count_returnsMatchingCount() {
SearchRequest request = new SearchRequest(
"COMPLETED", null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null);
long count = searchIndex.count(request);
assertThat(count).isEqualTo(2);
}
@Test
void search_multipleStatusFilter_works() {
SearchRequest request = new SearchRequest(
"COMPLETED,FAILED", null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(3);
}
@Test
void search_byCorrelationId_filtersCorrectly() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, "corr-1", null, null, null, null,
null, null, null, null, null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).executionId()).isEqualTo("exec-1");
}
@Test
void search_byDurationRange_filtersCorrectly() {
SearchRequest request = new SearchRequest(
null, null, null, 300L, 600L, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).executionId()).isEqualTo("exec-1");
}
}