diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java index 1aa80861..f602f034 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java @@ -47,6 +47,7 @@ public class AlertEvaluatorJob implements SchedulingConfigurer { private final NotificationContextBuilder contextBuilder; private final EnvironmentRepository environmentRepo; private final ObjectMapper objectMapper; + private final BatchResultApplier batchResultApplier; private final String instanceId; private final String tenantId; private final Clock clock; @@ -64,26 +65,28 @@ public class AlertEvaluatorJob implements SchedulingConfigurer { NotificationContextBuilder contextBuilder, EnvironmentRepository environmentRepo, ObjectMapper objectMapper, + BatchResultApplier batchResultApplier, @Qualifier("alertingInstanceId") String instanceId, @Value("${cameleer.server.tenant.id:default}") String tenantId, Clock alertingClock, AlertingMetrics metrics) { - this.props = props; - this.ruleRepo = ruleRepo; - this.instanceRepo = instanceRepo; - this.notificationRepo = notificationRepo; - this.evaluators = evaluatorList.stream() + this.props = props; + this.ruleRepo = ruleRepo; + this.instanceRepo = instanceRepo; + this.notificationRepo = notificationRepo; + this.evaluators = evaluatorList.stream() .collect(Collectors.toMap(ConditionEvaluator::kind, e -> e)); - this.circuitBreaker = circuitBreaker; - this.renderer = renderer; - this.contextBuilder = contextBuilder; - this.environmentRepo = environmentRepo; - this.objectMapper = objectMapper; - this.instanceId = instanceId; - this.tenantId = tenantId; - this.clock = alertingClock; - this.metrics = metrics; + this.circuitBreaker = circuitBreaker; + this.renderer = renderer; + this.contextBuilder = contextBuilder; + this.environmentRepo = environmentRepo; + this.objectMapper = objectMapper; + this.batchResultApplier = batchResultApplier; + this.instanceId = instanceId; + this.tenantId = tenantId; + this.clock = alertingClock; + this.metrics = metrics; } // ------------------------------------------------------------------------- @@ -112,33 +115,61 @@ public class AlertEvaluatorJob implements SchedulingConfigurer { for (AlertRule rule : claimed) { Instant nextRun = Instant.now(clock).plusSeconds(rule.evaluationIntervalSeconds()); - Map nextEvalState = rule.evalState(); + if (circuitBreaker.isOpen(rule.conditionKind())) { + log.debug("Circuit breaker open for {}; skipping rule {}", rule.conditionKind(), rule.id()); + reschedule(rule, nextRun); + continue; + } + + EvalResult result; try { - if (circuitBreaker.isOpen(rule.conditionKind())) { - log.debug("Circuit breaker open for {}; skipping rule {}", rule.conditionKind(), rule.id()); - continue; - } - EvalResult result = metrics.evalDuration(rule.conditionKind()) + result = metrics.evalDuration(rule.conditionKind()) .recordCallable(() -> evaluateSafely(rule, ctx)); - if (result instanceof EvalResult.Batch b) { - applyResult(rule, b); - // Guard: only advance cursor when the batch returned one. - // Empty ticks (no matches) return Map.of() — overwriting would - // wipe the persisted cursor and force the next tick to re-scan - // from rule.createdAt, re-alerting on every historical exchange. - if (!b.nextEvalState().isEmpty()) { - nextEvalState = b.nextEvalState(); - } - } else { - applyResult(rule, result); - } - circuitBreaker.recordSuccess(rule.conditionKind()); } catch (Exception e) { metrics.evalError(rule.conditionKind(), rule.id()); circuitBreaker.recordFailure(rule.conditionKind()); log.warn("Evaluator error for rule {} ({}): {}", rule.id(), rule.conditionKind(), e.toString()); - } finally { - reschedule(rule.withEvalState(nextEvalState), nextRun); + // Evaluation itself failed — release the claim so the rule can be + // retried on the next tick. Cursor stays put. + reschedule(rule, nextRun); + continue; + } + + if (result instanceof EvalResult.Batch b) { + // Phase 2: the Batch path is atomic. The @Transactional apply() on + // BatchResultApplier wraps instance writes, notification enqueues, + // AND the cursor advance + releaseClaim into a single tx. A + // mid-batch fault rolls everything back — including the cursor — + // so the next tick replays the whole batch exactly once. + try { + batchResultApplier.apply(rule, b, nextRun); + circuitBreaker.recordSuccess(rule.conditionKind()); + } catch (Exception e) { + metrics.evalError(rule.conditionKind(), rule.id()); + circuitBreaker.recordFailure(rule.conditionKind()); + log.warn("Batch apply failed for rule {} ({}): {} — rolling back; next tick will retry", + rule.id(), rule.conditionKind(), e.toString()); + // The transaction rolled back. Do NOT call reschedule here — + // leaving claim + next_evaluation_at as they were means the + // claim TTL takes over and the rule becomes due on its own. + // Rethrowing is unnecessary for correctness — the cursor + // stayed put, so exactly-once-per-exchange is preserved. + } + } else { + // Non-Batch path (FIRING / Clear / Error): classic apply + rule + // reschedule. Not wrapped in a single tx — semantics unchanged + // from pre-Phase-2. + try { + applyResult(rule, result); + circuitBreaker.recordSuccess(rule.conditionKind()); + } catch (Exception e) { + metrics.evalError(rule.conditionKind(), rule.id()); + circuitBreaker.recordFailure(rule.conditionKind()); + log.warn("applyResult failed for rule {} ({}): {}", + rule.id(), rule.conditionKind(), e.toString()); + } finally { + reschedule(rule, nextRun); + } } } @@ -183,14 +214,10 @@ public class AlertEvaluatorJob implements SchedulingConfigurer { // ------------------------------------------------------------------------- private void applyResult(AlertRule rule, EvalResult result) { - if (result instanceof EvalResult.Batch b) { - // PER_EXCHANGE mode: each Firing in the batch creates its own AlertInstance - for (EvalResult.Firing f : b.firings()) { - applyBatchFiring(rule, f); - } - return; - } - + // Note: the Batch path is handled by BatchResultApplier (transactional) — + // tick() routes Batch results there directly and never calls applyResult + // for them. This method only handles FIRING / Clear / Error state-machine + // transitions for the classic (non-PER_EXCHANGE) path. AlertInstance current = instanceRepo.findOpenForRule(rule.id()).orElse(null); Instant now = Instant.now(clock); @@ -211,19 +238,6 @@ public class AlertEvaluatorJob implements SchedulingConfigurer { }); } - /** - * Batch (PER_EXCHANGE) mode: always create a fresh FIRING instance per Firing entry. - * No forDuration check — each exchange is its own event. - */ - private void applyBatchFiring(AlertRule rule, EvalResult.Firing f) { - Instant now = Instant.now(clock); - AlertInstance instance = AlertStateTransitions.newInstance(rule, f, AlertState.FIRING, now) - .withRuleSnapshot(snapshotRule(rule)); - AlertInstance enriched = enrichTitleMessage(rule, instance); - AlertInstance persisted = instanceRepo.save(enriched); - enqueueNotifications(rule, persisted, now); - } - // ------------------------------------------------------------------------- // Title / message rendering // ------------------------------------------------------------------------- diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/BatchResultApplier.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/BatchResultApplier.java new file mode 100644 index 00000000..5b44b06c --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/BatchResultApplier.java @@ -0,0 +1,144 @@ +package com.cameleer.server.app.alerting.eval; + +import com.cameleer.server.app.alerting.notify.MustacheRenderer; +import com.cameleer.server.app.alerting.notify.NotificationContextBuilder; +import com.cameleer.server.core.alerting.*; +import com.cameleer.server.core.runtime.Environment; +import com.cameleer.server.core.runtime.EnvironmentRepository; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.Clock; +import java.time.Instant; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; + +/** + * Applies a {@link EvalResult.Batch} result to persistent state inside a single + * transaction: instance writes, notification enqueues, and the rule's cursor + * advance + {@code releaseClaim} either all commit or all roll back together. + *

+ * Lives in its own bean so the {@code @Transactional} annotation engages via the + * Spring proxy when invoked from {@link AlertEvaluatorJob#tick()}; calling it as + * {@code this.apply(...)} from {@code AlertEvaluatorJob} (a bean calling its own + * method) would bypass the proxy and silently disable the transaction. + *

+ * Phase 2 of the per-exchange exactly-once plan (see + * {@code docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md}). + */ +@Component +public class BatchResultApplier { + + private static final Logger log = LoggerFactory.getLogger(BatchResultApplier.class); + + private final AlertRuleRepository ruleRepo; + private final AlertInstanceRepository instanceRepo; + private final AlertNotificationRepository notificationRepo; + private final MustacheRenderer renderer; + private final NotificationContextBuilder contextBuilder; + private final EnvironmentRepository environmentRepo; + private final ObjectMapper objectMapper; + private final Clock clock; + + public BatchResultApplier( + AlertRuleRepository ruleRepo, + AlertInstanceRepository instanceRepo, + AlertNotificationRepository notificationRepo, + MustacheRenderer renderer, + NotificationContextBuilder contextBuilder, + EnvironmentRepository environmentRepo, + ObjectMapper objectMapper, + Clock alertingClock) { + this.ruleRepo = ruleRepo; + this.instanceRepo = instanceRepo; + this.notificationRepo = notificationRepo; + this.renderer = renderer; + this.contextBuilder = contextBuilder; + this.environmentRepo = environmentRepo; + this.objectMapper = objectMapper; + this.clock = alertingClock; + } + + /** + * Atomically apply a Batch result for a single rule: + *

    + *
  1. persist a FIRING instance per firing + enqueue its notifications
  2. + *
  3. advance the rule's cursor ({@code evalState}) iff the batch supplied one
  4. + *
  5. release the claim with the new {@code nextRun} + {@code evalState}
  6. + *
+ * Any exception thrown from the repo calls rolls back every write — including + * the cursor advance — so the rule is replayable on the next tick. + */ + @Transactional + public void apply(AlertRule rule, EvalResult.Batch batch, Instant nextRun) { + for (EvalResult.Firing f : batch.firings()) { + applyBatchFiring(rule, f); + } + Map nextEvalState = + batch.nextEvalState().isEmpty() ? rule.evalState() : batch.nextEvalState(); + ruleRepo.releaseClaim(rule.id(), nextRun, nextEvalState); + } + + /** + * Batch (PER_EXCHANGE) mode: always create a fresh FIRING instance per Firing entry. + * No forDuration check — each exchange is its own event. + */ + private void applyBatchFiring(AlertRule rule, EvalResult.Firing f) { + Instant now = Instant.now(clock); + AlertInstance instance = AlertStateTransitions.newInstance(rule, f, AlertState.FIRING, now) + .withRuleSnapshot(snapshotRule(rule)); + AlertInstance enriched = enrichTitleMessage(rule, instance); + AlertInstance persisted = instanceRepo.save(enriched); + enqueueNotifications(rule, persisted, now); + } + + private AlertInstance enrichTitleMessage(AlertRule rule, AlertInstance instance) { + Environment env = environmentRepo.findById(rule.environmentId()).orElse(null); + Map ctx = contextBuilder.build(rule, instance, env, null); + String title = renderer.render(rule.notificationTitleTmpl(), ctx); + String message = renderer.render(rule.notificationMessageTmpl(), ctx); + return instance.withTitleMessage(title, message); + } + + private void enqueueNotifications(AlertRule rule, AlertInstance instance, Instant now) { + for (WebhookBinding w : rule.webhooks()) { + Map payload = buildPayload(rule, instance); + notificationRepo.save(new AlertNotification( + UUID.randomUUID(), + instance.id(), + w.id(), + w.outboundConnectionId(), + NotificationStatus.PENDING, + 0, + now, + null, null, null, null, + payload, + null, + now)); + } + } + + private Map buildPayload(AlertRule rule, AlertInstance instance) { + Environment env = environmentRepo.findById(rule.environmentId()).orElse(null); + return contextBuilder.build(rule, instance, env, null); + } + + @SuppressWarnings("unchecked") + private Map snapshotRule(AlertRule rule) { + try { + Map raw = objectMapper.convertValue(rule, Map.class); + // Map.copyOf (used in AlertInstance compact ctor) rejects null values — + // strip them so the snapshot is safe to store. + Map safe = new LinkedHashMap<>(); + raw.forEach((k, v) -> { if (v != null) safe.put(k, v); }); + return safe; + } catch (Exception e) { + log.warn("Failed to snapshot rule {}: {}", rule.id(), e.getMessage()); + return Map.of("id", rule.id().toString(), "name", rule.name()); + } + } +} diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java index 6d82579f..550b4684 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java @@ -28,7 +28,6 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.when; /** @@ -441,6 +440,7 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT { } catch (RuntimeException expectedAfterPhase2) { // Phase 2 may choose to rethrow; either way the rollback assertions // below are what pin the contract. + // intentionally empty — fault-injection swallow/rethrow tolerance; see comment above } // Post-rollback: zero instances, zero notifications, cursor unchanged,