feat(alerting): NotificationDispatchJob outbox loop with silence + retry
Claim-polling SchedulingConfigurer: claims due notifications, resolves instance/connection/rule, checks active silences, dispatches via WebhookDispatcher, classifies outcomes into DELIVERED/FAILED/retry. Guards null rule/env after deletion. 5 Testcontainers ITs: 200/503/404 outcomes, active silence suppression, deleted connection fast-fail. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,174 @@
|
|||||||
|
package com.cameleer.server.app.alerting.notify;
|
||||||
|
|
||||||
|
import com.cameleer.server.app.alerting.config.AlertingProperties;
|
||||||
|
import com.cameleer.server.core.alerting.*;
|
||||||
|
import com.cameleer.server.core.outbound.OutboundConnectionRepository;
|
||||||
|
import com.cameleer.server.core.runtime.Environment;
|
||||||
|
import com.cameleer.server.core.runtime.EnvironmentRepository;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.scheduling.annotation.SchedulingConfigurer;
|
||||||
|
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.Clock;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Claim-polling outbox loop that dispatches {@link AlertNotification} records.
|
||||||
|
* <p>
|
||||||
|
* On each tick, claims a batch of due notifications, resolves the backing
|
||||||
|
* {@link AlertInstance} and {@link com.cameleer.server.core.outbound.OutboundConnection},
|
||||||
|
* checks active silences, delegates to {@link WebhookDispatcher}, and persists the outcome.
|
||||||
|
* <p>
|
||||||
|
* Retry backoff: {@code retryAfter × attempts} (30 s, 60 s, 90 s, …).
|
||||||
|
* After {@link AlertingProperties#effectiveWebhookMaxAttempts()} retries the notification
|
||||||
|
* is marked FAILED permanently.
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class NotificationDispatchJob implements SchedulingConfigurer {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(NotificationDispatchJob.class);
|
||||||
|
|
||||||
|
private final AlertingProperties props;
|
||||||
|
private final AlertNotificationRepository notificationRepo;
|
||||||
|
private final AlertInstanceRepository instanceRepo;
|
||||||
|
private final AlertRuleRepository ruleRepo;
|
||||||
|
private final AlertSilenceRepository silenceRepo;
|
||||||
|
private final OutboundConnectionRepository outboundRepo;
|
||||||
|
private final EnvironmentRepository envRepo;
|
||||||
|
private final WebhookDispatcher dispatcher;
|
||||||
|
private final SilenceMatcherService silenceMatcher;
|
||||||
|
private final NotificationContextBuilder contextBuilder;
|
||||||
|
private final String instanceId;
|
||||||
|
private final String tenantId;
|
||||||
|
private final Clock clock;
|
||||||
|
private final String uiOrigin;
|
||||||
|
|
||||||
|
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||||
|
public NotificationDispatchJob(
|
||||||
|
AlertingProperties props,
|
||||||
|
AlertNotificationRepository notificationRepo,
|
||||||
|
AlertInstanceRepository instanceRepo,
|
||||||
|
AlertRuleRepository ruleRepo,
|
||||||
|
AlertSilenceRepository silenceRepo,
|
||||||
|
OutboundConnectionRepository outboundRepo,
|
||||||
|
EnvironmentRepository envRepo,
|
||||||
|
WebhookDispatcher dispatcher,
|
||||||
|
SilenceMatcherService silenceMatcher,
|
||||||
|
NotificationContextBuilder contextBuilder,
|
||||||
|
@Qualifier("alertingInstanceId") String instanceId,
|
||||||
|
@Value("${cameleer.server.tenant.id:default}") String tenantId,
|
||||||
|
Clock alertingClock,
|
||||||
|
@Value("${cameleer.server.ui-origin:#{null}}") String uiOrigin) {
|
||||||
|
|
||||||
|
this.props = props;
|
||||||
|
this.notificationRepo = notificationRepo;
|
||||||
|
this.instanceRepo = instanceRepo;
|
||||||
|
this.ruleRepo = ruleRepo;
|
||||||
|
this.silenceRepo = silenceRepo;
|
||||||
|
this.outboundRepo = outboundRepo;
|
||||||
|
this.envRepo = envRepo;
|
||||||
|
this.dispatcher = dispatcher;
|
||||||
|
this.silenceMatcher = silenceMatcher;
|
||||||
|
this.contextBuilder = contextBuilder;
|
||||||
|
this.instanceId = instanceId;
|
||||||
|
this.tenantId = tenantId;
|
||||||
|
this.clock = alertingClock;
|
||||||
|
this.uiOrigin = uiOrigin;
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// SchedulingConfigurer
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configureTasks(ScheduledTaskRegistrar registrar) {
|
||||||
|
registrar.addFixedDelayTask(this::tick, props.effectiveNotificationTickIntervalMs());
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Tick — package-private for tests
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
void tick() {
|
||||||
|
List<AlertNotification> claimed = notificationRepo.claimDueNotifications(
|
||||||
|
instanceId,
|
||||||
|
props.effectiveNotificationBatchSize(),
|
||||||
|
props.effectiveClaimTtlSeconds());
|
||||||
|
|
||||||
|
for (AlertNotification n : claimed) {
|
||||||
|
try {
|
||||||
|
processOne(n);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Notification dispatch error for {}: {}", n.id(), e.toString());
|
||||||
|
notificationRepo.scheduleRetry(n.id(), Instant.now(clock).plusSeconds(30), -1, e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Per-notification processing
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
private void processOne(AlertNotification n) {
|
||||||
|
// 1. Resolve alert instance
|
||||||
|
AlertInstance instance = instanceRepo.findById(n.alertInstanceId()).orElse(null);
|
||||||
|
if (instance == null) {
|
||||||
|
notificationRepo.markFailed(n.id(), 0, "instance deleted");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Resolve outbound connection
|
||||||
|
var conn = outboundRepo.findById(tenantId, n.outboundConnectionId()).orElse(null);
|
||||||
|
if (conn == null) {
|
||||||
|
notificationRepo.markFailed(n.id(), 0, "outbound connection deleted");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Resolve rule and environment (may be null after deletion)
|
||||||
|
AlertRule rule = instance.ruleId() == null ? null
|
||||||
|
: ruleRepo.findById(instance.ruleId()).orElse(null);
|
||||||
|
Environment env = envRepo.findById(instance.environmentId()).orElse(null);
|
||||||
|
|
||||||
|
// 4. Build Mustache context (guard: rule or env may be null after deletion)
|
||||||
|
Map<String, Object> context = (rule != null && env != null)
|
||||||
|
? contextBuilder.build(rule, instance, env, uiOrigin)
|
||||||
|
: Map.of();
|
||||||
|
|
||||||
|
// 5. Silence check
|
||||||
|
List<AlertSilence> activeSilences = silenceRepo.listActive(instance.environmentId(), Instant.now(clock));
|
||||||
|
for (AlertSilence s : activeSilences) {
|
||||||
|
if (silenceMatcher.matches(s.matcher(), instance, rule)) {
|
||||||
|
instanceRepo.markSilenced(instance.id(), true);
|
||||||
|
notificationRepo.markFailed(n.id(), 0, "silenced");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 6. Dispatch
|
||||||
|
WebhookDispatcher.Outcome outcome = dispatcher.dispatch(n, rule, instance, conn, context);
|
||||||
|
|
||||||
|
NotificationStatus outcomeStatus = outcome.status();
|
||||||
|
if (outcomeStatus == NotificationStatus.DELIVERED) {
|
||||||
|
notificationRepo.markDelivered(
|
||||||
|
n.id(), outcome.httpStatus(), outcome.snippet(), Instant.now(clock));
|
||||||
|
} else if (outcomeStatus == NotificationStatus.FAILED) {
|
||||||
|
notificationRepo.markFailed(
|
||||||
|
n.id(), outcome.httpStatus(), outcome.snippet());
|
||||||
|
} else {
|
||||||
|
// null status = transient failure (5xx / network / timeout) → retry
|
||||||
|
int attempts = n.attempts() + 1;
|
||||||
|
if (attempts >= props.effectiveWebhookMaxAttempts()) {
|
||||||
|
notificationRepo.markFailed(n.id(), outcome.httpStatus(), outcome.snippet());
|
||||||
|
} else {
|
||||||
|
Instant next = Instant.now(clock).plus(outcome.retryAfter().multipliedBy(attempts));
|
||||||
|
notificationRepo.scheduleRetry(n.id(), next, outcome.httpStatus(), outcome.snippet());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,223 @@
|
|||||||
|
package com.cameleer.server.app.alerting.notify;
|
||||||
|
|
||||||
|
import com.cameleer.server.app.AbstractPostgresIT;
|
||||||
|
import com.cameleer.server.app.search.ClickHouseLogStore;
|
||||||
|
import com.cameleer.server.app.search.ClickHouseSearchIndex;
|
||||||
|
import com.cameleer.server.core.agent.AgentRegistryService;
|
||||||
|
import com.cameleer.server.core.alerting.*;
|
||||||
|
import com.cameleer.server.core.http.TrustMode;
|
||||||
|
import com.cameleer.server.core.outbound.OutboundAuth;
|
||||||
|
import com.cameleer.server.core.outbound.OutboundConnection;
|
||||||
|
import com.cameleer.server.core.outbound.OutboundConnectionRepository;
|
||||||
|
import com.cameleer.server.core.outbound.OutboundMethod;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integration test for {@link NotificationDispatchJob}.
|
||||||
|
* <p>
|
||||||
|
* Uses real Postgres repositories (Testcontainers). {@link WebhookDispatcher} is mocked
|
||||||
|
* so network dispatch is controlled per test without spinning up a real HTTP server.
|
||||||
|
* Other Spring components that need HTTP (ClickHouse, AgentRegistry) are also mocked.
|
||||||
|
*/
|
||||||
|
class NotificationDispatchJobIT extends AbstractPostgresIT {
|
||||||
|
|
||||||
|
@MockBean(name = "clickHouseSearchIndex") ClickHouseSearchIndex clickHouseSearchIndex;
|
||||||
|
@MockBean(name = "clickHouseLogStore") ClickHouseLogStore clickHouseLogStore;
|
||||||
|
@MockBean AgentRegistryService agentRegistryService;
|
||||||
|
|
||||||
|
/** Mock the dispatcher — we control outcomes per test. */
|
||||||
|
@MockBean WebhookDispatcher webhookDispatcher;
|
||||||
|
|
||||||
|
@Autowired private NotificationDispatchJob job;
|
||||||
|
@Autowired private AlertNotificationRepository notificationRepo;
|
||||||
|
@Autowired private AlertInstanceRepository instanceRepo;
|
||||||
|
@Autowired private AlertRuleRepository ruleRepo;
|
||||||
|
@Autowired private AlertSilenceRepository silenceRepo;
|
||||||
|
@Autowired private OutboundConnectionRepository outboundRepo;
|
||||||
|
|
||||||
|
@Value("${cameleer.server.tenant.id:default}")
|
||||||
|
private String tenantId;
|
||||||
|
|
||||||
|
private UUID envId;
|
||||||
|
private UUID ruleId;
|
||||||
|
private UUID connId;
|
||||||
|
private UUID instanceId;
|
||||||
|
|
||||||
|
private static final String SYS_USER = "sys-dispatch-it";
|
||||||
|
private static final String CONN_NAME_PREFIX = "test-conn-dispatch-it-";
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() {
|
||||||
|
when(agentRegistryService.findAll()).thenReturn(List.of());
|
||||||
|
|
||||||
|
envId = UUID.randomUUID();
|
||||||
|
ruleId = UUID.randomUUID();
|
||||||
|
connId = UUID.randomUUID();
|
||||||
|
instanceId = UUID.randomUUID();
|
||||||
|
|
||||||
|
jdbcTemplate.update(
|
||||||
|
"INSERT INTO environments (id, slug, display_name) VALUES (?, ?, ?)",
|
||||||
|
envId, "dispatch-it-env-" + envId, "Dispatch IT Env");
|
||||||
|
jdbcTemplate.update(
|
||||||
|
"INSERT INTO users (user_id, provider, email) VALUES (?, 'local', ?) ON CONFLICT (user_id) DO NOTHING",
|
||||||
|
SYS_USER, SYS_USER + "@test.example.com");
|
||||||
|
|
||||||
|
// Use ruleRepo.save() so the condition column is properly serialized (AlertCondition JSON)
|
||||||
|
var condition = new AgentStateCondition(new AlertScope(null, null, null), "DEAD", 60);
|
||||||
|
ruleRepo.save(new AlertRule(
|
||||||
|
ruleId, envId, "dispatch-rule", null,
|
||||||
|
AlertSeverity.WARNING, true, ConditionKind.AGENT_STATE, condition,
|
||||||
|
60, 0, 60, "title", "msg",
|
||||||
|
List.of(), List.of(),
|
||||||
|
Instant.now().minusSeconds(5), null, null, Map.of(),
|
||||||
|
Instant.now(), SYS_USER, Instant.now(), SYS_USER));
|
||||||
|
|
||||||
|
// Use instanceRepo.save() so all columns are correctly populated
|
||||||
|
instanceRepo.save(new AlertInstance(
|
||||||
|
instanceId, ruleId, Map.of(), envId,
|
||||||
|
AlertState.FIRING, AlertSeverity.WARNING,
|
||||||
|
Instant.now(), null, null, null, null, false,
|
||||||
|
null, null, Map.of(), "title", "msg",
|
||||||
|
List.of(), List.of(), List.of()));
|
||||||
|
|
||||||
|
// Outbound connection (real row so findById works)
|
||||||
|
outboundRepo.save(new OutboundConnection(
|
||||||
|
connId, tenantId, CONN_NAME_PREFIX + connId, null,
|
||||||
|
"https://localhost:9999/webhook", OutboundMethod.POST,
|
||||||
|
Map.of(), null, TrustMode.SYSTEM_DEFAULT, List.of(),
|
||||||
|
null, new OutboundAuth.None(), List.of(),
|
||||||
|
Instant.now(), SYS_USER, Instant.now(), SYS_USER));
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
void cleanup() {
|
||||||
|
jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id = ?", instanceId);
|
||||||
|
jdbcTemplate.update("DELETE FROM alert_silences WHERE environment_id = ?", envId);
|
||||||
|
jdbcTemplate.update("DELETE FROM alert_instances WHERE environment_id = ?", envId);
|
||||||
|
jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId);
|
||||||
|
jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId);
|
||||||
|
// connection may already be deleted in some tests — ignore if absent
|
||||||
|
try { outboundRepo.delete(tenantId, connId); } catch (Exception ignored) {}
|
||||||
|
jdbcTemplate.update("DELETE FROM users WHERE user_id = ?", SYS_USER);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Tests
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void twoHundred_marksDelivered() {
|
||||||
|
UUID notifId = seedNotification();
|
||||||
|
when(webhookDispatcher.dispatch(any(), any(), any(), any(), any()))
|
||||||
|
.thenReturn(new WebhookDispatcher.Outcome(NotificationStatus.DELIVERED, 200, "ok", null));
|
||||||
|
|
||||||
|
job.tick();
|
||||||
|
|
||||||
|
var row = notificationRepo.findById(notifId).orElseThrow();
|
||||||
|
assertThat(row.status()).isEqualTo(NotificationStatus.DELIVERED);
|
||||||
|
assertThat(row.lastResponseStatus()).isEqualTo(200);
|
||||||
|
assertThat(row.deliveredAt()).isNotNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void fiveOhThree_scheduleRetry() {
|
||||||
|
UUID notifId = seedNotification();
|
||||||
|
when(webhookDispatcher.dispatch(any(), any(), any(), any(), any()))
|
||||||
|
.thenReturn(new WebhookDispatcher.Outcome(null, 503, "unavailable", Duration.ofSeconds(30)));
|
||||||
|
|
||||||
|
job.tick();
|
||||||
|
|
||||||
|
var row = notificationRepo.findById(notifId).orElseThrow();
|
||||||
|
assertThat(row.status()).isEqualTo(NotificationStatus.PENDING);
|
||||||
|
assertThat(row.attempts()).isEqualTo(1);
|
||||||
|
assertThat(row.nextAttemptAt()).isAfter(Instant.now());
|
||||||
|
assertThat(row.lastResponseStatus()).isEqualTo(503);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void fourOhFour_failsImmediately() {
|
||||||
|
UUID notifId = seedNotification();
|
||||||
|
when(webhookDispatcher.dispatch(any(), any(), any(), any(), any()))
|
||||||
|
.thenReturn(new WebhookDispatcher.Outcome(NotificationStatus.FAILED, 404, "not found", null));
|
||||||
|
|
||||||
|
job.tick();
|
||||||
|
|
||||||
|
var row = notificationRepo.findById(notifId).orElseThrow();
|
||||||
|
assertThat(row.status()).isEqualTo(NotificationStatus.FAILED);
|
||||||
|
assertThat(row.lastResponseStatus()).isEqualTo(404);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void activeSilence_silencesInstanceAndFailsNotification() {
|
||||||
|
// Seed a silence matching by ruleId — SilenceMatcher.ruleId field
|
||||||
|
UUID silenceId = UUID.randomUUID();
|
||||||
|
jdbcTemplate.update("""
|
||||||
|
INSERT INTO alert_silences (id, environment_id, matcher, reason, starts_at, ends_at, created_by)
|
||||||
|
VALUES (?, ?, ?::jsonb, 'test silence', now() - interval '1 minute', now() + interval '1 hour', ?)""",
|
||||||
|
silenceId, envId,
|
||||||
|
"{\"ruleId\": \"" + ruleId + "\"}",
|
||||||
|
SYS_USER);
|
||||||
|
|
||||||
|
UUID notifId = seedNotification();
|
||||||
|
|
||||||
|
job.tick();
|
||||||
|
|
||||||
|
// Dispatcher must NOT have been called
|
||||||
|
verify(webhookDispatcher, never()).dispatch(any(), any(), any(), any(), any());
|
||||||
|
|
||||||
|
// Notification marked failed with "silenced"
|
||||||
|
var notifRow = notificationRepo.findById(notifId).orElseThrow();
|
||||||
|
assertThat(notifRow.status()).isEqualTo(NotificationStatus.FAILED);
|
||||||
|
assertThat(notifRow.lastResponseSnippet()).isEqualTo("silenced");
|
||||||
|
|
||||||
|
// Instance marked silenced=true
|
||||||
|
var instRow = instanceRepo.findById(instanceId).orElseThrow();
|
||||||
|
assertThat(instRow.silenced()).isTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void deletedConnection_failsWithMessage() {
|
||||||
|
// Seed notification while connection still exists (FK constraint)
|
||||||
|
UUID notifId = seedNotification();
|
||||||
|
|
||||||
|
// Now delete the connection — dispatch job should detect the missing conn
|
||||||
|
outboundRepo.delete(tenantId, connId);
|
||||||
|
|
||||||
|
job.tick();
|
||||||
|
|
||||||
|
verify(webhookDispatcher, never()).dispatch(any(), any(), any(), any(), any());
|
||||||
|
var row = notificationRepo.findById(notifId).orElseThrow();
|
||||||
|
assertThat(row.status()).isEqualTo(NotificationStatus.FAILED);
|
||||||
|
assertThat(row.lastResponseSnippet()).isEqualTo("outbound connection deleted");
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Helpers
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
private UUID seedNotification() {
|
||||||
|
UUID notifId = UUID.randomUUID();
|
||||||
|
// Use raw SQL (simpler) — notification table has no complex JSON columns to deserialize here
|
||||||
|
jdbcTemplate.update("""
|
||||||
|
INSERT INTO alert_notifications (id, alert_instance_id, webhook_id, outbound_connection_id,
|
||||||
|
status, attempts, next_attempt_at, payload)
|
||||||
|
VALUES (?, ?, ?, ?, 'PENDING', 0, now() - interval '1 second', '{}'::jsonb)""",
|
||||||
|
notifId, instanceId, UUID.randomUUID(), connId);
|
||||||
|
return notifId;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user