diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java index 51f3d4f4..5c78ec5e 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java @@ -2,15 +2,19 @@ package com.cameleer.server.app.alerting.eval; import com.cameleer.server.app.AbstractPostgresIT; import com.cameleer.server.app.search.ClickHouseLogStore; +import com.cameleer.server.app.storage.ClickHouseExecutionStore; import com.cameleer.server.core.agent.AgentInfo; import com.cameleer.server.core.agent.AgentRegistryService; import com.cameleer.server.core.agent.AgentState; import com.cameleer.server.core.alerting.*; +import com.cameleer.server.core.ingestion.MergedExecution; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.jdbc.core.JdbcTemplate; import java.time.Instant; import java.util.List; @@ -37,24 +41,29 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT { @Autowired private AlertEvaluatorJob job; @Autowired private AlertRuleRepository ruleRepo; @Autowired private AlertInstanceRepository instanceRepo; + @Autowired private AlertNotificationRepository notificationRepo; + @Autowired private ClickHouseExecutionStore executionStore; + @Autowired @Qualifier("clickHouseJdbcTemplate") private JdbcTemplate clickHouseJdbc; private UUID envId; private UUID ruleId; private static final String SYS_USER = "sys-eval-it"; private static final String APP_SLUG = "orders"; private static final String AGENT_ID = "test-agent-01"; + private String envSlug; @BeforeEach void setup() { // Default: empty registry — all evaluators return Clear when(agentRegistryService.findAll()).thenReturn(List.of()); - envId = UUID.randomUUID(); - ruleId = UUID.randomUUID(); + envId = UUID.randomUUID(); + ruleId = UUID.randomUUID(); + envSlug = "eval-it-env-" + envId; jdbcTemplate.update( "INSERT INTO environments (id, slug, display_name) VALUES (?, ?, ?)", - envId, "eval-it-env-" + envId, "Eval IT Env"); + envId, envSlug, "Eval IT Env"); jdbcTemplate.update( "INSERT INTO users (user_id, provider, email) VALUES (?, 'local', ?) ON CONFLICT (user_id) DO NOTHING", SYS_USER, SYS_USER + "@test.example.com"); @@ -82,6 +91,13 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT { jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId); jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId); jdbcTemplate.update("DELETE FROM users WHERE user_id = ?", SYS_USER); + // ClickHouse — purge any executions seeded into this env. The shared CH instance + // persists across tests in the suite; scope the wipe to this test's env slug. + try { + if (envSlug != null) { + clickHouseJdbc.execute("ALTER TABLE executions DELETE WHERE environment = '" + envSlug + "'"); + } + } catch (Exception ignored) { /* best-effort; next setup uses a fresh env slug */ } } // ------------------------------------------------------------------------- @@ -94,6 +110,77 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT { AgentState.DEAD, lastHeartbeat.minusSeconds(300), lastHeartbeat, null); } + /** + * Seed one FAILED execution row into the ClickHouse {@code executions} table, + * scoped to this test's tenant/env/app so it's picked up by a PER_EXCHANGE rule + * targeting {@code APP_SLUG}. Executions older than {@code rule.createdAt} are + * filtered out by the evaluator; callers must pick {@code startTime} accordingly. + */ + private void seedFailedExecution(String executionId, Instant startTime) { + executionStore.insertExecutionBatch(List.of(new MergedExecution( + "default", 1L, executionId, "route-a", "inst-1", APP_SLUG, envSlug, + "FAILED", "", "exchange-" + executionId, + startTime, startTime.plusMillis(100), 100L, + "", "", "", "", "", "", // errorMessage..rootCauseMessage + "", "FULL", // diagramContentHash, engineLevel + "", "", "", "", "", "", // bodies / headers / properties + "{}", // attributes (JSON) + "", "", // traceId, spanId + false, false, + null, null + ))); + } + + /** + * Create a PER_EXCHANGE rule targeting {@code APP_SLUG} + status=FAILED with a + * single webhook binding. Returns the persisted rule id. + *

+ * {@code createdAt} is set {@code 60s} before {@code Instant.now()} so the + * evaluator's first-run lower bound ({@code timeFrom = rule.createdAt}) picks + * up the seeded executions. + */ + private UUID createPerExchangeRuleWithWebhook() { + UUID ruleId2 = UUID.randomUUID(); + Instant now = Instant.now(); + var condition = new ExchangeMatchCondition( + new AlertScope(APP_SLUG, null, null), + new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()), + FireMode.PER_EXCHANGE, null, null, 60); + var webhook = new WebhookBinding(UUID.randomUUID(), null, null, Map.of()); + var rule = new AlertRule( + ruleId2, envId, "per-exchange-rule-" + ruleId2, null, + AlertSeverity.WARNING, true, ConditionKind.EXCHANGE_MATCH, condition, + 60, 0, 60, + "Exchange FAILED: {{exchange.id}}", "route={{exchange.routeId}}", + List.of(webhook), List.of(), + now.minusSeconds(5), // due now + null, null, Map.of(), + now.minusSeconds(60), SYS_USER, // createdAt — bounds first-run cursor + now.minusSeconds(60), SYS_USER); + ruleRepo.save(rule); + return ruleId2; + } + + /** List all notifications enqueued for any instance of {@code ruleId}. */ + private List listNotificationsByRule(UUID ruleId) { + List instanceIds = jdbcTemplate.queryForList( + "SELECT id FROM alert_instances WHERE rule_id = ?", + UUID.class, ruleId); + List out = new java.util.ArrayList<>(); + for (UUID iid : instanceIds) { + out.addAll(notificationRepo.listForInstance(iid)); + } + return out; + } + + /** List all instances for {@code ruleId} (open or resolved). */ + private int countInstancesByRule(UUID ruleId) { + Long c = jdbcTemplate.queryForObject( + "SELECT count(*) FROM alert_instances WHERE rule_id = ?", + Long.class, ruleId); + return c == null ? 0 : c.intValue(); + } + // ------------------------------------------------------------------------- // Tests // ------------------------------------------------------------------------- @@ -238,4 +325,67 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT { assertThat(snapshotAfter).contains("\"name\": \"dead-agent-rule\""); assertThat(snapshotAfter).contains("\"severity\": \"WARNING\""); } + + // ------------------------------------------------------------------------- + // PER_EXCHANGE regression pin — notifications must not re-enqueue for + // already-matched exchanges across tick boundaries (cursor must be persisted + // via releaseClaim, then read back on the next tick). + // See: docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md + // ------------------------------------------------------------------------- + + @Test + void tick2_noNewExchanges_enqueuesZeroAdditionalNotifications() { + // Arrange: 2 FAILED executions in ClickHouse, 1 PER_EXCHANGE rule with 1 webhook. + // Use relative-to-now timestamps so they sort within the evaluator's + // [rule.createdAt .. ctx.now()] window. + Instant t0 = Instant.now().minusSeconds(30); + seedFailedExecution("exec-1", t0); + seedFailedExecution("exec-2", t0.plusSeconds(1)); + UUID perExRuleId = createPerExchangeRuleWithWebhook(); + + // Tick 1 — expect 2 instances, 2 notifications. + job.tick(); + assertThat(countInstancesByRule(perExRuleId)) + .as("tick 1 must create one FIRING instance per matched exchange") + .isEqualTo(2); + List afterTick1 = listNotificationsByRule(perExRuleId); + assertThat(afterTick1) + .as("tick 1 must enqueue one notification per instance (1 webhook × 2 instances)") + .hasSize(2); + + // Simulate NotificationDispatchJob draining the queue. + Instant now = Instant.now(); + afterTick1.forEach(n -> notificationRepo.markDelivered(n.id(), 200, "OK", now)); + + // Reopen the claim so the rule is due for Tick 2. + jdbcTemplate.update( + "UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " + + "claimed_by = NULL, claimed_until = NULL WHERE id = ?", perExRuleId); + + // Tick 2 — no new ClickHouse rows; cursor should have advanced past exec-2 + // during tick 1 and persisted via releaseClaim. Therefore: no new firings, + // no new notifications. + job.tick(); + + // Instance count unchanged. + assertThat(countInstancesByRule(perExRuleId)) + .as("tick 2 must not create new instances — cursor persisted past exec-2") + .isEqualTo(2); + + // THE BLEED: any new PENDING notification after tick 2 indicates the + // evaluator re-matched already-processed exchanges (cursor not persisted + // across ticks). Must be zero after the Phase 1 fix. + long pending = listNotificationsByRule(perExRuleId).stream() + .filter(n -> n.status() == NotificationStatus.PENDING) + .count(); + assertThat(pending) + .as("tick 2 must NOT re-enqueue notifications for already-matched exchanges") + .isZero(); + + // Clean up the extra rule (setup-created rule is handled by @AfterEach). + jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " + + "(SELECT id FROM alert_instances WHERE rule_id = ?)", perExRuleId); + jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", perExRuleId); + jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", perExRuleId); + } }