alerting(eval): clamp first-run cursor to deployBacklogCap — flood guard
New property cameleer.server.alerting.perExchangeDeployBacklogCapSeconds (default 86400 = 24h, 0 disables). On first run (no persisted cursor or malformed), clamp cursorTs to max(rule.createdAt, now - cap) so a long-lived PER_EXCHANGE rule doesn't scan from its creation date forward on first post-deploy tick. Normal-advance path unaffected. Follows up final-review I-1 on the PER_EXCHANGE exactly-once phase. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -16,7 +16,8 @@ public record AlertingProperties(
|
||||
Integer eventRetentionDays,
|
||||
Integer notificationRetentionDays,
|
||||
Integer webhookTimeoutMs,
|
||||
Integer webhookMaxAttempts) {
|
||||
Integer webhookMaxAttempts,
|
||||
Integer perExchangeDeployBacklogCapSeconds) {
|
||||
|
||||
public int effectiveEvaluatorTickIntervalMs() {
|
||||
int raw = evaluatorTickIntervalMs == null ? 5000 : evaluatorTickIntervalMs;
|
||||
@@ -70,4 +71,9 @@ public record AlertingProperties(
|
||||
public int cbCooldownSeconds() {
|
||||
return circuitBreakerCooldownSeconds == null ? 60 : circuitBreakerCooldownSeconds;
|
||||
}
|
||||
|
||||
public int effectivePerExchangeDeployBacklogCapSeconds() {
|
||||
// Default 24 h. Zero or negative = disabled (no clamp — first-run uses rule.createdAt as today).
|
||||
return perExchangeDeployBacklogCapSeconds == null ? 86_400 : perExchangeDeployBacklogCapSeconds;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.cameleer.server.app.alerting.eval;
|
||||
|
||||
import com.cameleer.server.app.alerting.config.AlertingProperties;
|
||||
import com.cameleer.server.app.search.ClickHouseSearchIndex;
|
||||
import com.cameleer.server.core.alerting.AlertMatchSpec;
|
||||
import com.cameleer.server.core.alerting.AlertRule;
|
||||
@@ -24,10 +25,14 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator<ExchangeMatchC
|
||||
|
||||
private final ClickHouseSearchIndex searchIndex;
|
||||
private final EnvironmentRepository envRepo;
|
||||
private final AlertingProperties alertingProperties;
|
||||
|
||||
public ExchangeMatchEvaluator(ClickHouseSearchIndex searchIndex, EnvironmentRepository envRepo) {
|
||||
this.searchIndex = searchIndex;
|
||||
this.envRepo = envRepo;
|
||||
public ExchangeMatchEvaluator(ClickHouseSearchIndex searchIndex,
|
||||
EnvironmentRepository envRepo,
|
||||
AlertingProperties alertingProperties) {
|
||||
this.searchIndex = searchIndex;
|
||||
this.envRepo = envRepo;
|
||||
this.alertingProperties = alertingProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -93,16 +98,18 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator<ExchangeMatchC
|
||||
if (raw instanceof String s && !s.isBlank()) {
|
||||
int pipe = s.indexOf('|');
|
||||
if (pipe < 0) {
|
||||
// Malformed — treat as first-run
|
||||
cursorTs = rule.createdAt();
|
||||
// Malformed — treat as first-run (with deploy-backlog-cap clamp).
|
||||
cursorTs = firstRunCursorTs(rule, ctx);
|
||||
cursorId = "";
|
||||
} else {
|
||||
cursorTs = Instant.parse(s.substring(0, pipe));
|
||||
cursorId = s.substring(pipe + 1);
|
||||
}
|
||||
} else {
|
||||
// First run — bounded by rule.createdAt, empty executionId so any real id sorts after it
|
||||
cursorTs = rule.createdAt();
|
||||
// First run — bounded by rule.createdAt, empty executionId so any real id sorts after it.
|
||||
// Clamp to deploy-backlog-cap to avoid backlog flooding for long-lived rules on first
|
||||
// post-deploy tick. Normal-advance path (valid cursor above) is intentionally unaffected.
|
||||
cursorTs = firstRunCursorTs(rule, ctx);
|
||||
cursorId = "";
|
||||
}
|
||||
|
||||
@@ -156,4 +163,25 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator<ExchangeMatchC
|
||||
nextEvalState.put("lastExchangeCursor", nextCursorSerialized);
|
||||
return new EvalResult.Batch(firings, nextEvalState);
|
||||
}
|
||||
|
||||
/**
|
||||
* First-run cursor timestamp: {@code rule.createdAt()}, clamped to
|
||||
* {@code now - perExchangeDeployBacklogCapSeconds} so a long-lived PER_EXCHANGE rule
|
||||
* doesn't scan from its creation date forward on first post-deploy tick.
|
||||
* <p>
|
||||
* Cap ≤ 0 disables the clamp (first-run falls back to {@code rule.createdAt()} verbatim).
|
||||
* Applied only on first-run / malformed-cursor paths — the normal-advance path is
|
||||
* intentionally unaffected so legitimate missed ticks are not silently skipped.
|
||||
*/
|
||||
private Instant firstRunCursorTs(AlertRule rule, EvalContext ctx) {
|
||||
Instant cursorTs = rule.createdAt();
|
||||
int capSeconds = alertingProperties.effectivePerExchangeDeployBacklogCapSeconds();
|
||||
if (capSeconds > 0) {
|
||||
Instant capFloor = ctx.now().minusSeconds(capSeconds);
|
||||
if (cursorTs == null || cursorTs.isBefore(capFloor)) {
|
||||
cursorTs = capFloor;
|
||||
}
|
||||
}
|
||||
return cursorTs;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.cameleer.server.app.alerting.eval;
|
||||
|
||||
import com.cameleer.server.app.alerting.config.AlertingProperties;
|
||||
import com.cameleer.server.app.search.ClickHouseSearchIndex;
|
||||
import com.cameleer.server.core.alerting.*;
|
||||
import com.cameleer.server.core.runtime.Environment;
|
||||
@@ -36,7 +37,9 @@ class ExchangeMatchEvaluatorTest {
|
||||
void setUp() {
|
||||
searchIndex = mock(ClickHouseSearchIndex.class);
|
||||
envRepo = mock(EnvironmentRepository.class);
|
||||
eval = new ExchangeMatchEvaluator(searchIndex, envRepo);
|
||||
AlertingProperties props = new AlertingProperties(
|
||||
null, null, null, null, null, null, null, null, null, null, null, null, null, null);
|
||||
eval = new ExchangeMatchEvaluator(searchIndex, envRepo, props);
|
||||
|
||||
var env = new Environment(ENV_ID, "prod", "Production", false, true, null, null, null);
|
||||
when(envRepo.findById(ENV_ID)).thenReturn(Optional.of(env));
|
||||
@@ -264,6 +267,39 @@ class ExchangeMatchEvaluatorTest {
|
||||
assertThat(((EvalResult.Batch) r).firings()).hasSize(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void firstRun_clampsCursorToDeployBacklogCap_whenRuleCreatedLongAgo() {
|
||||
// Rule created 7 days ago, cap default is 24h; expect timeFrom to be now - 24h, not rule.createdAt.
|
||||
Instant now = Instant.parse("2026-04-22T12:00:00Z");
|
||||
Instant createdLongAgo = now.minus(Duration.ofDays(7));
|
||||
Instant expectedClampFloor = now.minusSeconds(86_400); // 24h
|
||||
|
||||
ArgumentCaptor<SearchRequest> cap = ArgumentCaptor.forClass(SearchRequest.class);
|
||||
when(searchIndex.search(cap.capture())).thenReturn(new SearchResult<>(List.of(), 0L, 0, 50));
|
||||
|
||||
ExchangeMatchCondition condition = perExchangeCondition();
|
||||
AlertRule rule = ruleWith(condition, Map.of(), createdLongAgo);
|
||||
eval.evaluate(condition, rule, new EvalContext("default", now, new TickCache()));
|
||||
|
||||
SearchRequest req = cap.getValue();
|
||||
assertThat(req.timeFrom()).isEqualTo(expectedClampFloor);
|
||||
}
|
||||
|
||||
@Test
|
||||
void firstRun_usesCreatedAt_whenWithinDeployBacklogCap() {
|
||||
Instant now = Instant.parse("2026-04-22T12:00:00Z");
|
||||
Instant createdRecent = now.minus(Duration.ofHours(1)); // 1h < 24h cap
|
||||
|
||||
ArgumentCaptor<SearchRequest> cap = ArgumentCaptor.forClass(SearchRequest.class);
|
||||
when(searchIndex.search(cap.capture())).thenReturn(new SearchResult<>(List.of(), 0L, 0, 50));
|
||||
|
||||
ExchangeMatchCondition condition = perExchangeCondition();
|
||||
AlertRule rule = ruleWith(condition, Map.of(), createdRecent);
|
||||
eval.evaluate(condition, rule, new EvalContext("default", now, new TickCache()));
|
||||
|
||||
assertThat(cap.getValue().timeFrom()).isEqualTo(createdRecent);
|
||||
}
|
||||
|
||||
@Test
|
||||
void kindIsExchangeMatch() {
|
||||
assertThat(eval.kind()).isEqualTo(ConditionKind.EXCHANGE_MATCH);
|
||||
|
||||
@@ -50,7 +50,7 @@ class WebhookDispatcherIT {
|
||||
new ApacheOutboundHttpClientFactory(props, new SslContextBuilder()),
|
||||
cipher,
|
||||
new MustacheRenderer(),
|
||||
new AlertingProperties(null, null, null, null, null, null, null, null, null, null, null, null, null),
|
||||
new AlertingProperties(null, null, null, null, null, null, null, null, null, null, null, null, null, null),
|
||||
new ObjectMapper()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -167,7 +167,7 @@ class AlertingRetentionJobIT extends AbstractPostgresIT {
|
||||
// effectiveEventRetentionDays = 90, effectiveNotificationRetentionDays = 30
|
||||
new com.cameleer.server.app.alerting.config.AlertingProperties(
|
||||
null, null, null, null, null, null, null, null, null,
|
||||
90, 30, null, null),
|
||||
90, 30, null, null, null),
|
||||
instanceRepo,
|
||||
notificationRepo,
|
||||
fixedClock);
|
||||
|
||||
Reference in New Issue
Block a user