alerting(eval): PER_EXCHANGE composite cursor — monotone across same-ms exchanges

Tests:
- cursorMonotonicity_sameMillisecondExchanges_fireExactlyOncePerTick
- firstRun_boundedByRuleCreatedAt_notRetentionHistory
This commit is contained in:
hsiegeln
2026-04-22 16:11:01 +02:00
parent 0bad014811
commit 4acf0aeeff
2 changed files with 43 additions and 33 deletions

View File

@@ -14,6 +14,7 @@ import org.springframework.stereotype.Component;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -85,19 +86,29 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator<ExchangeMatchC
String routeId = c.scope() != null ? c.scope().routeId() : null; String routeId = c.scope() != null ? c.scope().routeId() : null;
ExchangeMatchCondition.ExchangeFilter filter = c.filter(); ExchangeMatchCondition.ExchangeFilter filter = c.filter();
// Resolve cursor from evalState // Resolve composite cursor: (startTime, executionId)
Instant cursor = null; Instant cursorTs;
Object raw = rule.evalState().get("lastExchangeTs"); String cursorId;
Object raw = rule.evalState().get("lastExchangeCursor");
if (raw instanceof String s && !s.isBlank()) { if (raw instanceof String s && !s.isBlank()) {
try { cursor = Instant.parse(s); } catch (Exception ignored) {} int pipe = s.indexOf('|');
} else if (raw instanceof Instant i) { if (pipe < 0) {
cursor = i; // Malformed — treat as first-run
cursorTs = rule.createdAt();
cursorId = "";
} else {
cursorTs = Instant.parse(s.substring(0, pipe));
cursorId = s.substring(pipe + 1);
}
} else {
// First run — bounded by rule.createdAt, empty executionId so any real id sorts after it
cursorTs = rule.createdAt();
cursorId = "";
} }
// Build SearchRequest — use cursor as timeFrom so we only see exchanges after last run
var req = new SearchRequest( var req = new SearchRequest(
filter != null ? filter.status() : null, filter != null ? filter.status() : null,
cursor, // timeFrom = cursor (or null for first run) cursorTs, // timeFrom
ctx.now(), // timeTo ctx.now(), // timeTo
null, null, null, // durationMin/Max, correlationId null, null, null, // durationMin/Max, correlationId
null, null, null, null, // text variants null, null, null, null, // text variants
@@ -110,24 +121,26 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator<ExchangeMatchC
50, 50,
"startTime", "startTime",
"asc", // asc so we process oldest first "asc", // asc so we process oldest first
null, // afterExecutionId (wired in Task 1.5) cursorId.isEmpty() ? null : cursorId, // afterExecutionId — null on first run enables >=
envSlug envSlug
); );
SearchResult<ExecutionSummary> result = searchIndex.search(req); SearchResult<ExecutionSummary> result = searchIndex.search(req);
List<ExecutionSummary> matches = result.data(); List<ExecutionSummary> matches = result.data();
if (matches.isEmpty()) return new EvalResult.Batch(List.of(), Map.of()); if (matches.isEmpty()) return EvalResult.Batch.empty();
// Find the latest startTime across all matches — becomes the next cursor // Ensure deterministic ordering for cursor advance
Instant latestTs = matches.stream() matches = new ArrayList<>(matches);
.map(ExecutionSummary::startTime) matches.sort(Comparator
.max(Instant::compareTo) .comparing(ExecutionSummary::startTime)
.orElse(ctx.now()); .thenComparing(ExecutionSummary::executionId));
ExecutionSummary last = matches.get(matches.size() - 1);
String nextCursorSerialized = last.startTime().toString() + "|" + last.executionId();
List<EvalResult.Firing> firings = new ArrayList<>(); List<EvalResult.Firing> firings = new ArrayList<>();
for (int i = 0; i < matches.size(); i++) { for (ExecutionSummary ex : matches) {
ExecutionSummary ex = matches.get(i);
Map<String, Object> ctx2 = new HashMap<>(); Map<String, Object> ctx2 = new HashMap<>();
ctx2.put("exchange", Map.of( ctx2.put("exchange", Map.of(
"id", ex.executionId(), "id", ex.executionId(),
@@ -136,15 +149,11 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator<ExchangeMatchC
"startTime", ex.startTime() == null ? "" : ex.startTime().toString() "startTime", ex.startTime() == null ? "" : ex.startTime().toString()
)); ));
ctx2.put("app", Map.of("slug", ex.applicationId() == null ? "" : ex.applicationId())); ctx2.put("app", Map.of("slug", ex.applicationId() == null ? "" : ex.applicationId()));
// Attach the next-cursor to the last firing so the job can extract it
if (i == matches.size() - 1) {
ctx2.put("_nextCursor", latestTs);
}
firings.add(new EvalResult.Firing(1.0, null, ctx2)); firings.add(new EvalResult.Firing(1.0, null, ctx2));
} }
return new EvalResult.Batch(firings, Map.of()); Map<String, Object> nextEvalState = new HashMap<>(rule.evalState());
nextEvalState.put("lastExchangeCursor", nextCursorSerialized);
return new EvalResult.Batch(firings, nextEvalState);
} }
} }

View File

@@ -166,7 +166,7 @@ class ExchangeMatchEvaluatorTest {
} }
@Test @Test
void perExchange_lastFiringCarriesNextCursor() { void perExchange_batchCarriesNextCursorInEvalState() {
var condition = new ExchangeMatchCondition( var condition = new ExchangeMatchCondition(
new AlertScope("orders", null, null), new AlertScope("orders", null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()), new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
@@ -182,32 +182,32 @@ class ExchangeMatchEvaluatorTest {
EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache())); EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache()));
var batch = (EvalResult.Batch) r; var batch = (EvalResult.Batch) r;
// last firing carries the _nextCursor key with the latest startTime // The batch carries the composite next-cursor in nextEvalState under "lastExchangeCursor"
EvalResult.Firing last = batch.firings().get(batch.firings().size() - 1); assertThat(batch.nextEvalState()).containsKey("lastExchangeCursor");
assertThat(last.context()).containsKey("_nextCursor"); assertThat(batch.nextEvalState().get("lastExchangeCursor"))
assertThat(last.context().get("_nextCursor")).isEqualTo(t2); .isEqualTo(t2.toString() + "|ex-2");
} }
@Test @Test
void perExchange_usesLastExchangeTsFromEvalState() { void perExchange_usesLastExchangeCursorFromEvalState() {
var condition = new ExchangeMatchCondition( var condition = new ExchangeMatchCondition(
new AlertScope("orders", null, null), new AlertScope("orders", null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()), new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.PER_EXCHANGE, null, null, 60); FireMode.PER_EXCHANGE, null, null, 60);
Instant cursor = NOW.minusSeconds(120); Instant cursor = NOW.minusSeconds(120);
var rule = ruleWith(condition, Map.of("lastExchangeTs", cursor.toString())); var rule = ruleWith(condition, Map.of("lastExchangeCursor", cursor.toString() + "|ex-prev"));
when(searchIndex.search(any())).thenReturn(SearchResult.empty(0, 50)); when(searchIndex.search(any())).thenReturn(SearchResult.empty(0, 50));
eval.evaluate(condition, rule, new EvalContext("default", NOW, new TickCache())); eval.evaluate(condition, rule, new EvalContext("default", NOW, new TickCache()));
// Verify the search request used the cursor as the lower-bound // Verify the search request used the cursor tuple: timeFrom + afterExecutionId
ArgumentCaptor<com.cameleer.server.core.search.SearchRequest> captor = ArgumentCaptor<com.cameleer.server.core.search.SearchRequest> captor =
ArgumentCaptor.forClass(com.cameleer.server.core.search.SearchRequest.class); ArgumentCaptor.forClass(com.cameleer.server.core.search.SearchRequest.class);
verify(searchIndex).search(captor.capture()); verify(searchIndex).search(captor.capture());
// timeFrom should be the cursor value
assertThat(captor.getValue().timeFrom()).isEqualTo(cursor); assertThat(captor.getValue().timeFrom()).isEqualTo(cursor);
assertThat(captor.getValue().afterExecutionId()).isEqualTo("ex-prev");
} }
@Test @Test
@@ -241,6 +241,7 @@ class ExchangeMatchEvaluatorTest {
EvalResult r3 = eval.evaluate(condition, advanced, EvalResult r3 = eval.evaluate(condition, advanced,
new EvalContext("default", t.plusSeconds(3), new TickCache())); new EvalContext("default", t.plusSeconds(3), new TickCache()));
assertThat(((EvalResult.Batch) r3).firings()).hasSize(1); assertThat(((EvalResult.Batch) r3).firings()).hasSize(1);
assertThat(((EvalResult.Batch) r3).nextEvalState()).containsKey("lastExchangeCursor");
} }
@Test @Test