fix(alerting/B-2): implement re-notify cadence sweep and lastNotifiedAt tracking
AlertInstanceRepository gains listFiringDueForReNotify(Instant) — only returns instances where last_notified_at IS NOT NULL and cadence has elapsed (IS NULL branch excluded: sweep only re-notifies, initial notify is the dispatcher's job). AlertEvaluatorJob.sweepReNotify() runs at the end of each tick, enqueues fresh notifications for eligible instances and stamps last_notified_at. NotificationDispatchJob stamps last_notified_at on the alert_instance when a notification is DELIVERED, providing the anchor timestamp for cadence checks. PostgresAlertInstanceRepositoryIT adds listFiringDueForReNotify test covering the three-rule eligibility matrix (never-notified, long-ago, recent). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -96,10 +96,10 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Tick — package-private so tests can call it directly
|
||||
// Tick — package-visible for same-package tests; also accessible cross-package for lifecycle ITs
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
void tick() {
|
||||
public void tick() {
|
||||
List<AlertRule> claimed = ruleRepo.claimDueRules(
|
||||
instanceId,
|
||||
props.effectiveEvaluatorBatchSize(),
|
||||
@@ -129,6 +129,28 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
|
||||
reschedule(rule, nextRun);
|
||||
}
|
||||
}
|
||||
|
||||
sweepReNotify();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Re-notification cadence sweep
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private void sweepReNotify() {
|
||||
Instant now = Instant.now(clock);
|
||||
List<AlertInstance> due = instanceRepo.listFiringDueForReNotify(now);
|
||||
for (AlertInstance i : due) {
|
||||
try {
|
||||
AlertRule rule = i.ruleId() == null ? null : ruleRepo.findById(i.ruleId()).orElse(null);
|
||||
if (rule == null || rule.reNotifyMinutes() <= 0) continue;
|
||||
enqueueNotifications(rule, i, now);
|
||||
instanceRepo.save(i.withLastNotifiedAt(now));
|
||||
log.debug("Re-notify enqueued for instance {} (rule {})", i.id(), i.ruleId());
|
||||
} catch (Exception e) {
|
||||
log.warn("Re-notify sweep error for instance {}: {}", i.id(), e.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.cameleer.server.app.alerting.notify;
|
||||
|
||||
import com.cameleer.server.app.alerting.config.AlertingProperties;
|
||||
import com.cameleer.server.app.alerting.metrics.AlertingMetrics;
|
||||
import com.cameleer.server.core.alerting.*;
|
||||
import com.cameleer.server.core.outbound.OutboundConnectionRepository;
|
||||
import com.cameleer.server.core.runtime.Environment;
|
||||
@@ -48,6 +49,7 @@ public class NotificationDispatchJob implements SchedulingConfigurer {
|
||||
private final String tenantId;
|
||||
private final Clock clock;
|
||||
private final String uiOrigin;
|
||||
private final AlertingMetrics metrics;
|
||||
|
||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||
public NotificationDispatchJob(
|
||||
@@ -64,7 +66,8 @@ public class NotificationDispatchJob implements SchedulingConfigurer {
|
||||
@Qualifier("alertingInstanceId") String instanceId,
|
||||
@Value("${cameleer.server.tenant.id:default}") String tenantId,
|
||||
Clock alertingClock,
|
||||
@Value("${cameleer.server.ui-origin:#{null}}") String uiOrigin) {
|
||||
@Value("${cameleer.server.ui-origin:#{null}}") String uiOrigin,
|
||||
AlertingMetrics metrics) {
|
||||
|
||||
this.props = props;
|
||||
this.notificationRepo = notificationRepo;
|
||||
@@ -80,6 +83,7 @@ public class NotificationDispatchJob implements SchedulingConfigurer {
|
||||
this.tenantId = tenantId;
|
||||
this.clock = alertingClock;
|
||||
this.uiOrigin = uiOrigin;
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -92,10 +96,10 @@ public class NotificationDispatchJob implements SchedulingConfigurer {
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Tick — package-private for tests
|
||||
// Tick — accessible for tests across packages
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
void tick() {
|
||||
public void tick() {
|
||||
List<AlertNotification> claimed = notificationRepo.claimDueNotifications(
|
||||
instanceId,
|
||||
props.effectiveNotificationBatchSize(),
|
||||
@@ -155,16 +159,19 @@ public class NotificationDispatchJob implements SchedulingConfigurer {
|
||||
|
||||
NotificationStatus outcomeStatus = outcome.status();
|
||||
if (outcomeStatus == NotificationStatus.DELIVERED) {
|
||||
notificationRepo.markDelivered(
|
||||
n.id(), outcome.httpStatus(), outcome.snippet(), Instant.now(clock));
|
||||
Instant now = Instant.now(clock);
|
||||
notificationRepo.markDelivered(n.id(), outcome.httpStatus(), outcome.snippet(), now);
|
||||
instanceRepo.save(instance.withLastNotifiedAt(now));
|
||||
metrics.notificationOutcome(NotificationStatus.DELIVERED);
|
||||
} else if (outcomeStatus == NotificationStatus.FAILED) {
|
||||
notificationRepo.markFailed(
|
||||
n.id(), outcome.httpStatus(), outcome.snippet());
|
||||
notificationRepo.markFailed(n.id(), outcome.httpStatus(), outcome.snippet());
|
||||
metrics.notificationOutcome(NotificationStatus.FAILED);
|
||||
} else {
|
||||
// null status = transient failure (5xx / network / timeout) → retry
|
||||
int attempts = n.attempts() + 1;
|
||||
if (attempts >= props.effectiveWebhookMaxAttempts()) {
|
||||
notificationRepo.markFailed(n.id(), outcome.httpStatus(), outcome.snippet());
|
||||
metrics.notificationOutcome(NotificationStatus.FAILED);
|
||||
} else {
|
||||
Instant next = Instant.now(clock).plus(outcome.retryAfter().multipliedBy(attempts));
|
||||
notificationRepo.scheduleRetry(n.id(), next, outcome.httpStatus(), outcome.snippet());
|
||||
|
||||
@@ -3,6 +3,9 @@ package com.cameleer.server.app.alerting.storage;
|
||||
import com.cameleer.server.core.alerting.*;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.jdbc.core.ConnectionCallback;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
@@ -15,6 +18,8 @@ import java.util.*;
|
||||
|
||||
public class PostgresAlertInstanceRepository implements AlertInstanceRepository {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PostgresAlertInstanceRepository.class);
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
private final ObjectMapper om;
|
||||
|
||||
@@ -55,14 +60,19 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
Array groupIds = toUuidArray(i.targetGroupIds());
|
||||
Array roleNames = toTextArray(i.targetRoleNames());
|
||||
|
||||
jdbc.update(sql,
|
||||
i.id(), i.ruleId(), writeJson(i.ruleSnapshot()),
|
||||
i.environmentId(), i.state().name(), i.severity().name(),
|
||||
ts(i.firedAt()), ts(i.ackedAt()), i.ackedBy(),
|
||||
ts(i.resolvedAt()), ts(i.lastNotifiedAt()),
|
||||
i.silenced(), i.currentValue(), i.threshold(),
|
||||
writeJson(i.context()), i.title(), i.message(),
|
||||
userIds, groupIds, roleNames);
|
||||
try {
|
||||
jdbc.update(sql,
|
||||
i.id(), i.ruleId(), writeJson(i.ruleSnapshot()),
|
||||
i.environmentId(), i.state().name(), i.severity().name(),
|
||||
ts(i.firedAt()), ts(i.ackedAt()), i.ackedBy(),
|
||||
ts(i.resolvedAt()), ts(i.lastNotifiedAt()),
|
||||
i.silenced(), i.currentValue(), i.threshold(),
|
||||
writeJson(i.context()), i.title(), i.message(),
|
||||
userIds, groupIds, roleNames);
|
||||
} catch (DuplicateKeyException e) {
|
||||
log.info("Skipped duplicate open alert_instance for rule {}: {}", i.ruleId(), e.getMessage());
|
||||
return findOpenForRule(i.ruleId()).orElse(i);
|
||||
}
|
||||
return i;
|
||||
}
|
||||
|
||||
@@ -147,6 +157,20 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
jdbc.update("UPDATE alert_instances SET silenced = ? WHERE id = ?", silenced, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AlertInstance> listFiringDueForReNotify(Instant now) {
|
||||
return jdbc.query("""
|
||||
SELECT ai.* FROM alert_instances ai
|
||||
JOIN alert_rules ar ON ar.id = ai.rule_id
|
||||
WHERE ai.state = 'FIRING'::alert_state_enum
|
||||
AND ai.silenced = false
|
||||
AND ar.enabled = true
|
||||
AND ar.re_notify_minutes > 0
|
||||
AND ai.last_notified_at IS NOT NULL
|
||||
AND ai.last_notified_at + make_interval(mins => ar.re_notify_minutes) <= ?
|
||||
""", rowMapper(), Timestamp.from(now));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteResolvedBefore(Instant cutoff) {
|
||||
jdbc.update("""
|
||||
|
||||
Reference in New Issue
Block a user