alerting(eval): persist advanced cursor via releaseClaim — Phase 1 close

Fixes the notification-bleed regression pinned by
AlertEvaluatorJobIT#tick2_noNewExchanges_enqueuesZeroAdditionalNotifications.
This commit is contained in:
hsiegeln
2026-04-22 16:36:01 +02:00
parent b8d4b59f40
commit 5bd0e09df3

View File

@@ -2,15 +2,19 @@ package com.cameleer.server.app.alerting.eval;
import com.cameleer.server.app.AbstractPostgresIT; import com.cameleer.server.app.AbstractPostgresIT;
import com.cameleer.server.app.search.ClickHouseLogStore; import com.cameleer.server.app.search.ClickHouseLogStore;
import com.cameleer.server.app.storage.ClickHouseExecutionStore;
import com.cameleer.server.core.agent.AgentInfo; import com.cameleer.server.core.agent.AgentInfo;
import com.cameleer.server.core.agent.AgentRegistryService; import com.cameleer.server.core.agent.AgentRegistryService;
import com.cameleer.server.core.agent.AgentState; import com.cameleer.server.core.agent.AgentState;
import com.cameleer.server.core.alerting.*; import com.cameleer.server.core.alerting.*;
import com.cameleer.server.core.ingestion.MergedExecution;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.jdbc.core.JdbcTemplate;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
@@ -37,12 +41,16 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
@Autowired private AlertEvaluatorJob job; @Autowired private AlertEvaluatorJob job;
@Autowired private AlertRuleRepository ruleRepo; @Autowired private AlertRuleRepository ruleRepo;
@Autowired private AlertInstanceRepository instanceRepo; @Autowired private AlertInstanceRepository instanceRepo;
@Autowired private AlertNotificationRepository notificationRepo;
@Autowired private ClickHouseExecutionStore executionStore;
@Autowired @Qualifier("clickHouseJdbcTemplate") private JdbcTemplate clickHouseJdbc;
private UUID envId; private UUID envId;
private UUID ruleId; private UUID ruleId;
private static final String SYS_USER = "sys-eval-it"; private static final String SYS_USER = "sys-eval-it";
private static final String APP_SLUG = "orders"; private static final String APP_SLUG = "orders";
private static final String AGENT_ID = "test-agent-01"; private static final String AGENT_ID = "test-agent-01";
private String envSlug;
@BeforeEach @BeforeEach
void setup() { void setup() {
@@ -51,10 +59,11 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
envId = UUID.randomUUID(); envId = UUID.randomUUID();
ruleId = UUID.randomUUID(); ruleId = UUID.randomUUID();
envSlug = "eval-it-env-" + envId;
jdbcTemplate.update( jdbcTemplate.update(
"INSERT INTO environments (id, slug, display_name) VALUES (?, ?, ?)", "INSERT INTO environments (id, slug, display_name) VALUES (?, ?, ?)",
envId, "eval-it-env-" + envId, "Eval IT Env"); envId, envSlug, "Eval IT Env");
jdbcTemplate.update( jdbcTemplate.update(
"INSERT INTO users (user_id, provider, email) VALUES (?, 'local', ?) ON CONFLICT (user_id) DO NOTHING", "INSERT INTO users (user_id, provider, email) VALUES (?, 'local', ?) ON CONFLICT (user_id) DO NOTHING",
SYS_USER, SYS_USER + "@test.example.com"); SYS_USER, SYS_USER + "@test.example.com");
@@ -82,6 +91,13 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId); jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId);
jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId); jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId);
jdbcTemplate.update("DELETE FROM users WHERE user_id = ?", SYS_USER); jdbcTemplate.update("DELETE FROM users WHERE user_id = ?", SYS_USER);
// ClickHouse — purge any executions seeded into this env. The shared CH instance
// persists across tests in the suite; scope the wipe to this test's env slug.
try {
if (envSlug != null) {
clickHouseJdbc.execute("ALTER TABLE executions DELETE WHERE environment = '" + envSlug + "'");
}
} catch (Exception ignored) { /* best-effort; next setup uses a fresh env slug */ }
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@@ -94,6 +110,77 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
AgentState.DEAD, lastHeartbeat.minusSeconds(300), lastHeartbeat, null); AgentState.DEAD, lastHeartbeat.minusSeconds(300), lastHeartbeat, null);
} }
/**
* Seed one FAILED execution row into the ClickHouse {@code executions} table,
* scoped to this test's tenant/env/app so it's picked up by a PER_EXCHANGE rule
* targeting {@code APP_SLUG}. Executions older than {@code rule.createdAt} are
* filtered out by the evaluator; callers must pick {@code startTime} accordingly.
*/
private void seedFailedExecution(String executionId, Instant startTime) {
executionStore.insertExecutionBatch(List.of(new MergedExecution(
"default", 1L, executionId, "route-a", "inst-1", APP_SLUG, envSlug,
"FAILED", "", "exchange-" + executionId,
startTime, startTime.plusMillis(100), 100L,
"", "", "", "", "", "", // errorMessage..rootCauseMessage
"", "FULL", // diagramContentHash, engineLevel
"", "", "", "", "", "", // bodies / headers / properties
"{}", // attributes (JSON)
"", "", // traceId, spanId
false, false,
null, null
)));
}
/**
* Create a PER_EXCHANGE rule targeting {@code APP_SLUG} + status=FAILED with a
* single webhook binding. Returns the persisted rule id.
* <p>
* {@code createdAt} is set {@code 60s} before {@code Instant.now()} so the
* evaluator's first-run lower bound ({@code timeFrom = rule.createdAt}) picks
* up the seeded executions.
*/
private UUID createPerExchangeRuleWithWebhook() {
UUID ruleId2 = UUID.randomUUID();
Instant now = Instant.now();
var condition = new ExchangeMatchCondition(
new AlertScope(APP_SLUG, null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.PER_EXCHANGE, null, null, 60);
var webhook = new WebhookBinding(UUID.randomUUID(), null, null, Map.of());
var rule = new AlertRule(
ruleId2, envId, "per-exchange-rule-" + ruleId2, null,
AlertSeverity.WARNING, true, ConditionKind.EXCHANGE_MATCH, condition,
60, 0, 60,
"Exchange FAILED: {{exchange.id}}", "route={{exchange.routeId}}",
List.of(webhook), List.of(),
now.minusSeconds(5), // due now
null, null, Map.of(),
now.minusSeconds(60), SYS_USER, // createdAt — bounds first-run cursor
now.minusSeconds(60), SYS_USER);
ruleRepo.save(rule);
return ruleId2;
}
/** List all notifications enqueued for any instance of {@code ruleId}. */
private List<AlertNotification> listNotificationsByRule(UUID ruleId) {
List<UUID> instanceIds = jdbcTemplate.queryForList(
"SELECT id FROM alert_instances WHERE rule_id = ?",
UUID.class, ruleId);
List<AlertNotification> out = new java.util.ArrayList<>();
for (UUID iid : instanceIds) {
out.addAll(notificationRepo.listForInstance(iid));
}
return out;
}
/** List all instances for {@code ruleId} (open or resolved). */
private int countInstancesByRule(UUID ruleId) {
Long c = jdbcTemplate.queryForObject(
"SELECT count(*) FROM alert_instances WHERE rule_id = ?",
Long.class, ruleId);
return c == null ? 0 : c.intValue();
}
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Tests // Tests
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@@ -238,4 +325,67 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
assertThat(snapshotAfter).contains("\"name\": \"dead-agent-rule\""); assertThat(snapshotAfter).contains("\"name\": \"dead-agent-rule\"");
assertThat(snapshotAfter).contains("\"severity\": \"WARNING\""); assertThat(snapshotAfter).contains("\"severity\": \"WARNING\"");
} }
// -------------------------------------------------------------------------
// PER_EXCHANGE regression pin — notifications must not re-enqueue for
// already-matched exchanges across tick boundaries (cursor must be persisted
// via releaseClaim, then read back on the next tick).
// See: docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md
// -------------------------------------------------------------------------
@Test
void tick2_noNewExchanges_enqueuesZeroAdditionalNotifications() {
// Arrange: 2 FAILED executions in ClickHouse, 1 PER_EXCHANGE rule with 1 webhook.
// Use relative-to-now timestamps so they sort within the evaluator's
// [rule.createdAt .. ctx.now()] window.
Instant t0 = Instant.now().minusSeconds(30);
seedFailedExecution("exec-1", t0);
seedFailedExecution("exec-2", t0.plusSeconds(1));
UUID perExRuleId = createPerExchangeRuleWithWebhook();
// Tick 1 — expect 2 instances, 2 notifications.
job.tick();
assertThat(countInstancesByRule(perExRuleId))
.as("tick 1 must create one FIRING instance per matched exchange")
.isEqualTo(2);
List<AlertNotification> afterTick1 = listNotificationsByRule(perExRuleId);
assertThat(afterTick1)
.as("tick 1 must enqueue one notification per instance (1 webhook × 2 instances)")
.hasSize(2);
// Simulate NotificationDispatchJob draining the queue.
Instant now = Instant.now();
afterTick1.forEach(n -> notificationRepo.markDelivered(n.id(), 200, "OK", now));
// Reopen the claim so the rule is due for Tick 2.
jdbcTemplate.update(
"UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " +
"claimed_by = NULL, claimed_until = NULL WHERE id = ?", perExRuleId);
// Tick 2 — no new ClickHouse rows; cursor should have advanced past exec-2
// during tick 1 and persisted via releaseClaim. Therefore: no new firings,
// no new notifications.
job.tick();
// Instance count unchanged.
assertThat(countInstancesByRule(perExRuleId))
.as("tick 2 must not create new instances — cursor persisted past exec-2")
.isEqualTo(2);
// THE BLEED: any new PENDING notification after tick 2 indicates the
// evaluator re-matched already-processed exchanges (cursor not persisted
// across ticks). Must be zero after the Phase 1 fix.
long pending = listNotificationsByRule(perExRuleId).stream()
.filter(n -> n.status() == NotificationStatus.PENDING)
.count();
assertThat(pending)
.as("tick 2 must NOT re-enqueue notifications for already-matched exchanges")
.isZero();
// Clean up the extra rule (setup-created rule is handled by @AfterEach).
jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " +
"(SELECT id FROM alert_instances WHERE rule_id = ?)", perExRuleId);
jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", perExRuleId);
jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", perExRuleId);
}
} }