feat(alerting): EXCHANGE_MATCH evaluator with per-exchange + count modes
PER_EXCHANGE returns EvalResult.Batch(List<Firing>); last Firing carries _nextCursor (Instant) in its context map for the job to persist as evalState.lastExchangeTs. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,149 @@
|
||||
package com.cameleer.server.app.alerting.eval;
|
||||
|
||||
import com.cameleer.server.app.search.ClickHouseSearchIndex;
|
||||
import com.cameleer.server.core.alerting.AlertMatchSpec;
|
||||
import com.cameleer.server.core.alerting.AlertRule;
|
||||
import com.cameleer.server.core.alerting.ConditionKind;
|
||||
import com.cameleer.server.core.alerting.ExchangeMatchCondition;
|
||||
import com.cameleer.server.core.alerting.FireMode;
|
||||
import com.cameleer.server.core.runtime.EnvironmentRepository;
|
||||
import com.cameleer.server.core.search.ExecutionSummary;
|
||||
import com.cameleer.server.core.search.SearchRequest;
|
||||
import com.cameleer.server.core.search.SearchResult;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Component
|
||||
public class ExchangeMatchEvaluator implements ConditionEvaluator<ExchangeMatchCondition> {
|
||||
|
||||
private final ClickHouseSearchIndex searchIndex;
|
||||
private final EnvironmentRepository envRepo;
|
||||
|
||||
public ExchangeMatchEvaluator(ClickHouseSearchIndex searchIndex, EnvironmentRepository envRepo) {
|
||||
this.searchIndex = searchIndex;
|
||||
this.envRepo = envRepo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConditionKind kind() { return ConditionKind.EXCHANGE_MATCH; }
|
||||
|
||||
@Override
|
||||
public EvalResult evaluate(ExchangeMatchCondition c, AlertRule rule, EvalContext ctx) {
|
||||
String envSlug = envRepo.findById(rule.environmentId())
|
||||
.map(e -> e.slug())
|
||||
.orElse(null);
|
||||
|
||||
return switch (c.fireMode()) {
|
||||
case COUNT_IN_WINDOW -> evaluateCount(c, rule, ctx, envSlug);
|
||||
case PER_EXCHANGE -> evaluatePerExchange(c, rule, ctx, envSlug);
|
||||
};
|
||||
}
|
||||
|
||||
// ── COUNT_IN_WINDOW ───────────────────────────────────────────────────────
|
||||
|
||||
private EvalResult evaluateCount(ExchangeMatchCondition c, AlertRule rule,
|
||||
EvalContext ctx, String envSlug) {
|
||||
String appSlug = c.scope() != null ? c.scope().appSlug() : null;
|
||||
String routeId = c.scope() != null ? c.scope().routeId() : null;
|
||||
ExchangeMatchCondition.ExchangeFilter filter = c.filter();
|
||||
|
||||
var spec = new AlertMatchSpec(
|
||||
ctx.tenantId(),
|
||||
envSlug,
|
||||
appSlug,
|
||||
routeId,
|
||||
filter != null ? filter.status() : null,
|
||||
filter != null ? filter.attributes() : Map.of(),
|
||||
ctx.now().minusSeconds(c.windowSeconds()),
|
||||
ctx.now(),
|
||||
null
|
||||
);
|
||||
|
||||
long count = searchIndex.countExecutionsForAlerting(spec);
|
||||
if (count <= c.threshold()) return EvalResult.Clear.INSTANCE;
|
||||
|
||||
return new EvalResult.Firing(
|
||||
(double) count,
|
||||
c.threshold().doubleValue(),
|
||||
Map.of(
|
||||
"app", Map.of("slug", appSlug == null ? "" : appSlug),
|
||||
"route", Map.of("id", routeId == null ? "" : routeId)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// ── PER_EXCHANGE ──────────────────────────────────────────────────────────
|
||||
|
||||
private EvalResult evaluatePerExchange(ExchangeMatchCondition c, AlertRule rule,
|
||||
EvalContext ctx, String envSlug) {
|
||||
String appSlug = c.scope() != null ? c.scope().appSlug() : null;
|
||||
String routeId = c.scope() != null ? c.scope().routeId() : null;
|
||||
ExchangeMatchCondition.ExchangeFilter filter = c.filter();
|
||||
|
||||
// Resolve cursor from evalState
|
||||
Instant cursor = null;
|
||||
Object raw = rule.evalState().get("lastExchangeTs");
|
||||
if (raw instanceof String s && !s.isBlank()) {
|
||||
try { cursor = Instant.parse(s); } catch (Exception ignored) {}
|
||||
} else if (raw instanceof Instant i) {
|
||||
cursor = i;
|
||||
}
|
||||
|
||||
// Build SearchRequest — use cursor as timeFrom so we only see exchanges after last run
|
||||
var req = new SearchRequest(
|
||||
filter != null ? filter.status() : null,
|
||||
cursor, // timeFrom = cursor (or null for first run)
|
||||
ctx.now(), // timeTo
|
||||
null, null, null, // durationMin/Max, correlationId
|
||||
null, null, null, null, // text variants
|
||||
routeId,
|
||||
null, // instanceId
|
||||
null, // processorType
|
||||
appSlug,
|
||||
null, // instanceIds
|
||||
0,
|
||||
50,
|
||||
"startTime",
|
||||
"asc", // asc so we process oldest first
|
||||
envSlug
|
||||
);
|
||||
|
||||
SearchResult<ExecutionSummary> result = searchIndex.search(req);
|
||||
List<ExecutionSummary> matches = result.data();
|
||||
|
||||
if (matches.isEmpty()) return new EvalResult.Batch(List.of());
|
||||
|
||||
// Find the latest startTime across all matches — becomes the next cursor
|
||||
Instant latestTs = matches.stream()
|
||||
.map(ExecutionSummary::startTime)
|
||||
.max(Instant::compareTo)
|
||||
.orElse(ctx.now());
|
||||
|
||||
List<EvalResult.Firing> firings = new ArrayList<>();
|
||||
for (int i = 0; i < matches.size(); i++) {
|
||||
ExecutionSummary ex = matches.get(i);
|
||||
Map<String, Object> ctx2 = new HashMap<>();
|
||||
ctx2.put("exchange", Map.of(
|
||||
"id", ex.executionId(),
|
||||
"routeId", ex.routeId() == null ? "" : ex.routeId(),
|
||||
"status", ex.status() == null ? "" : ex.status(),
|
||||
"startTime", ex.startTime() == null ? "" : ex.startTime().toString()
|
||||
));
|
||||
ctx2.put("app", Map.of("slug", ex.applicationId() == null ? "" : ex.applicationId()));
|
||||
|
||||
// Attach the next-cursor to the last firing so the job can extract it
|
||||
if (i == matches.size() - 1) {
|
||||
ctx2.put("_nextCursor", latestTs);
|
||||
}
|
||||
|
||||
firings.add(new EvalResult.Firing(1.0, null, ctx2));
|
||||
}
|
||||
|
||||
return new EvalResult.Batch(firings);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,204 @@
|
||||
package com.cameleer.server.app.alerting.eval;
|
||||
|
||||
import com.cameleer.server.app.search.ClickHouseSearchIndex;
|
||||
import com.cameleer.server.core.alerting.*;
|
||||
import com.cameleer.server.core.runtime.Environment;
|
||||
import com.cameleer.server.core.runtime.EnvironmentRepository;
|
||||
import com.cameleer.server.core.search.ExecutionSummary;
|
||||
import com.cameleer.server.core.search.SearchResult;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
class ExchangeMatchEvaluatorTest {
|
||||
|
||||
private ClickHouseSearchIndex searchIndex;
|
||||
private EnvironmentRepository envRepo;
|
||||
private ExchangeMatchEvaluator eval;
|
||||
|
||||
private static final UUID ENV_ID = UUID.fromString("bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb");
|
||||
private static final UUID RULE_ID = UUID.fromString("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa");
|
||||
private static final Instant NOW = Instant.parse("2026-04-19T10:00:00Z");
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
searchIndex = mock(ClickHouseSearchIndex.class);
|
||||
envRepo = mock(EnvironmentRepository.class);
|
||||
eval = new ExchangeMatchEvaluator(searchIndex, envRepo);
|
||||
|
||||
var env = new Environment(ENV_ID, "prod", "Production", false, true, null, null, null);
|
||||
when(envRepo.findById(ENV_ID)).thenReturn(Optional.of(env));
|
||||
}
|
||||
|
||||
private AlertRule ruleWith(AlertCondition condition) {
|
||||
return ruleWith(condition, Map.of());
|
||||
}
|
||||
|
||||
private AlertRule ruleWith(AlertCondition condition, Map<String, Object> evalState) {
|
||||
return new AlertRule(RULE_ID, ENV_ID, "test", null,
|
||||
AlertSeverity.WARNING, true, condition.kind(), condition,
|
||||
60, 0, 0, null, null, List.of(), List.of(),
|
||||
null, null, null, evalState, null, null, null, null);
|
||||
}
|
||||
|
||||
private ExecutionSummary summary(String id, Instant startTime, String status) {
|
||||
return new ExecutionSummary(id, "direct:test", "inst-1", "orders",
|
||||
status, startTime, startTime.plusSeconds(1), 100L,
|
||||
null, "", null, null, Map.of(), false, false);
|
||||
}
|
||||
|
||||
// ── COUNT_IN_WINDOW ───────────────────────────────────────────────────────
|
||||
|
||||
@Test
|
||||
void countMode_firesWhenCountExceedsThreshold() {
|
||||
var condition = new ExchangeMatchCondition(
|
||||
new AlertScope("orders", null, null),
|
||||
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
|
||||
FireMode.COUNT_IN_WINDOW, 5, 300, null);
|
||||
|
||||
when(searchIndex.countExecutionsForAlerting(any())).thenReturn(7L);
|
||||
|
||||
EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache()));
|
||||
assertThat(r).isInstanceOf(EvalResult.Firing.class);
|
||||
assertThat(((EvalResult.Firing) r).currentValue()).isEqualTo(7.0);
|
||||
assertThat(((EvalResult.Firing) r).threshold()).isEqualTo(5.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void countMode_clearWhenCountBelowThreshold() {
|
||||
var condition = new ExchangeMatchCondition(
|
||||
new AlertScope("orders", null, null),
|
||||
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
|
||||
FireMode.COUNT_IN_WINDOW, 5, 300, null);
|
||||
|
||||
when(searchIndex.countExecutionsForAlerting(any())).thenReturn(3L);
|
||||
|
||||
EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache()));
|
||||
assertThat(r).isEqualTo(EvalResult.Clear.INSTANCE);
|
||||
}
|
||||
|
||||
@Test
|
||||
void countMode_passesCorrectSpecToIndex() {
|
||||
var condition = new ExchangeMatchCondition(
|
||||
new AlertScope("orders", "direct:pay", null),
|
||||
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of("orderId", "123")),
|
||||
FireMode.COUNT_IN_WINDOW, 1, 120, null);
|
||||
|
||||
when(searchIndex.countExecutionsForAlerting(any())).thenReturn(2L);
|
||||
|
||||
eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache()));
|
||||
|
||||
ArgumentCaptor<AlertMatchSpec> captor = ArgumentCaptor.forClass(AlertMatchSpec.class);
|
||||
verify(searchIndex).countExecutionsForAlerting(captor.capture());
|
||||
AlertMatchSpec spec = captor.getValue();
|
||||
|
||||
assertThat(spec.applicationId()).isEqualTo("orders");
|
||||
assertThat(spec.routeId()).isEqualTo("direct:pay");
|
||||
assertThat(spec.status()).isEqualTo("FAILED");
|
||||
assertThat(spec.attributes()).containsEntry("orderId", "123");
|
||||
assertThat(spec.environment()).isEqualTo("prod");
|
||||
assertThat(spec.from()).isEqualTo(NOW.minusSeconds(120));
|
||||
assertThat(spec.to()).isEqualTo(NOW);
|
||||
assertThat(spec.after()).isNull();
|
||||
}
|
||||
|
||||
// ── PER_EXCHANGE ──────────────────────────────────────────────────────────
|
||||
|
||||
@Test
|
||||
void perExchange_returnsEmptyBatchWhenNoMatches() {
|
||||
var condition = new ExchangeMatchCondition(
|
||||
new AlertScope("orders", null, null),
|
||||
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
|
||||
FireMode.PER_EXCHANGE, null, null, 60);
|
||||
|
||||
when(searchIndex.search(any())).thenReturn(SearchResult.empty(0, 50));
|
||||
|
||||
EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache()));
|
||||
assertThat(r).isInstanceOf(EvalResult.Batch.class);
|
||||
assertThat(((EvalResult.Batch) r).firings()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void perExchange_returnsOneFiringPerMatch() {
|
||||
var condition = new ExchangeMatchCondition(
|
||||
new AlertScope("orders", null, null),
|
||||
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
|
||||
FireMode.PER_EXCHANGE, null, null, 60);
|
||||
|
||||
Instant t1 = NOW.minusSeconds(50);
|
||||
Instant t2 = NOW.minusSeconds(30);
|
||||
Instant t3 = NOW.minusSeconds(10);
|
||||
|
||||
when(searchIndex.search(any())).thenReturn(new SearchResult<>(
|
||||
List.of(
|
||||
summary("ex-1", t1, "FAILED"),
|
||||
summary("ex-2", t2, "FAILED"),
|
||||
summary("ex-3", t3, "FAILED")
|
||||
), 3L, 0, 50));
|
||||
|
||||
EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache()));
|
||||
assertThat(r).isInstanceOf(EvalResult.Batch.class);
|
||||
var batch = (EvalResult.Batch) r;
|
||||
assertThat(batch.firings()).hasSize(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
void perExchange_lastFiringCarriesNextCursor() {
|
||||
var condition = new ExchangeMatchCondition(
|
||||
new AlertScope("orders", null, null),
|
||||
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
|
||||
FireMode.PER_EXCHANGE, null, null, 60);
|
||||
|
||||
Instant t1 = NOW.minusSeconds(50);
|
||||
Instant t2 = NOW.minusSeconds(10); // latest
|
||||
|
||||
when(searchIndex.search(any())).thenReturn(new SearchResult<>(
|
||||
List.of(summary("ex-1", t1, "FAILED"), summary("ex-2", t2, "FAILED")),
|
||||
2L, 0, 50));
|
||||
|
||||
EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache()));
|
||||
var batch = (EvalResult.Batch) r;
|
||||
|
||||
// last firing carries the _nextCursor key with the latest startTime
|
||||
EvalResult.Firing last = batch.firings().get(batch.firings().size() - 1);
|
||||
assertThat(last.context()).containsKey("_nextCursor");
|
||||
assertThat(last.context().get("_nextCursor")).isEqualTo(t2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void perExchange_usesLastExchangeTsFromEvalState() {
|
||||
var condition = new ExchangeMatchCondition(
|
||||
new AlertScope("orders", null, null),
|
||||
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
|
||||
FireMode.PER_EXCHANGE, null, null, 60);
|
||||
|
||||
Instant cursor = NOW.minusSeconds(120);
|
||||
var rule = ruleWith(condition, Map.of("lastExchangeTs", cursor.toString()));
|
||||
|
||||
when(searchIndex.search(any())).thenReturn(SearchResult.empty(0, 50));
|
||||
|
||||
eval.evaluate(condition, rule, new EvalContext("default", NOW, new TickCache()));
|
||||
|
||||
// Verify the search request used the cursor as the lower-bound
|
||||
ArgumentCaptor<com.cameleer.server.core.search.SearchRequest> captor =
|
||||
ArgumentCaptor.forClass(com.cameleer.server.core.search.SearchRequest.class);
|
||||
verify(searchIndex).search(captor.capture());
|
||||
// timeFrom should be the cursor value
|
||||
assertThat(captor.getValue().timeFrom()).isEqualTo(cursor);
|
||||
}
|
||||
|
||||
@Test
|
||||
void kindIsExchangeMatch() {
|
||||
assertThat(eval.kind()).isEqualTo(ConditionKind.EXCHANGE_MATCH);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user