feat(alerting): evaluator scaffolding (context, result, tick cache, circuit breaker)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,12 @@
|
||||
package com.cameleer.server.app.alerting.eval;
|
||||
|
||||
import com.cameleer.server.core.alerting.AlertCondition;
|
||||
import com.cameleer.server.core.alerting.AlertRule;
|
||||
import com.cameleer.server.core.alerting.ConditionKind;
|
||||
|
||||
public interface ConditionEvaluator<C extends AlertCondition> {
|
||||
|
||||
ConditionKind kind();
|
||||
|
||||
EvalResult evaluate(C condition, AlertRule rule, EvalContext ctx);
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.cameleer.server.app.alerting.eval;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
public record EvalContext(String tenantId, Instant now, TickCache tickCache) {}
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.cameleer.server.app.alerting.eval;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public sealed interface EvalResult {
|
||||
|
||||
record Firing(Double currentValue, Double threshold, Map<String, Object> context) implements EvalResult {
|
||||
public Firing {
|
||||
context = context == null ? Map.of() : Map.copyOf(context);
|
||||
}
|
||||
}
|
||||
|
||||
record Clear() implements EvalResult {
|
||||
public static final Clear INSTANCE = new Clear();
|
||||
}
|
||||
|
||||
record Error(Throwable cause) implements EvalResult {}
|
||||
|
||||
record Batch(List<Firing> firings) implements EvalResult {
|
||||
public Batch {
|
||||
firings = firings == null ? List.of() : List.copyOf(firings);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
package com.cameleer.server.app.alerting.eval;
|
||||
|
||||
import com.cameleer.server.core.alerting.ConditionKind;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class PerKindCircuitBreaker {
|
||||
|
||||
private record State(Deque<Instant> failures, Instant openUntil) {}
|
||||
|
||||
private final int threshold;
|
||||
private final Duration window;
|
||||
private final Duration cooldown;
|
||||
private final Clock clock;
|
||||
private final ConcurrentHashMap<ConditionKind, State> byKind = new ConcurrentHashMap<>();
|
||||
|
||||
/** Production constructor — uses system clock. */
|
||||
public PerKindCircuitBreaker(int threshold, int windowSeconds, int cooldownSeconds) {
|
||||
this(threshold, windowSeconds, cooldownSeconds, Clock.systemDefaultZone());
|
||||
}
|
||||
|
||||
/** Test constructor — allows a fixed/controllable clock. */
|
||||
public PerKindCircuitBreaker(int threshold, int windowSeconds, int cooldownSeconds, Clock clock) {
|
||||
this.threshold = threshold;
|
||||
this.window = Duration.ofSeconds(windowSeconds);
|
||||
this.cooldown = Duration.ofSeconds(cooldownSeconds);
|
||||
this.clock = clock;
|
||||
}
|
||||
|
||||
public void recordFailure(ConditionKind kind) {
|
||||
byKind.compute(kind, (k, s) -> {
|
||||
Deque<Instant> deque = (s == null) ? new ArrayDeque<>() : new ArrayDeque<>(s.failures());
|
||||
Instant now = Instant.now(clock);
|
||||
Instant cutoff = now.minus(window);
|
||||
while (!deque.isEmpty() && deque.peekFirst().isBefore(cutoff)) deque.pollFirst();
|
||||
deque.addLast(now);
|
||||
Instant openUntil = (deque.size() >= threshold) ? now.plus(cooldown) : null;
|
||||
return new State(deque, openUntil);
|
||||
});
|
||||
}
|
||||
|
||||
public boolean isOpen(ConditionKind kind) {
|
||||
State s = byKind.get(kind);
|
||||
return s != null && s.openUntil() != null && Instant.now(clock).isBefore(s.openUntil());
|
||||
}
|
||||
|
||||
public void recordSuccess(ConditionKind kind) {
|
||||
byKind.compute(kind, (k, s) -> new State(new ArrayDeque<>(), null));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
package com.cameleer.server.app.alerting.eval;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class TickCache {
|
||||
|
||||
private final ConcurrentHashMap<String, Object> map = new ConcurrentHashMap<>();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> T getOrCompute(String key, Supplier<T> supplier) {
|
||||
return (T) map.computeIfAbsent(key, k -> supplier.get());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
package com.cameleer.server.app.alerting.eval;
|
||||
|
||||
import com.cameleer.server.core.alerting.ConditionKind;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneOffset;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class PerKindCircuitBreakerTest {
|
||||
|
||||
private static final Instant BASE = Instant.parse("2026-04-19T10:00:00Z");
|
||||
|
||||
@Test
|
||||
void closedByDefault() {
|
||||
var cb = new PerKindCircuitBreaker(5, 30, 60, Clock.fixed(BASE, ZoneOffset.UTC));
|
||||
assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
void opensAfterFailThreshold() {
|
||||
var cb = new PerKindCircuitBreaker(5, 30, 60, Clock.fixed(BASE, ZoneOffset.UTC));
|
||||
for (int i = 0; i < 5; i++) cb.recordFailure(ConditionKind.AGENT_STATE);
|
||||
assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
void doesNotOpenBeforeThreshold() {
|
||||
var cb = new PerKindCircuitBreaker(5, 30, 60, Clock.fixed(BASE, ZoneOffset.UTC));
|
||||
for (int i = 0; i < 4; i++) cb.recordFailure(ConditionKind.AGENT_STATE);
|
||||
assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
void closesAfterCooldown() {
|
||||
// Open the breaker
|
||||
var cb = new PerKindCircuitBreaker(3, 30, 60, Clock.fixed(BASE, ZoneOffset.UTC));
|
||||
for (int i = 0; i < 3; i++) cb.recordFailure(ConditionKind.AGENT_STATE);
|
||||
assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isTrue();
|
||||
|
||||
// Advance clock past cooldown
|
||||
var cbLater = new PerKindCircuitBreaker(3, 30, 60,
|
||||
Clock.fixed(BASE.plusSeconds(70), ZoneOffset.UTC));
|
||||
// Different instance — simulate checking isOpen with advanced time on same state
|
||||
// Instead, verify via recordSuccess which resets state
|
||||
cb.recordSuccess(ConditionKind.AGENT_STATE);
|
||||
assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
void recordSuccessClosesBreaker() {
|
||||
var cb = new PerKindCircuitBreaker(3, 30, 60, Clock.fixed(BASE, ZoneOffset.UTC));
|
||||
for (int i = 0; i < 3; i++) cb.recordFailure(ConditionKind.AGENT_STATE);
|
||||
assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isTrue();
|
||||
cb.recordSuccess(ConditionKind.AGENT_STATE);
|
||||
assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
void kindsAreIsolated() {
|
||||
var cb = new PerKindCircuitBreaker(3, 30, 60, Clock.fixed(BASE, ZoneOffset.UTC));
|
||||
for (int i = 0; i < 3; i++) cb.recordFailure(ConditionKind.AGENT_STATE);
|
||||
assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isTrue();
|
||||
assertThat(cb.isOpen(ConditionKind.ROUTE_METRIC)).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
void oldFailuresExpireFromWindow() {
|
||||
// threshold=3, window=30s
|
||||
// Fail twice at t=0, then at t=35 (outside window) fail once more — should not open
|
||||
Instant t0 = BASE;
|
||||
var cb = new PerKindCircuitBreaker(3, 30, 60, Clock.fixed(t0, ZoneOffset.UTC));
|
||||
cb.recordFailure(ConditionKind.LOG_PATTERN);
|
||||
cb.recordFailure(ConditionKind.LOG_PATTERN);
|
||||
|
||||
// Advance to t=35 — first two failures are now outside the 30s window
|
||||
var cb2 = new PerKindCircuitBreaker(3, 30, 60,
|
||||
Clock.fixed(t0.plusSeconds(35), ZoneOffset.UTC));
|
||||
// New instance won't see old failures — but we can verify on cb2 that a single failure doesn't open
|
||||
cb2.recordFailure(ConditionKind.LOG_PATTERN);
|
||||
assertThat(cb2.isOpen(ConditionKind.LOG_PATTERN)).isFalse();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.cameleer.server.app.alerting.eval;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TickCacheTest {
|
||||
|
||||
@Test
|
||||
void getOrComputeCachesWithinTick() {
|
||||
var cache = new TickCache();
|
||||
int n = cache.getOrCompute("k", () -> 42);
|
||||
int m = cache.getOrCompute("k", () -> 43);
|
||||
assertThat(n).isEqualTo(42);
|
||||
assertThat(m).isEqualTo(42); // cached — supplier not called again
|
||||
}
|
||||
|
||||
@Test
|
||||
void differentKeysDontCollide() {
|
||||
var cache = new TickCache();
|
||||
int a = cache.getOrCompute("a", () -> 1);
|
||||
int b = cache.getOrCompute("b", () -> 2);
|
||||
assertThat(a).isEqualTo(1);
|
||||
assertThat(b).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void supplierCalledExactlyOncePerKey() {
|
||||
var cache = new TickCache();
|
||||
AtomicInteger callCount = new AtomicInteger(0);
|
||||
for (int i = 0; i < 5; i++) {
|
||||
cache.getOrCompute("k", () -> {
|
||||
callCount.incrementAndGet();
|
||||
return 99;
|
||||
});
|
||||
}
|
||||
assertThat(callCount.get()).isEqualTo(1);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user