diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java index 9706fc08..6d82579f 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java @@ -13,15 +13,22 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Primary; import org.springframework.jdbc.core.JdbcTemplate; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.when; /** @@ -33,6 +40,7 @@ import static org.mockito.Mockito.when; * {@code AgentRegistryService} is mocked so tests can control which agents * are DEAD without depending on in-memory timing. */ +@Import(AlertEvaluatorJobIT.FaultInjectingNotificationRepoConfig.class) class AlertEvaluatorJobIT extends AbstractPostgresIT { @MockBean(name = "clickHouseLogStore") ClickHouseLogStore clickHouseLogStore; @@ -42,6 +50,7 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT { @Autowired private AlertRuleRepository ruleRepo; @Autowired private AlertInstanceRepository instanceRepo; @Autowired private AlertNotificationRepository notificationRepo; + @Autowired private FaultInjectingNotificationRepository faultInjectingNotificationRepo; @Autowired private ClickHouseExecutionStore executionStore; @Autowired @Qualifier("clickHouseJdbcTemplate") private JdbcTemplate clickHouseJdbc; @@ -92,6 +101,10 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT { @AfterEach void cleanup() { + // Always reset the fault-injector — a prior @Test may have left it armed, + // and Spring reuses the same context (and thus the same decorator bean) + // across tests in this class. + faultInjectingNotificationRepo.clearFault(); jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " + "(SELECT id FROM alert_instances WHERE environment_id = ?)", envId); jdbcTemplate.update("DELETE FROM alert_instances WHERE environment_id = ?", envId); @@ -389,4 +402,183 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT { jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", perExRuleId); jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", perExRuleId); } + + // ------------------------------------------------------------------------- + // Tick atomicity regression pin — a crash mid-batch must roll back every + // instance + notification write AND leave the cursor unchanged so the + // next tick re-processes the entire batch exactly once. + // See: docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md (Task 2.1) + // ------------------------------------------------------------------------- + + @Test + void tickRollback_faultOnSecondNotificationInsert_leavesCursorUnchanged() { + // Seed 3 FAILED executions so the rule's PER_EXCHANGE batch has 3 firings. + // Relative-to-now timestamps so they fall inside [rule.createdAt .. ctx.now()]. + Instant t0 = Instant.now().minusSeconds(30); + seedFailedExecution("exec-1", t0); + seedFailedExecution("exec-2", t0.plusSeconds(1)); + seedFailedExecution("exec-3", t0.plusSeconds(2)); + UUID perExRuleId = createPerExchangeRuleWithWebhook(); + + var ruleBefore = ruleRepo.findById(perExRuleId).orElseThrow(); + Object cursorBefore = ruleBefore.evalState().get("lastExchangeCursor"); // null on first run + Instant nextRunBefore = ruleBefore.nextEvaluationAt(); + + // Arm the fault injector: the 2nd notification save() throws. + // (Instance saves are NOT counted — the injector is scoped to notification saves.) + faultInjectingNotificationRepo.failOnSave(2); + + // Today (Phase 1, non-transactional): the evaluator catches the exception + // per-rule and logs a warning — see AlertEvaluatorJob#tick's try/catch + // around applyResult. So tick() itself does NOT rethrow. That is exactly + // why this IT is RED pre-Phase-2: post-rollback asserts expect 0 instances + // and 0 notifications, but the current code will have persisted + // firing #1 (instance + notification) and firing #2's instance before the + // fault on firing #2's notification. Phase 2 wraps the per-rule body in + // @Transactional so the single-rule failure rolls back all of its writes. + try { + job.tick(); + } catch (RuntimeException expectedAfterPhase2) { + // Phase 2 may choose to rethrow; either way the rollback assertions + // below are what pin the contract. + } + + // Post-rollback: zero instances, zero notifications, cursor unchanged, + // nextRunAt unchanged (Phase 2 will hold the claim so the next tick retries). + assertThat(countInstancesByRule(perExRuleId)) + .as("Phase 2 contract: mid-batch fault rolls back every instance write") + .isZero(); + assertThat(listNotificationsByRule(perExRuleId)) + .as("Phase 2 contract: mid-batch fault rolls back every notification write") + .isEmpty(); + var ruleAfter = ruleRepo.findById(perExRuleId).orElseThrow(); + assertThat(ruleAfter.evalState().get("lastExchangeCursor")) + .as("Phase 2 contract: cursor MUST NOT advance when the tick fails") + .isEqualTo(cursorBefore); + assertThat(ruleAfter.nextEvaluationAt()) + .as("Phase 2 contract: nextEvaluationAt MUST NOT advance when the tick fails") + .isEqualTo(nextRunBefore); + + // Recover: clear the fault, reopen the claim, tick again. + // All 3 firings must land on the second tick — exactly-once-per-exchange. + faultInjectingNotificationRepo.clearFault(); + jdbcTemplate.update( + "UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " + + "claimed_by = NULL, claimed_until = NULL WHERE id = ?", perExRuleId); + + job.tick(); + + assertThat(countInstancesByRule(perExRuleId)) + .as("after recovery: all 3 exchanges produce exactly one instance each") + .isEqualTo(3); + assertThat(listNotificationsByRule(perExRuleId)) + .as("after recovery: all 3 instances produce exactly one notification each") + .hasSize(3); + assertThat(ruleRepo.findById(perExRuleId).orElseThrow() + .evalState().get("lastExchangeCursor")) + .as("after recovery: cursor advanced past exec-3") + .isNotEqualTo(cursorBefore); + + // Clean up the extra rule (setup-created rule is handled by @AfterEach). + jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " + + "(SELECT id FROM alert_instances WHERE rule_id = ?)", perExRuleId); + jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", perExRuleId); + jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", perExRuleId); + } + + // ------------------------------------------------------------------------- + // Fault-injecting AlertNotificationRepository decorator + // + // Delegates all calls to the real Postgres-backed repository except + // {@link #save(AlertNotification)} — that method increments a counter and + // throws a RuntimeException when the configured trigger-count is reached. + // Only notification saves are counted; instance saves go through a separate + // repo and are unaffected. + // ------------------------------------------------------------------------- + + static class FaultInjectingNotificationRepository implements AlertNotificationRepository { + private final AlertNotificationRepository delegate; + private final AtomicInteger saveCount = new AtomicInteger(0); + private volatile int failOnNthSave = -1; // -1 = disabled + + FaultInjectingNotificationRepository(AlertNotificationRepository delegate) { + this.delegate = delegate; + } + + /** Arms the fault: the {@code n}-th call to {@link #save} (1-indexed) throws. */ + void failOnSave(int n) { + saveCount.set(0); + failOnNthSave = n; + } + + /** Disarms the fault and resets the counter. */ + void clearFault() { + failOnNthSave = -1; + saveCount.set(0); + } + + @Override + public AlertNotification save(AlertNotification n) { + int current = saveCount.incrementAndGet(); + if (failOnNthSave > 0 && current == failOnNthSave) { + throw new RuntimeException( + "FaultInjectingNotificationRepository: injected failure on save #" + current); + } + return delegate.save(n); + } + + @Override + public Optional findById(UUID id) { return delegate.findById(id); } + + @Override + public List listForInstance(UUID alertInstanceId) { + return delegate.listForInstance(alertInstanceId); + } + + @Override + public List claimDueNotifications(String instanceId, int batchSize, int claimTtlSeconds) { + return delegate.claimDueNotifications(instanceId, batchSize, claimTtlSeconds); + } + + @Override + public void markDelivered(UUID id, int status, String snippet, Instant when) { + delegate.markDelivered(id, status, snippet, when); + } + + @Override + public void scheduleRetry(UUID id, Instant nextAttemptAt, int status, String snippet) { + delegate.scheduleRetry(id, nextAttemptAt, status, snippet); + } + + @Override + public void markFailed(UUID id, int status, String snippet) { + delegate.markFailed(id, status, snippet); + } + + @Override + public void resetForRetry(UUID id, Instant nextAttemptAt) { + delegate.resetForRetry(id, nextAttemptAt); + } + + @Override + public void deleteSettledBefore(Instant cutoff) { + delegate.deleteSettledBefore(cutoff); + } + } + + /** + * {@link TestConfiguration} that installs the fault-injecting decorator as the + * {@code @Primary} {@link AlertNotificationRepository}. The real Postgres repo is + * still registered (via {@code AlertingBeanConfig}) and is injected into the + * decorator as the delegate, so every non-instrumented call path exercises real SQL. + */ + @TestConfiguration + static class FaultInjectingNotificationRepoConfig { + @Bean + @Primary + FaultInjectingNotificationRepository faultInjectingNotificationRepository( + @Qualifier("alertNotificationRepository") AlertNotificationRepository realRepo) { + return new FaultInjectingNotificationRepository(realRepo); + } + } }