diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluator.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluator.java new file mode 100644 index 00000000..f7451483 --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluator.java @@ -0,0 +1,149 @@ +package com.cameleer.server.app.alerting.eval; + +import com.cameleer.server.app.search.ClickHouseSearchIndex; +import com.cameleer.server.core.alerting.AlertMatchSpec; +import com.cameleer.server.core.alerting.AlertRule; +import com.cameleer.server.core.alerting.ConditionKind; +import com.cameleer.server.core.alerting.ExchangeMatchCondition; +import com.cameleer.server.core.alerting.FireMode; +import com.cameleer.server.core.runtime.EnvironmentRepository; +import com.cameleer.server.core.search.ExecutionSummary; +import com.cameleer.server.core.search.SearchRequest; +import com.cameleer.server.core.search.SearchResult; +import org.springframework.stereotype.Component; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +public class ExchangeMatchEvaluator implements ConditionEvaluator { + + private final ClickHouseSearchIndex searchIndex; + private final EnvironmentRepository envRepo; + + public ExchangeMatchEvaluator(ClickHouseSearchIndex searchIndex, EnvironmentRepository envRepo) { + this.searchIndex = searchIndex; + this.envRepo = envRepo; + } + + @Override + public ConditionKind kind() { return ConditionKind.EXCHANGE_MATCH; } + + @Override + public EvalResult evaluate(ExchangeMatchCondition c, AlertRule rule, EvalContext ctx) { + String envSlug = envRepo.findById(rule.environmentId()) + .map(e -> e.slug()) + .orElse(null); + + return switch (c.fireMode()) { + case COUNT_IN_WINDOW -> evaluateCount(c, rule, ctx, envSlug); + case PER_EXCHANGE -> evaluatePerExchange(c, rule, ctx, envSlug); + }; + } + + // ── COUNT_IN_WINDOW ─────────────────────────────────────────────────────── + + private EvalResult evaluateCount(ExchangeMatchCondition c, AlertRule rule, + EvalContext ctx, String envSlug) { + String appSlug = c.scope() != null ? c.scope().appSlug() : null; + String routeId = c.scope() != null ? c.scope().routeId() : null; + ExchangeMatchCondition.ExchangeFilter filter = c.filter(); + + var spec = new AlertMatchSpec( + ctx.tenantId(), + envSlug, + appSlug, + routeId, + filter != null ? filter.status() : null, + filter != null ? filter.attributes() : Map.of(), + ctx.now().minusSeconds(c.windowSeconds()), + ctx.now(), + null + ); + + long count = searchIndex.countExecutionsForAlerting(spec); + if (count <= c.threshold()) return EvalResult.Clear.INSTANCE; + + return new EvalResult.Firing( + (double) count, + c.threshold().doubleValue(), + Map.of( + "app", Map.of("slug", appSlug == null ? "" : appSlug), + "route", Map.of("id", routeId == null ? "" : routeId) + ) + ); + } + + // ── PER_EXCHANGE ────────────────────────────────────────────────────────── + + private EvalResult evaluatePerExchange(ExchangeMatchCondition c, AlertRule rule, + EvalContext ctx, String envSlug) { + String appSlug = c.scope() != null ? c.scope().appSlug() : null; + String routeId = c.scope() != null ? c.scope().routeId() : null; + ExchangeMatchCondition.ExchangeFilter filter = c.filter(); + + // Resolve cursor from evalState + Instant cursor = null; + Object raw = rule.evalState().get("lastExchangeTs"); + if (raw instanceof String s && !s.isBlank()) { + try { cursor = Instant.parse(s); } catch (Exception ignored) {} + } else if (raw instanceof Instant i) { + cursor = i; + } + + // Build SearchRequest — use cursor as timeFrom so we only see exchanges after last run + var req = new SearchRequest( + filter != null ? filter.status() : null, + cursor, // timeFrom = cursor (or null for first run) + ctx.now(), // timeTo + null, null, null, // durationMin/Max, correlationId + null, null, null, null, // text variants + routeId, + null, // instanceId + null, // processorType + appSlug, + null, // instanceIds + 0, + 50, + "startTime", + "asc", // asc so we process oldest first + envSlug + ); + + SearchResult result = searchIndex.search(req); + List matches = result.data(); + + if (matches.isEmpty()) return new EvalResult.Batch(List.of()); + + // Find the latest startTime across all matches — becomes the next cursor + Instant latestTs = matches.stream() + .map(ExecutionSummary::startTime) + .max(Instant::compareTo) + .orElse(ctx.now()); + + List firings = new ArrayList<>(); + for (int i = 0; i < matches.size(); i++) { + ExecutionSummary ex = matches.get(i); + Map ctx2 = new HashMap<>(); + ctx2.put("exchange", Map.of( + "id", ex.executionId(), + "routeId", ex.routeId() == null ? "" : ex.routeId(), + "status", ex.status() == null ? "" : ex.status(), + "startTime", ex.startTime() == null ? "" : ex.startTime().toString() + )); + ctx2.put("app", Map.of("slug", ex.applicationId() == null ? "" : ex.applicationId())); + + // Attach the next-cursor to the last firing so the job can extract it + if (i == matches.size() - 1) { + ctx2.put("_nextCursor", latestTs); + } + + firings.add(new EvalResult.Firing(1.0, null, ctx2)); + } + + return new EvalResult.Batch(firings); + } +} diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluatorTest.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluatorTest.java new file mode 100644 index 00000000..7d7e696c --- /dev/null +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluatorTest.java @@ -0,0 +1,204 @@ +package com.cameleer.server.app.alerting.eval; + +import com.cameleer.server.app.search.ClickHouseSearchIndex; +import com.cameleer.server.core.alerting.*; +import com.cameleer.server.core.runtime.Environment; +import com.cameleer.server.core.runtime.EnvironmentRepository; +import com.cameleer.server.core.search.ExecutionSummary; +import com.cameleer.server.core.search.SearchResult; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +class ExchangeMatchEvaluatorTest { + + private ClickHouseSearchIndex searchIndex; + private EnvironmentRepository envRepo; + private ExchangeMatchEvaluator eval; + + private static final UUID ENV_ID = UUID.fromString("bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"); + private static final UUID RULE_ID = UUID.fromString("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"); + private static final Instant NOW = Instant.parse("2026-04-19T10:00:00Z"); + + @BeforeEach + void setUp() { + searchIndex = mock(ClickHouseSearchIndex.class); + envRepo = mock(EnvironmentRepository.class); + eval = new ExchangeMatchEvaluator(searchIndex, envRepo); + + var env = new Environment(ENV_ID, "prod", "Production", false, true, null, null, null); + when(envRepo.findById(ENV_ID)).thenReturn(Optional.of(env)); + } + + private AlertRule ruleWith(AlertCondition condition) { + return ruleWith(condition, Map.of()); + } + + private AlertRule ruleWith(AlertCondition condition, Map evalState) { + return new AlertRule(RULE_ID, ENV_ID, "test", null, + AlertSeverity.WARNING, true, condition.kind(), condition, + 60, 0, 0, null, null, List.of(), List.of(), + null, null, null, evalState, null, null, null, null); + } + + private ExecutionSummary summary(String id, Instant startTime, String status) { + return new ExecutionSummary(id, "direct:test", "inst-1", "orders", + status, startTime, startTime.plusSeconds(1), 100L, + null, "", null, null, Map.of(), false, false); + } + + // ── COUNT_IN_WINDOW ─────────────────────────────────────────────────────── + + @Test + void countMode_firesWhenCountExceedsThreshold() { + var condition = new ExchangeMatchCondition( + new AlertScope("orders", null, null), + new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()), + FireMode.COUNT_IN_WINDOW, 5, 300, null); + + when(searchIndex.countExecutionsForAlerting(any())).thenReturn(7L); + + EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache())); + assertThat(r).isInstanceOf(EvalResult.Firing.class); + assertThat(((EvalResult.Firing) r).currentValue()).isEqualTo(7.0); + assertThat(((EvalResult.Firing) r).threshold()).isEqualTo(5.0); + } + + @Test + void countMode_clearWhenCountBelowThreshold() { + var condition = new ExchangeMatchCondition( + new AlertScope("orders", null, null), + new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()), + FireMode.COUNT_IN_WINDOW, 5, 300, null); + + when(searchIndex.countExecutionsForAlerting(any())).thenReturn(3L); + + EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache())); + assertThat(r).isEqualTo(EvalResult.Clear.INSTANCE); + } + + @Test + void countMode_passesCorrectSpecToIndex() { + var condition = new ExchangeMatchCondition( + new AlertScope("orders", "direct:pay", null), + new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of("orderId", "123")), + FireMode.COUNT_IN_WINDOW, 1, 120, null); + + when(searchIndex.countExecutionsForAlerting(any())).thenReturn(2L); + + eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache())); + + ArgumentCaptor captor = ArgumentCaptor.forClass(AlertMatchSpec.class); + verify(searchIndex).countExecutionsForAlerting(captor.capture()); + AlertMatchSpec spec = captor.getValue(); + + assertThat(spec.applicationId()).isEqualTo("orders"); + assertThat(spec.routeId()).isEqualTo("direct:pay"); + assertThat(spec.status()).isEqualTo("FAILED"); + assertThat(spec.attributes()).containsEntry("orderId", "123"); + assertThat(spec.environment()).isEqualTo("prod"); + assertThat(spec.from()).isEqualTo(NOW.minusSeconds(120)); + assertThat(spec.to()).isEqualTo(NOW); + assertThat(spec.after()).isNull(); + } + + // ── PER_EXCHANGE ────────────────────────────────────────────────────────── + + @Test + void perExchange_returnsEmptyBatchWhenNoMatches() { + var condition = new ExchangeMatchCondition( + new AlertScope("orders", null, null), + new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()), + FireMode.PER_EXCHANGE, null, null, 60); + + when(searchIndex.search(any())).thenReturn(SearchResult.empty(0, 50)); + + EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache())); + assertThat(r).isInstanceOf(EvalResult.Batch.class); + assertThat(((EvalResult.Batch) r).firings()).isEmpty(); + } + + @Test + void perExchange_returnsOneFiringPerMatch() { + var condition = new ExchangeMatchCondition( + new AlertScope("orders", null, null), + new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()), + FireMode.PER_EXCHANGE, null, null, 60); + + Instant t1 = NOW.minusSeconds(50); + Instant t2 = NOW.minusSeconds(30); + Instant t3 = NOW.minusSeconds(10); + + when(searchIndex.search(any())).thenReturn(new SearchResult<>( + List.of( + summary("ex-1", t1, "FAILED"), + summary("ex-2", t2, "FAILED"), + summary("ex-3", t3, "FAILED") + ), 3L, 0, 50)); + + EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache())); + assertThat(r).isInstanceOf(EvalResult.Batch.class); + var batch = (EvalResult.Batch) r; + assertThat(batch.firings()).hasSize(3); + } + + @Test + void perExchange_lastFiringCarriesNextCursor() { + var condition = new ExchangeMatchCondition( + new AlertScope("orders", null, null), + new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()), + FireMode.PER_EXCHANGE, null, null, 60); + + Instant t1 = NOW.minusSeconds(50); + Instant t2 = NOW.minusSeconds(10); // latest + + when(searchIndex.search(any())).thenReturn(new SearchResult<>( + List.of(summary("ex-1", t1, "FAILED"), summary("ex-2", t2, "FAILED")), + 2L, 0, 50)); + + EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache())); + var batch = (EvalResult.Batch) r; + + // last firing carries the _nextCursor key with the latest startTime + EvalResult.Firing last = batch.firings().get(batch.firings().size() - 1); + assertThat(last.context()).containsKey("_nextCursor"); + assertThat(last.context().get("_nextCursor")).isEqualTo(t2); + } + + @Test + void perExchange_usesLastExchangeTsFromEvalState() { + var condition = new ExchangeMatchCondition( + new AlertScope("orders", null, null), + new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()), + FireMode.PER_EXCHANGE, null, null, 60); + + Instant cursor = NOW.minusSeconds(120); + var rule = ruleWith(condition, Map.of("lastExchangeTs", cursor.toString())); + + when(searchIndex.search(any())).thenReturn(SearchResult.empty(0, 50)); + + eval.evaluate(condition, rule, new EvalContext("default", NOW, new TickCache())); + + // Verify the search request used the cursor as the lower-bound + ArgumentCaptor captor = + ArgumentCaptor.forClass(com.cameleer.server.core.search.SearchRequest.class); + verify(searchIndex).search(captor.capture()); + // timeFrom should be the cursor value + assertThat(captor.getValue().timeFrom()).isEqualTo(cursor); + } + + @Test + void kindIsExchangeMatch() { + assertThat(eval.kind()).isEqualTo(ConditionKind.EXCHANGE_MATCH); + } +}