alerting(eval): AlertEvaluatorJob persists advanced cursor via withEvalState

Thread EvalResult.Batch.nextEvalState into releaseClaim so the composite
cursor from Task 1.5 actually lands in rule.evalState across tick boundaries.
Guards against empty-batch wipe (would regress to first-run scan).
This commit is contained in:
hsiegeln
2026-04-22 16:24:27 +02:00
parent 850c030642
commit b8d4b59f40

View File

@@ -112,6 +112,7 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
for (AlertRule rule : claimed) {
Instant nextRun = Instant.now(clock).plusSeconds(rule.evaluationIntervalSeconds());
Map<String, Object> nextEvalState = rule.evalState();
try {
if (circuitBreaker.isOpen(rule.conditionKind())) {
log.debug("Circuit breaker open for {}; skipping rule {}", rule.conditionKind(), rule.id());
@@ -119,14 +120,25 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
}
EvalResult result = metrics.evalDuration(rule.conditionKind())
.recordCallable(() -> evaluateSafely(rule, ctx));
applyResult(rule, result);
if (result instanceof EvalResult.Batch b) {
applyResult(rule, b);
// Guard: only advance cursor when the batch returned one.
// Empty ticks (no matches) return Map.of() — overwriting would
// wipe the persisted cursor and force the next tick to re-scan
// from rule.createdAt, re-alerting on every historical exchange.
if (!b.nextEvalState().isEmpty()) {
nextEvalState = b.nextEvalState();
}
} else {
applyResult(rule, result);
}
circuitBreaker.recordSuccess(rule.conditionKind());
} catch (Exception e) {
metrics.evalError(rule.conditionKind(), rule.id());
circuitBreaker.recordFailure(rule.conditionKind());
log.warn("Evaluator error for rule {} ({}): {}", rule.id(), rule.conditionKind(), e.toString());
} finally {
reschedule(rule, nextRun);
reschedule(rule.withEvalState(nextEvalState), nextRun);
}
}