feat: implement debounced SearchIndexer for async OpenSearch indexing
This commit is contained in:
@@ -0,0 +1,79 @@
|
||||
package com.cameleer3.server.core.indexing;
|
||||
|
||||
import com.cameleer3.server.core.storage.ExecutionStore;
|
||||
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
|
||||
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
|
||||
import com.cameleer3.server.core.storage.SearchIndex;
|
||||
import com.cameleer3.server.core.storage.model.ExecutionDocument;
|
||||
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
public class SearchIndexer {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SearchIndexer.class);
|
||||
|
||||
private final ExecutionStore executionStore;
|
||||
private final SearchIndex searchIndex;
|
||||
private final long debounceMs;
|
||||
private final int queueCapacity;
|
||||
|
||||
private final Map<String, ScheduledFuture<?>> pending = new ConcurrentHashMap<>();
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
|
||||
r -> { Thread t = new Thread(r, "search-indexer"); t.setDaemon(true); return t; });
|
||||
|
||||
public SearchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
|
||||
long debounceMs, int queueCapacity) {
|
||||
this.executionStore = executionStore;
|
||||
this.searchIndex = searchIndex;
|
||||
this.debounceMs = debounceMs;
|
||||
this.queueCapacity = queueCapacity;
|
||||
}
|
||||
|
||||
public void onExecutionUpdated(ExecutionUpdatedEvent event) {
|
||||
if (pending.size() >= queueCapacity) {
|
||||
log.warn("Search indexer queue full, dropping event for {}", event.executionId());
|
||||
return;
|
||||
}
|
||||
|
||||
ScheduledFuture<?> existing = pending.put(event.executionId(),
|
||||
scheduler.schedule(() -> indexExecution(event.executionId()),
|
||||
debounceMs, TimeUnit.MILLISECONDS));
|
||||
if (existing != null) {
|
||||
existing.cancel(false);
|
||||
}
|
||||
}
|
||||
|
||||
private void indexExecution(String executionId) {
|
||||
pending.remove(executionId);
|
||||
try {
|
||||
ExecutionRecord exec = executionStore.findById(executionId).orElse(null);
|
||||
if (exec == null) return;
|
||||
|
||||
List<ProcessorRecord> processors = executionStore.findProcessors(executionId);
|
||||
List<ProcessorDoc> processorDocs = processors.stream()
|
||||
.map(p -> new ProcessorDoc(
|
||||
p.processorId(), p.processorType(), p.status(),
|
||||
p.errorMessage(), p.errorStacktrace(),
|
||||
p.inputBody(), p.outputBody(),
|
||||
p.inputHeaders(), p.outputHeaders()))
|
||||
.toList();
|
||||
|
||||
searchIndex.index(new ExecutionDocument(
|
||||
exec.executionId(), exec.routeId(), exec.agentId(), exec.groupName(),
|
||||
exec.status(), exec.correlationId(), exec.exchangeId(),
|
||||
exec.startTime(), exec.endTime(), exec.durationMs(),
|
||||
exec.errorMessage(), exec.errorStacktrace(), processorDocs));
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to index execution {}", executionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
scheduler.shutdown();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user