refactor(search): drop dead SearchIndexer subsystem
After the ExecutionController removal (0f635576), SearchIndexer
subscribed to ExecutionUpdatedEvent but nothing publishes that event.
Every SearchIndexerStats metric returned always-zero, and the admin
/api/v1/admin/clickhouse/pipeline endpoint that surfaced those stats
carried no signal.
Backend removed:
- core: SearchIndexer, SearchIndexerStats, ExecutionUpdatedEvent
- app: IndexerPipelineResponse DTO, /pipeline endpoint on
ClickHouseAdminController (field + ctor param)
- StorageBeanConfig.searchIndexer bean
UI removed:
- IndexerPipeline type + useIndexerPipeline hook in
api/queries/admin/clickhouse.ts
- Indexer Pipeline card in ClickHouseAdminPage.tsx (plus ProgressBar
import and pipeline* CSS classes)
OpenAPI schema.d.ts + openapi.json regenerated (stale /pipeline path
and IndexerPipelineResponse schema removed).
SearchIndex interface + ClickHouseSearchIndex impl kept — those are
live and used by SearchService + ExchangeMatchEvaluator.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,5 +0,0 @@
|
||||
package com.cameleer.server.core.indexing;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
public record ExecutionUpdatedEvent(String executionId, Instant startTime) {}
|
||||
@@ -1,143 +0,0 @@
|
||||
package com.cameleer.server.core.indexing;
|
||||
|
||||
import com.cameleer.server.core.storage.ExecutionStore;
|
||||
import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord;
|
||||
import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord;
|
||||
import com.cameleer.server.core.storage.SearchIndex;
|
||||
import com.cameleer.server.core.storage.model.ExecutionDocument;
|
||||
import com.cameleer.server.core.storage.model.ExecutionDocument.ProcessorDoc;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class SearchIndexer implements SearchIndexerStats {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SearchIndexer.class);
|
||||
|
||||
private final ExecutionStore executionStore;
|
||||
private final SearchIndex searchIndex;
|
||||
private final long debounceMs;
|
||||
private final int queueCapacity;
|
||||
|
||||
private final Map<String, ScheduledFuture<?>> pending = new ConcurrentHashMap<>();
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
|
||||
r -> { Thread t = new Thread(r, "search-indexer"); t.setDaemon(true); return t; });
|
||||
|
||||
private final AtomicLong failedCount = new AtomicLong();
|
||||
private final AtomicLong indexedCount = new AtomicLong();
|
||||
private volatile Instant lastIndexedAt;
|
||||
|
||||
private final AtomicLong rateWindowStartMs = new AtomicLong(System.currentTimeMillis());
|
||||
private final AtomicLong rateWindowCount = new AtomicLong();
|
||||
private volatile double lastRate;
|
||||
|
||||
public SearchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
|
||||
long debounceMs, int queueCapacity) {
|
||||
this.executionStore = executionStore;
|
||||
this.searchIndex = searchIndex;
|
||||
this.debounceMs = debounceMs;
|
||||
this.queueCapacity = queueCapacity;
|
||||
}
|
||||
|
||||
public void onExecutionUpdated(ExecutionUpdatedEvent event) {
|
||||
if (pending.size() >= queueCapacity) {
|
||||
log.warn("Search indexer queue full, dropping event for {}", event.executionId());
|
||||
return;
|
||||
}
|
||||
|
||||
ScheduledFuture<?> existing = pending.put(event.executionId(),
|
||||
scheduler.schedule(() -> indexExecution(event.executionId()),
|
||||
debounceMs, TimeUnit.MILLISECONDS));
|
||||
if (existing != null) {
|
||||
existing.cancel(false);
|
||||
}
|
||||
}
|
||||
|
||||
private void indexExecution(String executionId) {
|
||||
pending.remove(executionId);
|
||||
try {
|
||||
ExecutionRecord exec = executionStore.findById(executionId).orElse(null);
|
||||
if (exec == null) return;
|
||||
|
||||
List<ProcessorRecord> processors = executionStore.findProcessors(executionId);
|
||||
List<ProcessorDoc> processorDocs = processors.stream()
|
||||
.map(p -> new ProcessorDoc(
|
||||
p.processorId(), p.processorType(), p.status(),
|
||||
p.errorMessage(), p.errorStacktrace(),
|
||||
p.inputBody(), p.outputBody(),
|
||||
p.inputHeaders(), p.outputHeaders(),
|
||||
p.attributes()))
|
||||
.toList();
|
||||
|
||||
searchIndex.index(new ExecutionDocument(
|
||||
exec.executionId(), exec.routeId(), exec.instanceId(), exec.applicationId(),
|
||||
exec.status(), exec.correlationId(), exec.exchangeId(),
|
||||
exec.startTime(), exec.endTime(), exec.durationMs(),
|
||||
exec.errorMessage(), exec.errorStacktrace(), processorDocs,
|
||||
exec.attributes(), exec.hasTraceData(), exec.isReplay()));
|
||||
|
||||
indexedCount.incrementAndGet();
|
||||
lastIndexedAt = Instant.now();
|
||||
updateRate();
|
||||
} catch (Exception e) {
|
||||
failedCount.incrementAndGet();
|
||||
log.error("Failed to index execution {}", executionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateRate() {
|
||||
long now = System.currentTimeMillis();
|
||||
long windowStart = rateWindowStartMs.get();
|
||||
long count = rateWindowCount.incrementAndGet();
|
||||
long elapsed = now - windowStart;
|
||||
if (elapsed >= 15_000) { // 15-second window
|
||||
lastRate = count / (elapsed / 1000.0);
|
||||
rateWindowStartMs.set(now);
|
||||
rateWindowCount.set(0);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueueDepth() {
|
||||
return pending.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxQueueSize() {
|
||||
return queueCapacity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFailedCount() {
|
||||
return failedCount.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getIndexedCount() {
|
||||
return indexedCount.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Instant getLastIndexedAt() {
|
||||
return lastIndexedAt;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDebounceMs() {
|
||||
return debounceMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getIndexingRate() {
|
||||
return lastRate;
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
scheduler.shutdown();
|
||||
}
|
||||
}
|
||||
@@ -1,14 +0,0 @@
|
||||
package com.cameleer.server.core.indexing;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
public interface SearchIndexerStats {
|
||||
int getQueueDepth();
|
||||
int getMaxQueueSize();
|
||||
long getFailedCount();
|
||||
long getIndexedCount();
|
||||
Instant getLastIndexedAt();
|
||||
long getDebounceMs();
|
||||
/** Approximate indexing rate in docs/sec over last measurement window */
|
||||
double getIndexingRate();
|
||||
}
|
||||
Reference in New Issue
Block a user