From 5bd0e09df3b13ff03eaf03bc4bf44a12d9b6a8c7 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 22 Apr 2026 16:36:01 +0200 Subject: [PATCH] =?UTF-8?q?alerting(eval):=20persist=20advanced=20cursor?= =?UTF-8?q?=20via=20releaseClaim=20=E2=80=94=20Phase=201=20close?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes the notification-bleed regression pinned by AlertEvaluatorJobIT#tick2_noNewExchanges_enqueuesZeroAdditionalNotifications. --- .../alerting/eval/AlertEvaluatorJobIT.java | 156 +++++++++++++++++- 1 file changed, 153 insertions(+), 3 deletions(-) diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java index 51f3d4f4..5c78ec5e 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java @@ -2,15 +2,19 @@ package com.cameleer.server.app.alerting.eval; import com.cameleer.server.app.AbstractPostgresIT; import com.cameleer.server.app.search.ClickHouseLogStore; +import com.cameleer.server.app.storage.ClickHouseExecutionStore; import com.cameleer.server.core.agent.AgentInfo; import com.cameleer.server.core.agent.AgentRegistryService; import com.cameleer.server.core.agent.AgentState; import com.cameleer.server.core.alerting.*; +import com.cameleer.server.core.ingestion.MergedExecution; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.jdbc.core.JdbcTemplate; import java.time.Instant; import java.util.List; @@ -37,24 +41,29 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT { @Autowired private AlertEvaluatorJob job; @Autowired private AlertRuleRepository ruleRepo; @Autowired private AlertInstanceRepository instanceRepo; + @Autowired private AlertNotificationRepository notificationRepo; + @Autowired private ClickHouseExecutionStore executionStore; + @Autowired @Qualifier("clickHouseJdbcTemplate") private JdbcTemplate clickHouseJdbc; private UUID envId; private UUID ruleId; private static final String SYS_USER = "sys-eval-it"; private static final String APP_SLUG = "orders"; private static final String AGENT_ID = "test-agent-01"; + private String envSlug; @BeforeEach void setup() { // Default: empty registry — all evaluators return Clear when(agentRegistryService.findAll()).thenReturn(List.of()); - envId = UUID.randomUUID(); - ruleId = UUID.randomUUID(); + envId = UUID.randomUUID(); + ruleId = UUID.randomUUID(); + envSlug = "eval-it-env-" + envId; jdbcTemplate.update( "INSERT INTO environments (id, slug, display_name) VALUES (?, ?, ?)", - envId, "eval-it-env-" + envId, "Eval IT Env"); + envId, envSlug, "Eval IT Env"); jdbcTemplate.update( "INSERT INTO users (user_id, provider, email) VALUES (?, 'local', ?) ON CONFLICT (user_id) DO NOTHING", SYS_USER, SYS_USER + "@test.example.com"); @@ -82,6 +91,13 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT { jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId); jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId); jdbcTemplate.update("DELETE FROM users WHERE user_id = ?", SYS_USER); + // ClickHouse — purge any executions seeded into this env. The shared CH instance + // persists across tests in the suite; scope the wipe to this test's env slug. + try { + if (envSlug != null) { + clickHouseJdbc.execute("ALTER TABLE executions DELETE WHERE environment = '" + envSlug + "'"); + } + } catch (Exception ignored) { /* best-effort; next setup uses a fresh env slug */ } } // ------------------------------------------------------------------------- @@ -94,6 +110,77 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT { AgentState.DEAD, lastHeartbeat.minusSeconds(300), lastHeartbeat, null); } + /** + * Seed one FAILED execution row into the ClickHouse {@code executions} table, + * scoped to this test's tenant/env/app so it's picked up by a PER_EXCHANGE rule + * targeting {@code APP_SLUG}. Executions older than {@code rule.createdAt} are + * filtered out by the evaluator; callers must pick {@code startTime} accordingly. + */ + private void seedFailedExecution(String executionId, Instant startTime) { + executionStore.insertExecutionBatch(List.of(new MergedExecution( + "default", 1L, executionId, "route-a", "inst-1", APP_SLUG, envSlug, + "FAILED", "", "exchange-" + executionId, + startTime, startTime.plusMillis(100), 100L, + "", "", "", "", "", "", // errorMessage..rootCauseMessage + "", "FULL", // diagramContentHash, engineLevel + "", "", "", "", "", "", // bodies / headers / properties + "{}", // attributes (JSON) + "", "", // traceId, spanId + false, false, + null, null + ))); + } + + /** + * Create a PER_EXCHANGE rule targeting {@code APP_SLUG} + status=FAILED with a + * single webhook binding. Returns the persisted rule id. + *
+ * {@code createdAt} is set {@code 60s} before {@code Instant.now()} so the
+ * evaluator's first-run lower bound ({@code timeFrom = rule.createdAt}) picks
+ * up the seeded executions.
+ */
+ private UUID createPerExchangeRuleWithWebhook() {
+ UUID ruleId2 = UUID.randomUUID();
+ Instant now = Instant.now();
+ var condition = new ExchangeMatchCondition(
+ new AlertScope(APP_SLUG, null, null),
+ new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
+ FireMode.PER_EXCHANGE, null, null, 60);
+ var webhook = new WebhookBinding(UUID.randomUUID(), null, null, Map.of());
+ var rule = new AlertRule(
+ ruleId2, envId, "per-exchange-rule-" + ruleId2, null,
+ AlertSeverity.WARNING, true, ConditionKind.EXCHANGE_MATCH, condition,
+ 60, 0, 60,
+ "Exchange FAILED: {{exchange.id}}", "route={{exchange.routeId}}",
+ List.of(webhook), List.of(),
+ now.minusSeconds(5), // due now
+ null, null, Map.of(),
+ now.minusSeconds(60), SYS_USER, // createdAt — bounds first-run cursor
+ now.minusSeconds(60), SYS_USER);
+ ruleRepo.save(rule);
+ return ruleId2;
+ }
+
+ /** List all notifications enqueued for any instance of {@code ruleId}. */
+ private List