From 4d33592015c0e43290bd92f7e1ca829ee214f16c Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 17 Mar 2026 15:43:16 +0100 Subject: [PATCH] feat: add ThresholdConfig, ThresholdRepository, SearchIndexerStats, and instrument SearchIndexer Co-Authored-By: Claude Opus 4.6 (1M context) --- .../server/core/admin/ThresholdConfig.java | 36 +++++++++++ .../core/admin/ThresholdRepository.java | 8 +++ .../server/core/indexing/SearchIndexer.java | 64 ++++++++++++++++++- .../core/indexing/SearchIndexerStats.java | 14 ++++ 4 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/admin/ThresholdConfig.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/admin/ThresholdRepository.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexerStats.java diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/admin/ThresholdConfig.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/admin/ThresholdConfig.java new file mode 100644 index 00000000..58714c28 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/admin/ThresholdConfig.java @@ -0,0 +1,36 @@ +package com.cameleer3.server.core.admin; + +public record ThresholdConfig( + DatabaseThresholds database, + OpenSearchThresholds opensearch +) { + public record DatabaseThresholds( + int connectionPoolWarning, + int connectionPoolCritical, + double queryDurationWarning, + double queryDurationCritical + ) { + public static DatabaseThresholds defaults() { + return new DatabaseThresholds(80, 95, 1.0, 10.0); + } + } + + public record OpenSearchThresholds( + String clusterHealthWarning, + String clusterHealthCritical, + int queueDepthWarning, + int queueDepthCritical, + int jvmHeapWarning, + int jvmHeapCritical, + int failedDocsWarning, + int failedDocsCritical + ) { + public static OpenSearchThresholds defaults() { + return new OpenSearchThresholds("YELLOW", "RED", 100, 500, 75, 90, 1, 10); + } + } + + public static ThresholdConfig defaults() { + return new ThresholdConfig(DatabaseThresholds.defaults(), OpenSearchThresholds.defaults()); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/admin/ThresholdRepository.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/admin/ThresholdRepository.java new file mode 100644 index 00000000..2e9a02f9 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/admin/ThresholdRepository.java @@ -0,0 +1,8 @@ +package com.cameleer3.server.core.admin; + +import java.util.Optional; + +public interface ThresholdRepository { + Optional find(); + void save(ThresholdConfig config, String updatedBy); +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java index 6cff9e8d..f616e35d 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java @@ -9,11 +9,13 @@ import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.List; import java.util.Map; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; -public class SearchIndexer { +public class SearchIndexer implements SearchIndexerStats { private static final Logger log = LoggerFactory.getLogger(SearchIndexer.class); @@ -26,6 +28,14 @@ public class SearchIndexer { private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( r -> { Thread t = new Thread(r, "search-indexer"); t.setDaemon(true); return t; }); + private final AtomicLong failedCount = new AtomicLong(); + private final AtomicLong indexedCount = new AtomicLong(); + private volatile Instant lastIndexedAt; + + private final AtomicLong rateWindowStartMs = new AtomicLong(System.currentTimeMillis()); + private final AtomicLong rateWindowCount = new AtomicLong(); + private volatile double lastRate; + public SearchIndexer(ExecutionStore executionStore, SearchIndex searchIndex, long debounceMs, int queueCapacity) { this.executionStore = executionStore; @@ -68,11 +78,63 @@ public class SearchIndexer { exec.status(), exec.correlationId(), exec.exchangeId(), exec.startTime(), exec.endTime(), exec.durationMs(), exec.errorMessage(), exec.errorStacktrace(), processorDocs)); + + indexedCount.incrementAndGet(); + lastIndexedAt = Instant.now(); + updateRate(); } catch (Exception e) { + failedCount.incrementAndGet(); log.error("Failed to index execution {}", executionId, e); } } + private void updateRate() { + long now = System.currentTimeMillis(); + long windowStart = rateWindowStartMs.get(); + long count = rateWindowCount.incrementAndGet(); + long elapsed = now - windowStart; + if (elapsed >= 15_000) { // 15-second window + lastRate = count / (elapsed / 1000.0); + rateWindowStartMs.set(now); + rateWindowCount.set(0); + } + } + + @Override + public int getQueueDepth() { + return pending.size(); + } + + @Override + public int getMaxQueueSize() { + return queueCapacity; + } + + @Override + public long getFailedCount() { + return failedCount.get(); + } + + @Override + public long getIndexedCount() { + return indexedCount.get(); + } + + @Override + public Instant getLastIndexedAt() { + return lastIndexedAt; + } + + @Override + public long getDebounceMs() { + return debounceMs; + } + + @Override + public double getIndexingRate() { + return lastRate; + } + public void shutdown() { scheduler.shutdown(); } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexerStats.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexerStats.java new file mode 100644 index 00000000..e743fe9d --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexerStats.java @@ -0,0 +1,14 @@ +package com.cameleer3.server.core.indexing; + +import java.time.Instant; + +public interface SearchIndexerStats { + int getQueueDepth(); + int getMaxQueueSize(); + long getFailedCount(); + long getIndexedCount(); + Instant getLastIndexedAt(); + long getDebounceMs(); + /** Approximate indexing rate in docs/sec over last measurement window */ + double getIndexingRate(); +}