diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJob.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJob.java new file mode 100644 index 00000000..8ceef294 --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJob.java @@ -0,0 +1,174 @@ +package com.cameleer.server.app.alerting.notify; + +import com.cameleer.server.app.alerting.config.AlertingProperties; +import com.cameleer.server.core.alerting.*; +import com.cameleer.server.core.outbound.OutboundConnectionRepository; +import com.cameleer.server.core.runtime.Environment; +import com.cameleer.server.core.runtime.EnvironmentRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; +import org.springframework.stereotype.Component; + +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +/** + * Claim-polling outbox loop that dispatches {@link AlertNotification} records. + *
+ * On each tick, claims a batch of due notifications, resolves the backing + * {@link AlertInstance} and {@link com.cameleer.server.core.outbound.OutboundConnection}, + * checks active silences, delegates to {@link WebhookDispatcher}, and persists the outcome. + *
+ * Retry backoff: {@code retryAfter × attempts} (30 s, 60 s, 90 s, …).
+ * After {@link AlertingProperties#effectiveWebhookMaxAttempts()} retries the notification
+ * is marked FAILED permanently.
+ */
+@Component
+public class NotificationDispatchJob implements SchedulingConfigurer {
+
+ private static final Logger log = LoggerFactory.getLogger(NotificationDispatchJob.class);
+
+ private final AlertingProperties props;
+ private final AlertNotificationRepository notificationRepo;
+ private final AlertInstanceRepository instanceRepo;
+ private final AlertRuleRepository ruleRepo;
+ private final AlertSilenceRepository silenceRepo;
+ private final OutboundConnectionRepository outboundRepo;
+ private final EnvironmentRepository envRepo;
+ private final WebhookDispatcher dispatcher;
+ private final SilenceMatcherService silenceMatcher;
+ private final NotificationContextBuilder contextBuilder;
+ private final String instanceId;
+ private final String tenantId;
+ private final Clock clock;
+ private final String uiOrigin;
+
+ @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
+ public NotificationDispatchJob(
+ AlertingProperties props,
+ AlertNotificationRepository notificationRepo,
+ AlertInstanceRepository instanceRepo,
+ AlertRuleRepository ruleRepo,
+ AlertSilenceRepository silenceRepo,
+ OutboundConnectionRepository outboundRepo,
+ EnvironmentRepository envRepo,
+ WebhookDispatcher dispatcher,
+ SilenceMatcherService silenceMatcher,
+ NotificationContextBuilder contextBuilder,
+ @Qualifier("alertingInstanceId") String instanceId,
+ @Value("${cameleer.server.tenant.id:default}") String tenantId,
+ Clock alertingClock,
+ @Value("${cameleer.server.ui-origin:#{null}}") String uiOrigin) {
+
+ this.props = props;
+ this.notificationRepo = notificationRepo;
+ this.instanceRepo = instanceRepo;
+ this.ruleRepo = ruleRepo;
+ this.silenceRepo = silenceRepo;
+ this.outboundRepo = outboundRepo;
+ this.envRepo = envRepo;
+ this.dispatcher = dispatcher;
+ this.silenceMatcher = silenceMatcher;
+ this.contextBuilder = contextBuilder;
+ this.instanceId = instanceId;
+ this.tenantId = tenantId;
+ this.clock = alertingClock;
+ this.uiOrigin = uiOrigin;
+ }
+
+ // -------------------------------------------------------------------------
+ // SchedulingConfigurer
+ // -------------------------------------------------------------------------
+
+ @Override
+ public void configureTasks(ScheduledTaskRegistrar registrar) {
+ registrar.addFixedDelayTask(this::tick, props.effectiveNotificationTickIntervalMs());
+ }
+
+ // -------------------------------------------------------------------------
+ // Tick — package-private for tests
+ // -------------------------------------------------------------------------
+
+ void tick() {
+ List
+ * Uses real Postgres repositories (Testcontainers). {@link WebhookDispatcher} is mocked
+ * so network dispatch is controlled per test without spinning up a real HTTP server.
+ * Other Spring components that need HTTP (ClickHouse, AgentRegistry) are also mocked.
+ */
+class NotificationDispatchJobIT extends AbstractPostgresIT {
+
+ @MockBean(name = "clickHouseSearchIndex") ClickHouseSearchIndex clickHouseSearchIndex;
+ @MockBean(name = "clickHouseLogStore") ClickHouseLogStore clickHouseLogStore;
+ @MockBean AgentRegistryService agentRegistryService;
+
+ /** Mock the dispatcher — we control outcomes per test. */
+ @MockBean WebhookDispatcher webhookDispatcher;
+
+ @Autowired private NotificationDispatchJob job;
+ @Autowired private AlertNotificationRepository notificationRepo;
+ @Autowired private AlertInstanceRepository instanceRepo;
+ @Autowired private AlertRuleRepository ruleRepo;
+ @Autowired private AlertSilenceRepository silenceRepo;
+ @Autowired private OutboundConnectionRepository outboundRepo;
+
+ @Value("${cameleer.server.tenant.id:default}")
+ private String tenantId;
+
+ private UUID envId;
+ private UUID ruleId;
+ private UUID connId;
+ private UUID instanceId;
+
+ private static final String SYS_USER = "sys-dispatch-it";
+ private static final String CONN_NAME_PREFIX = "test-conn-dispatch-it-";
+
+ @BeforeEach
+ void setUp() {
+ when(agentRegistryService.findAll()).thenReturn(List.of());
+
+ envId = UUID.randomUUID();
+ ruleId = UUID.randomUUID();
+ connId = UUID.randomUUID();
+ instanceId = UUID.randomUUID();
+
+ jdbcTemplate.update(
+ "INSERT INTO environments (id, slug, display_name) VALUES (?, ?, ?)",
+ envId, "dispatch-it-env-" + envId, "Dispatch IT Env");
+ jdbcTemplate.update(
+ "INSERT INTO users (user_id, provider, email) VALUES (?, 'local', ?) ON CONFLICT (user_id) DO NOTHING",
+ SYS_USER, SYS_USER + "@test.example.com");
+
+ // Use ruleRepo.save() so the condition column is properly serialized (AlertCondition JSON)
+ var condition = new AgentStateCondition(new AlertScope(null, null, null), "DEAD", 60);
+ ruleRepo.save(new AlertRule(
+ ruleId, envId, "dispatch-rule", null,
+ AlertSeverity.WARNING, true, ConditionKind.AGENT_STATE, condition,
+ 60, 0, 60, "title", "msg",
+ List.of(), List.of(),
+ Instant.now().minusSeconds(5), null, null, Map.of(),
+ Instant.now(), SYS_USER, Instant.now(), SYS_USER));
+
+ // Use instanceRepo.save() so all columns are correctly populated
+ instanceRepo.save(new AlertInstance(
+ instanceId, ruleId, Map.of(), envId,
+ AlertState.FIRING, AlertSeverity.WARNING,
+ Instant.now(), null, null, null, null, false,
+ null, null, Map.of(), "title", "msg",
+ List.of(), List.of(), List.of()));
+
+ // Outbound connection (real row so findById works)
+ outboundRepo.save(new OutboundConnection(
+ connId, tenantId, CONN_NAME_PREFIX + connId, null,
+ "https://localhost:9999/webhook", OutboundMethod.POST,
+ Map.of(), null, TrustMode.SYSTEM_DEFAULT, List.of(),
+ null, new OutboundAuth.None(), List.of(),
+ Instant.now(), SYS_USER, Instant.now(), SYS_USER));
+ }
+
+ @AfterEach
+ void cleanup() {
+ jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id = ?", instanceId);
+ jdbcTemplate.update("DELETE FROM alert_silences WHERE environment_id = ?", envId);
+ jdbcTemplate.update("DELETE FROM alert_instances WHERE environment_id = ?", envId);
+ jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId);
+ jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId);
+ // connection may already be deleted in some tests — ignore if absent
+ try { outboundRepo.delete(tenantId, connId); } catch (Exception ignored) {}
+ jdbcTemplate.update("DELETE FROM users WHERE user_id = ?", SYS_USER);
+ }
+
+ // -------------------------------------------------------------------------
+ // Tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ void twoHundred_marksDelivered() {
+ UUID notifId = seedNotification();
+ when(webhookDispatcher.dispatch(any(), any(), any(), any(), any()))
+ .thenReturn(new WebhookDispatcher.Outcome(NotificationStatus.DELIVERED, 200, "ok", null));
+
+ job.tick();
+
+ var row = notificationRepo.findById(notifId).orElseThrow();
+ assertThat(row.status()).isEqualTo(NotificationStatus.DELIVERED);
+ assertThat(row.lastResponseStatus()).isEqualTo(200);
+ assertThat(row.deliveredAt()).isNotNull();
+ }
+
+ @Test
+ void fiveOhThree_scheduleRetry() {
+ UUID notifId = seedNotification();
+ when(webhookDispatcher.dispatch(any(), any(), any(), any(), any()))
+ .thenReturn(new WebhookDispatcher.Outcome(null, 503, "unavailable", Duration.ofSeconds(30)));
+
+ job.tick();
+
+ var row = notificationRepo.findById(notifId).orElseThrow();
+ assertThat(row.status()).isEqualTo(NotificationStatus.PENDING);
+ assertThat(row.attempts()).isEqualTo(1);
+ assertThat(row.nextAttemptAt()).isAfter(Instant.now());
+ assertThat(row.lastResponseStatus()).isEqualTo(503);
+ }
+
+ @Test
+ void fourOhFour_failsImmediately() {
+ UUID notifId = seedNotification();
+ when(webhookDispatcher.dispatch(any(), any(), any(), any(), any()))
+ .thenReturn(new WebhookDispatcher.Outcome(NotificationStatus.FAILED, 404, "not found", null));
+
+ job.tick();
+
+ var row = notificationRepo.findById(notifId).orElseThrow();
+ assertThat(row.status()).isEqualTo(NotificationStatus.FAILED);
+ assertThat(row.lastResponseStatus()).isEqualTo(404);
+ }
+
+ @Test
+ void activeSilence_silencesInstanceAndFailsNotification() {
+ // Seed a silence matching by ruleId — SilenceMatcher.ruleId field
+ UUID silenceId = UUID.randomUUID();
+ jdbcTemplate.update("""
+ INSERT INTO alert_silences (id, environment_id, matcher, reason, starts_at, ends_at, created_by)
+ VALUES (?, ?, ?::jsonb, 'test silence', now() - interval '1 minute', now() + interval '1 hour', ?)""",
+ silenceId, envId,
+ "{\"ruleId\": \"" + ruleId + "\"}",
+ SYS_USER);
+
+ UUID notifId = seedNotification();
+
+ job.tick();
+
+ // Dispatcher must NOT have been called
+ verify(webhookDispatcher, never()).dispatch(any(), any(), any(), any(), any());
+
+ // Notification marked failed with "silenced"
+ var notifRow = notificationRepo.findById(notifId).orElseThrow();
+ assertThat(notifRow.status()).isEqualTo(NotificationStatus.FAILED);
+ assertThat(notifRow.lastResponseSnippet()).isEqualTo("silenced");
+
+ // Instance marked silenced=true
+ var instRow = instanceRepo.findById(instanceId).orElseThrow();
+ assertThat(instRow.silenced()).isTrue();
+ }
+
+ @Test
+ void deletedConnection_failsWithMessage() {
+ // Seed notification while connection still exists (FK constraint)
+ UUID notifId = seedNotification();
+
+ // Now delete the connection — dispatch job should detect the missing conn
+ outboundRepo.delete(tenantId, connId);
+
+ job.tick();
+
+ verify(webhookDispatcher, never()).dispatch(any(), any(), any(), any(), any());
+ var row = notificationRepo.findById(notifId).orElseThrow();
+ assertThat(row.status()).isEqualTo(NotificationStatus.FAILED);
+ assertThat(row.lastResponseSnippet()).isEqualTo("outbound connection deleted");
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ private UUID seedNotification() {
+ UUID notifId = UUID.randomUUID();
+ // Use raw SQL (simpler) — notification table has no complex JSON columns to deserialize here
+ jdbcTemplate.update("""
+ INSERT INTO alert_notifications (id, alert_instance_id, webhook_id, outbound_connection_id,
+ status, attempts, next_attempt_at, payload)
+ VALUES (?, ?, ?, ?, 'PENDING', 0, now() - interval '1 second', '{}'::jsonb)""",
+ notifId, instanceId, UUID.randomUUID(), connId);
+ return notifId;
+ }
+}