From f80bc006c14f77a752487214df446a71ff5781bf Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Sun, 19 Apr 2026 18:48:15 +0200 Subject: [PATCH] feat(alerting): Postgres repository for alert_rules Implements AlertRuleRepository with JSONB condition/webhooks/eval_state serialization via ObjectMapper, UPSERT on conflict, JSONB containment query for findRuleIdsByOutboundConnectionId, and FOR UPDATE SKIP LOCKED claim-polling for horizontal scale. Co-Authored-By: Claude Sonnet 4.6 --- .../storage/PostgresAlertRuleRepository.java | 176 ++++++++++++++++++ .../PostgresAlertRuleRepositoryIT.java | 87 +++++++++ 2 files changed, 263 insertions(+) create mode 100644 cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/storage/PostgresAlertRuleRepository.java create mode 100644 cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/storage/PostgresAlertRuleRepositoryIT.java diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/storage/PostgresAlertRuleRepository.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/storage/PostgresAlertRuleRepository.java new file mode 100644 index 00000000..efbdd07e --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/storage/PostgresAlertRuleRepository.java @@ -0,0 +1,176 @@ +package com.cameleer.server.app.alerting.storage; + +import com.cameleer.server.core.alerting.*; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.*; + +public class PostgresAlertRuleRepository implements AlertRuleRepository { + + private final JdbcTemplate jdbc; + private final ObjectMapper om; + + public PostgresAlertRuleRepository(JdbcTemplate jdbc, ObjectMapper om) { + this.jdbc = jdbc; + this.om = om; + } + + @Override + public AlertRule save(AlertRule r) { + String sql = """ + INSERT INTO alert_rules (id, environment_id, name, description, severity, enabled, + condition_kind, condition, evaluation_interval_seconds, for_duration_seconds, + re_notify_minutes, notification_title_tmpl, notification_message_tmpl, + webhooks, next_evaluation_at, claimed_by, claimed_until, eval_state, + created_at, created_by, updated_at, updated_by) + VALUES (?, ?, ?, ?, ?::severity_enum, ?, ?::condition_kind_enum, ?::jsonb, ?, ?, ?, ?, ?, ?::jsonb, + ?, ?, ?, ?::jsonb, ?, ?, ?, ?) + ON CONFLICT (id) DO UPDATE SET + name = EXCLUDED.name, description = EXCLUDED.description, + severity = EXCLUDED.severity, enabled = EXCLUDED.enabled, + condition_kind = EXCLUDED.condition_kind, condition = EXCLUDED.condition, + evaluation_interval_seconds = EXCLUDED.evaluation_interval_seconds, + for_duration_seconds = EXCLUDED.for_duration_seconds, + re_notify_minutes = EXCLUDED.re_notify_minutes, + notification_title_tmpl = EXCLUDED.notification_title_tmpl, + notification_message_tmpl = EXCLUDED.notification_message_tmpl, + webhooks = EXCLUDED.webhooks, eval_state = EXCLUDED.eval_state, + updated_at = EXCLUDED.updated_at, updated_by = EXCLUDED.updated_by + """; + jdbc.update(sql, + r.id(), r.environmentId(), r.name(), r.description(), + r.severity().name(), r.enabled(), r.conditionKind().name(), + writeJson(r.condition()), + r.evaluationIntervalSeconds(), r.forDurationSeconds(), r.reNotifyMinutes(), + r.notificationTitleTmpl(), r.notificationMessageTmpl(), + writeJson(r.webhooks()), + Timestamp.from(r.nextEvaluationAt()), + r.claimedBy(), + r.claimedUntil() == null ? null : Timestamp.from(r.claimedUntil()), + writeJson(r.evalState()), + Timestamp.from(r.createdAt()), r.createdBy(), + Timestamp.from(r.updatedAt()), r.updatedBy()); + return r; + } + + @Override + public Optional findById(UUID id) { + var list = jdbc.query("SELECT * FROM alert_rules WHERE id = ?", rowMapper(), id); + return list.isEmpty() ? Optional.empty() : Optional.of(list.get(0)); + } + + @Override + public List listByEnvironment(UUID environmentId) { + return jdbc.query( + "SELECT * FROM alert_rules WHERE environment_id = ? ORDER BY created_at DESC", + rowMapper(), environmentId); + } + + @Override + public List findAllByOutboundConnectionId(UUID connectionId) { + String sql = """ + SELECT * FROM alert_rules + WHERE webhooks @> ?::jsonb + ORDER BY created_at DESC + """; + String predicate = "[{\"outboundConnectionId\":\"" + connectionId + "\"}]"; + return jdbc.query(sql, rowMapper(), predicate); + } + + @Override + public List findRuleIdsByOutboundConnectionId(UUID connectionId) { + String sql = """ + SELECT id FROM alert_rules + WHERE webhooks @> ?::jsonb + """; + String predicate = "[{\"outboundConnectionId\":\"" + connectionId + "\"}]"; + return jdbc.queryForList(sql, UUID.class, predicate); + } + + @Override + public void delete(UUID id) { + jdbc.update("DELETE FROM alert_rules WHERE id = ?", id); + } + + @Override + public List claimDueRules(String instanceId, int batchSize, int claimTtlSeconds) { + String sql = """ + UPDATE alert_rules + SET claimed_by = ?, claimed_until = now() + (? || ' seconds')::interval + WHERE id IN ( + SELECT id FROM alert_rules + WHERE enabled = true + AND next_evaluation_at <= now() + AND (claimed_until IS NULL OR claimed_until < now()) + ORDER BY next_evaluation_at + LIMIT ? + FOR UPDATE SKIP LOCKED + ) + RETURNING * + """; + return jdbc.query(sql, rowMapper(), instanceId, claimTtlSeconds, batchSize); + } + + @Override + public void releaseClaim(UUID ruleId, Instant nextEvaluationAt, Map evalState) { + jdbc.update(""" + UPDATE alert_rules + SET claimed_by = NULL, claimed_until = NULL, + next_evaluation_at = ?, eval_state = ?::jsonb + WHERE id = ? + """, + Timestamp.from(nextEvaluationAt), writeJson(evalState), ruleId); + } + + private RowMapper rowMapper() { + return (rs, i) -> { + try { + ConditionKind kind = ConditionKind.valueOf(rs.getString("condition_kind")); + AlertCondition cond = om.readValue(rs.getString("condition"), AlertCondition.class); + List webhooks = om.readValue( + rs.getString("webhooks"), new TypeReference<>() {}); + Map evalState = om.readValue( + rs.getString("eval_state"), new TypeReference<>() {}); + + Timestamp cu = rs.getTimestamp("claimed_until"); + return new AlertRule( + (UUID) rs.getObject("id"), + (UUID) rs.getObject("environment_id"), + rs.getString("name"), + rs.getString("description"), + AlertSeverity.valueOf(rs.getString("severity")), + rs.getBoolean("enabled"), + kind, cond, + rs.getInt("evaluation_interval_seconds"), + rs.getInt("for_duration_seconds"), + rs.getInt("re_notify_minutes"), + rs.getString("notification_title_tmpl"), + rs.getString("notification_message_tmpl"), + webhooks, List.of(), + rs.getTimestamp("next_evaluation_at").toInstant(), + rs.getString("claimed_by"), + cu == null ? null : cu.toInstant(), + evalState, + rs.getTimestamp("created_at").toInstant(), + rs.getString("created_by"), + rs.getTimestamp("updated_at").toInstant(), + rs.getString("updated_by")); + } catch (Exception e) { + throw new IllegalStateException("Failed to map alert_rules row", e); + } + }; + } + + private String writeJson(Object o) { + try { + return om.writeValueAsString(o); + } catch (Exception e) { + throw new IllegalStateException("Failed to serialize to JSON", e); + } + } +} diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/storage/PostgresAlertRuleRepositoryIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/storage/PostgresAlertRuleRepositoryIT.java new file mode 100644 index 00000000..64d8f76d --- /dev/null +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/storage/PostgresAlertRuleRepositoryIT.java @@ -0,0 +1,87 @@ +package com.cameleer.server.app.alerting.storage; + +import com.cameleer.server.app.AbstractPostgresIT; +import com.cameleer.server.core.alerting.*; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +class PostgresAlertRuleRepositoryIT extends AbstractPostgresIT { + + private PostgresAlertRuleRepository repo; + private UUID envId; + + @BeforeEach + void setup() { + repo = new PostgresAlertRuleRepository(jdbcTemplate, new ObjectMapper()); + envId = UUID.randomUUID(); + jdbcTemplate.update( + "INSERT INTO environments (id, slug, display_name) VALUES (?, ?, ?)", + envId, "test-env-" + UUID.randomUUID(), "Test Env"); + jdbcTemplate.update( + "INSERT INTO users (user_id, provider, email) VALUES ('test-user', 'local', 'test@example.com')" + + " ON CONFLICT (user_id) DO NOTHING"); + } + + @AfterEach + void cleanup() { + jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId); + jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId); + jdbcTemplate.update("DELETE FROM users WHERE user_id = 'test-user'"); + } + + @Test + void saveAndFindByIdRoundtrip() { + var rule = newRule(List.of()); + repo.save(rule); + var found = repo.findById(rule.id()).orElseThrow(); + assertThat(found.name()).isEqualTo(rule.name()); + assertThat(found.condition()).isInstanceOf(AgentStateCondition.class); + assertThat(found.severity()).isEqualTo(AlertSeverity.WARNING); + assertThat(found.conditionKind()).isEqualTo(ConditionKind.AGENT_STATE); + } + + @Test + void findRuleIdsByOutboundConnectionId() { + var connId = UUID.randomUUID(); + var wb = new WebhookBinding(UUID.randomUUID(), connId, null, Map.of()); + var rule = newRule(List.of(wb)); + repo.save(rule); + + List ids = repo.findRuleIdsByOutboundConnectionId(connId); + assertThat(ids).containsExactly(rule.id()); + + assertThat(repo.findRuleIdsByOutboundConnectionId(UUID.randomUUID())).isEmpty(); + } + + @Test + void claimDueRulesAtomicSkipLocked() { + var rule = newRule(List.of()); + repo.save(rule); + + List claimed = repo.claimDueRules("instance-A", 10, 30); + assertThat(claimed).hasSize(1); + + // Second claimant sees nothing until first releases or TTL expires + List second = repo.claimDueRules("instance-B", 10, 30); + assertThat(second).isEmpty(); + } + + private AlertRule newRule(List webhooks) { + return new AlertRule( + UUID.randomUUID(), envId, "rule-" + UUID.randomUUID(), "desc", + AlertSeverity.WARNING, true, ConditionKind.AGENT_STATE, + new AgentStateCondition(new AlertScope(null, null, null), "DEAD", 60), + 60, 0, 60, "t", "m", webhooks, List.of(), + Instant.now().minusSeconds(10), null, null, Map.of(), + Instant.now(), "test-user", Instant.now(), "test-user"); + } +}