diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java new file mode 100644 index 00000000..6cff9e8d --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java @@ -0,0 +1,79 @@ +package com.cameleer3.server.core.indexing; + +import com.cameleer3.server.core.storage.ExecutionStore; +import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord; +import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; +import com.cameleer3.server.core.storage.SearchIndex; +import com.cameleer3.server.core.storage.model.ExecutionDocument; +import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +public class SearchIndexer { + + private static final Logger log = LoggerFactory.getLogger(SearchIndexer.class); + + private final ExecutionStore executionStore; + private final SearchIndex searchIndex; + private final long debounceMs; + private final int queueCapacity; + + private final Map> pending = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( + r -> { Thread t = new Thread(r, "search-indexer"); t.setDaemon(true); return t; }); + + public SearchIndexer(ExecutionStore executionStore, SearchIndex searchIndex, + long debounceMs, int queueCapacity) { + this.executionStore = executionStore; + this.searchIndex = searchIndex; + this.debounceMs = debounceMs; + this.queueCapacity = queueCapacity; + } + + public void onExecutionUpdated(ExecutionUpdatedEvent event) { + if (pending.size() >= queueCapacity) { + log.warn("Search indexer queue full, dropping event for {}", event.executionId()); + return; + } + + ScheduledFuture existing = pending.put(event.executionId(), + scheduler.schedule(() -> indexExecution(event.executionId()), + debounceMs, TimeUnit.MILLISECONDS)); + if (existing != null) { + existing.cancel(false); + } + } + + private void indexExecution(String executionId) { + pending.remove(executionId); + try { + ExecutionRecord exec = executionStore.findById(executionId).orElse(null); + if (exec == null) return; + + List processors = executionStore.findProcessors(executionId); + List processorDocs = processors.stream() + .map(p -> new ProcessorDoc( + p.processorId(), p.processorType(), p.status(), + p.errorMessage(), p.errorStacktrace(), + p.inputBody(), p.outputBody(), + p.inputHeaders(), p.outputHeaders())) + .toList(); + + searchIndex.index(new ExecutionDocument( + exec.executionId(), exec.routeId(), exec.agentId(), exec.groupName(), + exec.status(), exec.correlationId(), exec.exchangeId(), + exec.startTime(), exec.endTime(), exec.durationMs(), + exec.errorMessage(), exec.errorStacktrace(), processorDocs)); + } catch (Exception e) { + log.error("Failed to index execution {}", executionId, e); + } + } + + public void shutdown() { + scheduler.shutdown(); + } +}