From b8d4b59f4014880acbddecb00d94db614033230c Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 22 Apr 2026 16:24:27 +0200 Subject: [PATCH] alerting(eval): AlertEvaluatorJob persists advanced cursor via withEvalState Thread EvalResult.Batch.nextEvalState into releaseClaim so the composite cursor from Task 1.5 actually lands in rule.evalState across tick boundaries. Guards against empty-batch wipe (would regress to first-run scan). --- .../app/alerting/eval/AlertEvaluatorJob.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java index cecaace8..1aa80861 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java @@ -112,6 +112,7 @@ public class AlertEvaluatorJob implements SchedulingConfigurer { for (AlertRule rule : claimed) { Instant nextRun = Instant.now(clock).plusSeconds(rule.evaluationIntervalSeconds()); + Map nextEvalState = rule.evalState(); try { if (circuitBreaker.isOpen(rule.conditionKind())) { log.debug("Circuit breaker open for {}; skipping rule {}", rule.conditionKind(), rule.id()); @@ -119,14 +120,25 @@ public class AlertEvaluatorJob implements SchedulingConfigurer { } EvalResult result = metrics.evalDuration(rule.conditionKind()) .recordCallable(() -> evaluateSafely(rule, ctx)); - applyResult(rule, result); + if (result instanceof EvalResult.Batch b) { + applyResult(rule, b); + // Guard: only advance cursor when the batch returned one. + // Empty ticks (no matches) return Map.of() — overwriting would + // wipe the persisted cursor and force the next tick to re-scan + // from rule.createdAt, re-alerting on every historical exchange. + if (!b.nextEvalState().isEmpty()) { + nextEvalState = b.nextEvalState(); + } + } else { + applyResult(rule, result); + } circuitBreaker.recordSuccess(rule.conditionKind()); } catch (Exception e) { metrics.evalError(rule.conditionKind(), rule.id()); circuitBreaker.recordFailure(rule.conditionKind()); log.warn("Evaluator error for rule {} ({}): {}", rule.id(), rule.conditionKind(), e.toString()); } finally { - reschedule(rule, nextRun); + reschedule(rule.withEvalState(nextEvalState), nextRun); } }