From 6b48bc63bf6a0bef30d3d65bd4fffbb6c7c3b60b Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Sun, 19 Apr 2026 20:24:54 +0200 Subject: [PATCH] feat(alerting): NotificationDispatchJob outbox loop with silence + retry Claim-polling SchedulingConfigurer: claims due notifications, resolves instance/connection/rule, checks active silences, dispatches via WebhookDispatcher, classifies outcomes into DELIVERED/FAILED/retry. Guards null rule/env after deletion. 5 Testcontainers ITs: 200/503/404 outcomes, active silence suppression, deleted connection fast-fail. Co-Authored-By: Claude Sonnet 4.6 --- .../notify/NotificationDispatchJob.java | 174 ++++++++++++++ .../notify/NotificationDispatchJobIT.java | 223 ++++++++++++++++++ 2 files changed, 397 insertions(+) create mode 100644 cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJob.java create mode 100644 cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJobIT.java diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJob.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJob.java new file mode 100644 index 00000000..8ceef294 --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJob.java @@ -0,0 +1,174 @@ +package com.cameleer.server.app.alerting.notify; + +import com.cameleer.server.app.alerting.config.AlertingProperties; +import com.cameleer.server.core.alerting.*; +import com.cameleer.server.core.outbound.OutboundConnectionRepository; +import com.cameleer.server.core.runtime.Environment; +import com.cameleer.server.core.runtime.EnvironmentRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; +import org.springframework.stereotype.Component; + +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +/** + * Claim-polling outbox loop that dispatches {@link AlertNotification} records. + *

+ * On each tick, claims a batch of due notifications, resolves the backing + * {@link AlertInstance} and {@link com.cameleer.server.core.outbound.OutboundConnection}, + * checks active silences, delegates to {@link WebhookDispatcher}, and persists the outcome. + *

+ * Retry backoff: {@code retryAfter × attempts} (30 s, 60 s, 90 s, …). + * After {@link AlertingProperties#effectiveWebhookMaxAttempts()} retries the notification + * is marked FAILED permanently. + */ +@Component +public class NotificationDispatchJob implements SchedulingConfigurer { + + private static final Logger log = LoggerFactory.getLogger(NotificationDispatchJob.class); + + private final AlertingProperties props; + private final AlertNotificationRepository notificationRepo; + private final AlertInstanceRepository instanceRepo; + private final AlertRuleRepository ruleRepo; + private final AlertSilenceRepository silenceRepo; + private final OutboundConnectionRepository outboundRepo; + private final EnvironmentRepository envRepo; + private final WebhookDispatcher dispatcher; + private final SilenceMatcherService silenceMatcher; + private final NotificationContextBuilder contextBuilder; + private final String instanceId; + private final String tenantId; + private final Clock clock; + private final String uiOrigin; + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public NotificationDispatchJob( + AlertingProperties props, + AlertNotificationRepository notificationRepo, + AlertInstanceRepository instanceRepo, + AlertRuleRepository ruleRepo, + AlertSilenceRepository silenceRepo, + OutboundConnectionRepository outboundRepo, + EnvironmentRepository envRepo, + WebhookDispatcher dispatcher, + SilenceMatcherService silenceMatcher, + NotificationContextBuilder contextBuilder, + @Qualifier("alertingInstanceId") String instanceId, + @Value("${cameleer.server.tenant.id:default}") String tenantId, + Clock alertingClock, + @Value("${cameleer.server.ui-origin:#{null}}") String uiOrigin) { + + this.props = props; + this.notificationRepo = notificationRepo; + this.instanceRepo = instanceRepo; + this.ruleRepo = ruleRepo; + this.silenceRepo = silenceRepo; + this.outboundRepo = outboundRepo; + this.envRepo = envRepo; + this.dispatcher = dispatcher; + this.silenceMatcher = silenceMatcher; + this.contextBuilder = contextBuilder; + this.instanceId = instanceId; + this.tenantId = tenantId; + this.clock = alertingClock; + this.uiOrigin = uiOrigin; + } + + // ------------------------------------------------------------------------- + // SchedulingConfigurer + // ------------------------------------------------------------------------- + + @Override + public void configureTasks(ScheduledTaskRegistrar registrar) { + registrar.addFixedDelayTask(this::tick, props.effectiveNotificationTickIntervalMs()); + } + + // ------------------------------------------------------------------------- + // Tick — package-private for tests + // ------------------------------------------------------------------------- + + void tick() { + List claimed = notificationRepo.claimDueNotifications( + instanceId, + props.effectiveNotificationBatchSize(), + props.effectiveClaimTtlSeconds()); + + for (AlertNotification n : claimed) { + try { + processOne(n); + } catch (Exception e) { + log.warn("Notification dispatch error for {}: {}", n.id(), e.toString()); + notificationRepo.scheduleRetry(n.id(), Instant.now(clock).plusSeconds(30), -1, e.getMessage()); + } + } + } + + // ------------------------------------------------------------------------- + // Per-notification processing + // ------------------------------------------------------------------------- + + private void processOne(AlertNotification n) { + // 1. Resolve alert instance + AlertInstance instance = instanceRepo.findById(n.alertInstanceId()).orElse(null); + if (instance == null) { + notificationRepo.markFailed(n.id(), 0, "instance deleted"); + return; + } + + // 2. Resolve outbound connection + var conn = outboundRepo.findById(tenantId, n.outboundConnectionId()).orElse(null); + if (conn == null) { + notificationRepo.markFailed(n.id(), 0, "outbound connection deleted"); + return; + } + + // 3. Resolve rule and environment (may be null after deletion) + AlertRule rule = instance.ruleId() == null ? null + : ruleRepo.findById(instance.ruleId()).orElse(null); + Environment env = envRepo.findById(instance.environmentId()).orElse(null); + + // 4. Build Mustache context (guard: rule or env may be null after deletion) + Map context = (rule != null && env != null) + ? contextBuilder.build(rule, instance, env, uiOrigin) + : Map.of(); + + // 5. Silence check + List activeSilences = silenceRepo.listActive(instance.environmentId(), Instant.now(clock)); + for (AlertSilence s : activeSilences) { + if (silenceMatcher.matches(s.matcher(), instance, rule)) { + instanceRepo.markSilenced(instance.id(), true); + notificationRepo.markFailed(n.id(), 0, "silenced"); + return; + } + } + + // 6. Dispatch + WebhookDispatcher.Outcome outcome = dispatcher.dispatch(n, rule, instance, conn, context); + + NotificationStatus outcomeStatus = outcome.status(); + if (outcomeStatus == NotificationStatus.DELIVERED) { + notificationRepo.markDelivered( + n.id(), outcome.httpStatus(), outcome.snippet(), Instant.now(clock)); + } else if (outcomeStatus == NotificationStatus.FAILED) { + notificationRepo.markFailed( + n.id(), outcome.httpStatus(), outcome.snippet()); + } else { + // null status = transient failure (5xx / network / timeout) → retry + int attempts = n.attempts() + 1; + if (attempts >= props.effectiveWebhookMaxAttempts()) { + notificationRepo.markFailed(n.id(), outcome.httpStatus(), outcome.snippet()); + } else { + Instant next = Instant.now(clock).plus(outcome.retryAfter().multipliedBy(attempts)); + notificationRepo.scheduleRetry(n.id(), next, outcome.httpStatus(), outcome.snippet()); + } + } + } +} diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJobIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJobIT.java new file mode 100644 index 00000000..985d4807 --- /dev/null +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJobIT.java @@ -0,0 +1,223 @@ +package com.cameleer.server.app.alerting.notify; + +import com.cameleer.server.app.AbstractPostgresIT; +import com.cameleer.server.app.search.ClickHouseLogStore; +import com.cameleer.server.app.search.ClickHouseSearchIndex; +import com.cameleer.server.core.agent.AgentRegistryService; +import com.cameleer.server.core.alerting.*; +import com.cameleer.server.core.http.TrustMode; +import com.cameleer.server.core.outbound.OutboundAuth; +import com.cameleer.server.core.outbound.OutboundConnection; +import com.cameleer.server.core.outbound.OutboundConnectionRepository; +import com.cameleer.server.core.outbound.OutboundMethod; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.mock.mockito.MockBean; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +/** + * Integration test for {@link NotificationDispatchJob}. + *

+ * Uses real Postgres repositories (Testcontainers). {@link WebhookDispatcher} is mocked + * so network dispatch is controlled per test without spinning up a real HTTP server. + * Other Spring components that need HTTP (ClickHouse, AgentRegistry) are also mocked. + */ +class NotificationDispatchJobIT extends AbstractPostgresIT { + + @MockBean(name = "clickHouseSearchIndex") ClickHouseSearchIndex clickHouseSearchIndex; + @MockBean(name = "clickHouseLogStore") ClickHouseLogStore clickHouseLogStore; + @MockBean AgentRegistryService agentRegistryService; + + /** Mock the dispatcher — we control outcomes per test. */ + @MockBean WebhookDispatcher webhookDispatcher; + + @Autowired private NotificationDispatchJob job; + @Autowired private AlertNotificationRepository notificationRepo; + @Autowired private AlertInstanceRepository instanceRepo; + @Autowired private AlertRuleRepository ruleRepo; + @Autowired private AlertSilenceRepository silenceRepo; + @Autowired private OutboundConnectionRepository outboundRepo; + + @Value("${cameleer.server.tenant.id:default}") + private String tenantId; + + private UUID envId; + private UUID ruleId; + private UUID connId; + private UUID instanceId; + + private static final String SYS_USER = "sys-dispatch-it"; + private static final String CONN_NAME_PREFIX = "test-conn-dispatch-it-"; + + @BeforeEach + void setUp() { + when(agentRegistryService.findAll()).thenReturn(List.of()); + + envId = UUID.randomUUID(); + ruleId = UUID.randomUUID(); + connId = UUID.randomUUID(); + instanceId = UUID.randomUUID(); + + jdbcTemplate.update( + "INSERT INTO environments (id, slug, display_name) VALUES (?, ?, ?)", + envId, "dispatch-it-env-" + envId, "Dispatch IT Env"); + jdbcTemplate.update( + "INSERT INTO users (user_id, provider, email) VALUES (?, 'local', ?) ON CONFLICT (user_id) DO NOTHING", + SYS_USER, SYS_USER + "@test.example.com"); + + // Use ruleRepo.save() so the condition column is properly serialized (AlertCondition JSON) + var condition = new AgentStateCondition(new AlertScope(null, null, null), "DEAD", 60); + ruleRepo.save(new AlertRule( + ruleId, envId, "dispatch-rule", null, + AlertSeverity.WARNING, true, ConditionKind.AGENT_STATE, condition, + 60, 0, 60, "title", "msg", + List.of(), List.of(), + Instant.now().minusSeconds(5), null, null, Map.of(), + Instant.now(), SYS_USER, Instant.now(), SYS_USER)); + + // Use instanceRepo.save() so all columns are correctly populated + instanceRepo.save(new AlertInstance( + instanceId, ruleId, Map.of(), envId, + AlertState.FIRING, AlertSeverity.WARNING, + Instant.now(), null, null, null, null, false, + null, null, Map.of(), "title", "msg", + List.of(), List.of(), List.of())); + + // Outbound connection (real row so findById works) + outboundRepo.save(new OutboundConnection( + connId, tenantId, CONN_NAME_PREFIX + connId, null, + "https://localhost:9999/webhook", OutboundMethod.POST, + Map.of(), null, TrustMode.SYSTEM_DEFAULT, List.of(), + null, new OutboundAuth.None(), List.of(), + Instant.now(), SYS_USER, Instant.now(), SYS_USER)); + } + + @AfterEach + void cleanup() { + jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id = ?", instanceId); + jdbcTemplate.update("DELETE FROM alert_silences WHERE environment_id = ?", envId); + jdbcTemplate.update("DELETE FROM alert_instances WHERE environment_id = ?", envId); + jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId); + jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId); + // connection may already be deleted in some tests — ignore if absent + try { outboundRepo.delete(tenantId, connId); } catch (Exception ignored) {} + jdbcTemplate.update("DELETE FROM users WHERE user_id = ?", SYS_USER); + } + + // ------------------------------------------------------------------------- + // Tests + // ------------------------------------------------------------------------- + + @Test + void twoHundred_marksDelivered() { + UUID notifId = seedNotification(); + when(webhookDispatcher.dispatch(any(), any(), any(), any(), any())) + .thenReturn(new WebhookDispatcher.Outcome(NotificationStatus.DELIVERED, 200, "ok", null)); + + job.tick(); + + var row = notificationRepo.findById(notifId).orElseThrow(); + assertThat(row.status()).isEqualTo(NotificationStatus.DELIVERED); + assertThat(row.lastResponseStatus()).isEqualTo(200); + assertThat(row.deliveredAt()).isNotNull(); + } + + @Test + void fiveOhThree_scheduleRetry() { + UUID notifId = seedNotification(); + when(webhookDispatcher.dispatch(any(), any(), any(), any(), any())) + .thenReturn(new WebhookDispatcher.Outcome(null, 503, "unavailable", Duration.ofSeconds(30))); + + job.tick(); + + var row = notificationRepo.findById(notifId).orElseThrow(); + assertThat(row.status()).isEqualTo(NotificationStatus.PENDING); + assertThat(row.attempts()).isEqualTo(1); + assertThat(row.nextAttemptAt()).isAfter(Instant.now()); + assertThat(row.lastResponseStatus()).isEqualTo(503); + } + + @Test + void fourOhFour_failsImmediately() { + UUID notifId = seedNotification(); + when(webhookDispatcher.dispatch(any(), any(), any(), any(), any())) + .thenReturn(new WebhookDispatcher.Outcome(NotificationStatus.FAILED, 404, "not found", null)); + + job.tick(); + + var row = notificationRepo.findById(notifId).orElseThrow(); + assertThat(row.status()).isEqualTo(NotificationStatus.FAILED); + assertThat(row.lastResponseStatus()).isEqualTo(404); + } + + @Test + void activeSilence_silencesInstanceAndFailsNotification() { + // Seed a silence matching by ruleId — SilenceMatcher.ruleId field + UUID silenceId = UUID.randomUUID(); + jdbcTemplate.update(""" + INSERT INTO alert_silences (id, environment_id, matcher, reason, starts_at, ends_at, created_by) + VALUES (?, ?, ?::jsonb, 'test silence', now() - interval '1 minute', now() + interval '1 hour', ?)""", + silenceId, envId, + "{\"ruleId\": \"" + ruleId + "\"}", + SYS_USER); + + UUID notifId = seedNotification(); + + job.tick(); + + // Dispatcher must NOT have been called + verify(webhookDispatcher, never()).dispatch(any(), any(), any(), any(), any()); + + // Notification marked failed with "silenced" + var notifRow = notificationRepo.findById(notifId).orElseThrow(); + assertThat(notifRow.status()).isEqualTo(NotificationStatus.FAILED); + assertThat(notifRow.lastResponseSnippet()).isEqualTo("silenced"); + + // Instance marked silenced=true + var instRow = instanceRepo.findById(instanceId).orElseThrow(); + assertThat(instRow.silenced()).isTrue(); + } + + @Test + void deletedConnection_failsWithMessage() { + // Seed notification while connection still exists (FK constraint) + UUID notifId = seedNotification(); + + // Now delete the connection — dispatch job should detect the missing conn + outboundRepo.delete(tenantId, connId); + + job.tick(); + + verify(webhookDispatcher, never()).dispatch(any(), any(), any(), any(), any()); + var row = notificationRepo.findById(notifId).orElseThrow(); + assertThat(row.status()).isEqualTo(NotificationStatus.FAILED); + assertThat(row.lastResponseSnippet()).isEqualTo("outbound connection deleted"); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + private UUID seedNotification() { + UUID notifId = UUID.randomUUID(); + // Use raw SQL (simpler) — notification table has no complex JSON columns to deserialize here + jdbcTemplate.update(""" + INSERT INTO alert_notifications (id, alert_instance_id, webhook_id, outbound_connection_id, + status, attempts, next_attempt_at, payload) + VALUES (?, ?, ?, ?, 'PENDING', 0, now() - interval '1 second', '{}'::jsonb)""", + notifId, instanceId, UUID.randomUUID(), connId); + return notifId; + } +}