From 15c0a8273c86f406f6bb2ef0624fe9b2aea23697 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Sun, 19 Apr 2026 19:58:27 +0200 Subject: [PATCH] feat(alerting): AlertEvaluatorJob with claim-polling + circuit breaker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - AlertEvaluatorJob implements SchedulingConfigurer; fixed-delay tick from AlertingProperties.effectiveEvaluatorTickIntervalMs (5 s floor) - Claim-polling via AlertRuleRepository.claimDueRules (FOR UPDATE SKIP LOCKED) - Per-kind circuit breaker guards each evaluator; failures recorded, open kinds skipped and rescheduled without evaluation - Single-Firing path delegates to AlertStateTransitions; new FIRING instances enqueue AlertNotification rows per rule.webhooks() - Batch (PER_EXCHANGE) path creates one FIRING AlertInstance per Firing entry - PENDING→FIRING promotion handled in applyResult via state machine - Title/message rendered via MustacheRenderer + NotificationContextBuilder; environment resolved from EnvironmentRepository.findById per tick - AlertEvaluatorJobIT (4 tests): uses named @MockBean replacements for ClickHouseSearchIndex + ClickHouseLogStore; @MockBean AgentRegistryService drives Clear/Firing/resolve cycle without timing sensitivity Co-Authored-By: Claude Sonnet 4.6 --- .../app/alerting/eval/AlertEvaluatorJob.java | 254 ++++++++++++++++++ .../alerting/eval/AlertEvaluatorJobIT.java | 199 ++++++++++++++ 2 files changed, 453 insertions(+) create mode 100644 cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java create mode 100644 cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java new file mode 100644 index 00000000..7002ad80 --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java @@ -0,0 +1,254 @@ +package com.cameleer.server.app.alerting.eval; + +import com.cameleer.server.app.alerting.config.AlertingProperties; +import com.cameleer.server.app.alerting.notify.MustacheRenderer; +import com.cameleer.server.app.alerting.notify.NotificationContextBuilder; +import com.cameleer.server.core.alerting.*; +import com.cameleer.server.core.runtime.Environment; +import com.cameleer.server.core.runtime.EnvironmentRepository; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; +import org.springframework.stereotype.Component; + +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Claim-polling evaluator job. + *

+ * On each tick, claims a batch of due {@link AlertRule}s via {@code FOR UPDATE SKIP LOCKED}, + * invokes the matching {@link ConditionEvaluator}, applies the {@link AlertStateTransitions} + * state machine, persists any new/updated {@link AlertInstance}, enqueues webhook + * {@link AlertNotification}s on first-fire, and releases the claim. + */ +@Component +public class AlertEvaluatorJob implements SchedulingConfigurer { + + private static final Logger log = LoggerFactory.getLogger(AlertEvaluatorJob.class); + + private final AlertingProperties props; + private final AlertRuleRepository ruleRepo; + private final AlertInstanceRepository instanceRepo; + private final AlertNotificationRepository notificationRepo; + private final Map> evaluators; + private final PerKindCircuitBreaker circuitBreaker; + private final MustacheRenderer renderer; + private final NotificationContextBuilder contextBuilder; + private final EnvironmentRepository environmentRepo; + private final ObjectMapper objectMapper; + private final String instanceId; + private final String tenantId; + private final Clock clock; + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public AlertEvaluatorJob( + AlertingProperties props, + AlertRuleRepository ruleRepo, + AlertInstanceRepository instanceRepo, + AlertNotificationRepository notificationRepo, + List> evaluatorList, + PerKindCircuitBreaker circuitBreaker, + MustacheRenderer renderer, + NotificationContextBuilder contextBuilder, + EnvironmentRepository environmentRepo, + ObjectMapper objectMapper, + @Qualifier("alertingInstanceId") String instanceId, + @Value("${cameleer.server.tenant.id:default}") String tenantId, + Clock alertingClock) { + + this.props = props; + this.ruleRepo = ruleRepo; + this.instanceRepo = instanceRepo; + this.notificationRepo = notificationRepo; + this.evaluators = evaluatorList.stream() + .collect(Collectors.toMap(ConditionEvaluator::kind, e -> e)); + this.circuitBreaker = circuitBreaker; + this.renderer = renderer; + this.contextBuilder = contextBuilder; + this.environmentRepo = environmentRepo; + this.objectMapper = objectMapper; + this.instanceId = instanceId; + this.tenantId = tenantId; + this.clock = alertingClock; + } + + // ------------------------------------------------------------------------- + // SchedulingConfigurer — register the tick as a fixed-delay task + // ------------------------------------------------------------------------- + + @Override + public void configureTasks(ScheduledTaskRegistrar registrar) { + registrar.addFixedDelayTask(this::tick, props.effectiveEvaluatorTickIntervalMs()); + } + + // ------------------------------------------------------------------------- + // Tick — package-private so tests can call it directly + // ------------------------------------------------------------------------- + + void tick() { + List claimed = ruleRepo.claimDueRules( + instanceId, + props.effectiveEvaluatorBatchSize(), + props.effectiveClaimTtlSeconds()); + + if (claimed.isEmpty()) return; + + TickCache cache = new TickCache(); + EvalContext ctx = new EvalContext(tenantId, Instant.now(clock), cache); + + for (AlertRule rule : claimed) { + Instant nextRun = Instant.now(clock).plusSeconds(rule.evaluationIntervalSeconds()); + try { + if (circuitBreaker.isOpen(rule.conditionKind())) { + log.debug("Circuit breaker open for {}; skipping rule {}", rule.conditionKind(), rule.id()); + continue; + } + EvalResult result = evaluateSafely(rule, ctx); + applyResult(rule, result); + circuitBreaker.recordSuccess(rule.conditionKind()); + } catch (Exception e) { + circuitBreaker.recordFailure(rule.conditionKind()); + log.warn("Evaluator error for rule {} ({}): {}", rule.id(), rule.conditionKind(), e.toString()); + } finally { + reschedule(rule, nextRun); + } + } + } + + // ------------------------------------------------------------------------- + // Evaluation + // ------------------------------------------------------------------------- + + @SuppressWarnings({"rawtypes", "unchecked"}) + private EvalResult evaluateSafely(AlertRule rule, EvalContext ctx) { + ConditionEvaluator evaluator = evaluators.get(rule.conditionKind()); + if (evaluator == null) { + throw new IllegalStateException("No evaluator registered for " + rule.conditionKind()); + } + return evaluator.evaluate(rule.condition(), rule, ctx); + } + + // ------------------------------------------------------------------------- + // State machine application + // ------------------------------------------------------------------------- + + private void applyResult(AlertRule rule, EvalResult result) { + if (result instanceof EvalResult.Batch b) { + // PER_EXCHANGE mode: each Firing in the batch creates its own AlertInstance + for (EvalResult.Firing f : b.firings()) { + applyBatchFiring(rule, f); + } + return; + } + + AlertInstance current = instanceRepo.findOpenForRule(rule.id()).orElse(null); + Instant now = Instant.now(clock); + + AlertStateTransitions.apply(current, result, rule, now).ifPresent(next -> { + // Determine whether this is a newly created instance transitioning to FIRING + boolean isFirstFire = current == null && next.state() == AlertState.FIRING; + boolean promotedFromPending = current != null + && current.state() == AlertState.PENDING + && next.state() == AlertState.FIRING; + + AlertInstance enriched = enrichTitleMessage(rule, next); + AlertInstance persisted = instanceRepo.save(enriched); + + if (isFirstFire || promotedFromPending) { + enqueueNotifications(rule, persisted, now); + } + }); + } + + /** + * Batch (PER_EXCHANGE) mode: always create a fresh FIRING instance per Firing entry. + * No forDuration check — each exchange is its own event. + */ + private void applyBatchFiring(AlertRule rule, EvalResult.Firing f) { + Instant now = Instant.now(clock); + AlertInstance instance = AlertStateTransitions.newInstance(rule, f, AlertState.FIRING, now); + AlertInstance enriched = enrichTitleMessage(rule, instance); + AlertInstance persisted = instanceRepo.save(enriched); + enqueueNotifications(rule, persisted, now); + } + + // ------------------------------------------------------------------------- + // Title / message rendering + // ------------------------------------------------------------------------- + + private AlertInstance enrichTitleMessage(AlertRule rule, AlertInstance instance) { + Environment env = environmentRepo.findById(rule.environmentId()).orElse(null); + Map ctx = contextBuilder.build(rule, instance, env, null); + String title = renderer.render(rule.notificationTitleTmpl(), ctx); + String message = renderer.render(rule.notificationMessageTmpl(), ctx); + return instance.withTitleMessage(title, message); + } + + // ------------------------------------------------------------------------- + // Notification enqueue + // ------------------------------------------------------------------------- + + private void enqueueNotifications(AlertRule rule, AlertInstance instance, Instant now) { + for (WebhookBinding w : rule.webhooks()) { + Map payload = buildPayload(rule, instance); + notificationRepo.save(new AlertNotification( + UUID.randomUUID(), + instance.id(), + w.id(), + w.outboundConnectionId(), + NotificationStatus.PENDING, + 0, + now, + null, null, null, null, + payload, + null, + now)); + } + } + + private Map buildPayload(AlertRule rule, AlertInstance instance) { + Environment env = environmentRepo.findById(rule.environmentId()).orElse(null); + return contextBuilder.build(rule, instance, env, null); + } + + // ------------------------------------------------------------------------- + // Claim release + // ------------------------------------------------------------------------- + + private void reschedule(AlertRule rule, Instant nextRun) { + ruleRepo.releaseClaim(rule.id(), nextRun, rule.evalState()); + } + + // ------------------------------------------------------------------------- + // Rule snapshot helper (used by tests / future extensions) + // ------------------------------------------------------------------------- + + @SuppressWarnings("unchecked") + Map snapshotRule(AlertRule rule) { + try { + return objectMapper.convertValue(rule, Map.class); + } catch (Exception e) { + log.warn("Failed to snapshot rule {}: {}", rule.id(), e.getMessage()); + return Map.of("id", rule.id().toString(), "name", rule.name()); + } + } + + // ------------------------------------------------------------------------- + // Visible for testing + // ------------------------------------------------------------------------- + + /** Returns the evaluator map (for inspection in tests). */ + Map> evaluators() { + return evaluators; + } +} diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java new file mode 100644 index 00000000..9c3e5659 --- /dev/null +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJobIT.java @@ -0,0 +1,199 @@ +package com.cameleer.server.app.alerting.eval; + +import com.cameleer.server.app.AbstractPostgresIT; +import com.cameleer.server.app.search.ClickHouseLogStore; +import com.cameleer.server.app.search.ClickHouseSearchIndex; +import com.cameleer.server.core.agent.AgentInfo; +import com.cameleer.server.core.agent.AgentRegistryService; +import com.cameleer.server.core.agent.AgentState; +import com.cameleer.server.core.alerting.*; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.mock.mockito.MockBean; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +/** + * Integration test for {@link AlertEvaluatorJob}. + *

+ * Uses real Postgres (Testcontainers) for the full claim→persist pipeline. + * {@code ClickHouseSearchIndex} and {@code ClickHouseLogStore} are mocked so + * {@code ExchangeMatchEvaluator} and {@code LogPatternEvaluator} wire up even + * though those concrete types are not directly registered as Spring beans. + * {@code AgentRegistryService} is mocked so tests can control which agents + * are DEAD without depending on in-memory timing. + */ +class AlertEvaluatorJobIT extends AbstractPostgresIT { + + // Replace the named beans so ExchangeMatchEvaluator / LogPatternEvaluator can wire their + // concrete-type constructor args without duplicating the SearchIndex / LogIndex beans. + @MockBean(name = "clickHouseSearchIndex") ClickHouseSearchIndex clickHouseSearchIndex; + @MockBean(name = "clickHouseLogStore") ClickHouseLogStore clickHouseLogStore; + + // Control agent state per test without timing sensitivity + @MockBean AgentRegistryService agentRegistryService; + + @Autowired private AlertEvaluatorJob job; + @Autowired private AlertRuleRepository ruleRepo; + @Autowired private AlertInstanceRepository instanceRepo; + + private UUID envId; + private UUID ruleId; + private static final String SYS_USER = "sys-eval-it"; + private static final String APP_SLUG = "orders"; + private static final String AGENT_ID = "test-agent-01"; + + @BeforeEach + void setup() { + // Default: empty registry — all evaluators return Clear + when(agentRegistryService.findAll()).thenReturn(List.of()); + + envId = UUID.randomUUID(); + ruleId = UUID.randomUUID(); + + jdbcTemplate.update( + "INSERT INTO environments (id, slug, display_name) VALUES (?, ?, ?)", + envId, "eval-it-env-" + envId, "Eval IT Env"); + jdbcTemplate.update( + "INSERT INTO users (user_id, provider, email) VALUES (?, 'local', ?) ON CONFLICT (user_id) DO NOTHING", + SYS_USER, SYS_USER + "@test.example.com"); + + // Rule: AGENT_STATE = DEAD, forSeconds=60, forDurationSeconds=0 (straight to FIRING) + var condition = new AgentStateCondition( + new AlertScope(APP_SLUG, null, null), "DEAD", 60); + var rule = new AlertRule( + ruleId, envId, "dead-agent-rule", "fires when orders agent is dead", + AlertSeverity.WARNING, true, ConditionKind.AGENT_STATE, condition, + 60, 0, 60, + "Agent dead: {{agent.name}}", "Agent {{agent.id}} is {{agent.state}}", + List.of(), List.of(), + Instant.now().minusSeconds(5), // due now + null, null, Map.of(), + Instant.now(), SYS_USER, Instant.now(), SYS_USER); + ruleRepo.save(rule); + } + + @AfterEach + void cleanup() { + jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " + + "(SELECT id FROM alert_instances WHERE environment_id = ?)", envId); + jdbcTemplate.update("DELETE FROM alert_instances WHERE environment_id = ?", envId); + jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId); + jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId); + jdbcTemplate.update("DELETE FROM users WHERE user_id = ?", SYS_USER); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + private AgentInfo deadAgent(Instant lastHeartbeat) { + return new AgentInfo(AGENT_ID, "orders-service", APP_SLUG, + envId.toString(), "1.0", List.of(), Map.of(), + AgentState.DEAD, lastHeartbeat.minusSeconds(300), lastHeartbeat, null); + } + + // ------------------------------------------------------------------------- + // Tests + // ------------------------------------------------------------------------- + + @Test + void noMatchingAgentProducesNoInstance() { + // Registry empty → evaluator returns Clear → no alert_instance + when(agentRegistryService.findAll()).thenReturn(List.of()); + + job.tick(); + + assertThat(instanceRepo.findOpenForRule(ruleId)).isEmpty(); + } + + @Test + void deadAgentProducesFiringInstance() { + // Agent has been DEAD for 2 minutes (> forSeconds=60) → FIRING + when(agentRegistryService.findAll()) + .thenReturn(List.of(deadAgent(Instant.now().minusSeconds(120)))); + + job.tick(); + + assertThat(instanceRepo.findOpenForRule(ruleId)).hasValueSatisfying(i -> { + assertThat(i.state()).isEqualTo(AlertState.FIRING); + assertThat(i.ruleId()).isEqualTo(ruleId); + assertThat(i.environmentId()).isEqualTo(envId); + assertThat(i.severity()).isEqualTo(AlertSeverity.WARNING); + }); + } + + @Test + void claimDueResolveCycle() { + // Tick 1: dead agent → FIRING + when(agentRegistryService.findAll()) + .thenReturn(List.of(deadAgent(Instant.now().minusSeconds(120)))); + job.tick(); + assertThat(instanceRepo.findOpenForRule(ruleId)).hasValueSatisfying(i -> + assertThat(i.state()).isEqualTo(AlertState.FIRING)); + + // Bump next_evaluation_at so rule is due again + jdbcTemplate.update( + "UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " + + "claimed_by = NULL, claimed_until = NULL WHERE id = ?", ruleId); + + // Tick 2: empty registry → Clear → RESOLVED + when(agentRegistryService.findAll()).thenReturn(List.of()); + job.tick(); + + assertThat(instanceRepo.findOpenForRule(ruleId)).isEmpty(); + long resolvedCount = jdbcTemplate.queryForObject( + "SELECT count(*) FROM alert_instances WHERE rule_id = ? AND state = 'RESOLVED'", + Long.class, ruleId); + assertThat(resolvedCount).isEqualTo(1L); + } + + @Test + void firingWithForDurationCreatesPendingThenPromotes() { + UUID ruleId2 = UUID.randomUUID(); + var condition = new AgentStateCondition(new AlertScope(APP_SLUG, null, null), "DEAD", 60); + var ruleWithDuration = new AlertRule( + ruleId2, envId, "pending-rule", null, + AlertSeverity.WARNING, true, ConditionKind.AGENT_STATE, condition, + 60, 60, 60, // forDurationSeconds = 60 + "title", "msg", + List.of(), List.of(), + Instant.now().minusSeconds(5), + null, null, Map.of(), + Instant.now(), SYS_USER, Instant.now(), SYS_USER); + ruleRepo.save(ruleWithDuration); + + // Dead agent for both rules + when(agentRegistryService.findAll()) + .thenReturn(List.of(deadAgent(Instant.now().minusSeconds(120)))); + job.tick(); + + // ruleId2 has forDuration=60 → PENDING + assertThat(instanceRepo.findOpenForRule(ruleId2)).hasValueSatisfying(i -> + assertThat(i.state()).isEqualTo(AlertState.PENDING)); + + // Backdate firedAt so promotion window is met + jdbcTemplate.update( + "UPDATE alert_instances SET fired_at = now() - interval '90 seconds' WHERE rule_id = ?", + ruleId2); + jdbcTemplate.update( + "UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " + + "claimed_by = NULL, claimed_until = NULL WHERE id = ?", ruleId2); + + job.tick(); + + assertThat(instanceRepo.findOpenForRule(ruleId2)).hasValueSatisfying(i -> + assertThat(i.state()).isEqualTo(AlertState.FIRING)); + + jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", ruleId2); + jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", ruleId2); + } +}