feat: implement OpenSearchIndex with full-text and wildcard search

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-16 18:25:55 +01:00
parent c48e0bdfde
commit f7d7302694
3 changed files with 420 additions and 0 deletions

View File

@@ -0,0 +1,23 @@
package com.cameleer3.server.app.config;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HttpHost;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class OpenSearchConfig {
@Value("${opensearch.url:http://localhost:9200}")
private String opensearchUrl;
@Bean
public OpenSearchClient openSearchClient() {
HttpHost host = HttpHost.create(opensearchUrl);
var transport = ApacheHttpClient5TransportBuilder.builder(host).build();
return new OpenSearchClient(transport);
}
}

View File

@@ -0,0 +1,310 @@
package com.cameleer3.server.app.search;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import jakarta.annotation.PostConstruct;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.SortOrder;
import org.opensearch.client.opensearch._types.query_dsl.*;
import org.opensearch.client.opensearch.core.*;
import org.opensearch.client.opensearch.core.search.Hit;
import org.opensearch.client.opensearch.indices.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@Repository
public class OpenSearchIndex implements SearchIndex {
private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class);
private static final String INDEX_PREFIX = "executions-";
private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd")
.withZone(ZoneOffset.UTC);
private final OpenSearchClient client;
public OpenSearchIndex(OpenSearchClient client) {
this.client = client;
}
@PostConstruct
void ensureIndexTemplate() {
// Full template with ngram analyzer for infix wildcard search.
// The template JSON matches the spec's OpenSearch index template definition.
try {
boolean exists = client.indices().existsIndexTemplate(
ExistsIndexTemplateRequest.of(b -> b.name("executions-template"))).value();
if (!exists) {
client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b
.name("executions-template")
.indexPatterns(List.of("executions-*"))
.template(t -> t
.settings(s -> s
.numberOfShards("3")
.numberOfReplicas("1")
.analysis(a -> a
.analyzer("ngram_analyzer", an -> an
.custom(c -> c
.tokenizer("ngram_tokenizer")
.filter("lowercase")))
.tokenizer("ngram_tokenizer", tk -> tk
.definition(d -> d
.ngram(ng -> ng
.minGram(3)
.maxGram(4)
.tokenChars(TokenChar.Letter,
TokenChar.Digit,
TokenChar.Punctuation,
TokenChar.Symbol)))))))));
log.info("OpenSearch index template created with ngram analyzer");
}
} catch (IOException e) {
log.error("Failed to create index template", e);
}
}
@Override
public void index(ExecutionDocument doc) {
String indexName = INDEX_PREFIX + DAY_FMT.format(doc.startTime());
try {
client.index(IndexRequest.of(b -> b
.index(indexName)
.id(doc.executionId())
.document(toMap(doc))));
} catch (IOException e) {
log.error("Failed to index execution {}", doc.executionId(), e);
}
}
@Override
public SearchResult<ExecutionSummary> search(SearchRequest request) {
try {
var searchReq = buildSearchRequest(request, request.limit());
var response = client.search(searchReq, Map.class);
List<ExecutionSummary> items = response.hits().hits().stream()
.map(this::hitToSummary)
.collect(Collectors.toList());
long total = response.hits().total() != null ? response.hits().total().value() : 0;
return new SearchResult<>(items, total);
} catch (IOException e) {
log.error("Search failed", e);
return new SearchResult<>(List.of(), 0);
}
}
@Override
public long count(SearchRequest request) {
try {
var countReq = CountRequest.of(b -> b
.index(INDEX_PREFIX + "*")
.query(buildQuery(request)));
return client.count(countReq).count();
} catch (IOException e) {
log.error("Count failed", e);
return 0;
}
}
@Override
public void delete(String executionId) {
try {
client.deleteByQuery(DeleteByQueryRequest.of(b -> b
.index(List.of(INDEX_PREFIX + "*"))
.query(Query.of(q -> q.term(t -> t
.field("execution_id").value(executionId))))));
} catch (IOException e) {
log.error("Failed to delete execution {}", executionId, e);
}
}
private org.opensearch.client.opensearch.core.SearchRequest buildSearchRequest(
SearchRequest request, int size) {
return org.opensearch.client.opensearch.core.SearchRequest.of(b -> {
b.index(INDEX_PREFIX + "*")
.query(buildQuery(request))
.size(size)
.from(request.offset())
.sort(s -> s.field(f -> f
.field(request.sortColumn())
.order("asc".equalsIgnoreCase(request.sortDir())
? SortOrder.Asc : SortOrder.Desc)));
return b;
});
}
private Query buildQuery(SearchRequest request) {
List<Query> must = new ArrayList<>();
List<Query> filter = new ArrayList<>();
// Time range
if (request.timeFrom() != null || request.timeTo() != null) {
filter.add(Query.of(q -> q.range(r -> {
r.field("start_time");
if (request.timeFrom() != null)
r.gte(jakarta.json.Json.createValue(request.timeFrom().toString()));
if (request.timeTo() != null)
r.lte(jakarta.json.Json.createValue(request.timeTo().toString()));
return r;
})));
}
// Keyword filters
if (request.status() != null)
filter.add(termQuery("status", request.status()));
if (request.routeId() != null)
filter.add(termQuery("route_id", request.routeId()));
if (request.agentId() != null)
filter.add(termQuery("agent_id", request.agentId()));
if (request.correlationId() != null)
filter.add(termQuery("correlation_id", request.correlationId()));
// Full-text search across all fields + nested processor fields
if (request.text() != null && !request.text().isBlank()) {
String text = request.text();
List<Query> textQueries = new ArrayList<>();
// Search top-level text fields
textQueries.add(Query.of(q -> q.multiMatch(m -> m
.query(text)
.fields("error_message", "error_stacktrace",
"error_message.ngram", "error_stacktrace.ngram"))));
// Search nested processor fields
textQueries.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(text)
.fields("processors.input_body", "processors.output_body",
"processors.input_headers", "processors.output_headers",
"processors.error_message", "processors.error_stacktrace",
"processors.input_body.ngram", "processors.output_body.ngram",
"processors.input_headers.ngram", "processors.output_headers.ngram",
"processors.error_message.ngram", "processors.error_stacktrace.ngram"))))));
// Also try keyword fields for exact matches
textQueries.add(Query.of(q -> q.multiMatch(m -> m
.query(text)
.fields("execution_id", "route_id", "agent_id", "correlation_id", "exchange_id"))));
must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1"))));
}
// Scoped text searches
if (request.textInBody() != null && !request.textInBody().isBlank()) {
must.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(request.textInBody())
.fields("processors.input_body", "processors.output_body",
"processors.input_body.ngram", "processors.output_body.ngram"))))));
}
if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) {
must.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(request.textInHeaders())
.fields("processors.input_headers", "processors.output_headers",
"processors.input_headers.ngram", "processors.output_headers.ngram"))))));
}
if (request.textInErrors() != null && !request.textInErrors().isBlank()) {
String errText = request.textInErrors();
must.add(Query.of(q -> q.bool(b -> b.should(
Query.of(sq -> sq.multiMatch(m -> m
.query(errText)
.fields("error_message", "error_stacktrace",
"error_message.ngram", "error_stacktrace.ngram"))),
Query.of(sq -> sq.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(errText)
.fields("processors.error_message", "processors.error_stacktrace",
"processors.error_message.ngram", "processors.error_stacktrace.ngram")))))
).minimumShouldMatch("1"))));
}
// Duration range
if (request.durationMin() != null || request.durationMax() != null) {
filter.add(Query.of(q -> q.range(r -> {
r.field("duration_ms");
if (request.durationMin() != null)
r.gte(jakarta.json.Json.createValue(request.durationMin()));
if (request.durationMax() != null)
r.lte(jakarta.json.Json.createValue(request.durationMax()));
return r;
})));
}
return Query.of(q -> q.bool(b -> {
if (!must.isEmpty()) b.must(must);
if (!filter.isEmpty()) b.filter(filter);
if (must.isEmpty() && filter.isEmpty()) b.must(Query.of(mq -> mq.matchAll(m -> m)));
return b;
}));
}
private Query termQuery(String field, String value) {
return Query.of(q -> q.term(t -> t.field(field).value(value)));
}
private Map<String, Object> toMap(ExecutionDocument doc) {
Map<String, Object> map = new LinkedHashMap<>();
map.put("execution_id", doc.executionId());
map.put("route_id", doc.routeId());
map.put("agent_id", doc.agentId());
map.put("group_name", doc.groupName());
map.put("status", doc.status());
map.put("correlation_id", doc.correlationId());
map.put("exchange_id", doc.exchangeId());
map.put("start_time", doc.startTime() != null ? doc.startTime().toString() : null);
map.put("end_time", doc.endTime() != null ? doc.endTime().toString() : null);
map.put("duration_ms", doc.durationMs());
map.put("error_message", doc.errorMessage());
map.put("error_stacktrace", doc.errorStacktrace());
if (doc.processors() != null) {
map.put("processors", doc.processors().stream().map(p -> {
Map<String, Object> pm = new LinkedHashMap<>();
pm.put("processor_id", p.processorId());
pm.put("processor_type", p.processorType());
pm.put("status", p.status());
pm.put("error_message", p.errorMessage());
pm.put("error_stacktrace", p.errorStacktrace());
pm.put("input_body", p.inputBody());
pm.put("output_body", p.outputBody());
pm.put("input_headers", p.inputHeaders());
pm.put("output_headers", p.outputHeaders());
return pm;
}).toList());
}
return map;
}
@SuppressWarnings("unchecked")
private ExecutionSummary hitToSummary(Hit<Map> hit) {
Map<String, Object> src = hit.source();
if (src == null) return null;
return new ExecutionSummary(
(String) src.get("execution_id"),
(String) src.get("route_id"),
(String) src.get("agent_id"),
(String) src.get("status"),
src.get("start_time") != null ? Instant.parse((String) src.get("start_time")) : null,
src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null,
src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L,
(String) src.get("correlation_id"),
(String) src.get("error_message"));
}
}