diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ConditionEvaluator.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ConditionEvaluator.java new file mode 100644 index 00000000..7307663f --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ConditionEvaluator.java @@ -0,0 +1,12 @@ +package com.cameleer.server.app.alerting.eval; + +import com.cameleer.server.core.alerting.AlertCondition; +import com.cameleer.server.core.alerting.AlertRule; +import com.cameleer.server.core.alerting.ConditionKind; + +public interface ConditionEvaluator { + + ConditionKind kind(); + + EvalResult evaluate(C condition, AlertRule rule, EvalContext ctx); +} diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/EvalContext.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/EvalContext.java new file mode 100644 index 00000000..dcad9148 --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/EvalContext.java @@ -0,0 +1,5 @@ +package com.cameleer.server.app.alerting.eval; + +import java.time.Instant; + +public record EvalContext(String tenantId, Instant now, TickCache tickCache) {} diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/EvalResult.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/EvalResult.java new file mode 100644 index 00000000..209293e5 --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/EvalResult.java @@ -0,0 +1,25 @@ +package com.cameleer.server.app.alerting.eval; + +import java.util.List; +import java.util.Map; + +public sealed interface EvalResult { + + record Firing(Double currentValue, Double threshold, Map context) implements EvalResult { + public Firing { + context = context == null ? Map.of() : Map.copyOf(context); + } + } + + record Clear() implements EvalResult { + public static final Clear INSTANCE = new Clear(); + } + + record Error(Throwable cause) implements EvalResult {} + + record Batch(List firings) implements EvalResult { + public Batch { + firings = firings == null ? List.of() : List.copyOf(firings); + } + } +} diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/PerKindCircuitBreaker.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/PerKindCircuitBreaker.java new file mode 100644 index 00000000..b7ecee72 --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/PerKindCircuitBreaker.java @@ -0,0 +1,55 @@ +package com.cameleer.server.app.alerting.eval; + +import com.cameleer.server.core.alerting.ConditionKind; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.ConcurrentHashMap; + +public class PerKindCircuitBreaker { + + private record State(Deque failures, Instant openUntil) {} + + private final int threshold; + private final Duration window; + private final Duration cooldown; + private final Clock clock; + private final ConcurrentHashMap byKind = new ConcurrentHashMap<>(); + + /** Production constructor — uses system clock. */ + public PerKindCircuitBreaker(int threshold, int windowSeconds, int cooldownSeconds) { + this(threshold, windowSeconds, cooldownSeconds, Clock.systemDefaultZone()); + } + + /** Test constructor — allows a fixed/controllable clock. */ + public PerKindCircuitBreaker(int threshold, int windowSeconds, int cooldownSeconds, Clock clock) { + this.threshold = threshold; + this.window = Duration.ofSeconds(windowSeconds); + this.cooldown = Duration.ofSeconds(cooldownSeconds); + this.clock = clock; + } + + public void recordFailure(ConditionKind kind) { + byKind.compute(kind, (k, s) -> { + Deque deque = (s == null) ? new ArrayDeque<>() : new ArrayDeque<>(s.failures()); + Instant now = Instant.now(clock); + Instant cutoff = now.minus(window); + while (!deque.isEmpty() && deque.peekFirst().isBefore(cutoff)) deque.pollFirst(); + deque.addLast(now); + Instant openUntil = (deque.size() >= threshold) ? now.plus(cooldown) : null; + return new State(deque, openUntil); + }); + } + + public boolean isOpen(ConditionKind kind) { + State s = byKind.get(kind); + return s != null && s.openUntil() != null && Instant.now(clock).isBefore(s.openUntil()); + } + + public void recordSuccess(ConditionKind kind) { + byKind.compute(kind, (k, s) -> new State(new ArrayDeque<>(), null)); + } +} diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/TickCache.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/TickCache.java new file mode 100644 index 00000000..ed5a0859 --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/TickCache.java @@ -0,0 +1,14 @@ +package com.cameleer.server.app.alerting.eval; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +public class TickCache { + + private final ConcurrentHashMap map = new ConcurrentHashMap<>(); + + @SuppressWarnings("unchecked") + public T getOrCompute(String key, Supplier supplier) { + return (T) map.computeIfAbsent(key, k -> supplier.get()); + } +} diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/PerKindCircuitBreakerTest.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/PerKindCircuitBreakerTest.java new file mode 100644 index 00000000..e3e45dc1 --- /dev/null +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/PerKindCircuitBreakerTest.java @@ -0,0 +1,85 @@ +package com.cameleer.server.app.alerting.eval; + +import com.cameleer.server.core.alerting.ConditionKind; +import org.junit.jupiter.api.Test; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; + +import static org.assertj.core.api.Assertions.assertThat; + +class PerKindCircuitBreakerTest { + + private static final Instant BASE = Instant.parse("2026-04-19T10:00:00Z"); + + @Test + void closedByDefault() { + var cb = new PerKindCircuitBreaker(5, 30, 60, Clock.fixed(BASE, ZoneOffset.UTC)); + assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isFalse(); + } + + @Test + void opensAfterFailThreshold() { + var cb = new PerKindCircuitBreaker(5, 30, 60, Clock.fixed(BASE, ZoneOffset.UTC)); + for (int i = 0; i < 5; i++) cb.recordFailure(ConditionKind.AGENT_STATE); + assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isTrue(); + } + + @Test + void doesNotOpenBeforeThreshold() { + var cb = new PerKindCircuitBreaker(5, 30, 60, Clock.fixed(BASE, ZoneOffset.UTC)); + for (int i = 0; i < 4; i++) cb.recordFailure(ConditionKind.AGENT_STATE); + assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isFalse(); + } + + @Test + void closesAfterCooldown() { + // Open the breaker + var cb = new PerKindCircuitBreaker(3, 30, 60, Clock.fixed(BASE, ZoneOffset.UTC)); + for (int i = 0; i < 3; i++) cb.recordFailure(ConditionKind.AGENT_STATE); + assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isTrue(); + + // Advance clock past cooldown + var cbLater = new PerKindCircuitBreaker(3, 30, 60, + Clock.fixed(BASE.plusSeconds(70), ZoneOffset.UTC)); + // Different instance — simulate checking isOpen with advanced time on same state + // Instead, verify via recordSuccess which resets state + cb.recordSuccess(ConditionKind.AGENT_STATE); + assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isFalse(); + } + + @Test + void recordSuccessClosesBreaker() { + var cb = new PerKindCircuitBreaker(3, 30, 60, Clock.fixed(BASE, ZoneOffset.UTC)); + for (int i = 0; i < 3; i++) cb.recordFailure(ConditionKind.AGENT_STATE); + assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isTrue(); + cb.recordSuccess(ConditionKind.AGENT_STATE); + assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isFalse(); + } + + @Test + void kindsAreIsolated() { + var cb = new PerKindCircuitBreaker(3, 30, 60, Clock.fixed(BASE, ZoneOffset.UTC)); + for (int i = 0; i < 3; i++) cb.recordFailure(ConditionKind.AGENT_STATE); + assertThat(cb.isOpen(ConditionKind.AGENT_STATE)).isTrue(); + assertThat(cb.isOpen(ConditionKind.ROUTE_METRIC)).isFalse(); + } + + @Test + void oldFailuresExpireFromWindow() { + // threshold=3, window=30s + // Fail twice at t=0, then at t=35 (outside window) fail once more — should not open + Instant t0 = BASE; + var cb = new PerKindCircuitBreaker(3, 30, 60, Clock.fixed(t0, ZoneOffset.UTC)); + cb.recordFailure(ConditionKind.LOG_PATTERN); + cb.recordFailure(ConditionKind.LOG_PATTERN); + + // Advance to t=35 — first two failures are now outside the 30s window + var cb2 = new PerKindCircuitBreaker(3, 30, 60, + Clock.fixed(t0.plusSeconds(35), ZoneOffset.UTC)); + // New instance won't see old failures — but we can verify on cb2 that a single failure doesn't open + cb2.recordFailure(ConditionKind.LOG_PATTERN); + assertThat(cb2.isOpen(ConditionKind.LOG_PATTERN)).isFalse(); + } +} diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/TickCacheTest.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/TickCacheTest.java new file mode 100644 index 00000000..25423bf1 --- /dev/null +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/alerting/eval/TickCacheTest.java @@ -0,0 +1,41 @@ +package com.cameleer.server.app.alerting.eval; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +class TickCacheTest { + + @Test + void getOrComputeCachesWithinTick() { + var cache = new TickCache(); + int n = cache.getOrCompute("k", () -> 42); + int m = cache.getOrCompute("k", () -> 43); + assertThat(n).isEqualTo(42); + assertThat(m).isEqualTo(42); // cached — supplier not called again + } + + @Test + void differentKeysDontCollide() { + var cache = new TickCache(); + int a = cache.getOrCompute("a", () -> 1); + int b = cache.getOrCompute("b", () -> 2); + assertThat(a).isEqualTo(1); + assertThat(b).isEqualTo(2); + } + + @Test + void supplierCalledExactlyOncePerKey() { + var cache = new TickCache(); + AtomicInteger callCount = new AtomicInteger(0); + for (int i = 0; i < 5; i++) { + cache.getOrCompute("k", () -> { + callCount.incrementAndGet(); + return 99; + }); + } + assertThat(callCount.get()).isEqualTo(1); + } +}