diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java index cecaace8..1aa80861 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java @@ -112,6 +112,7 @@ public class AlertEvaluatorJob implements SchedulingConfigurer { for (AlertRule rule : claimed) { Instant nextRun = Instant.now(clock).plusSeconds(rule.evaluationIntervalSeconds()); + Map nextEvalState = rule.evalState(); try { if (circuitBreaker.isOpen(rule.conditionKind())) { log.debug("Circuit breaker open for {}; skipping rule {}", rule.conditionKind(), rule.id()); @@ -119,14 +120,25 @@ public class AlertEvaluatorJob implements SchedulingConfigurer { } EvalResult result = metrics.evalDuration(rule.conditionKind()) .recordCallable(() -> evaluateSafely(rule, ctx)); - applyResult(rule, result); + if (result instanceof EvalResult.Batch b) { + applyResult(rule, b); + // Guard: only advance cursor when the batch returned one. + // Empty ticks (no matches) return Map.of() — overwriting would + // wipe the persisted cursor and force the next tick to re-scan + // from rule.createdAt, re-alerting on every historical exchange. + if (!b.nextEvalState().isEmpty()) { + nextEvalState = b.nextEvalState(); + } + } else { + applyResult(rule, result); + } circuitBreaker.recordSuccess(rule.conditionKind()); } catch (Exception e) { metrics.evalError(rule.conditionKind(), rule.id()); circuitBreaker.recordFailure(rule.conditionKind()); log.warn("Evaluator error for rule {} ({}): {}", rule.id(), rule.conditionKind(), e.toString()); } finally { - reschedule(rule, nextRun); + reschedule(rule.withEvalState(nextEvalState), nextRun); } }