feat(alerting): Postgres repositories for silences, notifications, reads
PostgresAlertSilenceRepository: save/findById roundtrip, listActive (BETWEEN starts_at AND ends_at), listByEnvironment, delete. JSONB SilenceMatcher via ObjectMapper. PostgresAlertNotificationRepository: save/findById, listForInstance, claimDueNotifications (UPDATE...RETURNING with FOR UPDATE SKIP LOCKED), markDelivered, scheduleRetry (bumps attempts + next_attempt_at), markFailed, deleteSettledBefore (DELIVERED+FAILED rows older than cutoff). JSONB payload. PostgresAlertReadRepository: markRead (ON CONFLICT DO NOTHING idempotent), bulkMarkRead (iterates, handles empty list without error). 16 IT scenarios across 3 classes, all passing. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,185 @@
|
||||
package com.cameleer.server.app.alerting.storage;
|
||||
|
||||
import com.cameleer.server.core.alerting.AlertNotification;
|
||||
import com.cameleer.server.core.alerting.AlertNotificationRepository;
|
||||
import com.cameleer.server.core.alerting.NotificationStatus;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
public class PostgresAlertNotificationRepository implements AlertNotificationRepository {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
private final ObjectMapper om;
|
||||
|
||||
public PostgresAlertNotificationRepository(JdbcTemplate jdbc, ObjectMapper om) {
|
||||
this.jdbc = jdbc;
|
||||
this.om = om;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AlertNotification save(AlertNotification n) {
|
||||
jdbc.update("""
|
||||
INSERT INTO alert_notifications (
|
||||
id, alert_instance_id, webhook_id, outbound_connection_id,
|
||||
status, attempts, next_attempt_at, claimed_by, claimed_until,
|
||||
last_response_status, last_response_snippet, payload, delivered_at, created_at)
|
||||
VALUES (?, ?, ?, ?,
|
||||
?::notification_status_enum, ?, ?, ?, ?,
|
||||
?, ?, ?::jsonb, ?, ?)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
status = EXCLUDED.status,
|
||||
attempts = EXCLUDED.attempts,
|
||||
next_attempt_at = EXCLUDED.next_attempt_at,
|
||||
claimed_by = EXCLUDED.claimed_by,
|
||||
claimed_until = EXCLUDED.claimed_until,
|
||||
last_response_status = EXCLUDED.last_response_status,
|
||||
last_response_snippet = EXCLUDED.last_response_snippet,
|
||||
payload = EXCLUDED.payload,
|
||||
delivered_at = EXCLUDED.delivered_at
|
||||
""",
|
||||
n.id(), n.alertInstanceId(), n.webhookId(), n.outboundConnectionId(),
|
||||
n.status().name(), n.attempts(), Timestamp.from(n.nextAttemptAt()),
|
||||
n.claimedBy(), n.claimedUntil() == null ? null : Timestamp.from(n.claimedUntil()),
|
||||
n.lastResponseStatus(), n.lastResponseSnippet(),
|
||||
writeJson(n.payload()),
|
||||
n.deliveredAt() == null ? null : Timestamp.from(n.deliveredAt()),
|
||||
Timestamp.from(n.createdAt()));
|
||||
return n;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<AlertNotification> findById(UUID id) {
|
||||
var list = jdbc.query("SELECT * FROM alert_notifications WHERE id = ?", rowMapper(), id);
|
||||
return list.isEmpty() ? Optional.empty() : Optional.of(list.get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AlertNotification> listForInstance(UUID alertInstanceId) {
|
||||
return jdbc.query("""
|
||||
SELECT * FROM alert_notifications
|
||||
WHERE alert_instance_id = ?
|
||||
ORDER BY created_at DESC
|
||||
""", rowMapper(), alertInstanceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AlertNotification> claimDueNotifications(String instanceId, int batchSize, int claimTtlSeconds) {
|
||||
String sql = """
|
||||
UPDATE alert_notifications
|
||||
SET claimed_by = ?, claimed_until = now() + (? || ' seconds')::interval
|
||||
WHERE id IN (
|
||||
SELECT id FROM alert_notifications
|
||||
WHERE status = 'PENDING'::notification_status_enum
|
||||
AND next_attempt_at <= now()
|
||||
AND (claimed_until IS NULL OR claimed_until < now())
|
||||
ORDER BY next_attempt_at
|
||||
LIMIT ?
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING *
|
||||
""";
|
||||
return jdbc.query(sql, rowMapper(), instanceId, claimTtlSeconds, batchSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markDelivered(UUID id, int status, String snippet, Instant when) {
|
||||
jdbc.update("""
|
||||
UPDATE alert_notifications
|
||||
SET status = 'DELIVERED'::notification_status_enum,
|
||||
last_response_status = ?,
|
||||
last_response_snippet = ?,
|
||||
delivered_at = ?,
|
||||
claimed_by = NULL,
|
||||
claimed_until = NULL
|
||||
WHERE id = ?
|
||||
""", status, snippet, Timestamp.from(when), id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void scheduleRetry(UUID id, Instant nextAttemptAt, int status, String snippet) {
|
||||
jdbc.update("""
|
||||
UPDATE alert_notifications
|
||||
SET attempts = attempts + 1,
|
||||
next_attempt_at = ?,
|
||||
last_response_status = ?,
|
||||
last_response_snippet = ?,
|
||||
claimed_by = NULL,
|
||||
claimed_until = NULL
|
||||
WHERE id = ?
|
||||
""", Timestamp.from(nextAttemptAt), status, snippet, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markFailed(UUID id, int status, String snippet) {
|
||||
jdbc.update("""
|
||||
UPDATE alert_notifications
|
||||
SET status = 'FAILED'::notification_status_enum,
|
||||
attempts = attempts + 1,
|
||||
last_response_status = ?,
|
||||
last_response_snippet = ?,
|
||||
claimed_by = NULL,
|
||||
claimed_until = NULL
|
||||
WHERE id = ?
|
||||
""", status, snippet, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteSettledBefore(Instant cutoff) {
|
||||
jdbc.update("""
|
||||
DELETE FROM alert_notifications
|
||||
WHERE status IN ('DELIVERED'::notification_status_enum, 'FAILED'::notification_status_enum)
|
||||
AND created_at < ?
|
||||
""", Timestamp.from(cutoff));
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private RowMapper<AlertNotification> rowMapper() {
|
||||
return (rs, i) -> {
|
||||
try {
|
||||
Map<String, Object> payload = om.readValue(
|
||||
rs.getString("payload"), new TypeReference<>() {});
|
||||
Timestamp claimedUntil = rs.getTimestamp("claimed_until");
|
||||
Timestamp deliveredAt = rs.getTimestamp("delivered_at");
|
||||
Object lastStatus = rs.getObject("last_response_status");
|
||||
|
||||
Object webhookIdObj = rs.getObject("webhook_id");
|
||||
UUID webhookId = webhookIdObj == null ? null : (UUID) webhookIdObj;
|
||||
Object connIdObj = rs.getObject("outbound_connection_id");
|
||||
UUID connId = connIdObj == null ? null : (UUID) connIdObj;
|
||||
|
||||
return new AlertNotification(
|
||||
(UUID) rs.getObject("id"),
|
||||
(UUID) rs.getObject("alert_instance_id"),
|
||||
webhookId,
|
||||
connId,
|
||||
NotificationStatus.valueOf(rs.getString("status")),
|
||||
rs.getInt("attempts"),
|
||||
rs.getTimestamp("next_attempt_at").toInstant(),
|
||||
rs.getString("claimed_by"),
|
||||
claimedUntil == null ? null : claimedUntil.toInstant(),
|
||||
lastStatus == null ? null : ((Number) lastStatus).intValue(),
|
||||
rs.getString("last_response_snippet"),
|
||||
payload,
|
||||
deliveredAt == null ? null : deliveredAt.toInstant(),
|
||||
rs.getTimestamp("created_at").toInstant());
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to map alert_notifications row", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private String writeJson(Object o) {
|
||||
try { return om.writeValueAsString(o); }
|
||||
catch (Exception e) { throw new IllegalStateException("Failed to serialize JSON", e); }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.cameleer.server.app.alerting.storage;
|
||||
|
||||
import com.cameleer.server.core.alerting.AlertReadRepository;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public class PostgresAlertReadRepository implements AlertReadRepository {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public PostgresAlertReadRepository(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markRead(String userId, UUID alertInstanceId) {
|
||||
jdbc.update("""
|
||||
INSERT INTO alert_reads (user_id, alert_instance_id)
|
||||
VALUES (?, ?)
|
||||
ON CONFLICT (user_id, alert_instance_id) DO NOTHING
|
||||
""", userId, alertInstanceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bulkMarkRead(String userId, List<UUID> alertInstanceIds) {
|
||||
if (alertInstanceIds == null || alertInstanceIds.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (UUID id : alertInstanceIds) {
|
||||
markRead(userId, id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,101 @@
|
||||
package com.cameleer.server.app.alerting.storage;
|
||||
|
||||
import com.cameleer.server.core.alerting.AlertSilence;
|
||||
import com.cameleer.server.core.alerting.AlertSilenceRepository;
|
||||
import com.cameleer.server.core.alerting.AlertSeverity;
|
||||
import com.cameleer.server.core.alerting.SilenceMatcher;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
public class PostgresAlertSilenceRepository implements AlertSilenceRepository {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
private final ObjectMapper om;
|
||||
|
||||
public PostgresAlertSilenceRepository(JdbcTemplate jdbc, ObjectMapper om) {
|
||||
this.jdbc = jdbc;
|
||||
this.om = om;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AlertSilence save(AlertSilence s) {
|
||||
jdbc.update("""
|
||||
INSERT INTO alert_silences (id, environment_id, matcher, reason, starts_at, ends_at, created_by, created_at)
|
||||
VALUES (?, ?, ?::jsonb, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
matcher = EXCLUDED.matcher,
|
||||
reason = EXCLUDED.reason,
|
||||
starts_at = EXCLUDED.starts_at,
|
||||
ends_at = EXCLUDED.ends_at
|
||||
""",
|
||||
s.id(), s.environmentId(), writeJson(s.matcher()),
|
||||
s.reason(),
|
||||
Timestamp.from(s.startsAt()), Timestamp.from(s.endsAt()),
|
||||
s.createdBy(), Timestamp.from(s.createdAt()));
|
||||
return s;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<AlertSilence> findById(UUID id) {
|
||||
var list = jdbc.query("SELECT * FROM alert_silences WHERE id = ?", rowMapper(), id);
|
||||
return list.isEmpty() ? Optional.empty() : Optional.of(list.get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AlertSilence> listActive(UUID environmentId, Instant when) {
|
||||
Timestamp t = Timestamp.from(when);
|
||||
return jdbc.query("""
|
||||
SELECT * FROM alert_silences
|
||||
WHERE environment_id = ?
|
||||
AND starts_at <= ? AND ends_at >= ?
|
||||
ORDER BY starts_at
|
||||
""", rowMapper(), environmentId, t, t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AlertSilence> listByEnvironment(UUID environmentId) {
|
||||
return jdbc.query("""
|
||||
SELECT * FROM alert_silences
|
||||
WHERE environment_id = ?
|
||||
ORDER BY starts_at DESC
|
||||
""", rowMapper(), environmentId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(UUID id) {
|
||||
jdbc.update("DELETE FROM alert_silences WHERE id = ?", id);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private RowMapper<AlertSilence> rowMapper() {
|
||||
return (rs, i) -> {
|
||||
try {
|
||||
SilenceMatcher matcher = om.readValue(rs.getString("matcher"), SilenceMatcher.class);
|
||||
return new AlertSilence(
|
||||
(UUID) rs.getObject("id"),
|
||||
(UUID) rs.getObject("environment_id"),
|
||||
matcher,
|
||||
rs.getString("reason"),
|
||||
rs.getTimestamp("starts_at").toInstant(),
|
||||
rs.getTimestamp("ends_at").toInstant(),
|
||||
rs.getString("created_by"),
|
||||
rs.getTimestamp("created_at").toInstant());
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to map alert_silences row", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private String writeJson(Object o) {
|
||||
try { return om.writeValueAsString(o); }
|
||||
catch (Exception e) { throw new IllegalStateException("Failed to serialize JSON", e); }
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user