diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/AlertingFullLifecycleIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/AlertingFullLifecycleIT.java index 77e73b5e..6043cca6 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/AlertingFullLifecycleIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/AlertingFullLifecycleIT.java @@ -7,8 +7,10 @@ import com.cameleer.server.app.alerting.eval.AlertEvaluatorJob; import com.cameleer.server.app.alerting.notify.NotificationDispatchJob; import com.cameleer.server.app.outbound.crypto.SecretCipher; import com.cameleer.server.app.search.ClickHouseLogStore; +import com.cameleer.server.app.storage.ClickHouseExecutionStore; import com.cameleer.server.core.alerting.*; import com.cameleer.server.core.ingestion.BufferedLogEntry; +import com.cameleer.server.core.ingestion.MergedExecution; import com.cameleer.server.core.outbound.OutboundConnectionRepository; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -62,6 +64,7 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT { @Autowired private AlertSilenceRepository silenceRepo; @Autowired private OutboundConnectionRepository outboundRepo; @Autowired private ClickHouseLogStore logStore; + @Autowired private ClickHouseExecutionStore executionStore; @Autowired private SecretCipher secretCipher; @Autowired private TestRestTemplate restTemplate; @Autowired private TestSecurityHelper securityHelper; @@ -399,6 +402,102 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT { jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", reNotifyRuleId); } + /** + * Exactly-once-per-exchange end-to-end lifecycle. + *

+ * 5 FAILED exchanges across 2 evaluator ticks must produce exactly + * 5 FIRING instances + 5 PENDING notifications (one per exchange, one webhook). + * A third tick with no new exchanges must be a no-op. Acking one instance + * must leave the other four untouched. + *

+ * Exercises the full Phase-1+2+3 stack: evaluator cursor persistence across + * ticks, per-tick rollback isolation, and the ack-doesn't-cascade invariant. + * See: docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md + */ + @Test + @Order(7) + void perExchange_5FailuresAcross2Ticks_exactlyOncePerExchange() { + // Relative-to-now timestamps so they fall inside the evaluator's + // [rule.createdAt .. ctx.now()] window. Using Instant.parse(...) would + // require reconciling with the mocked alertingClock AND rule.createdAt, + // which is wall-clock in createPerExchangeRuleWithWebhook. + Instant base = Instant.now().minusSeconds(30); + + // Pin the mocked alertingClock to current wall time so ctx.now() is > + // every seeded execution timestamp (base + 0..4s) AND > rule.createdAt + // (now - 60s). Prior tests may have set simulatedNow far in the past + // (step1 used wall time but step6 advanced by 61s — test ordering means + // the last value lingers). Re-pinning here makes the window deterministic. + setSimulatedNow(Instant.now()); + + UUID perExRuleId = createPerExchangeRuleWithWebhook(); + + // ── Tick 1 — seed 3, tick ──────────────────────────────────────────── + seedFailedExecution("ex1-exec-1", base); + seedFailedExecution("ex1-exec-2", base.plusSeconds(1)); + seedFailedExecution("ex1-exec-3", base.plusSeconds(2)); + evaluatorJob.tick(); + + // ── Tick 2 — seed 2 more, tick ─────────────────────────────────────── + seedFailedExecution("ex1-exec-4", base.plusSeconds(3)); + seedFailedExecution("ex1-exec-5", base.plusSeconds(4)); + // Re-open the rule claim so it's due for tick 2. + jdbcTemplate.update( + "UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " + + "claimed_by = NULL, claimed_until = NULL WHERE id = ?", perExRuleId); + evaluatorJob.tick(); + + // Assert: 5 instances, 5 PENDING notifications. + List instanceIds = instanceIdsForRule(perExRuleId); + assertThat(instanceIds) + .as("5 FAILED exchanges across 2 ticks must produce exactly 5 FIRING instances") + .hasSize(5); + List allNotifs = notificationsForRule(perExRuleId); + assertThat(allNotifs) + .as("5 instances × 1 webhook must produce exactly 5 notifications") + .hasSize(5); + assertThat(allNotifs.stream().allMatch(n -> n.status() == NotificationStatus.PENDING)) + .as("all notifications must be PENDING before dispatch") + .isTrue(); + + // ── Dispatch all pending, then tick 3 — expect no change ──────────── + dispatchAllPending(); + // Re-open the rule claim so it's due for tick 3. + jdbcTemplate.update( + "UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " + + "claimed_by = NULL, claimed_until = NULL WHERE id = ?", perExRuleId); + evaluatorJob.tick(); + + assertThat(instanceIdsForRule(perExRuleId)) + .as("tick 3 with no new exchanges must not create new instances") + .hasSize(5); + long pending = notificationsForRule(perExRuleId).stream() + .filter(n -> n.status() == NotificationStatus.PENDING) + .count(); + assertThat(pending) + .as("tick 3 must not re-enqueue notifications — all prior were dispatched") + .isZero(); + + // ── Ack one — others unchanged ────────────────────────────────────── + UUID firstInstanceId = instanceIds.get(0); + instanceRepo.ack(firstInstanceId, "test-operator", Instant.now()); + + List all = instanceIds.stream() + .map(id -> instanceRepo.findById(id).orElseThrow()) + .toList(); + long ackedCount = all.stream().filter(i -> i.ackedBy() != null).count(); + assertThat(ackedCount) + .as("ack on one instance must not cascade to peers") + .isEqualTo(1); + + // Cleanup — the @AfterAll cleans by envId which covers us, but be explicit. + jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " + + "(SELECT id FROM alert_instances WHERE rule_id = ?)", perExRuleId); + jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", perExRuleId); + jdbcTemplate.update("DELETE FROM alert_rule_targets WHERE rule_id = ?", perExRuleId); + jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", perExRuleId); + } + // ── Helpers ─────────────────────────────────────────────────────────────── /** POST the main lifecycle rule via REST API. Returns the created rule ID. */ @@ -513,4 +612,96 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT { logStore.insertBufferedBatch(List.of( new BufferedLogEntry(tenantId, envSlug, "lc-agent-01", "lc-app", entry))); } + + // ── Helpers for perExchange exactly-once test ──────────────────────────── + + private static final String PER_EX_APP_SLUG = "per-ex-app"; + + /** + * Create a PER_EXCHANGE rule bound to {@link #PER_EX_APP_SLUG} that fires on + * {@code status=FAILED} and enqueues one notification per match via the + * pre-seeded webhook connection ({@link #connId}). Returns the new rule id. + *

+ * Replicates the pattern from {@code AlertEvaluatorJobIT#createPerExchangeRuleWithWebhook} + * but reuses this test's env + outbound connection. + */ + private UUID createPerExchangeRuleWithWebhook() { + UUID rid = UUID.randomUUID(); + Instant now = Instant.now(); + var condition = new ExchangeMatchCondition( + new AlertScope(PER_EX_APP_SLUG, null, null), + new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()), + FireMode.PER_EXCHANGE, null, null); + var webhook = new WebhookBinding(connId, null, null, Map.of()); + var rule = new AlertRule( + rid, envId, "per-ex-lc-rule-" + rid, null, + AlertSeverity.WARNING, true, ConditionKind.EXCHANGE_MATCH, condition, + 60, 0, 60, + "Exchange FAILED: {{exchange.id}}", "route={{exchange.routeId}}", + List.of(webhook), List.of(), + now.minusSeconds(5), // due now + null, null, Map.of(), + now.minusSeconds(60), "test-operator", // createdAt bounds first-run cursor + now.minusSeconds(60), "test-operator"); + ruleRepo.save(rule); + return rid; + } + + /** + * Seed one FAILED execution into ClickHouse, scoped to this test's tenant/env/app + * so it's picked up by a PER_EXCHANGE rule targeting {@link #PER_EX_APP_SLUG}. + */ + private void seedFailedExecution(String executionId, Instant startTime) { + executionStore.insertExecutionBatch(List.of(new MergedExecution( + tenantId, 1L, executionId, "route-a", "inst-1", PER_EX_APP_SLUG, envSlug, + "FAILED", "", "exchange-" + executionId, + startTime, startTime.plusMillis(100), 100L, + "", "", "", "", "", "", // error fields + "", "FULL", // diagramContentHash, engineLevel + "", "", "", "", "", "", // bodies / headers / properties + "{}", // attributes (JSON) + "", "", // traceId, spanId + false, false, + null, null + ))); + } + + /** All instance ids for a rule, ordered by fired_at ascending (deterministic). */ + private List instanceIdsForRule(UUID rid) { + return jdbcTemplate.queryForList( + "SELECT id FROM alert_instances WHERE rule_id = ? ORDER BY fired_at ASC", + UUID.class, rid); + } + + /** All notifications across every instance of a rule. */ + private List notificationsForRule(UUID rid) { + List ids = instanceIdsForRule(rid); + List out = new java.util.ArrayList<>(); + for (UUID iid : ids) { + out.addAll(notificationRepo.listForInstance(iid)); + } + return out; + } + + /** + * Simulate a dispatch pass without hitting the real webhook — marks every + * PENDING notification for this rule as DELIVERED. Using + * {@code dispatchJob.tick()} would round-trip through WireMock and require + * extra plumbing; the exactly-once contract under test is about the + * evaluator re-enqueueing behaviour, not webhook delivery. + */ + private void dispatchAllPending() { + Instant now = Instant.now(); + // Drain PENDING notifications across the whole env (safe because the + // ackedBy-scoped assertions further down look at this rule only). + List pendingIds = jdbcTemplate.queryForList( + "SELECT n.id FROM alert_notifications n " + + "JOIN alert_instances i ON n.alert_instance_id = i.id " + + "WHERE i.environment_id = ? " + + "AND n.status = 'PENDING'::notification_status_enum", + UUID.class, envId); + for (UUID nid : pendingIds) { + notificationRepo.markDelivered(nid, 200, "OK", now); + } + } }