feat: add ThresholdConfig, ThresholdRepository, SearchIndexerStats, and instrument SearchIndexer
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,36 @@
|
||||
package com.cameleer3.server.core.admin;
|
||||
|
||||
public record ThresholdConfig(
|
||||
DatabaseThresholds database,
|
||||
OpenSearchThresholds opensearch
|
||||
) {
|
||||
public record DatabaseThresholds(
|
||||
int connectionPoolWarning,
|
||||
int connectionPoolCritical,
|
||||
double queryDurationWarning,
|
||||
double queryDurationCritical
|
||||
) {
|
||||
public static DatabaseThresholds defaults() {
|
||||
return new DatabaseThresholds(80, 95, 1.0, 10.0);
|
||||
}
|
||||
}
|
||||
|
||||
public record OpenSearchThresholds(
|
||||
String clusterHealthWarning,
|
||||
String clusterHealthCritical,
|
||||
int queueDepthWarning,
|
||||
int queueDepthCritical,
|
||||
int jvmHeapWarning,
|
||||
int jvmHeapCritical,
|
||||
int failedDocsWarning,
|
||||
int failedDocsCritical
|
||||
) {
|
||||
public static OpenSearchThresholds defaults() {
|
||||
return new OpenSearchThresholds("YELLOW", "RED", 100, 500, 75, 90, 1, 10);
|
||||
}
|
||||
}
|
||||
|
||||
public static ThresholdConfig defaults() {
|
||||
return new ThresholdConfig(DatabaseThresholds.defaults(), OpenSearchThresholds.defaults());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package com.cameleer3.server.core.admin;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface ThresholdRepository {
|
||||
Optional<ThresholdConfig> find();
|
||||
void save(ThresholdConfig config, String updatedBy);
|
||||
}
|
||||
@@ -9,11 +9,13 @@ import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class SearchIndexer {
|
||||
public class SearchIndexer implements SearchIndexerStats {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SearchIndexer.class);
|
||||
|
||||
@@ -26,6 +28,14 @@ public class SearchIndexer {
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
|
||||
r -> { Thread t = new Thread(r, "search-indexer"); t.setDaemon(true); return t; });
|
||||
|
||||
private final AtomicLong failedCount = new AtomicLong();
|
||||
private final AtomicLong indexedCount = new AtomicLong();
|
||||
private volatile Instant lastIndexedAt;
|
||||
|
||||
private final AtomicLong rateWindowStartMs = new AtomicLong(System.currentTimeMillis());
|
||||
private final AtomicLong rateWindowCount = new AtomicLong();
|
||||
private volatile double lastRate;
|
||||
|
||||
public SearchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
|
||||
long debounceMs, int queueCapacity) {
|
||||
this.executionStore = executionStore;
|
||||
@@ -68,11 +78,63 @@ public class SearchIndexer {
|
||||
exec.status(), exec.correlationId(), exec.exchangeId(),
|
||||
exec.startTime(), exec.endTime(), exec.durationMs(),
|
||||
exec.errorMessage(), exec.errorStacktrace(), processorDocs));
|
||||
|
||||
indexedCount.incrementAndGet();
|
||||
lastIndexedAt = Instant.now();
|
||||
updateRate();
|
||||
} catch (Exception e) {
|
||||
failedCount.incrementAndGet();
|
||||
log.error("Failed to index execution {}", executionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateRate() {
|
||||
long now = System.currentTimeMillis();
|
||||
long windowStart = rateWindowStartMs.get();
|
||||
long count = rateWindowCount.incrementAndGet();
|
||||
long elapsed = now - windowStart;
|
||||
if (elapsed >= 15_000) { // 15-second window
|
||||
lastRate = count / (elapsed / 1000.0);
|
||||
rateWindowStartMs.set(now);
|
||||
rateWindowCount.set(0);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueueDepth() {
|
||||
return pending.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxQueueSize() {
|
||||
return queueCapacity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFailedCount() {
|
||||
return failedCount.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getIndexedCount() {
|
||||
return indexedCount.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Instant getLastIndexedAt() {
|
||||
return lastIndexedAt;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDebounceMs() {
|
||||
return debounceMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getIndexingRate() {
|
||||
return lastRate;
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
scheduler.shutdown();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
package com.cameleer3.server.core.indexing;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
public interface SearchIndexerStats {
|
||||
int getQueueDepth();
|
||||
int getMaxQueueSize();
|
||||
long getFailedCount();
|
||||
long getIndexedCount();
|
||||
Instant getLastIndexedAt();
|
||||
long getDebounceMs();
|
||||
/** Approximate indexing rate in docs/sec over last measurement window */
|
||||
double getIndexingRate();
|
||||
}
|
||||
Reference in New Issue
Block a user