feat(alerting): AlertEvaluatorJob with claim-polling + circuit breaker

- AlertEvaluatorJob implements SchedulingConfigurer; fixed-delay tick from
  AlertingProperties.effectiveEvaluatorTickIntervalMs (5 s floor)
- Claim-polling via AlertRuleRepository.claimDueRules (FOR UPDATE SKIP LOCKED)
- Per-kind circuit breaker guards each evaluator; failures recorded, open kinds
  skipped and rescheduled without evaluation
- Single-Firing path delegates to AlertStateTransitions; new FIRING instances
  enqueue AlertNotification rows per rule.webhooks()
- Batch (PER_EXCHANGE) path creates one FIRING AlertInstance per Firing entry
- PENDING→FIRING promotion handled in applyResult via state machine
- Title/message rendered via MustacheRenderer + NotificationContextBuilder;
  environment resolved from EnvironmentRepository.findById per tick
- AlertEvaluatorJobIT (4 tests): uses named @MockBean replacements for
  ClickHouseSearchIndex + ClickHouseLogStore; @MockBean AgentRegistryService
  drives Clear/Firing/resolve cycle without timing sensitivity

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-19 19:58:27 +02:00
parent 657dc2d407
commit 15c0a8273c
2 changed files with 453 additions and 0 deletions

View File

@@ -0,0 +1,254 @@
package com.cameleer.server.app.alerting.eval;
import com.cameleer.server.app.alerting.config.AlertingProperties;
import com.cameleer.server.app.alerting.notify.MustacheRenderer;
import com.cameleer.server.app.alerting.notify.NotificationContextBuilder;
import com.cameleer.server.core.alerting.*;
import com.cameleer.server.core.runtime.Environment;
import com.cameleer.server.core.runtime.EnvironmentRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.stereotype.Component;
import java.time.Clock;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* Claim-polling evaluator job.
* <p>
* On each tick, claims a batch of due {@link AlertRule}s via {@code FOR UPDATE SKIP LOCKED},
* invokes the matching {@link ConditionEvaluator}, applies the {@link AlertStateTransitions}
* state machine, persists any new/updated {@link AlertInstance}, enqueues webhook
* {@link AlertNotification}s on first-fire, and releases the claim.
*/
@Component
public class AlertEvaluatorJob implements SchedulingConfigurer {
private static final Logger log = LoggerFactory.getLogger(AlertEvaluatorJob.class);
private final AlertingProperties props;
private final AlertRuleRepository ruleRepo;
private final AlertInstanceRepository instanceRepo;
private final AlertNotificationRepository notificationRepo;
private final Map<ConditionKind, ConditionEvaluator<?>> evaluators;
private final PerKindCircuitBreaker circuitBreaker;
private final MustacheRenderer renderer;
private final NotificationContextBuilder contextBuilder;
private final EnvironmentRepository environmentRepo;
private final ObjectMapper objectMapper;
private final String instanceId;
private final String tenantId;
private final Clock clock;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public AlertEvaluatorJob(
AlertingProperties props,
AlertRuleRepository ruleRepo,
AlertInstanceRepository instanceRepo,
AlertNotificationRepository notificationRepo,
List<ConditionEvaluator<?>> evaluatorList,
PerKindCircuitBreaker circuitBreaker,
MustacheRenderer renderer,
NotificationContextBuilder contextBuilder,
EnvironmentRepository environmentRepo,
ObjectMapper objectMapper,
@Qualifier("alertingInstanceId") String instanceId,
@Value("${cameleer.server.tenant.id:default}") String tenantId,
Clock alertingClock) {
this.props = props;
this.ruleRepo = ruleRepo;
this.instanceRepo = instanceRepo;
this.notificationRepo = notificationRepo;
this.evaluators = evaluatorList.stream()
.collect(Collectors.toMap(ConditionEvaluator::kind, e -> e));
this.circuitBreaker = circuitBreaker;
this.renderer = renderer;
this.contextBuilder = contextBuilder;
this.environmentRepo = environmentRepo;
this.objectMapper = objectMapper;
this.instanceId = instanceId;
this.tenantId = tenantId;
this.clock = alertingClock;
}
// -------------------------------------------------------------------------
// SchedulingConfigurer — register the tick as a fixed-delay task
// -------------------------------------------------------------------------
@Override
public void configureTasks(ScheduledTaskRegistrar registrar) {
registrar.addFixedDelayTask(this::tick, props.effectiveEvaluatorTickIntervalMs());
}
// -------------------------------------------------------------------------
// Tick — package-private so tests can call it directly
// -------------------------------------------------------------------------
void tick() {
List<AlertRule> claimed = ruleRepo.claimDueRules(
instanceId,
props.effectiveEvaluatorBatchSize(),
props.effectiveClaimTtlSeconds());
if (claimed.isEmpty()) return;
TickCache cache = new TickCache();
EvalContext ctx = new EvalContext(tenantId, Instant.now(clock), cache);
for (AlertRule rule : claimed) {
Instant nextRun = Instant.now(clock).plusSeconds(rule.evaluationIntervalSeconds());
try {
if (circuitBreaker.isOpen(rule.conditionKind())) {
log.debug("Circuit breaker open for {}; skipping rule {}", rule.conditionKind(), rule.id());
continue;
}
EvalResult result = evaluateSafely(rule, ctx);
applyResult(rule, result);
circuitBreaker.recordSuccess(rule.conditionKind());
} catch (Exception e) {
circuitBreaker.recordFailure(rule.conditionKind());
log.warn("Evaluator error for rule {} ({}): {}", rule.id(), rule.conditionKind(), e.toString());
} finally {
reschedule(rule, nextRun);
}
}
}
// -------------------------------------------------------------------------
// Evaluation
// -------------------------------------------------------------------------
@SuppressWarnings({"rawtypes", "unchecked"})
private EvalResult evaluateSafely(AlertRule rule, EvalContext ctx) {
ConditionEvaluator evaluator = evaluators.get(rule.conditionKind());
if (evaluator == null) {
throw new IllegalStateException("No evaluator registered for " + rule.conditionKind());
}
return evaluator.evaluate(rule.condition(), rule, ctx);
}
// -------------------------------------------------------------------------
// State machine application
// -------------------------------------------------------------------------
private void applyResult(AlertRule rule, EvalResult result) {
if (result instanceof EvalResult.Batch b) {
// PER_EXCHANGE mode: each Firing in the batch creates its own AlertInstance
for (EvalResult.Firing f : b.firings()) {
applyBatchFiring(rule, f);
}
return;
}
AlertInstance current = instanceRepo.findOpenForRule(rule.id()).orElse(null);
Instant now = Instant.now(clock);
AlertStateTransitions.apply(current, result, rule, now).ifPresent(next -> {
// Determine whether this is a newly created instance transitioning to FIRING
boolean isFirstFire = current == null && next.state() == AlertState.FIRING;
boolean promotedFromPending = current != null
&& current.state() == AlertState.PENDING
&& next.state() == AlertState.FIRING;
AlertInstance enriched = enrichTitleMessage(rule, next);
AlertInstance persisted = instanceRepo.save(enriched);
if (isFirstFire || promotedFromPending) {
enqueueNotifications(rule, persisted, now);
}
});
}
/**
* Batch (PER_EXCHANGE) mode: always create a fresh FIRING instance per Firing entry.
* No forDuration check — each exchange is its own event.
*/
private void applyBatchFiring(AlertRule rule, EvalResult.Firing f) {
Instant now = Instant.now(clock);
AlertInstance instance = AlertStateTransitions.newInstance(rule, f, AlertState.FIRING, now);
AlertInstance enriched = enrichTitleMessage(rule, instance);
AlertInstance persisted = instanceRepo.save(enriched);
enqueueNotifications(rule, persisted, now);
}
// -------------------------------------------------------------------------
// Title / message rendering
// -------------------------------------------------------------------------
private AlertInstance enrichTitleMessage(AlertRule rule, AlertInstance instance) {
Environment env = environmentRepo.findById(rule.environmentId()).orElse(null);
Map<String, Object> ctx = contextBuilder.build(rule, instance, env, null);
String title = renderer.render(rule.notificationTitleTmpl(), ctx);
String message = renderer.render(rule.notificationMessageTmpl(), ctx);
return instance.withTitleMessage(title, message);
}
// -------------------------------------------------------------------------
// Notification enqueue
// -------------------------------------------------------------------------
private void enqueueNotifications(AlertRule rule, AlertInstance instance, Instant now) {
for (WebhookBinding w : rule.webhooks()) {
Map<String, Object> payload = buildPayload(rule, instance);
notificationRepo.save(new AlertNotification(
UUID.randomUUID(),
instance.id(),
w.id(),
w.outboundConnectionId(),
NotificationStatus.PENDING,
0,
now,
null, null, null, null,
payload,
null,
now));
}
}
private Map<String, Object> buildPayload(AlertRule rule, AlertInstance instance) {
Environment env = environmentRepo.findById(rule.environmentId()).orElse(null);
return contextBuilder.build(rule, instance, env, null);
}
// -------------------------------------------------------------------------
// Claim release
// -------------------------------------------------------------------------
private void reschedule(AlertRule rule, Instant nextRun) {
ruleRepo.releaseClaim(rule.id(), nextRun, rule.evalState());
}
// -------------------------------------------------------------------------
// Rule snapshot helper (used by tests / future extensions)
// -------------------------------------------------------------------------
@SuppressWarnings("unchecked")
Map<String, Object> snapshotRule(AlertRule rule) {
try {
return objectMapper.convertValue(rule, Map.class);
} catch (Exception e) {
log.warn("Failed to snapshot rule {}: {}", rule.id(), e.getMessage());
return Map.of("id", rule.id().toString(), "name", rule.name());
}
}
// -------------------------------------------------------------------------
// Visible for testing
// -------------------------------------------------------------------------
/** Returns the evaluator map (for inspection in tests). */
Map<ConditionKind, ConditionEvaluator<?>> evaluators() {
return evaluators;
}
}