diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java new file mode 100644 index 00000000..7002ad80 --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/AlertEvaluatorJob.java @@ -0,0 +1,254 @@ +package com.cameleer.server.app.alerting.eval; + +import com.cameleer.server.app.alerting.config.AlertingProperties; +import com.cameleer.server.app.alerting.notify.MustacheRenderer; +import com.cameleer.server.app.alerting.notify.NotificationContextBuilder; +import com.cameleer.server.core.alerting.*; +import com.cameleer.server.core.runtime.Environment; +import com.cameleer.server.core.runtime.EnvironmentRepository; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; +import org.springframework.stereotype.Component; + +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Claim-polling evaluator job. + *
+ * On each tick, claims a batch of due {@link AlertRule}s via {@code FOR UPDATE SKIP LOCKED},
+ * invokes the matching {@link ConditionEvaluator}, applies the {@link AlertStateTransitions}
+ * state machine, persists any new/updated {@link AlertInstance}, enqueues webhook
+ * {@link AlertNotification}s on first-fire, and releases the claim.
+ */
+@Component
+public class AlertEvaluatorJob implements SchedulingConfigurer {
+
+ private static final Logger log = LoggerFactory.getLogger(AlertEvaluatorJob.class);
+
+ private final AlertingProperties props;
+ private final AlertRuleRepository ruleRepo;
+ private final AlertInstanceRepository instanceRepo;
+ private final AlertNotificationRepository notificationRepo;
+ private final Map
+ * Uses real Postgres (Testcontainers) for the full claim→persist pipeline.
+ * {@code ClickHouseSearchIndex} and {@code ClickHouseLogStore} are mocked so
+ * {@code ExchangeMatchEvaluator} and {@code LogPatternEvaluator} wire up even
+ * though those concrete types are not directly registered as Spring beans.
+ * {@code AgentRegistryService} is mocked so tests can control which agents
+ * are DEAD without depending on in-memory timing.
+ */
+class AlertEvaluatorJobIT extends AbstractPostgresIT {
+
+ // Replace the named beans so ExchangeMatchEvaluator / LogPatternEvaluator can wire their
+ // concrete-type constructor args without duplicating the SearchIndex / LogIndex beans.
+ @MockBean(name = "clickHouseSearchIndex") ClickHouseSearchIndex clickHouseSearchIndex;
+ @MockBean(name = "clickHouseLogStore") ClickHouseLogStore clickHouseLogStore;
+
+ // Control agent state per test without timing sensitivity
+ @MockBean AgentRegistryService agentRegistryService;
+
+ @Autowired private AlertEvaluatorJob job;
+ @Autowired private AlertRuleRepository ruleRepo;
+ @Autowired private AlertInstanceRepository instanceRepo;
+
+ private UUID envId;
+ private UUID ruleId;
+ private static final String SYS_USER = "sys-eval-it";
+ private static final String APP_SLUG = "orders";
+ private static final String AGENT_ID = "test-agent-01";
+
+ @BeforeEach
+ void setup() {
+ // Default: empty registry — all evaluators return Clear
+ when(agentRegistryService.findAll()).thenReturn(List.of());
+
+ envId = UUID.randomUUID();
+ ruleId = UUID.randomUUID();
+
+ jdbcTemplate.update(
+ "INSERT INTO environments (id, slug, display_name) VALUES (?, ?, ?)",
+ envId, "eval-it-env-" + envId, "Eval IT Env");
+ jdbcTemplate.update(
+ "INSERT INTO users (user_id, provider, email) VALUES (?, 'local', ?) ON CONFLICT (user_id) DO NOTHING",
+ SYS_USER, SYS_USER + "@test.example.com");
+
+ // Rule: AGENT_STATE = DEAD, forSeconds=60, forDurationSeconds=0 (straight to FIRING)
+ var condition = new AgentStateCondition(
+ new AlertScope(APP_SLUG, null, null), "DEAD", 60);
+ var rule = new AlertRule(
+ ruleId, envId, "dead-agent-rule", "fires when orders agent is dead",
+ AlertSeverity.WARNING, true, ConditionKind.AGENT_STATE, condition,
+ 60, 0, 60,
+ "Agent dead: {{agent.name}}", "Agent {{agent.id}} is {{agent.state}}",
+ List.of(), List.of(),
+ Instant.now().minusSeconds(5), // due now
+ null, null, Map.of(),
+ Instant.now(), SYS_USER, Instant.now(), SYS_USER);
+ ruleRepo.save(rule);
+ }
+
+ @AfterEach
+ void cleanup() {
+ jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " +
+ "(SELECT id FROM alert_instances WHERE environment_id = ?)", envId);
+ jdbcTemplate.update("DELETE FROM alert_instances WHERE environment_id = ?", envId);
+ jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId);
+ jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId);
+ jdbcTemplate.update("DELETE FROM users WHERE user_id = ?", SYS_USER);
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ private AgentInfo deadAgent(Instant lastHeartbeat) {
+ return new AgentInfo(AGENT_ID, "orders-service", APP_SLUG,
+ envId.toString(), "1.0", List.of(), Map.of(),
+ AgentState.DEAD, lastHeartbeat.minusSeconds(300), lastHeartbeat, null);
+ }
+
+ // -------------------------------------------------------------------------
+ // Tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ void noMatchingAgentProducesNoInstance() {
+ // Registry empty → evaluator returns Clear → no alert_instance
+ when(agentRegistryService.findAll()).thenReturn(List.of());
+
+ job.tick();
+
+ assertThat(instanceRepo.findOpenForRule(ruleId)).isEmpty();
+ }
+
+ @Test
+ void deadAgentProducesFiringInstance() {
+ // Agent has been DEAD for 2 minutes (> forSeconds=60) → FIRING
+ when(agentRegistryService.findAll())
+ .thenReturn(List.of(deadAgent(Instant.now().minusSeconds(120))));
+
+ job.tick();
+
+ assertThat(instanceRepo.findOpenForRule(ruleId)).hasValueSatisfying(i -> {
+ assertThat(i.state()).isEqualTo(AlertState.FIRING);
+ assertThat(i.ruleId()).isEqualTo(ruleId);
+ assertThat(i.environmentId()).isEqualTo(envId);
+ assertThat(i.severity()).isEqualTo(AlertSeverity.WARNING);
+ });
+ }
+
+ @Test
+ void claimDueResolveCycle() {
+ // Tick 1: dead agent → FIRING
+ when(agentRegistryService.findAll())
+ .thenReturn(List.of(deadAgent(Instant.now().minusSeconds(120))));
+ job.tick();
+ assertThat(instanceRepo.findOpenForRule(ruleId)).hasValueSatisfying(i ->
+ assertThat(i.state()).isEqualTo(AlertState.FIRING));
+
+ // Bump next_evaluation_at so rule is due again
+ jdbcTemplate.update(
+ "UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " +
+ "claimed_by = NULL, claimed_until = NULL WHERE id = ?", ruleId);
+
+ // Tick 2: empty registry → Clear → RESOLVED
+ when(agentRegistryService.findAll()).thenReturn(List.of());
+ job.tick();
+
+ assertThat(instanceRepo.findOpenForRule(ruleId)).isEmpty();
+ long resolvedCount = jdbcTemplate.queryForObject(
+ "SELECT count(*) FROM alert_instances WHERE rule_id = ? AND state = 'RESOLVED'",
+ Long.class, ruleId);
+ assertThat(resolvedCount).isEqualTo(1L);
+ }
+
+ @Test
+ void firingWithForDurationCreatesPendingThenPromotes() {
+ UUID ruleId2 = UUID.randomUUID();
+ var condition = new AgentStateCondition(new AlertScope(APP_SLUG, null, null), "DEAD", 60);
+ var ruleWithDuration = new AlertRule(
+ ruleId2, envId, "pending-rule", null,
+ AlertSeverity.WARNING, true, ConditionKind.AGENT_STATE, condition,
+ 60, 60, 60, // forDurationSeconds = 60
+ "title", "msg",
+ List.of(), List.of(),
+ Instant.now().minusSeconds(5),
+ null, null, Map.of(),
+ Instant.now(), SYS_USER, Instant.now(), SYS_USER);
+ ruleRepo.save(ruleWithDuration);
+
+ // Dead agent for both rules
+ when(agentRegistryService.findAll())
+ .thenReturn(List.of(deadAgent(Instant.now().minusSeconds(120))));
+ job.tick();
+
+ // ruleId2 has forDuration=60 → PENDING
+ assertThat(instanceRepo.findOpenForRule(ruleId2)).hasValueSatisfying(i ->
+ assertThat(i.state()).isEqualTo(AlertState.PENDING));
+
+ // Backdate firedAt so promotion window is met
+ jdbcTemplate.update(
+ "UPDATE alert_instances SET fired_at = now() - interval '90 seconds' WHERE rule_id = ?",
+ ruleId2);
+ jdbcTemplate.update(
+ "UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " +
+ "claimed_by = NULL, claimed_until = NULL WHERE id = ?", ruleId2);
+
+ job.tick();
+
+ assertThat(instanceRepo.findOpenForRule(ruleId2)).hasValueSatisfying(i ->
+ assertThat(i.state()).isEqualTo(AlertState.FIRING));
+
+ jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", ruleId2);
+ jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", ruleId2);
+ }
+}