alerting(it): RED test pinning Phase 2 tick-atomicity contract
Fault-injection IT asserts that a crash mid-batch rolls back every instance + notification write AND leaves the cursor unchanged. Fails against current (Phase 1 only) code — turns green when Task 2.2 wraps batch processing in @Transactional.
This commit is contained in:
@@ -13,15 +13,22 @@ import org.junit.jupiter.api.BeforeEach;
|
|||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.boot.test.context.TestConfiguration;
|
||||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Import;
|
||||||
|
import org.springframework.context.annotation.Primary;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -33,6 +40,7 @@ import static org.mockito.Mockito.when;
|
|||||||
* {@code AgentRegistryService} is mocked so tests can control which agents
|
* {@code AgentRegistryService} is mocked so tests can control which agents
|
||||||
* are DEAD without depending on in-memory timing.
|
* are DEAD without depending on in-memory timing.
|
||||||
*/
|
*/
|
||||||
|
@Import(AlertEvaluatorJobIT.FaultInjectingNotificationRepoConfig.class)
|
||||||
class AlertEvaluatorJobIT extends AbstractPostgresIT {
|
class AlertEvaluatorJobIT extends AbstractPostgresIT {
|
||||||
|
|
||||||
@MockBean(name = "clickHouseLogStore") ClickHouseLogStore clickHouseLogStore;
|
@MockBean(name = "clickHouseLogStore") ClickHouseLogStore clickHouseLogStore;
|
||||||
@@ -42,6 +50,7 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
|
|||||||
@Autowired private AlertRuleRepository ruleRepo;
|
@Autowired private AlertRuleRepository ruleRepo;
|
||||||
@Autowired private AlertInstanceRepository instanceRepo;
|
@Autowired private AlertInstanceRepository instanceRepo;
|
||||||
@Autowired private AlertNotificationRepository notificationRepo;
|
@Autowired private AlertNotificationRepository notificationRepo;
|
||||||
|
@Autowired private FaultInjectingNotificationRepository faultInjectingNotificationRepo;
|
||||||
@Autowired private ClickHouseExecutionStore executionStore;
|
@Autowired private ClickHouseExecutionStore executionStore;
|
||||||
@Autowired @Qualifier("clickHouseJdbcTemplate") private JdbcTemplate clickHouseJdbc;
|
@Autowired @Qualifier("clickHouseJdbcTemplate") private JdbcTemplate clickHouseJdbc;
|
||||||
|
|
||||||
@@ -92,6 +101,10 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
|
|||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
void cleanup() {
|
void cleanup() {
|
||||||
|
// Always reset the fault-injector — a prior @Test may have left it armed,
|
||||||
|
// and Spring reuses the same context (and thus the same decorator bean)
|
||||||
|
// across tests in this class.
|
||||||
|
faultInjectingNotificationRepo.clearFault();
|
||||||
jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " +
|
jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " +
|
||||||
"(SELECT id FROM alert_instances WHERE environment_id = ?)", envId);
|
"(SELECT id FROM alert_instances WHERE environment_id = ?)", envId);
|
||||||
jdbcTemplate.update("DELETE FROM alert_instances WHERE environment_id = ?", envId);
|
jdbcTemplate.update("DELETE FROM alert_instances WHERE environment_id = ?", envId);
|
||||||
@@ -389,4 +402,183 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
|
|||||||
jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", perExRuleId);
|
jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", perExRuleId);
|
||||||
jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", perExRuleId);
|
jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", perExRuleId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Tick atomicity regression pin — a crash mid-batch must roll back every
|
||||||
|
// instance + notification write AND leave the cursor unchanged so the
|
||||||
|
// next tick re-processes the entire batch exactly once.
|
||||||
|
// See: docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md (Task 2.1)
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void tickRollback_faultOnSecondNotificationInsert_leavesCursorUnchanged() {
|
||||||
|
// Seed 3 FAILED executions so the rule's PER_EXCHANGE batch has 3 firings.
|
||||||
|
// Relative-to-now timestamps so they fall inside [rule.createdAt .. ctx.now()].
|
||||||
|
Instant t0 = Instant.now().minusSeconds(30);
|
||||||
|
seedFailedExecution("exec-1", t0);
|
||||||
|
seedFailedExecution("exec-2", t0.plusSeconds(1));
|
||||||
|
seedFailedExecution("exec-3", t0.plusSeconds(2));
|
||||||
|
UUID perExRuleId = createPerExchangeRuleWithWebhook();
|
||||||
|
|
||||||
|
var ruleBefore = ruleRepo.findById(perExRuleId).orElseThrow();
|
||||||
|
Object cursorBefore = ruleBefore.evalState().get("lastExchangeCursor"); // null on first run
|
||||||
|
Instant nextRunBefore = ruleBefore.nextEvaluationAt();
|
||||||
|
|
||||||
|
// Arm the fault injector: the 2nd notification save() throws.
|
||||||
|
// (Instance saves are NOT counted — the injector is scoped to notification saves.)
|
||||||
|
faultInjectingNotificationRepo.failOnSave(2);
|
||||||
|
|
||||||
|
// Today (Phase 1, non-transactional): the evaluator catches the exception
|
||||||
|
// per-rule and logs a warning — see AlertEvaluatorJob#tick's try/catch
|
||||||
|
// around applyResult. So tick() itself does NOT rethrow. That is exactly
|
||||||
|
// why this IT is RED pre-Phase-2: post-rollback asserts expect 0 instances
|
||||||
|
// and 0 notifications, but the current code will have persisted
|
||||||
|
// firing #1 (instance + notification) and firing #2's instance before the
|
||||||
|
// fault on firing #2's notification. Phase 2 wraps the per-rule body in
|
||||||
|
// @Transactional so the single-rule failure rolls back all of its writes.
|
||||||
|
try {
|
||||||
|
job.tick();
|
||||||
|
} catch (RuntimeException expectedAfterPhase2) {
|
||||||
|
// Phase 2 may choose to rethrow; either way the rollback assertions
|
||||||
|
// below are what pin the contract.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Post-rollback: zero instances, zero notifications, cursor unchanged,
|
||||||
|
// nextRunAt unchanged (Phase 2 will hold the claim so the next tick retries).
|
||||||
|
assertThat(countInstancesByRule(perExRuleId))
|
||||||
|
.as("Phase 2 contract: mid-batch fault rolls back every instance write")
|
||||||
|
.isZero();
|
||||||
|
assertThat(listNotificationsByRule(perExRuleId))
|
||||||
|
.as("Phase 2 contract: mid-batch fault rolls back every notification write")
|
||||||
|
.isEmpty();
|
||||||
|
var ruleAfter = ruleRepo.findById(perExRuleId).orElseThrow();
|
||||||
|
assertThat(ruleAfter.evalState().get("lastExchangeCursor"))
|
||||||
|
.as("Phase 2 contract: cursor MUST NOT advance when the tick fails")
|
||||||
|
.isEqualTo(cursorBefore);
|
||||||
|
assertThat(ruleAfter.nextEvaluationAt())
|
||||||
|
.as("Phase 2 contract: nextEvaluationAt MUST NOT advance when the tick fails")
|
||||||
|
.isEqualTo(nextRunBefore);
|
||||||
|
|
||||||
|
// Recover: clear the fault, reopen the claim, tick again.
|
||||||
|
// All 3 firings must land on the second tick — exactly-once-per-exchange.
|
||||||
|
faultInjectingNotificationRepo.clearFault();
|
||||||
|
jdbcTemplate.update(
|
||||||
|
"UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " +
|
||||||
|
"claimed_by = NULL, claimed_until = NULL WHERE id = ?", perExRuleId);
|
||||||
|
|
||||||
|
job.tick();
|
||||||
|
|
||||||
|
assertThat(countInstancesByRule(perExRuleId))
|
||||||
|
.as("after recovery: all 3 exchanges produce exactly one instance each")
|
||||||
|
.isEqualTo(3);
|
||||||
|
assertThat(listNotificationsByRule(perExRuleId))
|
||||||
|
.as("after recovery: all 3 instances produce exactly one notification each")
|
||||||
|
.hasSize(3);
|
||||||
|
assertThat(ruleRepo.findById(perExRuleId).orElseThrow()
|
||||||
|
.evalState().get("lastExchangeCursor"))
|
||||||
|
.as("after recovery: cursor advanced past exec-3")
|
||||||
|
.isNotEqualTo(cursorBefore);
|
||||||
|
|
||||||
|
// Clean up the extra rule (setup-created rule is handled by @AfterEach).
|
||||||
|
jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " +
|
||||||
|
"(SELECT id FROM alert_instances WHERE rule_id = ?)", perExRuleId);
|
||||||
|
jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", perExRuleId);
|
||||||
|
jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", perExRuleId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Fault-injecting AlertNotificationRepository decorator
|
||||||
|
//
|
||||||
|
// Delegates all calls to the real Postgres-backed repository except
|
||||||
|
// {@link #save(AlertNotification)} — that method increments a counter and
|
||||||
|
// throws a RuntimeException when the configured trigger-count is reached.
|
||||||
|
// Only notification saves are counted; instance saves go through a separate
|
||||||
|
// repo and are unaffected.
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
static class FaultInjectingNotificationRepository implements AlertNotificationRepository {
|
||||||
|
private final AlertNotificationRepository delegate;
|
||||||
|
private final AtomicInteger saveCount = new AtomicInteger(0);
|
||||||
|
private volatile int failOnNthSave = -1; // -1 = disabled
|
||||||
|
|
||||||
|
FaultInjectingNotificationRepository(AlertNotificationRepository delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Arms the fault: the {@code n}-th call to {@link #save} (1-indexed) throws. */
|
||||||
|
void failOnSave(int n) {
|
||||||
|
saveCount.set(0);
|
||||||
|
failOnNthSave = n;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Disarms the fault and resets the counter. */
|
||||||
|
void clearFault() {
|
||||||
|
failOnNthSave = -1;
|
||||||
|
saveCount.set(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AlertNotification save(AlertNotification n) {
|
||||||
|
int current = saveCount.incrementAndGet();
|
||||||
|
if (failOnNthSave > 0 && current == failOnNthSave) {
|
||||||
|
throw new RuntimeException(
|
||||||
|
"FaultInjectingNotificationRepository: injected failure on save #" + current);
|
||||||
|
}
|
||||||
|
return delegate.save(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<AlertNotification> findById(UUID id) { return delegate.findById(id); }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<AlertNotification> listForInstance(UUID alertInstanceId) {
|
||||||
|
return delegate.listForInstance(alertInstanceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<AlertNotification> claimDueNotifications(String instanceId, int batchSize, int claimTtlSeconds) {
|
||||||
|
return delegate.claimDueNotifications(instanceId, batchSize, claimTtlSeconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void markDelivered(UUID id, int status, String snippet, Instant when) {
|
||||||
|
delegate.markDelivered(id, status, snippet, when);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void scheduleRetry(UUID id, Instant nextAttemptAt, int status, String snippet) {
|
||||||
|
delegate.scheduleRetry(id, nextAttemptAt, status, snippet);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void markFailed(UUID id, int status, String snippet) {
|
||||||
|
delegate.markFailed(id, status, snippet);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void resetForRetry(UUID id, Instant nextAttemptAt) {
|
||||||
|
delegate.resetForRetry(id, nextAttemptAt);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteSettledBefore(Instant cutoff) {
|
||||||
|
delegate.deleteSettledBefore(cutoff);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link TestConfiguration} that installs the fault-injecting decorator as the
|
||||||
|
* {@code @Primary} {@link AlertNotificationRepository}. The real Postgres repo is
|
||||||
|
* still registered (via {@code AlertingBeanConfig}) and is injected into the
|
||||||
|
* decorator as the delegate, so every non-instrumented call path exercises real SQL.
|
||||||
|
*/
|
||||||
|
@TestConfiguration
|
||||||
|
static class FaultInjectingNotificationRepoConfig {
|
||||||
|
@Bean
|
||||||
|
@Primary
|
||||||
|
FaultInjectingNotificationRepository faultInjectingNotificationRepository(
|
||||||
|
@Qualifier("alertNotificationRepository") AlertNotificationRepository realRepo) {
|
||||||
|
return new FaultInjectingNotificationRepository(realRepo);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user