From 4acf0aeeff691101b34cafe1c726bc6b35f24703 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 22 Apr 2026 16:11:01 +0200 Subject: [PATCH] =?UTF-8?q?alerting(eval):=20PER=5FEXCHANGE=20composite=20?= =?UTF-8?q?cursor=20=E2=80=94=20monotone=20across=20same-ms=20exchanges?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tests: - cursorMonotonicity_sameMillisecondExchanges_fireExactlyOncePerTick - firstRun_boundedByRuleCreatedAt_notRetentionHistory --- .../alerting/eval/ExchangeMatchEvaluator.java | 57 +++++++++++-------- .../eval/ExchangeMatchEvaluatorTest.java | 19 ++++--- 2 files changed, 43 insertions(+), 33 deletions(-) diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluator.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluator.java index dff12099..e05da07e 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluator.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluator.java @@ -14,6 +14,7 @@ import org.springframework.stereotype.Component; import java.time.Instant; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -85,19 +86,29 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator= envSlug ); SearchResult result = searchIndex.search(req); List matches = result.data(); - if (matches.isEmpty()) return new EvalResult.Batch(List.of(), Map.of()); + if (matches.isEmpty()) return EvalResult.Batch.empty(); - // Find the latest startTime across all matches — becomes the next cursor - Instant latestTs = matches.stream() - .map(ExecutionSummary::startTime) - .max(Instant::compareTo) - .orElse(ctx.now()); + // Ensure deterministic ordering for cursor advance + matches = new ArrayList<>(matches); + matches.sort(Comparator + .comparing(ExecutionSummary::startTime) + .thenComparing(ExecutionSummary::executionId)); + + ExecutionSummary last = matches.get(matches.size() - 1); + String nextCursorSerialized = last.startTime().toString() + "|" + last.executionId(); List firings = new ArrayList<>(); - for (int i = 0; i < matches.size(); i++) { - ExecutionSummary ex = matches.get(i); + for (ExecutionSummary ex : matches) { Map ctx2 = new HashMap<>(); ctx2.put("exchange", Map.of( "id", ex.executionId(), @@ -136,15 +149,11 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator nextEvalState = new HashMap<>(rule.evalState()); + nextEvalState.put("lastExchangeCursor", nextCursorSerialized); + return new EvalResult.Batch(firings, nextEvalState); } } diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluatorTest.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluatorTest.java index 22308e77..6096bbe1 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluatorTest.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluatorTest.java @@ -166,7 +166,7 @@ class ExchangeMatchEvaluatorTest { } @Test - void perExchange_lastFiringCarriesNextCursor() { + void perExchange_batchCarriesNextCursorInEvalState() { var condition = new ExchangeMatchCondition( new AlertScope("orders", null, null), new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()), @@ -182,32 +182,32 @@ class ExchangeMatchEvaluatorTest { EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache())); var batch = (EvalResult.Batch) r; - // last firing carries the _nextCursor key with the latest startTime - EvalResult.Firing last = batch.firings().get(batch.firings().size() - 1); - assertThat(last.context()).containsKey("_nextCursor"); - assertThat(last.context().get("_nextCursor")).isEqualTo(t2); + // The batch carries the composite next-cursor in nextEvalState under "lastExchangeCursor" + assertThat(batch.nextEvalState()).containsKey("lastExchangeCursor"); + assertThat(batch.nextEvalState().get("lastExchangeCursor")) + .isEqualTo(t2.toString() + "|ex-2"); } @Test - void perExchange_usesLastExchangeTsFromEvalState() { + void perExchange_usesLastExchangeCursorFromEvalState() { var condition = new ExchangeMatchCondition( new AlertScope("orders", null, null), new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()), FireMode.PER_EXCHANGE, null, null, 60); Instant cursor = NOW.minusSeconds(120); - var rule = ruleWith(condition, Map.of("lastExchangeTs", cursor.toString())); + var rule = ruleWith(condition, Map.of("lastExchangeCursor", cursor.toString() + "|ex-prev")); when(searchIndex.search(any())).thenReturn(SearchResult.empty(0, 50)); eval.evaluate(condition, rule, new EvalContext("default", NOW, new TickCache())); - // Verify the search request used the cursor as the lower-bound + // Verify the search request used the cursor tuple: timeFrom + afterExecutionId ArgumentCaptor captor = ArgumentCaptor.forClass(com.cameleer.server.core.search.SearchRequest.class); verify(searchIndex).search(captor.capture()); - // timeFrom should be the cursor value assertThat(captor.getValue().timeFrom()).isEqualTo(cursor); + assertThat(captor.getValue().afterExecutionId()).isEqualTo("ex-prev"); } @Test @@ -241,6 +241,7 @@ class ExchangeMatchEvaluatorTest { EvalResult r3 = eval.evaluate(condition, advanced, new EvalContext("default", t.plusSeconds(3), new TickCache())); assertThat(((EvalResult.Batch) r3).firings()).hasSize(1); + assertThat(((EvalResult.Batch) r3).nextEvalState()).containsKey("lastExchangeCursor"); } @Test