feat(alerts): Postgres repo — read_at/deleted_at columns, filter params, new mutations
- save/rowMapper read+write read_at and deleted_at - listForInbox: tri-state acked/read filters; always excludes deleted - countUnreadBySeverity: rewire without alert_reads join, preserve zero-fill - new: markRead/bulkMarkRead/softDelete/bulkSoftDelete/bulkAck/restore - delete PostgresAlertReadRepository + its bean - restore zero-fill Javadoc on interface - mechanical compile-fixes in AlertController, InAppInboxQuery, AlertControllerIT, InAppInboxQueryTest; Task 6 owns the rewrite - PostgresAlertReadRepositoryIT stubbed @Disabled; Task 7 owns migration Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3,7 +3,10 @@ package com.cameleer.server.app.alerting.config;
|
||||
import com.cameleer.server.app.alerting.eval.PerKindCircuitBreaker;
|
||||
import com.cameleer.server.app.alerting.metrics.AlertingMetrics;
|
||||
import com.cameleer.server.app.alerting.storage.*;
|
||||
import com.cameleer.server.core.alerting.*;
|
||||
import com.cameleer.server.core.alerting.AlertInstanceRepository;
|
||||
import com.cameleer.server.core.alerting.AlertNotificationRepository;
|
||||
import com.cameleer.server.core.alerting.AlertRuleRepository;
|
||||
import com.cameleer.server.core.alerting.AlertSilenceRepository;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -41,11 +44,6 @@ public class AlertingBeanConfig {
|
||||
return new PostgresAlertNotificationRepository(jdbc, om);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public AlertReadRepository alertReadRepository(JdbcTemplate jdbc) {
|
||||
return new PostgresAlertReadRepository(jdbc);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Clock alertingClock() {
|
||||
return Clock.systemDefaultZone();
|
||||
|
||||
@@ -7,7 +7,6 @@ import com.cameleer.server.app.alerting.notify.InAppInboxQuery;
|
||||
import com.cameleer.server.app.web.EnvPath;
|
||||
import com.cameleer.server.core.alerting.AlertInstance;
|
||||
import com.cameleer.server.core.alerting.AlertInstanceRepository;
|
||||
import com.cameleer.server.core.alerting.AlertReadRepository;
|
||||
import com.cameleer.server.core.alerting.AlertSeverity;
|
||||
import com.cameleer.server.core.alerting.AlertState;
|
||||
import com.cameleer.server.core.runtime.Environment;
|
||||
@@ -43,14 +42,11 @@ public class AlertController {
|
||||
|
||||
private final InAppInboxQuery inboxQuery;
|
||||
private final AlertInstanceRepository instanceRepo;
|
||||
private final AlertReadRepository readRepo;
|
||||
|
||||
public AlertController(InAppInboxQuery inboxQuery,
|
||||
AlertInstanceRepository instanceRepo,
|
||||
AlertReadRepository readRepo) {
|
||||
AlertInstanceRepository instanceRepo) {
|
||||
this.inboxQuery = inboxQuery;
|
||||
this.instanceRepo = instanceRepo;
|
||||
this.readRepo = readRepo;
|
||||
}
|
||||
|
||||
@GetMapping
|
||||
@@ -89,14 +85,12 @@ public class AlertController {
|
||||
@PostMapping("/{id}/read")
|
||||
public void read(@EnvPath Environment env, @PathVariable UUID id) {
|
||||
requireInstance(id, env.id());
|
||||
String userId = currentUserId();
|
||||
readRepo.markRead(userId, id);
|
||||
instanceRepo.markRead(id, Instant.now());
|
||||
}
|
||||
|
||||
@PostMapping("/bulk-read")
|
||||
public void bulkRead(@EnvPath Environment env,
|
||||
@Valid @RequestBody BulkReadRequest req) {
|
||||
String userId = currentUserId();
|
||||
// filter to only instances in this env
|
||||
List<UUID> filtered = req.instanceIds().stream()
|
||||
.filter(instanceId -> instanceRepo.findById(instanceId)
|
||||
@@ -104,7 +98,7 @@ public class AlertController {
|
||||
.orElse(false))
|
||||
.toList();
|
||||
if (!filtered.isEmpty()) {
|
||||
readRepo.bulkMarkRead(userId, filtered);
|
||||
instanceRepo.bulkMarkRead(filtered, Instant.now());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -70,7 +70,7 @@ public class InAppInboxQuery {
|
||||
int limit) {
|
||||
List<String> groupIds = resolveGroupIds(userId);
|
||||
List<String> roleNames = resolveRoleNames(userId);
|
||||
return instanceRepo.listForInbox(envId, groupIds, userId, roleNames, states, severities, limit);
|
||||
return instanceRepo.listForInbox(envId, groupIds, userId, roleNames, states, severities, null, null, limit);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -85,7 +85,9 @@ public class InAppInboxQuery {
|
||||
if (cached != null && now.isBefore(cached.expiresAt())) {
|
||||
return cached.response();
|
||||
}
|
||||
Map<AlertSeverity, Long> bySeverity = instanceRepo.countUnreadBySeverityForUser(envId, userId);
|
||||
List<String> groupIds = resolveGroupIds(userId);
|
||||
List<String> roleNames = resolveRoleNames(userId);
|
||||
Map<AlertSeverity, Long> bySeverity = instanceRepo.countUnreadBySeverity(envId, userId, groupIds, roleNames);
|
||||
UnreadCountResponse response = UnreadCountResponse.from(bySeverity);
|
||||
memo.put(key, new Entry(response, now.plusMillis(MEMO_TTL_MS)));
|
||||
return response;
|
||||
|
||||
@@ -34,10 +34,12 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
INSERT INTO alert_instances (
|
||||
id, rule_id, rule_snapshot, environment_id, state, severity,
|
||||
fired_at, acked_at, acked_by, resolved_at, last_notified_at,
|
||||
read_at, deleted_at,
|
||||
silenced, current_value, threshold, context, title, message,
|
||||
target_user_ids, target_group_ids, target_role_names)
|
||||
VALUES (?, ?, ?::jsonb, ?, ?::alert_state_enum, ?::severity_enum,
|
||||
?, ?, ?, ?, ?,
|
||||
?, ?,
|
||||
?, ?, ?, ?::jsonb, ?, ?,
|
||||
?, ?, ?)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
@@ -46,6 +48,8 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
acked_by = EXCLUDED.acked_by,
|
||||
resolved_at = EXCLUDED.resolved_at,
|
||||
last_notified_at = EXCLUDED.last_notified_at,
|
||||
read_at = EXCLUDED.read_at,
|
||||
deleted_at = EXCLUDED.deleted_at,
|
||||
silenced = EXCLUDED.silenced,
|
||||
current_value = EXCLUDED.current_value,
|
||||
threshold = EXCLUDED.threshold,
|
||||
@@ -66,6 +70,7 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
i.environmentId(), i.state().name(), i.severity().name(),
|
||||
ts(i.firedAt()), ts(i.ackedAt()), i.ackedBy(),
|
||||
ts(i.resolvedAt()), ts(i.lastNotifiedAt()),
|
||||
ts(i.readAt()), ts(i.deletedAt()),
|
||||
i.silenced(), i.currentValue(), i.threshold(),
|
||||
writeJson(i.context()), i.title(), i.message(),
|
||||
userIds, groupIds, roleNames);
|
||||
@@ -101,8 +106,9 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
List<String> userRoleNames,
|
||||
List<AlertState> states,
|
||||
List<AlertSeverity> severities,
|
||||
Boolean acked,
|
||||
Boolean read,
|
||||
int limit) {
|
||||
// Build arrays for group UUIDs and role names
|
||||
Array groupArray = toUuidArrayFromStrings(userGroupIdFilter);
|
||||
Array roleArray = toTextArray(userRoleNames);
|
||||
|
||||
@@ -127,7 +133,13 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
sql.append(" AND severity::text = ANY(?)");
|
||||
args.add(severityArray);
|
||||
}
|
||||
|
||||
if (acked != null) {
|
||||
sql.append(acked ? " AND acked_at IS NOT NULL" : " AND acked_at IS NULL");
|
||||
}
|
||||
if (read != null) {
|
||||
sql.append(read ? " AND read_at IS NOT NULL" : " AND read_at IS NULL");
|
||||
}
|
||||
sql.append(" AND deleted_at IS NULL");
|
||||
sql.append(" ORDER BY fired_at DESC LIMIT ?");
|
||||
args.add(limit);
|
||||
|
||||
@@ -135,23 +147,30 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<AlertSeverity, Long> countUnreadBySeverityForUser(UUID environmentId, String userId) {
|
||||
public Map<AlertSeverity, Long> countUnreadBySeverity(UUID environmentId,
|
||||
String userId,
|
||||
List<String> groupIds,
|
||||
List<String> roleNames) {
|
||||
Array groupArray = toUuidArrayFromStrings(groupIds);
|
||||
Array roleArray = toTextArray(roleNames);
|
||||
String sql = """
|
||||
SELECT ai.severity::text AS severity, COUNT(*) AS cnt
|
||||
FROM alert_instances ai
|
||||
WHERE ai.environment_id = ?
|
||||
AND ? = ANY(ai.target_user_ids)
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM alert_reads ar
|
||||
WHERE ar.user_id = ? AND ar.alert_instance_id = ai.id
|
||||
SELECT severity::text AS severity, COUNT(*) AS cnt
|
||||
FROM alert_instances
|
||||
WHERE environment_id = ?
|
||||
AND read_at IS NULL
|
||||
AND deleted_at IS NULL
|
||||
AND (
|
||||
? = ANY(target_user_ids)
|
||||
OR target_group_ids && ?
|
||||
OR target_role_names && ?
|
||||
)
|
||||
GROUP BY ai.severity
|
||||
GROUP BY severity
|
||||
""";
|
||||
EnumMap<AlertSeverity, Long> counts = new EnumMap<>(AlertSeverity.class);
|
||||
for (AlertSeverity s : AlertSeverity.values()) counts.put(s, 0L);
|
||||
jdbc.query(sql, rs -> {
|
||||
counts.put(AlertSeverity.valueOf(rs.getString("severity")), rs.getLong("cnt"));
|
||||
}, environmentId, userId, userId);
|
||||
jdbc.query(sql, (org.springframework.jdbc.core.RowCallbackHandler) rs -> counts.put(
|
||||
AlertSeverity.valueOf(rs.getString("severity")), rs.getLong("cnt")
|
||||
), environmentId, userId, groupArray, roleArray);
|
||||
return counts;
|
||||
}
|
||||
|
||||
@@ -164,6 +183,56 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
""", Timestamp.from(when), userId, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markRead(UUID id, Instant when) {
|
||||
jdbc.update("UPDATE alert_instances SET read_at = ? WHERE id = ? AND read_at IS NULL",
|
||||
Timestamp.from(when), id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bulkMarkRead(List<UUID> ids, Instant when) {
|
||||
if (ids == null || ids.isEmpty()) return;
|
||||
Array idArray = jdbc.execute((ConnectionCallback<Array>) c ->
|
||||
c.createArrayOf("uuid", ids.toArray()));
|
||||
jdbc.update("""
|
||||
UPDATE alert_instances SET read_at = ?
|
||||
WHERE id = ANY(?) AND read_at IS NULL
|
||||
""", Timestamp.from(when), idArray);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void softDelete(UUID id, Instant when) {
|
||||
jdbc.update("UPDATE alert_instances SET deleted_at = ? WHERE id = ? AND deleted_at IS NULL",
|
||||
Timestamp.from(when), id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bulkSoftDelete(List<UUID> ids, Instant when) {
|
||||
if (ids == null || ids.isEmpty()) return;
|
||||
Array idArray = jdbc.execute((ConnectionCallback<Array>) c ->
|
||||
c.createArrayOf("uuid", ids.toArray()));
|
||||
jdbc.update("""
|
||||
UPDATE alert_instances SET deleted_at = ?
|
||||
WHERE id = ANY(?) AND deleted_at IS NULL
|
||||
""", Timestamp.from(when), idArray);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restore(UUID id) {
|
||||
jdbc.update("UPDATE alert_instances SET deleted_at = NULL WHERE id = ?", id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bulkAck(List<UUID> ids, String userId, Instant when) {
|
||||
if (ids == null || ids.isEmpty()) return;
|
||||
Array idArray = jdbc.execute((ConnectionCallback<Array>) c ->
|
||||
c.createArrayOf("uuid", ids.toArray()));
|
||||
jdbc.update("""
|
||||
UPDATE alert_instances SET acked_at = ?, acked_by = ?
|
||||
WHERE id = ANY(?) AND acked_at IS NULL AND deleted_at IS NULL
|
||||
""", Timestamp.from(when), userId, idArray);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resolve(UUID id, Instant when) {
|
||||
jdbc.update("""
|
||||
@@ -215,6 +284,8 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
Timestamp ackedAt = rs.getTimestamp("acked_at");
|
||||
Timestamp resolvedAt = rs.getTimestamp("resolved_at");
|
||||
Timestamp lastNotifiedAt = rs.getTimestamp("last_notified_at");
|
||||
Timestamp readAt = rs.getTimestamp("read_at");
|
||||
Timestamp deletedAt = rs.getTimestamp("deleted_at");
|
||||
|
||||
Object cvObj = rs.getObject("current_value");
|
||||
Double currentValue = cvObj == null ? null : ((Number) cvObj).doubleValue();
|
||||
@@ -235,8 +306,8 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
rs.getString("acked_by"),
|
||||
resolvedAt == null ? null : resolvedAt.toInstant(),
|
||||
lastNotifiedAt == null ? null : lastNotifiedAt.toInstant(),
|
||||
null,
|
||||
null,
|
||||
readAt == null ? null : readAt.toInstant(),
|
||||
deletedAt == null ? null : deletedAt.toInstant(),
|
||||
rs.getBoolean("silenced"),
|
||||
currentValue,
|
||||
threshold,
|
||||
|
||||
@@ -1,35 +0,0 @@
|
||||
package com.cameleer.server.app.alerting.storage;
|
||||
|
||||
import com.cameleer.server.core.alerting.AlertReadRepository;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public class PostgresAlertReadRepository implements AlertReadRepository {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public PostgresAlertReadRepository(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markRead(String userId, UUID alertInstanceId) {
|
||||
jdbc.update("""
|
||||
INSERT INTO alert_reads (user_id, alert_instance_id)
|
||||
VALUES (?, ?)
|
||||
ON CONFLICT (user_id, alert_instance_id) DO NOTHING
|
||||
""", userId, alertInstanceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bulkMarkRead(String userId, List<UUID> alertInstanceIds) {
|
||||
if (alertInstanceIds == null || alertInstanceIds.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (UUID id : alertInstanceIds) {
|
||||
markRead(userId, id);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user