alerting(it): AlertingFullLifecycleIT — exactly-once across ticks, ack isolation
End-to-end lifecycle test: 5 FAILED exchanges across 2 ticks produces exactly 5 FIRING instances + 5 PENDING notifications. Tick 3 with no new exchanges produces zero new instances or notifications. Ack on one instance leaves the other four untouched. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -7,8 +7,10 @@ import com.cameleer.server.app.alerting.eval.AlertEvaluatorJob;
|
||||
import com.cameleer.server.app.alerting.notify.NotificationDispatchJob;
|
||||
import com.cameleer.server.app.outbound.crypto.SecretCipher;
|
||||
import com.cameleer.server.app.search.ClickHouseLogStore;
|
||||
import com.cameleer.server.app.storage.ClickHouseExecutionStore;
|
||||
import com.cameleer.server.core.alerting.*;
|
||||
import com.cameleer.server.core.ingestion.BufferedLogEntry;
|
||||
import com.cameleer.server.core.ingestion.MergedExecution;
|
||||
import com.cameleer.server.core.outbound.OutboundConnectionRepository;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
@@ -62,6 +64,7 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
|
||||
@Autowired private AlertSilenceRepository silenceRepo;
|
||||
@Autowired private OutboundConnectionRepository outboundRepo;
|
||||
@Autowired private ClickHouseLogStore logStore;
|
||||
@Autowired private ClickHouseExecutionStore executionStore;
|
||||
@Autowired private SecretCipher secretCipher;
|
||||
@Autowired private TestRestTemplate restTemplate;
|
||||
@Autowired private TestSecurityHelper securityHelper;
|
||||
@@ -399,6 +402,102 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
|
||||
jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", reNotifyRuleId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Exactly-once-per-exchange end-to-end lifecycle.
|
||||
* <p>
|
||||
* 5 FAILED exchanges across 2 evaluator ticks must produce exactly
|
||||
* 5 FIRING instances + 5 PENDING notifications (one per exchange, one webhook).
|
||||
* A third tick with no new exchanges must be a no-op. Acking one instance
|
||||
* must leave the other four untouched.
|
||||
* <p>
|
||||
* Exercises the full Phase-1+2+3 stack: evaluator cursor persistence across
|
||||
* ticks, per-tick rollback isolation, and the ack-doesn't-cascade invariant.
|
||||
* See: docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md
|
||||
*/
|
||||
@Test
|
||||
@Order(7)
|
||||
void perExchange_5FailuresAcross2Ticks_exactlyOncePerExchange() {
|
||||
// Relative-to-now timestamps so they fall inside the evaluator's
|
||||
// [rule.createdAt .. ctx.now()] window. Using Instant.parse(...) would
|
||||
// require reconciling with the mocked alertingClock AND rule.createdAt,
|
||||
// which is wall-clock in createPerExchangeRuleWithWebhook.
|
||||
Instant base = Instant.now().minusSeconds(30);
|
||||
|
||||
// Pin the mocked alertingClock to current wall time so ctx.now() is >
|
||||
// every seeded execution timestamp (base + 0..4s) AND > rule.createdAt
|
||||
// (now - 60s). Prior tests may have set simulatedNow far in the past
|
||||
// (step1 used wall time but step6 advanced by 61s — test ordering means
|
||||
// the last value lingers). Re-pinning here makes the window deterministic.
|
||||
setSimulatedNow(Instant.now());
|
||||
|
||||
UUID perExRuleId = createPerExchangeRuleWithWebhook();
|
||||
|
||||
// ── Tick 1 — seed 3, tick ────────────────────────────────────────────
|
||||
seedFailedExecution("ex1-exec-1", base);
|
||||
seedFailedExecution("ex1-exec-2", base.plusSeconds(1));
|
||||
seedFailedExecution("ex1-exec-3", base.plusSeconds(2));
|
||||
evaluatorJob.tick();
|
||||
|
||||
// ── Tick 2 — seed 2 more, tick ───────────────────────────────────────
|
||||
seedFailedExecution("ex1-exec-4", base.plusSeconds(3));
|
||||
seedFailedExecution("ex1-exec-5", base.plusSeconds(4));
|
||||
// Re-open the rule claim so it's due for tick 2.
|
||||
jdbcTemplate.update(
|
||||
"UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " +
|
||||
"claimed_by = NULL, claimed_until = NULL WHERE id = ?", perExRuleId);
|
||||
evaluatorJob.tick();
|
||||
|
||||
// Assert: 5 instances, 5 PENDING notifications.
|
||||
List<UUID> instanceIds = instanceIdsForRule(perExRuleId);
|
||||
assertThat(instanceIds)
|
||||
.as("5 FAILED exchanges across 2 ticks must produce exactly 5 FIRING instances")
|
||||
.hasSize(5);
|
||||
List<AlertNotification> allNotifs = notificationsForRule(perExRuleId);
|
||||
assertThat(allNotifs)
|
||||
.as("5 instances × 1 webhook must produce exactly 5 notifications")
|
||||
.hasSize(5);
|
||||
assertThat(allNotifs.stream().allMatch(n -> n.status() == NotificationStatus.PENDING))
|
||||
.as("all notifications must be PENDING before dispatch")
|
||||
.isTrue();
|
||||
|
||||
// ── Dispatch all pending, then tick 3 — expect no change ────────────
|
||||
dispatchAllPending();
|
||||
// Re-open the rule claim so it's due for tick 3.
|
||||
jdbcTemplate.update(
|
||||
"UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " +
|
||||
"claimed_by = NULL, claimed_until = NULL WHERE id = ?", perExRuleId);
|
||||
evaluatorJob.tick();
|
||||
|
||||
assertThat(instanceIdsForRule(perExRuleId))
|
||||
.as("tick 3 with no new exchanges must not create new instances")
|
||||
.hasSize(5);
|
||||
long pending = notificationsForRule(perExRuleId).stream()
|
||||
.filter(n -> n.status() == NotificationStatus.PENDING)
|
||||
.count();
|
||||
assertThat(pending)
|
||||
.as("tick 3 must not re-enqueue notifications — all prior were dispatched")
|
||||
.isZero();
|
||||
|
||||
// ── Ack one — others unchanged ──────────────────────────────────────
|
||||
UUID firstInstanceId = instanceIds.get(0);
|
||||
instanceRepo.ack(firstInstanceId, "test-operator", Instant.now());
|
||||
|
||||
List<AlertInstance> all = instanceIds.stream()
|
||||
.map(id -> instanceRepo.findById(id).orElseThrow())
|
||||
.toList();
|
||||
long ackedCount = all.stream().filter(i -> i.ackedBy() != null).count();
|
||||
assertThat(ackedCount)
|
||||
.as("ack on one instance must not cascade to peers")
|
||||
.isEqualTo(1);
|
||||
|
||||
// Cleanup — the @AfterAll cleans by envId which covers us, but be explicit.
|
||||
jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " +
|
||||
"(SELECT id FROM alert_instances WHERE rule_id = ?)", perExRuleId);
|
||||
jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", perExRuleId);
|
||||
jdbcTemplate.update("DELETE FROM alert_rule_targets WHERE rule_id = ?", perExRuleId);
|
||||
jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", perExRuleId);
|
||||
}
|
||||
|
||||
// ── Helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
/** POST the main lifecycle rule via REST API. Returns the created rule ID. */
|
||||
@@ -513,4 +612,96 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
|
||||
logStore.insertBufferedBatch(List.of(
|
||||
new BufferedLogEntry(tenantId, envSlug, "lc-agent-01", "lc-app", entry)));
|
||||
}
|
||||
|
||||
// ── Helpers for perExchange exactly-once test ────────────────────────────
|
||||
|
||||
private static final String PER_EX_APP_SLUG = "per-ex-app";
|
||||
|
||||
/**
|
||||
* Create a PER_EXCHANGE rule bound to {@link #PER_EX_APP_SLUG} that fires on
|
||||
* {@code status=FAILED} and enqueues one notification per match via the
|
||||
* pre-seeded webhook connection ({@link #connId}). Returns the new rule id.
|
||||
* <p>
|
||||
* Replicates the pattern from {@code AlertEvaluatorJobIT#createPerExchangeRuleWithWebhook}
|
||||
* but reuses this test's env + outbound connection.
|
||||
*/
|
||||
private UUID createPerExchangeRuleWithWebhook() {
|
||||
UUID rid = UUID.randomUUID();
|
||||
Instant now = Instant.now();
|
||||
var condition = new ExchangeMatchCondition(
|
||||
new AlertScope(PER_EX_APP_SLUG, null, null),
|
||||
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
|
||||
FireMode.PER_EXCHANGE, null, null);
|
||||
var webhook = new WebhookBinding(connId, null, null, Map.of());
|
||||
var rule = new AlertRule(
|
||||
rid, envId, "per-ex-lc-rule-" + rid, null,
|
||||
AlertSeverity.WARNING, true, ConditionKind.EXCHANGE_MATCH, condition,
|
||||
60, 0, 60,
|
||||
"Exchange FAILED: {{exchange.id}}", "route={{exchange.routeId}}",
|
||||
List.of(webhook), List.of(),
|
||||
now.minusSeconds(5), // due now
|
||||
null, null, Map.of(),
|
||||
now.minusSeconds(60), "test-operator", // createdAt bounds first-run cursor
|
||||
now.minusSeconds(60), "test-operator");
|
||||
ruleRepo.save(rule);
|
||||
return rid;
|
||||
}
|
||||
|
||||
/**
|
||||
* Seed one FAILED execution into ClickHouse, scoped to this test's tenant/env/app
|
||||
* so it's picked up by a PER_EXCHANGE rule targeting {@link #PER_EX_APP_SLUG}.
|
||||
*/
|
||||
private void seedFailedExecution(String executionId, Instant startTime) {
|
||||
executionStore.insertExecutionBatch(List.of(new MergedExecution(
|
||||
tenantId, 1L, executionId, "route-a", "inst-1", PER_EX_APP_SLUG, envSlug,
|
||||
"FAILED", "", "exchange-" + executionId,
|
||||
startTime, startTime.plusMillis(100), 100L,
|
||||
"", "", "", "", "", "", // error fields
|
||||
"", "FULL", // diagramContentHash, engineLevel
|
||||
"", "", "", "", "", "", // bodies / headers / properties
|
||||
"{}", // attributes (JSON)
|
||||
"", "", // traceId, spanId
|
||||
false, false,
|
||||
null, null
|
||||
)));
|
||||
}
|
||||
|
||||
/** All instance ids for a rule, ordered by fired_at ascending (deterministic). */
|
||||
private List<UUID> instanceIdsForRule(UUID rid) {
|
||||
return jdbcTemplate.queryForList(
|
||||
"SELECT id FROM alert_instances WHERE rule_id = ? ORDER BY fired_at ASC",
|
||||
UUID.class, rid);
|
||||
}
|
||||
|
||||
/** All notifications across every instance of a rule. */
|
||||
private List<AlertNotification> notificationsForRule(UUID rid) {
|
||||
List<UUID> ids = instanceIdsForRule(rid);
|
||||
List<AlertNotification> out = new java.util.ArrayList<>();
|
||||
for (UUID iid : ids) {
|
||||
out.addAll(notificationRepo.listForInstance(iid));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simulate a dispatch pass without hitting the real webhook — marks every
|
||||
* PENDING notification for this rule as DELIVERED. Using
|
||||
* {@code dispatchJob.tick()} would round-trip through WireMock and require
|
||||
* extra plumbing; the exactly-once contract under test is about the
|
||||
* evaluator re-enqueueing behaviour, not webhook delivery.
|
||||
*/
|
||||
private void dispatchAllPending() {
|
||||
Instant now = Instant.now();
|
||||
// Drain PENDING notifications across the whole env (safe because the
|
||||
// ackedBy-scoped assertions further down look at this rule only).
|
||||
List<UUID> pendingIds = jdbcTemplate.queryForList(
|
||||
"SELECT n.id FROM alert_notifications n " +
|
||||
"JOIN alert_instances i ON n.alert_instance_id = i.id " +
|
||||
"WHERE i.environment_id = ? " +
|
||||
"AND n.status = 'PENDING'::notification_status_enum",
|
||||
UUID.class, envId);
|
||||
for (UUID nid : pendingIds) {
|
||||
notificationRepo.markDelivered(nid, 200, "OK", now);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user