feat(alerting): Postgres repository for alert_rules
Implements AlertRuleRepository with JSONB condition/webhooks/eval_state serialization via ObjectMapper, UPSERT on conflict, JSONB containment query for findRuleIdsByOutboundConnectionId, and FOR UPDATE SKIP LOCKED claim-polling for horizontal scale. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,176 @@
|
||||
package com.cameleer.server.app.alerting.storage;
|
||||
|
||||
import com.cameleer.server.core.alerting.*;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
|
||||
public class PostgresAlertRuleRepository implements AlertRuleRepository {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
private final ObjectMapper om;
|
||||
|
||||
public PostgresAlertRuleRepository(JdbcTemplate jdbc, ObjectMapper om) {
|
||||
this.jdbc = jdbc;
|
||||
this.om = om;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AlertRule save(AlertRule r) {
|
||||
String sql = """
|
||||
INSERT INTO alert_rules (id, environment_id, name, description, severity, enabled,
|
||||
condition_kind, condition, evaluation_interval_seconds, for_duration_seconds,
|
||||
re_notify_minutes, notification_title_tmpl, notification_message_tmpl,
|
||||
webhooks, next_evaluation_at, claimed_by, claimed_until, eval_state,
|
||||
created_at, created_by, updated_at, updated_by)
|
||||
VALUES (?, ?, ?, ?, ?::severity_enum, ?, ?::condition_kind_enum, ?::jsonb, ?, ?, ?, ?, ?, ?::jsonb,
|
||||
?, ?, ?, ?::jsonb, ?, ?, ?, ?)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
name = EXCLUDED.name, description = EXCLUDED.description,
|
||||
severity = EXCLUDED.severity, enabled = EXCLUDED.enabled,
|
||||
condition_kind = EXCLUDED.condition_kind, condition = EXCLUDED.condition,
|
||||
evaluation_interval_seconds = EXCLUDED.evaluation_interval_seconds,
|
||||
for_duration_seconds = EXCLUDED.for_duration_seconds,
|
||||
re_notify_minutes = EXCLUDED.re_notify_minutes,
|
||||
notification_title_tmpl = EXCLUDED.notification_title_tmpl,
|
||||
notification_message_tmpl = EXCLUDED.notification_message_tmpl,
|
||||
webhooks = EXCLUDED.webhooks, eval_state = EXCLUDED.eval_state,
|
||||
updated_at = EXCLUDED.updated_at, updated_by = EXCLUDED.updated_by
|
||||
""";
|
||||
jdbc.update(sql,
|
||||
r.id(), r.environmentId(), r.name(), r.description(),
|
||||
r.severity().name(), r.enabled(), r.conditionKind().name(),
|
||||
writeJson(r.condition()),
|
||||
r.evaluationIntervalSeconds(), r.forDurationSeconds(), r.reNotifyMinutes(),
|
||||
r.notificationTitleTmpl(), r.notificationMessageTmpl(),
|
||||
writeJson(r.webhooks()),
|
||||
Timestamp.from(r.nextEvaluationAt()),
|
||||
r.claimedBy(),
|
||||
r.claimedUntil() == null ? null : Timestamp.from(r.claimedUntil()),
|
||||
writeJson(r.evalState()),
|
||||
Timestamp.from(r.createdAt()), r.createdBy(),
|
||||
Timestamp.from(r.updatedAt()), r.updatedBy());
|
||||
return r;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<AlertRule> findById(UUID id) {
|
||||
var list = jdbc.query("SELECT * FROM alert_rules WHERE id = ?", rowMapper(), id);
|
||||
return list.isEmpty() ? Optional.empty() : Optional.of(list.get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AlertRule> listByEnvironment(UUID environmentId) {
|
||||
return jdbc.query(
|
||||
"SELECT * FROM alert_rules WHERE environment_id = ? ORDER BY created_at DESC",
|
||||
rowMapper(), environmentId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AlertRule> findAllByOutboundConnectionId(UUID connectionId) {
|
||||
String sql = """
|
||||
SELECT * FROM alert_rules
|
||||
WHERE webhooks @> ?::jsonb
|
||||
ORDER BY created_at DESC
|
||||
""";
|
||||
String predicate = "[{\"outboundConnectionId\":\"" + connectionId + "\"}]";
|
||||
return jdbc.query(sql, rowMapper(), predicate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<UUID> findRuleIdsByOutboundConnectionId(UUID connectionId) {
|
||||
String sql = """
|
||||
SELECT id FROM alert_rules
|
||||
WHERE webhooks @> ?::jsonb
|
||||
""";
|
||||
String predicate = "[{\"outboundConnectionId\":\"" + connectionId + "\"}]";
|
||||
return jdbc.queryForList(sql, UUID.class, predicate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(UUID id) {
|
||||
jdbc.update("DELETE FROM alert_rules WHERE id = ?", id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AlertRule> claimDueRules(String instanceId, int batchSize, int claimTtlSeconds) {
|
||||
String sql = """
|
||||
UPDATE alert_rules
|
||||
SET claimed_by = ?, claimed_until = now() + (? || ' seconds')::interval
|
||||
WHERE id IN (
|
||||
SELECT id FROM alert_rules
|
||||
WHERE enabled = true
|
||||
AND next_evaluation_at <= now()
|
||||
AND (claimed_until IS NULL OR claimed_until < now())
|
||||
ORDER BY next_evaluation_at
|
||||
LIMIT ?
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING *
|
||||
""";
|
||||
return jdbc.query(sql, rowMapper(), instanceId, claimTtlSeconds, batchSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseClaim(UUID ruleId, Instant nextEvaluationAt, Map<String, Object> evalState) {
|
||||
jdbc.update("""
|
||||
UPDATE alert_rules
|
||||
SET claimed_by = NULL, claimed_until = NULL,
|
||||
next_evaluation_at = ?, eval_state = ?::jsonb
|
||||
WHERE id = ?
|
||||
""",
|
||||
Timestamp.from(nextEvaluationAt), writeJson(evalState), ruleId);
|
||||
}
|
||||
|
||||
private RowMapper<AlertRule> rowMapper() {
|
||||
return (rs, i) -> {
|
||||
try {
|
||||
ConditionKind kind = ConditionKind.valueOf(rs.getString("condition_kind"));
|
||||
AlertCondition cond = om.readValue(rs.getString("condition"), AlertCondition.class);
|
||||
List<WebhookBinding> webhooks = om.readValue(
|
||||
rs.getString("webhooks"), new TypeReference<>() {});
|
||||
Map<String, Object> evalState = om.readValue(
|
||||
rs.getString("eval_state"), new TypeReference<>() {});
|
||||
|
||||
Timestamp cu = rs.getTimestamp("claimed_until");
|
||||
return new AlertRule(
|
||||
(UUID) rs.getObject("id"),
|
||||
(UUID) rs.getObject("environment_id"),
|
||||
rs.getString("name"),
|
||||
rs.getString("description"),
|
||||
AlertSeverity.valueOf(rs.getString("severity")),
|
||||
rs.getBoolean("enabled"),
|
||||
kind, cond,
|
||||
rs.getInt("evaluation_interval_seconds"),
|
||||
rs.getInt("for_duration_seconds"),
|
||||
rs.getInt("re_notify_minutes"),
|
||||
rs.getString("notification_title_tmpl"),
|
||||
rs.getString("notification_message_tmpl"),
|
||||
webhooks, List.of(),
|
||||
rs.getTimestamp("next_evaluation_at").toInstant(),
|
||||
rs.getString("claimed_by"),
|
||||
cu == null ? null : cu.toInstant(),
|
||||
evalState,
|
||||
rs.getTimestamp("created_at").toInstant(),
|
||||
rs.getString("created_by"),
|
||||
rs.getTimestamp("updated_at").toInstant(),
|
||||
rs.getString("updated_by"));
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to map alert_rules row", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private String writeJson(Object o) {
|
||||
try {
|
||||
return om.writeValueAsString(o);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to serialize to JSON", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
package com.cameleer.server.app.alerting.storage;
|
||||
|
||||
import com.cameleer.server.app.AbstractPostgresIT;
|
||||
import com.cameleer.server.core.alerting.*;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class PostgresAlertRuleRepositoryIT extends AbstractPostgresIT {
|
||||
|
||||
private PostgresAlertRuleRepository repo;
|
||||
private UUID envId;
|
||||
|
||||
@BeforeEach
|
||||
void setup() {
|
||||
repo = new PostgresAlertRuleRepository(jdbcTemplate, new ObjectMapper());
|
||||
envId = UUID.randomUUID();
|
||||
jdbcTemplate.update(
|
||||
"INSERT INTO environments (id, slug, display_name) VALUES (?, ?, ?)",
|
||||
envId, "test-env-" + UUID.randomUUID(), "Test Env");
|
||||
jdbcTemplate.update(
|
||||
"INSERT INTO users (user_id, provider, email) VALUES ('test-user', 'local', 'test@example.com')" +
|
||||
" ON CONFLICT (user_id) DO NOTHING");
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void cleanup() {
|
||||
jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId);
|
||||
jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId);
|
||||
jdbcTemplate.update("DELETE FROM users WHERE user_id = 'test-user'");
|
||||
}
|
||||
|
||||
@Test
|
||||
void saveAndFindByIdRoundtrip() {
|
||||
var rule = newRule(List.of());
|
||||
repo.save(rule);
|
||||
var found = repo.findById(rule.id()).orElseThrow();
|
||||
assertThat(found.name()).isEqualTo(rule.name());
|
||||
assertThat(found.condition()).isInstanceOf(AgentStateCondition.class);
|
||||
assertThat(found.severity()).isEqualTo(AlertSeverity.WARNING);
|
||||
assertThat(found.conditionKind()).isEqualTo(ConditionKind.AGENT_STATE);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findRuleIdsByOutboundConnectionId() {
|
||||
var connId = UUID.randomUUID();
|
||||
var wb = new WebhookBinding(UUID.randomUUID(), connId, null, Map.of());
|
||||
var rule = newRule(List.of(wb));
|
||||
repo.save(rule);
|
||||
|
||||
List<UUID> ids = repo.findRuleIdsByOutboundConnectionId(connId);
|
||||
assertThat(ids).containsExactly(rule.id());
|
||||
|
||||
assertThat(repo.findRuleIdsByOutboundConnectionId(UUID.randomUUID())).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void claimDueRulesAtomicSkipLocked() {
|
||||
var rule = newRule(List.of());
|
||||
repo.save(rule);
|
||||
|
||||
List<AlertRule> claimed = repo.claimDueRules("instance-A", 10, 30);
|
||||
assertThat(claimed).hasSize(1);
|
||||
|
||||
// Second claimant sees nothing until first releases or TTL expires
|
||||
List<AlertRule> second = repo.claimDueRules("instance-B", 10, 30);
|
||||
assertThat(second).isEmpty();
|
||||
}
|
||||
|
||||
private AlertRule newRule(List<WebhookBinding> webhooks) {
|
||||
return new AlertRule(
|
||||
UUID.randomUUID(), envId, "rule-" + UUID.randomUUID(), "desc",
|
||||
AlertSeverity.WARNING, true, ConditionKind.AGENT_STATE,
|
||||
new AgentStateCondition(new AlertScope(null, null, null), "DEAD", 60),
|
||||
60, 0, 60, "t", "m", webhooks, List.of(),
|
||||
Instant.now().minusSeconds(10), null, null, Map.of(),
|
||||
Instant.now(), "test-user", Instant.now(), "test-user");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user