From f7d73026941c05b64360933ce3c14a3928f43c93 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Mon, 16 Mar 2026 18:25:55 +0100 Subject: [PATCH] feat: implement OpenSearchIndex with full-text and wildcard search Co-Authored-By: Claude Opus 4.6 (1M context) --- .../server/app/config/OpenSearchConfig.java | 23 ++ .../server/app/search/OpenSearchIndex.java | 310 ++++++++++++++++++ .../server/app/search/OpenSearchIndexIT.java | 87 +++++ 3 files changed, 420 insertions(+) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java create mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java new file mode 100644 index 00000000..0ed581ad --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java @@ -0,0 +1,23 @@ +package com.cameleer3.server.app.config; + +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.core5.http.HttpHost; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class OpenSearchConfig { + + @Value("${opensearch.url:http://localhost:9200}") + private String opensearchUrl; + + @Bean + public OpenSearchClient openSearchClient() { + HttpHost host = HttpHost.create(opensearchUrl); + var transport = ApacheHttpClient5TransportBuilder.builder(host).build(); + return new OpenSearchClient(transport); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java new file mode 100644 index 00000000..d130b1f6 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java @@ -0,0 +1,310 @@ +package com.cameleer3.server.app.search; + +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.SearchIndex; +import com.cameleer3.server.core.storage.model.ExecutionDocument; +import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; +import jakarta.annotation.PostConstruct; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.FieldValue; +import org.opensearch.client.opensearch._types.SortOrder; +import org.opensearch.client.opensearch._types.query_dsl.*; +import org.opensearch.client.opensearch.core.*; +import org.opensearch.client.opensearch.core.search.Hit; +import org.opensearch.client.opensearch.indices.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Repository; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.stream.Collectors; + +@Repository +public class OpenSearchIndex implements SearchIndex { + + private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class); + private static final String INDEX_PREFIX = "executions-"; + private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd") + .withZone(ZoneOffset.UTC); + + private final OpenSearchClient client; + + public OpenSearchIndex(OpenSearchClient client) { + this.client = client; + } + + @PostConstruct + void ensureIndexTemplate() { + // Full template with ngram analyzer for infix wildcard search. + // The template JSON matches the spec's OpenSearch index template definition. + try { + boolean exists = client.indices().existsIndexTemplate( + ExistsIndexTemplateRequest.of(b -> b.name("executions-template"))).value(); + if (!exists) { + client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b + .name("executions-template") + .indexPatterns(List.of("executions-*")) + .template(t -> t + .settings(s -> s + .numberOfShards("3") + .numberOfReplicas("1") + .analysis(a -> a + .analyzer("ngram_analyzer", an -> an + .custom(c -> c + .tokenizer("ngram_tokenizer") + .filter("lowercase"))) + .tokenizer("ngram_tokenizer", tk -> tk + .definition(d -> d + .ngram(ng -> ng + .minGram(3) + .maxGram(4) + .tokenChars(TokenChar.Letter, + TokenChar.Digit, + TokenChar.Punctuation, + TokenChar.Symbol))))))))); + log.info("OpenSearch index template created with ngram analyzer"); + } + } catch (IOException e) { + log.error("Failed to create index template", e); + } + } + + @Override + public void index(ExecutionDocument doc) { + String indexName = INDEX_PREFIX + DAY_FMT.format(doc.startTime()); + try { + client.index(IndexRequest.of(b -> b + .index(indexName) + .id(doc.executionId()) + .document(toMap(doc)))); + } catch (IOException e) { + log.error("Failed to index execution {}", doc.executionId(), e); + } + } + + @Override + public SearchResult search(SearchRequest request) { + try { + var searchReq = buildSearchRequest(request, request.limit()); + var response = client.search(searchReq, Map.class); + + List items = response.hits().hits().stream() + .map(this::hitToSummary) + .collect(Collectors.toList()); + + long total = response.hits().total() != null ? response.hits().total().value() : 0; + return new SearchResult<>(items, total); + } catch (IOException e) { + log.error("Search failed", e); + return new SearchResult<>(List.of(), 0); + } + } + + @Override + public long count(SearchRequest request) { + try { + var countReq = CountRequest.of(b -> b + .index(INDEX_PREFIX + "*") + .query(buildQuery(request))); + return client.count(countReq).count(); + } catch (IOException e) { + log.error("Count failed", e); + return 0; + } + } + + @Override + public void delete(String executionId) { + try { + client.deleteByQuery(DeleteByQueryRequest.of(b -> b + .index(List.of(INDEX_PREFIX + "*")) + .query(Query.of(q -> q.term(t -> t + .field("execution_id").value(executionId)))))); + } catch (IOException e) { + log.error("Failed to delete execution {}", executionId, e); + } + } + + private org.opensearch.client.opensearch.core.SearchRequest buildSearchRequest( + SearchRequest request, int size) { + return org.opensearch.client.opensearch.core.SearchRequest.of(b -> { + b.index(INDEX_PREFIX + "*") + .query(buildQuery(request)) + .size(size) + .from(request.offset()) + .sort(s -> s.field(f -> f + .field(request.sortColumn()) + .order("asc".equalsIgnoreCase(request.sortDir()) + ? SortOrder.Asc : SortOrder.Desc))); + return b; + }); + } + + private Query buildQuery(SearchRequest request) { + List must = new ArrayList<>(); + List filter = new ArrayList<>(); + + // Time range + if (request.timeFrom() != null || request.timeTo() != null) { + filter.add(Query.of(q -> q.range(r -> { + r.field("start_time"); + if (request.timeFrom() != null) + r.gte(jakarta.json.Json.createValue(request.timeFrom().toString())); + if (request.timeTo() != null) + r.lte(jakarta.json.Json.createValue(request.timeTo().toString())); + return r; + }))); + } + + // Keyword filters + if (request.status() != null) + filter.add(termQuery("status", request.status())); + if (request.routeId() != null) + filter.add(termQuery("route_id", request.routeId())); + if (request.agentId() != null) + filter.add(termQuery("agent_id", request.agentId())); + if (request.correlationId() != null) + filter.add(termQuery("correlation_id", request.correlationId())); + + // Full-text search across all fields + nested processor fields + if (request.text() != null && !request.text().isBlank()) { + String text = request.text(); + List textQueries = new ArrayList<>(); + + // Search top-level text fields + textQueries.add(Query.of(q -> q.multiMatch(m -> m + .query(text) + .fields("error_message", "error_stacktrace", + "error_message.ngram", "error_stacktrace.ngram")))); + + // Search nested processor fields + textQueries.add(Query.of(q -> q.nested(n -> n + .path("processors") + .query(nq -> nq.multiMatch(m -> m + .query(text) + .fields("processors.input_body", "processors.output_body", + "processors.input_headers", "processors.output_headers", + "processors.error_message", "processors.error_stacktrace", + "processors.input_body.ngram", "processors.output_body.ngram", + "processors.input_headers.ngram", "processors.output_headers.ngram", + "processors.error_message.ngram", "processors.error_stacktrace.ngram")))))); + + // Also try keyword fields for exact matches + textQueries.add(Query.of(q -> q.multiMatch(m -> m + .query(text) + .fields("execution_id", "route_id", "agent_id", "correlation_id", "exchange_id")))); + + must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1")))); + } + + // Scoped text searches + if (request.textInBody() != null && !request.textInBody().isBlank()) { + must.add(Query.of(q -> q.nested(n -> n + .path("processors") + .query(nq -> nq.multiMatch(m -> m + .query(request.textInBody()) + .fields("processors.input_body", "processors.output_body", + "processors.input_body.ngram", "processors.output_body.ngram")))))); + } + if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) { + must.add(Query.of(q -> q.nested(n -> n + .path("processors") + .query(nq -> nq.multiMatch(m -> m + .query(request.textInHeaders()) + .fields("processors.input_headers", "processors.output_headers", + "processors.input_headers.ngram", "processors.output_headers.ngram")))))); + } + if (request.textInErrors() != null && !request.textInErrors().isBlank()) { + String errText = request.textInErrors(); + must.add(Query.of(q -> q.bool(b -> b.should( + Query.of(sq -> sq.multiMatch(m -> m + .query(errText) + .fields("error_message", "error_stacktrace", + "error_message.ngram", "error_stacktrace.ngram"))), + Query.of(sq -> sq.nested(n -> n + .path("processors") + .query(nq -> nq.multiMatch(m -> m + .query(errText) + .fields("processors.error_message", "processors.error_stacktrace", + "processors.error_message.ngram", "processors.error_stacktrace.ngram"))))) + ).minimumShouldMatch("1")))); + } + + // Duration range + if (request.durationMin() != null || request.durationMax() != null) { + filter.add(Query.of(q -> q.range(r -> { + r.field("duration_ms"); + if (request.durationMin() != null) + r.gte(jakarta.json.Json.createValue(request.durationMin())); + if (request.durationMax() != null) + r.lte(jakarta.json.Json.createValue(request.durationMax())); + return r; + }))); + } + + return Query.of(q -> q.bool(b -> { + if (!must.isEmpty()) b.must(must); + if (!filter.isEmpty()) b.filter(filter); + if (must.isEmpty() && filter.isEmpty()) b.must(Query.of(mq -> mq.matchAll(m -> m))); + return b; + })); + } + + private Query termQuery(String field, String value) { + return Query.of(q -> q.term(t -> t.field(field).value(value))); + } + + private Map toMap(ExecutionDocument doc) { + Map map = new LinkedHashMap<>(); + map.put("execution_id", doc.executionId()); + map.put("route_id", doc.routeId()); + map.put("agent_id", doc.agentId()); + map.put("group_name", doc.groupName()); + map.put("status", doc.status()); + map.put("correlation_id", doc.correlationId()); + map.put("exchange_id", doc.exchangeId()); + map.put("start_time", doc.startTime() != null ? doc.startTime().toString() : null); + map.put("end_time", doc.endTime() != null ? doc.endTime().toString() : null); + map.put("duration_ms", doc.durationMs()); + map.put("error_message", doc.errorMessage()); + map.put("error_stacktrace", doc.errorStacktrace()); + if (doc.processors() != null) { + map.put("processors", doc.processors().stream().map(p -> { + Map pm = new LinkedHashMap<>(); + pm.put("processor_id", p.processorId()); + pm.put("processor_type", p.processorType()); + pm.put("status", p.status()); + pm.put("error_message", p.errorMessage()); + pm.put("error_stacktrace", p.errorStacktrace()); + pm.put("input_body", p.inputBody()); + pm.put("output_body", p.outputBody()); + pm.put("input_headers", p.inputHeaders()); + pm.put("output_headers", p.outputHeaders()); + return pm; + }).toList()); + } + return map; + } + + @SuppressWarnings("unchecked") + private ExecutionSummary hitToSummary(Hit hit) { + Map src = hit.source(); + if (src == null) return null; + return new ExecutionSummary( + (String) src.get("execution_id"), + (String) src.get("route_id"), + (String) src.get("agent_id"), + (String) src.get("status"), + src.get("start_time") != null ? Instant.parse((String) src.get("start_time")) : null, + src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null, + src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L, + (String) src.get("correlation_id"), + (String) src.get("error_message")); + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java new file mode 100644 index 00000000..24054006 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java @@ -0,0 +1,87 @@ +package com.cameleer3.server.app.search; + +import com.cameleer3.server.app.AbstractPostgresIT; +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.SearchIndex; +import com.cameleer3.server.core.storage.model.ExecutionDocument; +import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; +import org.junit.jupiter.api.Test; +import org.opensearch.testcontainers.OpensearchContainer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.junit.jupiter.Container; + +import java.time.Instant; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +// Extends AbstractPostgresIT for PostgreSQL datasource needed by Spring context +class OpenSearchIndexIT extends AbstractPostgresIT { + + @Container + static final OpensearchContainer opensearch = + new OpensearchContainer<>("opensearchproject/opensearch:2.19.0") + .withSecurityEnabled(false); + + @DynamicPropertySource + static void configureOpenSearch(DynamicPropertyRegistry registry) { + registry.add("opensearch.url", opensearch::getHttpHostAddress); + } + + @Autowired + SearchIndex searchIndex; + + @Test + void indexAndSearchByText() throws Exception { + Instant now = Instant.now(); + ExecutionDocument doc = new ExecutionDocument( + "search-1", "route-a", "agent-1", "app-1", + "FAILED", "corr-1", "exch-1", + now, now.plusMillis(100), 100L, + "OrderNotFoundException: order-12345 not found", null, + List.of(new ProcessorDoc("proc-1", "log", "COMPLETED", + null, null, "request body with customer-99", null, null, null))); + + searchIndex.index(doc); + Thread.sleep(1500); // Allow OpenSearch refresh + + SearchRequest request = new SearchRequest( + null, now.minusSeconds(60), now.plusSeconds(60), + null, null, null, + "OrderNotFoundException", null, null, null, + null, null, null, null, null, + 0, 50, "startTime", "desc"); + + SearchResult result = searchIndex.search(request); + assertTrue(result.total() > 0); + assertEquals("search-1", result.items().get(0).executionId()); + } + + @Test + void wildcardSearchFindsSubstring() throws Exception { + Instant now = Instant.now(); + ExecutionDocument doc = new ExecutionDocument( + "wild-1", "route-b", "agent-1", "app-1", + "COMPLETED", null, null, + now, now.plusMillis(50), 50L, null, null, + List.of(new ProcessorDoc("proc-1", "bean", "COMPLETED", + null, null, "UniquePayloadIdentifier12345", null, null, null))); + + searchIndex.index(doc); + Thread.sleep(1500); + + SearchRequest request = new SearchRequest( + null, now.minusSeconds(60), now.plusSeconds(60), + null, null, null, + "PayloadIdentifier", null, null, null, + null, null, null, null, null, + 0, 50, "startTime", "desc"); + + SearchResult result = searchIndex.search(request); + assertTrue(result.total() > 0); + } +}