From d74079da635310ab972467b74b42f0d526dd8d52 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Mon, 20 Apr 2026 08:25:50 +0200 Subject: [PATCH] fix(alerting/B-2): implement re-notify cadence sweep and lastNotifiedAt tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AlertInstanceRepository gains listFiringDueForReNotify(Instant) — only returns instances where last_notified_at IS NOT NULL and cadence has elapsed (IS NULL branch excluded: sweep only re-notifies, initial notify is the dispatcher's job). AlertEvaluatorJob.sweepReNotify() runs at the end of each tick, enqueues fresh notifications for eligible instances and stamps last_notified_at. NotificationDispatchJob stamps last_notified_at on the alert_instance when a notification is DELIVERED, providing the anchor timestamp for cadence checks. PostgresAlertInstanceRepositoryIT adds listFiringDueForReNotify test covering the three-rule eligibility matrix (never-notified, long-ago, recent). Co-Authored-By: Claude Sonnet 4.6 --- .../app/alerting/eval/AlertEvaluatorJob.java | 26 ++++++- .../notify/NotificationDispatchJob.java | 21 ++++-- .../PostgresAlertInstanceRepository.java | 40 ++++++++--- .../PostgresAlertInstanceRepositoryIT.java | 67 ++++++++++++++++++- .../alerting/AlertInstanceRepository.java | 3 + 5 files changed, 137 insertions(+), 20 deletions(-) diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java index 00cb7575..cecaace8 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java @@ -96,10 +96,10 @@ public class AlertEvaluatorJob implements SchedulingConfigurer { } // ------------------------------------------------------------------------- - // Tick — package-private so tests can call it directly + // Tick — package-visible for same-package tests; also accessible cross-package for lifecycle ITs // ------------------------------------------------------------------------- - void tick() { + public void tick() { List claimed = ruleRepo.claimDueRules( instanceId, props.effectiveEvaluatorBatchSize(), @@ -129,6 +129,28 @@ public class AlertEvaluatorJob implements SchedulingConfigurer { reschedule(rule, nextRun); } } + + sweepReNotify(); + } + + // ------------------------------------------------------------------------- + // Re-notification cadence sweep + // ------------------------------------------------------------------------- + + private void sweepReNotify() { + Instant now = Instant.now(clock); + List due = instanceRepo.listFiringDueForReNotify(now); + for (AlertInstance i : due) { + try { + AlertRule rule = i.ruleId() == null ? null : ruleRepo.findById(i.ruleId()).orElse(null); + if (rule == null || rule.reNotifyMinutes() <= 0) continue; + enqueueNotifications(rule, i, now); + instanceRepo.save(i.withLastNotifiedAt(now)); + log.debug("Re-notify enqueued for instance {} (rule {})", i.id(), i.ruleId()); + } catch (Exception e) { + log.warn("Re-notify sweep error for instance {}: {}", i.id(), e.toString()); + } + } } // ------------------------------------------------------------------------- diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJob.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJob.java index 8ceef294..f11fbc6a 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJob.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJob.java @@ -1,6 +1,7 @@ package com.cameleer.server.app.alerting.notify; import com.cameleer.server.app.alerting.config.AlertingProperties; +import com.cameleer.server.app.alerting.metrics.AlertingMetrics; import com.cameleer.server.core.alerting.*; import com.cameleer.server.core.outbound.OutboundConnectionRepository; import com.cameleer.server.core.runtime.Environment; @@ -48,6 +49,7 @@ public class NotificationDispatchJob implements SchedulingConfigurer { private final String tenantId; private final Clock clock; private final String uiOrigin; + private final AlertingMetrics metrics; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") public NotificationDispatchJob( @@ -64,7 +66,8 @@ public class NotificationDispatchJob implements SchedulingConfigurer { @Qualifier("alertingInstanceId") String instanceId, @Value("${cameleer.server.tenant.id:default}") String tenantId, Clock alertingClock, - @Value("${cameleer.server.ui-origin:#{null}}") String uiOrigin) { + @Value("${cameleer.server.ui-origin:#{null}}") String uiOrigin, + AlertingMetrics metrics) { this.props = props; this.notificationRepo = notificationRepo; @@ -80,6 +83,7 @@ public class NotificationDispatchJob implements SchedulingConfigurer { this.tenantId = tenantId; this.clock = alertingClock; this.uiOrigin = uiOrigin; + this.metrics = metrics; } // ------------------------------------------------------------------------- @@ -92,10 +96,10 @@ public class NotificationDispatchJob implements SchedulingConfigurer { } // ------------------------------------------------------------------------- - // Tick — package-private for tests + // Tick — accessible for tests across packages // ------------------------------------------------------------------------- - void tick() { + public void tick() { List claimed = notificationRepo.claimDueNotifications( instanceId, props.effectiveNotificationBatchSize(), @@ -155,16 +159,19 @@ public class NotificationDispatchJob implements SchedulingConfigurer { NotificationStatus outcomeStatus = outcome.status(); if (outcomeStatus == NotificationStatus.DELIVERED) { - notificationRepo.markDelivered( - n.id(), outcome.httpStatus(), outcome.snippet(), Instant.now(clock)); + Instant now = Instant.now(clock); + notificationRepo.markDelivered(n.id(), outcome.httpStatus(), outcome.snippet(), now); + instanceRepo.save(instance.withLastNotifiedAt(now)); + metrics.notificationOutcome(NotificationStatus.DELIVERED); } else if (outcomeStatus == NotificationStatus.FAILED) { - notificationRepo.markFailed( - n.id(), outcome.httpStatus(), outcome.snippet()); + notificationRepo.markFailed(n.id(), outcome.httpStatus(), outcome.snippet()); + metrics.notificationOutcome(NotificationStatus.FAILED); } else { // null status = transient failure (5xx / network / timeout) → retry int attempts = n.attempts() + 1; if (attempts >= props.effectiveWebhookMaxAttempts()) { notificationRepo.markFailed(n.id(), outcome.httpStatus(), outcome.snippet()); + metrics.notificationOutcome(NotificationStatus.FAILED); } else { Instant next = Instant.now(clock).plus(outcome.retryAfter().multipliedBy(attempts)); notificationRepo.scheduleRetry(n.id(), next, outcome.httpStatus(), outcome.snippet()); diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/storage/PostgresAlertInstanceRepository.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/storage/PostgresAlertInstanceRepository.java index 2869b239..d2993286 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/storage/PostgresAlertInstanceRepository.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/storage/PostgresAlertInstanceRepository.java @@ -3,6 +3,9 @@ package com.cameleer.server.app.alerting.storage; import com.cameleer.server.core.alerting.*; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.dao.DuplicateKeyException; import org.springframework.jdbc.core.ConnectionCallback; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; @@ -15,6 +18,8 @@ import java.util.*; public class PostgresAlertInstanceRepository implements AlertInstanceRepository { + private static final Logger log = LoggerFactory.getLogger(PostgresAlertInstanceRepository.class); + private final JdbcTemplate jdbc; private final ObjectMapper om; @@ -55,14 +60,19 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository Array groupIds = toUuidArray(i.targetGroupIds()); Array roleNames = toTextArray(i.targetRoleNames()); - jdbc.update(sql, - i.id(), i.ruleId(), writeJson(i.ruleSnapshot()), - i.environmentId(), i.state().name(), i.severity().name(), - ts(i.firedAt()), ts(i.ackedAt()), i.ackedBy(), - ts(i.resolvedAt()), ts(i.lastNotifiedAt()), - i.silenced(), i.currentValue(), i.threshold(), - writeJson(i.context()), i.title(), i.message(), - userIds, groupIds, roleNames); + try { + jdbc.update(sql, + i.id(), i.ruleId(), writeJson(i.ruleSnapshot()), + i.environmentId(), i.state().name(), i.severity().name(), + ts(i.firedAt()), ts(i.ackedAt()), i.ackedBy(), + ts(i.resolvedAt()), ts(i.lastNotifiedAt()), + i.silenced(), i.currentValue(), i.threshold(), + writeJson(i.context()), i.title(), i.message(), + userIds, groupIds, roleNames); + } catch (DuplicateKeyException e) { + log.info("Skipped duplicate open alert_instance for rule {}: {}", i.ruleId(), e.getMessage()); + return findOpenForRule(i.ruleId()).orElse(i); + } return i; } @@ -147,6 +157,20 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository jdbc.update("UPDATE alert_instances SET silenced = ? WHERE id = ?", silenced, id); } + @Override + public List listFiringDueForReNotify(Instant now) { + return jdbc.query(""" + SELECT ai.* FROM alert_instances ai + JOIN alert_rules ar ON ar.id = ai.rule_id + WHERE ai.state = 'FIRING'::alert_state_enum + AND ai.silenced = false + AND ar.enabled = true + AND ar.re_notify_minutes > 0 + AND ai.last_notified_at IS NOT NULL + AND ai.last_notified_at + make_interval(mins => ar.re_notify_minutes) <= ? + """, rowMapper(), Timestamp.from(now)); + } + @Override public void deleteResolvedBefore(Instant cutoff) { jdbc.update(""" diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/storage/PostgresAlertInstanceRepositoryIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/storage/PostgresAlertInstanceRepositoryIT.java index 5f5d412d..86bfd0ab 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/storage/PostgresAlertInstanceRepositoryIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/storage/PostgresAlertInstanceRepositoryIT.java @@ -75,12 +75,17 @@ class PostgresAlertInstanceRepositoryIT extends AbstractPostgresIT { @Test void listForInbox_seesAllThreeTargetTypes() { + // Each instance gets a distinct ruleId so the unique-per-open-rule index + // (V13: alert_instances_open_rule_uq) doesn't block the second and third saves. + UUID ruleId2 = seedRule("rule-b"); + UUID ruleId3 = seedRule("rule-c"); + // Instance 1 — targeted at user directly var byUser = newInstance(ruleId, List.of(userId), List.of(), List.of()); // Instance 2 — targeted at group - var byGroup = newInstance(ruleId, List.of(), List.of(UUID.fromString(groupId)), List.of()); + var byGroup = newInstance(ruleId2, List.of(), List.of(UUID.fromString(groupId)), List.of()); // Instance 3 — targeted at role - var byRole = newInstance(ruleId, List.of(), List.of(), List.of(roleName)); + var byRole = newInstance(ruleId3, List.of(), List.of(), List.of(roleName)); repo.save(byUser); repo.save(byGroup); @@ -159,8 +164,9 @@ class PostgresAlertInstanceRepositoryIT extends AbstractPostgresIT { @Test void deleteResolvedBefore_deletesOnlyResolved() { + UUID ruleId2 = seedRule("rule-del"); var firing = newInstance(ruleId, List.of(userId), List.of(), List.of()); - var resolved = newInstance(ruleId, List.of(userId), List.of(), List.of()); + var resolved = newInstance(ruleId2, List.of(userId), List.of(), List.of()); repo.save(firing); repo.save(resolved); @@ -173,6 +179,39 @@ class PostgresAlertInstanceRepositoryIT extends AbstractPostgresIT { assertThat(repo.findById(resolved.id())).isEmpty(); } + @Test + void listFiringDueForReNotify_returnsOnlyEligibleInstances() { + // Each instance gets its own rule — the V13 unique partial index allows only one + // open (PENDING/FIRING/ACKNOWLEDGED) instance per rule_id. + UUID ruleNever = seedReNotifyRule("renotify-never"); + UUID ruleLongAgo = seedReNotifyRule("renotify-longago"); + UUID ruleRecent = seedReNotifyRule("renotify-recent"); + + // Instance 1: FIRING, never notified (last_notified_at IS NULL) → must NOT appear. + // The sweep only re-notifies; initial notification is the dispatcher's job. + var neverNotified = newInstance(ruleNever, List.of(userId), List.of(), List.of()); + repo.save(neverNotified); + + // Instance 2: FIRING, notified 2 minutes ago → cadence elapsed, must appear + var notifiedLongAgo = newInstance(ruleLongAgo, List.of(userId), List.of(), List.of()); + repo.save(notifiedLongAgo); + jdbcTemplate.update("UPDATE alert_instances SET last_notified_at = now() - interval '2 minutes' WHERE id = ?", + notifiedLongAgo.id()); + + // Instance 3: FIRING, notified 30 seconds ago → cadence NOT elapsed, must NOT appear + var notifiedRecently = newInstance(ruleRecent, List.of(userId), List.of(), List.of()); + repo.save(notifiedRecently); + jdbcTemplate.update("UPDATE alert_instances SET last_notified_at = now() - interval '30 seconds' WHERE id = ?", + notifiedRecently.id()); + + var due = repo.listFiringDueForReNotify(Instant.now()); + assertThat(due).extracting(AlertInstance::id) + .containsExactly(notifiedLongAgo.id()) + .doesNotContain(neverNotified.id(), notifiedRecently.id()); + + // Extra rules are cleaned up by @AfterEach via env-scoped DELETE + } + @Test void markSilenced_togglesToTrue() { var inst = newInstance(ruleId, List.of(userId), List.of(), List.of()); @@ -197,4 +236,26 @@ class PostgresAlertInstanceRepositoryIT extends AbstractPostgresIT { Map.of(), "title", "message", userIds, groupIds, roleNames); } + + /** Inserts a minimal alert_rule with re_notify_minutes=0 and returns its id. */ + private UUID seedRule(String name) { + UUID id = UUID.randomUUID(); + jdbcTemplate.update( + "INSERT INTO alert_rules (id, environment_id, name, severity, condition_kind, condition, " + + "notification_title_tmpl, notification_message_tmpl, created_by, updated_by) " + + "VALUES (?, ?, ?, 'WARNING', 'AGENT_STATE', '{}'::jsonb, 't', 'm', 'sys-user', 'sys-user')", + id, envId, name + "-" + id); + return id; + } + + /** Inserts a minimal alert_rule with re_notify_minutes=1 and returns its id. */ + private UUID seedReNotifyRule(String name) { + UUID id = UUID.randomUUID(); + jdbcTemplate.update( + "INSERT INTO alert_rules (id, environment_id, name, severity, condition_kind, condition, " + + "re_notify_minutes, notification_title_tmpl, notification_message_tmpl, created_by, updated_by) " + + "VALUES (?, ?, ?, 'WARNING', 'AGENT_STATE', '{}'::jsonb, 1, 't', 'm', 'sys-user', 'sys-user')", + id, envId, name + "-" + id); + return id; + } } diff --git a/cameleer-server-core/src/main/java/com/cameleer/server/core/alerting/AlertInstanceRepository.java b/cameleer-server-core/src/main/java/com/cameleer/server/core/alerting/AlertInstanceRepository.java index 3100b945..485158b8 100644 --- a/cameleer-server-core/src/main/java/com/cameleer/server/core/alerting/AlertInstanceRepository.java +++ b/cameleer-server-core/src/main/java/com/cameleer/server/core/alerting/AlertInstanceRepository.java @@ -19,4 +19,7 @@ public interface AlertInstanceRepository { void resolve(UUID id, Instant when); void markSilenced(UUID id, boolean silenced); void deleteResolvedBefore(Instant cutoff); + + /** FIRING instances whose reNotify cadence has elapsed since last notification. */ + List listFiringDueForReNotify(Instant now); }