alerting(eval): atomic per-rule batch commit via @Transactional — Phase 2 close

Wraps instance writes, notification enqueues, and cursor advance in one
transactional boundary per rule tick. Rollback leaves the rule replayable
on next tick. Turns the Phase 2 atomicity IT green (see AlertEvaluatorJobIT
#tickRollback_faultOnSecondNotificationInsert_leavesCursorUnchanged).
This commit is contained in:
hsiegeln
2026-04-22 17:03:07 +02:00
parent 989dde23eb
commit ba4e2bb68f
3 changed files with 215 additions and 57 deletions

View File

@@ -47,6 +47,7 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
private final NotificationContextBuilder contextBuilder;
private final EnvironmentRepository environmentRepo;
private final ObjectMapper objectMapper;
private final BatchResultApplier batchResultApplier;
private final String instanceId;
private final String tenantId;
private final Clock clock;
@@ -64,6 +65,7 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
NotificationContextBuilder contextBuilder,
EnvironmentRepository environmentRepo,
ObjectMapper objectMapper,
BatchResultApplier batchResultApplier,
@Qualifier("alertingInstanceId") String instanceId,
@Value("${cameleer.server.tenant.id:default}") String tenantId,
Clock alertingClock,
@@ -80,6 +82,7 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
this.contextBuilder = contextBuilder;
this.environmentRepo = environmentRepo;
this.objectMapper = objectMapper;
this.batchResultApplier = batchResultApplier;
this.instanceId = instanceId;
this.tenantId = tenantId;
this.clock = alertingClock;
@@ -112,33 +115,61 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
for (AlertRule rule : claimed) {
Instant nextRun = Instant.now(clock).plusSeconds(rule.evaluationIntervalSeconds());
Map<String, Object> nextEvalState = rule.evalState();
try {
if (circuitBreaker.isOpen(rule.conditionKind())) {
log.debug("Circuit breaker open for {}; skipping rule {}", rule.conditionKind(), rule.id());
reschedule(rule, nextRun);
continue;
}
EvalResult result = metrics.evalDuration(rule.conditionKind())
EvalResult result;
try {
result = metrics.evalDuration(rule.conditionKind())
.recordCallable(() -> evaluateSafely(rule, ctx));
if (result instanceof EvalResult.Batch b) {
applyResult(rule, b);
// Guard: only advance cursor when the batch returned one.
// Empty ticks (no matches) return Map.of() — overwriting would
// wipe the persisted cursor and force the next tick to re-scan
// from rule.createdAt, re-alerting on every historical exchange.
if (!b.nextEvalState().isEmpty()) {
nextEvalState = b.nextEvalState();
}
} else {
applyResult(rule, result);
}
circuitBreaker.recordSuccess(rule.conditionKind());
} catch (Exception e) {
metrics.evalError(rule.conditionKind(), rule.id());
circuitBreaker.recordFailure(rule.conditionKind());
log.warn("Evaluator error for rule {} ({}): {}", rule.id(), rule.conditionKind(), e.toString());
// Evaluation itself failed — release the claim so the rule can be
// retried on the next tick. Cursor stays put.
reschedule(rule, nextRun);
continue;
}
if (result instanceof EvalResult.Batch b) {
// Phase 2: the Batch path is atomic. The @Transactional apply() on
// BatchResultApplier wraps instance writes, notification enqueues,
// AND the cursor advance + releaseClaim into a single tx. A
// mid-batch fault rolls everything back — including the cursor —
// so the next tick replays the whole batch exactly once.
try {
batchResultApplier.apply(rule, b, nextRun);
circuitBreaker.recordSuccess(rule.conditionKind());
} catch (Exception e) {
metrics.evalError(rule.conditionKind(), rule.id());
circuitBreaker.recordFailure(rule.conditionKind());
log.warn("Batch apply failed for rule {} ({}): {} — rolling back; next tick will retry",
rule.id(), rule.conditionKind(), e.toString());
// The transaction rolled back. Do NOT call reschedule here —
// leaving claim + next_evaluation_at as they were means the
// claim TTL takes over and the rule becomes due on its own.
// Rethrowing is unnecessary for correctness — the cursor
// stayed put, so exactly-once-per-exchange is preserved.
}
} else {
// Non-Batch path (FIRING / Clear / Error): classic apply + rule
// reschedule. Not wrapped in a single tx — semantics unchanged
// from pre-Phase-2.
try {
applyResult(rule, result);
circuitBreaker.recordSuccess(rule.conditionKind());
} catch (Exception e) {
metrics.evalError(rule.conditionKind(), rule.id());
circuitBreaker.recordFailure(rule.conditionKind());
log.warn("applyResult failed for rule {} ({}): {}",
rule.id(), rule.conditionKind(), e.toString());
} finally {
reschedule(rule.withEvalState(nextEvalState), nextRun);
reschedule(rule, nextRun);
}
}
}
@@ -183,14 +214,10 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
// -------------------------------------------------------------------------
private void applyResult(AlertRule rule, EvalResult result) {
if (result instanceof EvalResult.Batch b) {
// PER_EXCHANGE mode: each Firing in the batch creates its own AlertInstance
for (EvalResult.Firing f : b.firings()) {
applyBatchFiring(rule, f);
}
return;
}
// Note: the Batch path is handled by BatchResultApplier (transactional) —
// tick() routes Batch results there directly and never calls applyResult
// for them. This method only handles FIRING / Clear / Error state-machine
// transitions for the classic (non-PER_EXCHANGE) path.
AlertInstance current = instanceRepo.findOpenForRule(rule.id()).orElse(null);
Instant now = Instant.now(clock);
@@ -211,19 +238,6 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
});
}
/**
* Batch (PER_EXCHANGE) mode: always create a fresh FIRING instance per Firing entry.
* No forDuration check — each exchange is its own event.
*/
private void applyBatchFiring(AlertRule rule, EvalResult.Firing f) {
Instant now = Instant.now(clock);
AlertInstance instance = AlertStateTransitions.newInstance(rule, f, AlertState.FIRING, now)
.withRuleSnapshot(snapshotRule(rule));
AlertInstance enriched = enrichTitleMessage(rule, instance);
AlertInstance persisted = instanceRepo.save(enriched);
enqueueNotifications(rule, persisted, now);
}
// -------------------------------------------------------------------------
// Title / message rendering
// -------------------------------------------------------------------------

View File

@@ -0,0 +1,144 @@
package com.cameleer.server.app.alerting.eval;
import com.cameleer.server.app.alerting.notify.MustacheRenderer;
import com.cameleer.server.app.alerting.notify.NotificationContextBuilder;
import com.cameleer.server.core.alerting.*;
import com.cameleer.server.core.runtime.Environment;
import com.cameleer.server.core.runtime.EnvironmentRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.Clock;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
/**
* Applies a {@link EvalResult.Batch} result to persistent state inside a single
* transaction: instance writes, notification enqueues, and the rule's cursor
* advance + {@code releaseClaim} either all commit or all roll back together.
* <p>
* Lives in its own bean so the {@code @Transactional} annotation engages via the
* Spring proxy when invoked from {@link AlertEvaluatorJob#tick()}; calling it as
* {@code this.apply(...)} from {@code AlertEvaluatorJob} (a bean calling its own
* method) would bypass the proxy and silently disable the transaction.
* <p>
* Phase 2 of the per-exchange exactly-once plan (see
* {@code docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md}).
*/
@Component
public class BatchResultApplier {
private static final Logger log = LoggerFactory.getLogger(BatchResultApplier.class);
private final AlertRuleRepository ruleRepo;
private final AlertInstanceRepository instanceRepo;
private final AlertNotificationRepository notificationRepo;
private final MustacheRenderer renderer;
private final NotificationContextBuilder contextBuilder;
private final EnvironmentRepository environmentRepo;
private final ObjectMapper objectMapper;
private final Clock clock;
public BatchResultApplier(
AlertRuleRepository ruleRepo,
AlertInstanceRepository instanceRepo,
AlertNotificationRepository notificationRepo,
MustacheRenderer renderer,
NotificationContextBuilder contextBuilder,
EnvironmentRepository environmentRepo,
ObjectMapper objectMapper,
Clock alertingClock) {
this.ruleRepo = ruleRepo;
this.instanceRepo = instanceRepo;
this.notificationRepo = notificationRepo;
this.renderer = renderer;
this.contextBuilder = contextBuilder;
this.environmentRepo = environmentRepo;
this.objectMapper = objectMapper;
this.clock = alertingClock;
}
/**
* Atomically apply a Batch result for a single rule:
* <ol>
* <li>persist a FIRING instance per firing + enqueue its notifications</li>
* <li>advance the rule's cursor ({@code evalState}) iff the batch supplied one</li>
* <li>release the claim with the new {@code nextRun} + {@code evalState}</li>
* </ol>
* Any exception thrown from the repo calls rolls back every write — including
* the cursor advance — so the rule is replayable on the next tick.
*/
@Transactional
public void apply(AlertRule rule, EvalResult.Batch batch, Instant nextRun) {
for (EvalResult.Firing f : batch.firings()) {
applyBatchFiring(rule, f);
}
Map<String, Object> nextEvalState =
batch.nextEvalState().isEmpty() ? rule.evalState() : batch.nextEvalState();
ruleRepo.releaseClaim(rule.id(), nextRun, nextEvalState);
}
/**
* Batch (PER_EXCHANGE) mode: always create a fresh FIRING instance per Firing entry.
* No forDuration check — each exchange is its own event.
*/
private void applyBatchFiring(AlertRule rule, EvalResult.Firing f) {
Instant now = Instant.now(clock);
AlertInstance instance = AlertStateTransitions.newInstance(rule, f, AlertState.FIRING, now)
.withRuleSnapshot(snapshotRule(rule));
AlertInstance enriched = enrichTitleMessage(rule, instance);
AlertInstance persisted = instanceRepo.save(enriched);
enqueueNotifications(rule, persisted, now);
}
private AlertInstance enrichTitleMessage(AlertRule rule, AlertInstance instance) {
Environment env = environmentRepo.findById(rule.environmentId()).orElse(null);
Map<String, Object> ctx = contextBuilder.build(rule, instance, env, null);
String title = renderer.render(rule.notificationTitleTmpl(), ctx);
String message = renderer.render(rule.notificationMessageTmpl(), ctx);
return instance.withTitleMessage(title, message);
}
private void enqueueNotifications(AlertRule rule, AlertInstance instance, Instant now) {
for (WebhookBinding w : rule.webhooks()) {
Map<String, Object> payload = buildPayload(rule, instance);
notificationRepo.save(new AlertNotification(
UUID.randomUUID(),
instance.id(),
w.id(),
w.outboundConnectionId(),
NotificationStatus.PENDING,
0,
now,
null, null, null, null,
payload,
null,
now));
}
}
private Map<String, Object> buildPayload(AlertRule rule, AlertInstance instance) {
Environment env = environmentRepo.findById(rule.environmentId()).orElse(null);
return contextBuilder.build(rule, instance, env, null);
}
@SuppressWarnings("unchecked")
private Map<String, Object> snapshotRule(AlertRule rule) {
try {
Map<String, Object> raw = objectMapper.convertValue(rule, Map.class);
// Map.copyOf (used in AlertInstance compact ctor) rejects null values —
// strip them so the snapshot is safe to store.
Map<String, Object> safe = new LinkedHashMap<>();
raw.forEach((k, v) -> { if (v != null) safe.put(k, v); });
return safe;
} catch (Exception e) {
log.warn("Failed to snapshot rule {}: {}", rule.id(), e.getMessage());
return Map.of("id", rule.id().toString(), "name", rule.name());
}
}
}

View File

@@ -28,7 +28,6 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.when;
/**
@@ -441,6 +440,7 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
} catch (RuntimeException expectedAfterPhase2) {
// Phase 2 may choose to rethrow; either way the rollback assertions
// below are what pin the contract.
// intentionally empty — fault-injection swallow/rethrow tolerance; see comment above
}
// Post-rollback: zero instances, zero notifications, cursor unchanged,