Compare commits

..

27 Commits

Author SHA1 Message Date
hsiegeln
eda74b7339 docs(alerting): PER_EXCHANGE exactly-once — fireMode reference + deploy-backlog-cap
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 2m7s
CI / docker (push) Successful in 1m22s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 41s
Fix stale `AGGREGATE` label (actual enum: `COUNT_IN_WINDOW`). Expand
EXCHANGE_MATCH section with both fire modes, PER_EXCHANGE config-surface
restrictions (0 for reNotifyMinutes/forDurationSeconds, at-least-one-sink
rule), exactly-once guarantee scope, and the first-run backlog-cap knob.

Surface the new config in application.yml with the 24h default and the
opt-out-to-0 semantics.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 18:39:49 +02:00
hsiegeln
e470fc0dab alerting(eval): clamp first-run cursor to deployBacklogCap — flood guard
New property cameleer.server.alerting.perExchangeDeployBacklogCapSeconds
(default 86400 = 24h, 0 disables). On first run (no persisted cursor
or malformed), clamp cursorTs to max(rule.createdAt, now - cap) so a
long-lived PER_EXCHANGE rule doesn't scan from its creation date
forward on first post-deploy tick. Normal-advance path unaffected.

Follows up final-review I-1 on the PER_EXCHANGE exactly-once phase.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 18:34:23 +02:00
hsiegeln
32c52aa22e docs(rules): update app-classes for BatchResultApplier
Task 6.2 housekeeping — add BatchResultApplier to the class map per
CLAUDE.md convention. Introduced in Task 2.2 as the @Transactional
wrapper for atomic per-rule batch commits (instance writes + notification
enqueues + cursor advance).

Also refreshes GitNexus index stats auto-emitted into AGENTS.md /
CLAUDE.md (8778 -> 8893 nodes, 22647 -> 23049 edges).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 18:13:57 +02:00
hsiegeln
cfc619505a alerting(it): AlertingFullLifecycleIT — exactly-once across ticks, ack isolation
End-to-end lifecycle test: 5 FAILED exchanges across 2 ticks produces
exactly 5 FIRING instances + 5 PENDING notifications. Tick 3 with no
new exchanges produces zero new instances or notifications. Ack on one
instance leaves the other four untouched.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 18:07:45 +02:00
hsiegeln
e0496fdba2 ui(alerts): ReviewStep — render-preview pane for existing rules
Wire up the existing POST /alerts/rules/{id}/render-preview endpoint
so rule authors can preview their Mustache-templated notification
before saving. Available in edit mode only (new rules require save
first — endpoint is id-bound). Matches the njams gap: their rules
builder ships no in-builder preview and operators compensate with
trial-and-error save/retry.

Implementation notes:
- ReviewStep gains an optional `ruleId` prop; when present, a
  "Preview notification" button calls `useRenderPreview` (the
  existing TanStack mutation in api/queries/alertRules.ts) and
  renders title + message in a titled, read-only pane styled like
  a notification card.
- Errors surface as a DS Alert (variant=error) beneath the button.
- `RuleEditorWizard` passes `ruleId={id}` through — mirrors the
  existing TriggerStep / NotifyStep wiring.
- No stateless (/render-preview without id) variant exists on the
  backend, so for new rules the button is simply omitted.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 17:58:17 +02:00
hsiegeln
f096365e05 ui(alerts): ReviewStep blocks save on empty webhooks+targets
Shows a warning banner and disables the Save button when a rule has
neither webhooks nor targets — would have been rejected at the server
edge (Task 3.3 validator), now caught earlier in the wizard with clear
reason.
2026-04-22 17:55:13 +02:00
hsiegeln
36cb93ecdd ui(alerts): ExchangeMatchForm — enforce PER_EXCHANGE UI constraints
Disable reNotifyMinutes at 0 with tooltip when PER_EXCHANGE is selected
(server rejects non-zero per Task 3.3 validator). Hide forDurationSeconds
entirely for PER_EXCHANGE (not applicable to per-exchange semantics).
Values stay zeroed via Task 4.3's applyFireModeChange helper on any
mode toggle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 17:51:58 +02:00
hsiegeln
9960fd8c36 ui(alerts): applyFireModeChange — clear mode-specific fields on toggle
Prevents stale COUNT_IN_WINDOW threshold/windowSeconds from surviving
PER_EXCHANGE save (would trip the Task 3.3 server-side validator).
Also forces reNotifyMinutes=0 and forDurationSeconds=0 when switching to
PER_EXCHANGE.

Turns green: form-state.test.ts#applyFireModeChange (3 tests).
2026-04-22 17:48:51 +02:00
hsiegeln
4d37dff9f8 ui(alerts): RED tests for form-state fireMode toggle clearing
Three failing tests pinning Task 4.3's mode-toggle state hygiene:
- clears threshold+windowSeconds on COUNT_IN_WINDOW -> PER_EXCHANGE
- returns to defaults (not stale values) on PER_EXCHANGE -> COUNT_IN_WINDOW
- forces reNotifyMinutes=0 and forDurationSeconds=0 on PER_EXCHANGE

Targets a to-be-introduced pure helper `applyFireModeChange(form, newMode)`
in form-state.ts. Task 4.3 will implement the helper and wire it into
ExchangeMatchForm so the Fire-mode <Select> calls it instead of the current
raw patch({ fireMode }) that leaves stale fields.
2026-04-22 17:46:11 +02:00
hsiegeln
7677df33e5 ui(api): regen types + drop perExchangeLingerSeconds from SPA
Follows backend removal of the field (Task 3.1). Typechecker confirms
zero remaining references. The ExchangeMatchForm linger-input is
visually removed in Task 4.4.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 17:40:43 +02:00
hsiegeln
0f6bafae8e alerting(api): cross-field validation for PER_EXCHANGE + empty-targets guard
PER_EXCHANGE rules: 400 if reNotifyMinutes != 0 or forDurationSeconds != 0.
Any rule: 400 if webhooks + targets are both empty (never notifies anyone).

Turns green: AlertRuleControllerIT#createPerExchangeRule_with*NonZero_returns400,
AlertRuleControllerIT#createAnyRule_withEmptyWebhooksAndTargets_returns400.
2026-04-22 17:31:11 +02:00
hsiegeln
377968eb53 alerting(it): RED tests for PER_EXCHANGE cross-field validation + empty targets
Three failing IT tests documenting the contract Task 3.3 will satisfy:
- createPerExchangeRule_withReNotifyMinutesNonZero_returns400
- createPerExchangeRule_withForDurationSecondsNonZero_returns400
- createAnyRule_withEmptyWebhooksAndTargets_returns400
2026-04-22 17:17:47 +02:00
hsiegeln
e483e52eee alerting(core): drop unused perExchangeLingerSeconds from ExchangeMatchCondition
Dead field — was enforced by compact ctor as required for PER_EXCHANGE,
but never read anywhere in the codebase. Removal tightens the API surface
and is precondition for the Task 3.3 cross-field validator.

Pre-prod; no shim / migration.
2026-04-22 17:10:53 +02:00
hsiegeln
ba4e2bb68f alerting(eval): atomic per-rule batch commit via @Transactional — Phase 2 close
Wraps instance writes, notification enqueues, and cursor advance in one
transactional boundary per rule tick. Rollback leaves the rule replayable
on next tick. Turns the Phase 2 atomicity IT green (see AlertEvaluatorJobIT
#tickRollback_faultOnSecondNotificationInsert_leavesCursorUnchanged).
2026-04-22 17:03:07 +02:00
hsiegeln
989dde23eb alerting(it): RED test pinning Phase 2 tick-atomicity contract
Fault-injection IT asserts that a crash mid-batch rolls back every
instance + notification write AND leaves the cursor unchanged. Fails
against current (Phase 1 only) code — turns green when Task 2.2
wraps batch processing in @Transactional.
2026-04-22 16:51:09 +02:00
hsiegeln
3c3d90c45b test(alerting): align AlertEvaluatorJobIT CH cleanup with house style
Replace async @AfterEach ALTER...DELETE with @BeforeEach TRUNCATE TABLE
executions — matches the convention used in ClickHouseExecutionStoreIT
and peers. Env-slug isolation was already preventing cross-test pollution;
this change is about hygiene and determinism (TRUNCATE is synchronous).
2026-04-22 16:45:28 +02:00
hsiegeln
5bd0e09df3 alerting(eval): persist advanced cursor via releaseClaim — Phase 1 close
Fixes the notification-bleed regression pinned by
AlertEvaluatorJobIT#tick2_noNewExchanges_enqueuesZeroAdditionalNotifications.
2026-04-22 16:36:01 +02:00
hsiegeln
b8d4b59f40 alerting(eval): AlertEvaluatorJob persists advanced cursor via withEvalState
Thread EvalResult.Batch.nextEvalState into releaseClaim so the composite
cursor from Task 1.5 actually lands in rule.evalState across tick boundaries.
Guards against empty-batch wipe (would regress to first-run scan).
2026-04-22 16:24:27 +02:00
hsiegeln
850c030642 search: compose ORDER BY with execution_id when afterExecutionId set
Follow-up to Task 1.2 flagged by Task 1.5 review (I-1). Single-column
ORDER BY could drop tail rows in a same-millisecond group >50 when
paginating via the composite cursor. Appending ', execution_id <dir>'
as secondary key only when afterExecutionId is set preserves existing
behaviour for UI/stats callers.
2026-04-22 16:21:52 +02:00
hsiegeln
4acf0aeeff alerting(eval): PER_EXCHANGE composite cursor — monotone across same-ms exchanges
Tests:
- cursorMonotonicity_sameMillisecondExchanges_fireExactlyOncePerTick
- firstRun_boundedByRuleCreatedAt_notRetentionHistory
2026-04-22 16:11:01 +02:00
hsiegeln
0bad014811 core(alerting): AlertRule.withEvalState wither for cursor threading 2026-04-22 16:04:55 +02:00
hsiegeln
c2252a0e72 alerting(eval): RED tests for PER_EXCHANGE cursor monotonicity + first-run bound
Two failing tests documenting the contract Task 1.5 will satisfy:
- cursorMonotonicity_sameMillisecondExchanges_fireExactlyOncePerTick
- firstRun_boundedByRuleCreatedAt_notRetentionHistory

Compile may fail until Task 1.4 adds AlertRule.withEvalState wither.
2026-04-22 15:58:16 +02:00
hsiegeln
b41f34c090 search: SearchRequest.afterExecutionId — composite (startTime, execId) predicate
Adds an optional afterExecutionId field to SearchRequest. When combined
with a non-null timeFrom, ClickHouseSearchIndex applies a strictly-after
tuple predicate (start_time > ts OR (start_time = ts AND execution_id > id))
so same-millisecond exchanges can be consumed exactly once across ticks.

When afterExecutionId is null, timeFrom keeps its existing >= semantics —
no behaviour change for any current caller.

Also adds the SearchRequest.withCursor(ts, id) wither. Threads the field
through existing withInstanceIds / withEnvironment witheres. All existing
positional call-sites (SearchController, ExchangeMatchEvaluator,
ClickHouseSearchIndexIT, ClickHouseChunkPipelineIT) pass null for the new
slot.

Task 1.2 of docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md.
The evaluator-side wiring that actually supplies the cursor is Task 1.5.
2026-04-22 15:49:05 +02:00
hsiegeln
6fa8e3aa30 alerting(eval): EvalResult.Batch carries nextEvalState for cursor threading 2026-04-22 15:42:20 +02:00
hsiegeln
031fe725b5 docs(plan): PER_EXCHANGE exactly-once — implementation plan (21 tasks, 6 phases)
Plan for executing the tightened spec. TDD per task: RED test first,
minimal GREEN impl, commit. Phases 1-2 land the cursor + atomic batch
commit; phase 3 validates config; phase 4 fixes the UI mode-toggle
leakage + empty-targets guard + render-preview pane; phases 5-6 close
with full-lifecycle IT and regression sweep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 15:39:31 +02:00
hsiegeln
2f9b9c9b0f docs(spec): PER_EXCHANGE — tighten motivation, fold in njams review
Correct the factual claim that the cursor advances — it is dead code:
_nextCursor is computed but never persisted by applyBatchFiring/reschedule,
so every tick re-enqueues notifications for every matching exchange in
retention. Clarify that instance-level dedup already works via the unique
index; notification-level dedup is what's broken. Reframe §2 as "make it
atomic before §1 goes live."

Add builder-UX lessons from the njams Server_4 rules editor: clear stale
fields on fireMode toggle (not just hide them); block save on empty
webhooks+targets; wire the already-existing /render-preview endpoint into
the Review step. Add Test 5 (red-first notification-bleed regression) and
Test 6 (form-state clear on mode toggle).

Park two follow-ups explicitly: sealed condition-type hierarchy (backend
lags the UI's condition-forms/* sharding) and a coalesceSeconds primitive
for Inbox-storm taming. Amend cursor-format-churn risk: benign in theory,
but first post-deploy tick against long-standing rules could scan from
rule.createdAt forward — suggests a deployBacklogCap clamp to bound the
one-time backlog flood.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 14:57:25 +02:00
hsiegeln
817b61058a docs(spec): PER_EXCHANGE exactly-once-per-exchange alerting
Four focused correctness fixes for the "fire exactly once per FAILED
exchange" use case (alerting layer only; HTTP-level idempotency is a
separate scope):

1. Composite cursor (startTime, executionId) replaces the current
   single-timestamp, inclusive cursor — prevents same-millisecond
   drops and same-exchange re-selection.
2. First-run cursor initialized to rule createdAt (not null) —
   prevents the current unbounded historical-retention scan on first
   tick of a new rule.
3. Transactional coupling of instance writes + notification enqueue +
   cursor advance — eliminates partial-progress failure modes on crash
   or rollback.
4. Config hygiene: reNotifyMinutes forced to 0, forDurationSeconds
   rejected, perExchangeLingerSeconds removed entirely (was validated
   as required but never read) — the rule shape stops admitting
   nonsensical PER_EXCHANGE combinations.

Alert stays FIRING until human ack/resolve (no auto-resolve); webhook
fires exactly once per AlertInstance; Inbox never sees duplicates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 14:17:18 +02:00
39 changed files with 2806 additions and 195 deletions

View File

@@ -171,6 +171,14 @@ Env-scoped read-path controllers (`AlertController`, `AlertRuleController`, `Ale
- `JarRetentionJob`@Scheduled 03:00 daily, per-environment retention, skips deployed versions
## alerting/eval/ — Rule evaluation
- `AlertEvaluatorJob`@Scheduled tick driver; per-rule claim/release via `AlertRuleRepository`, dispatches to per-kind `ConditionEvaluator`, persists advanced cursor on release via `AlertRule.withEvalState`.
- `BatchResultApplier``@Component` that wraps a single rule's tick outcome (`EvalResult.Batch` = `firings` + `nextEvalState`) in one `@Transactional` boundary: instance upserts + notification enqueues + cursor advance commit atomically or roll back together. This is the exactly-once-per-exchange guarantee for `PER_EXCHANGE` fire mode.
- `ConditionEvaluator` — interface; per-kind implementations: `ExchangeMatchEvaluator`, `AgentLifecycleEvaluator`, `AgentStateEvaluator`, `DeploymentStateEvaluator`, `JvmMetricEvaluator`, `LogPatternEvaluator`, `RouteMetricEvaluator`.
- `AlertStateTransitions` — PER_EXCHANGE vs rule-level FSM helpers (fire/resolve/ack).
- `PerKindCircuitBreaker` — trips noisy per-kind evaluators; `TickCache` — per-tick shared lookups (apps, envs, silences).
## http/ — Outbound HTTP client implementation
- `SslContextBuilder` — composes SSL context from `OutboundHttpProperties` + `OutboundHttpRequestContext`. Supports SYSTEM_DEFAULT (JDK roots + configured CA extras), TRUST_ALL (short-circuit no-op TrustManager), TRUST_PATHS (JDK roots + system extras + per-request extras). Throws `IllegalArgumentException("CA file not found: ...")` on missing PEM.

View File

@@ -1,7 +1,7 @@
<!-- gitnexus:start -->
# GitNexus — Code Intelligence
This project is indexed by GitNexus as **cameleer-server** (8778 symbols, 22647 relationships, 300 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
This project is indexed by GitNexus as **cameleer-server** (8893 symbols, 23049 relationships, 300 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
> If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first.

View File

@@ -85,7 +85,7 @@ When adding, removing, or renaming classes, controllers, endpoints, UI component
<!-- gitnexus:start -->
# GitNexus — Code Intelligence
This project is indexed by GitNexus as **cameleer-server** (8778 symbols, 22647 relationships, 300 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
This project is indexed by GitNexus as **cameleer-server** (8893 symbols, 23049 relationships, 300 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
> If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first.

View File

@@ -16,7 +16,8 @@ public record AlertingProperties(
Integer eventRetentionDays,
Integer notificationRetentionDays,
Integer webhookTimeoutMs,
Integer webhookMaxAttempts) {
Integer webhookMaxAttempts,
Integer perExchangeDeployBacklogCapSeconds) {
public int effectiveEvaluatorTickIntervalMs() {
int raw = evaluatorTickIntervalMs == null ? 5000 : evaluatorTickIntervalMs;
@@ -70,4 +71,9 @@ public record AlertingProperties(
public int cbCooldownSeconds() {
return circuitBreakerCooldownSeconds == null ? 60 : circuitBreakerCooldownSeconds;
}
public int effectivePerExchangeDeployBacklogCapSeconds() {
// Default 24 h. Zero or negative = disabled (no clamp — first-run uses rule.createdAt as today).
return perExchangeDeployBacklogCapSeconds == null ? 86_400 : perExchangeDeployBacklogCapSeconds;
}
}

View File

@@ -22,6 +22,7 @@ import com.cameleer.server.core.alerting.AlertRuleRepository;
import com.cameleer.server.core.alerting.AlertRuleTarget;
import com.cameleer.server.core.alerting.ConditionKind;
import com.cameleer.server.core.alerting.ExchangeMatchCondition;
import com.cameleer.server.core.alerting.FireMode;
import com.cameleer.server.core.alerting.WebhookBinding;
import com.cameleer.server.core.outbound.OutboundConnection;
import com.cameleer.server.core.outbound.OutboundConnectionService;
@@ -126,6 +127,7 @@ public class AlertRuleController {
HttpServletRequest httpRequest) {
validateAttributeKeys(req.condition());
validateBusinessRules(req);
validateWebhooks(req.webhooks(), env.id());
AlertRule draft = buildRule(null, env.id(), req, currentUserId());
@@ -147,6 +149,7 @@ public class AlertRuleController {
AlertRule existing = requireRule(id, env.id());
validateAttributeKeys(req.condition());
validateBusinessRules(req);
validateWebhooks(req.webhooks(), env.id());
AlertRule updated = buildRule(existing, env.id(), req, currentUserId());
@@ -258,6 +261,36 @@ public class AlertRuleController {
// Helpers
// -------------------------------------------------------------------------
/**
* Cross-field business-rule validation for {@link AlertRuleRequest}.
*
* <p>PER_EXCHANGE rules: re-notify and for-duration are nonsensical (each fire is its own
* exchange — there's no "still firing" window and nothing to re-notify about). Reject 400
* if either is non-zero.
*
* <p>All rules: reject 400 if both webhooks and targets are empty — such a rule can never
* notify anyone and is a pure footgun.
*/
private void validateBusinessRules(AlertRuleRequest req) {
if (req.condition() instanceof ExchangeMatchCondition ex
&& ex.fireMode() == FireMode.PER_EXCHANGE) {
if (req.reNotifyMinutes() != null && req.reNotifyMinutes() != 0) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
"reNotifyMinutes must be 0 for PER_EXCHANGE rules (re-notify does not apply)");
}
if (req.forDurationSeconds() != null && req.forDurationSeconds() != 0) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
"forDurationSeconds must be 0 for PER_EXCHANGE rules");
}
}
boolean noWebhooks = req.webhooks() == null || req.webhooks().isEmpty();
boolean noTargets = req.targets() == null || req.targets().isEmpty();
if (noWebhooks && noTargets) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
"rule must have at least one webhook or target — otherwise it never notifies anyone");
}
}
/**
* Validates that all attribute keys in an {@link ExchangeMatchCondition} match
* {@code ^[a-zA-Z0-9._-]+$}. Keys are inlined into ClickHouse SQL, making this

View File

@@ -64,13 +64,13 @@ public class AgentLifecycleEvaluator implements ConditionEvaluator<AgentLifecycl
List<AgentEventRecord> matches = eventRepo.findInWindow(
envSlug, appSlug, agentId, typeNames, from, to, MAX_EVENTS_PER_TICK);
if (matches.isEmpty()) return new EvalResult.Batch(List.of());
if (matches.isEmpty()) return new EvalResult.Batch(List.of(), Map.of());
List<EvalResult.Firing> firings = new ArrayList<>(matches.size());
for (AgentEventRecord ev : matches) {
firings.add(toFiring(ev));
}
return new EvalResult.Batch(firings);
return new EvalResult.Batch(firings, Map.of());
}
private static EvalResult.Firing toFiring(AgentEventRecord ev) {

View File

@@ -47,6 +47,7 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
private final NotificationContextBuilder contextBuilder;
private final EnvironmentRepository environmentRepo;
private final ObjectMapper objectMapper;
private final BatchResultApplier batchResultApplier;
private final String instanceId;
private final String tenantId;
private final Clock clock;
@@ -64,26 +65,28 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
NotificationContextBuilder contextBuilder,
EnvironmentRepository environmentRepo,
ObjectMapper objectMapper,
BatchResultApplier batchResultApplier,
@Qualifier("alertingInstanceId") String instanceId,
@Value("${cameleer.server.tenant.id:default}") String tenantId,
Clock alertingClock,
AlertingMetrics metrics) {
this.props = props;
this.ruleRepo = ruleRepo;
this.instanceRepo = instanceRepo;
this.notificationRepo = notificationRepo;
this.evaluators = evaluatorList.stream()
this.props = props;
this.ruleRepo = ruleRepo;
this.instanceRepo = instanceRepo;
this.notificationRepo = notificationRepo;
this.evaluators = evaluatorList.stream()
.collect(Collectors.toMap(ConditionEvaluator::kind, e -> e));
this.circuitBreaker = circuitBreaker;
this.renderer = renderer;
this.contextBuilder = contextBuilder;
this.environmentRepo = environmentRepo;
this.objectMapper = objectMapper;
this.instanceId = instanceId;
this.tenantId = tenantId;
this.clock = alertingClock;
this.metrics = metrics;
this.circuitBreaker = circuitBreaker;
this.renderer = renderer;
this.contextBuilder = contextBuilder;
this.environmentRepo = environmentRepo;
this.objectMapper = objectMapper;
this.batchResultApplier = batchResultApplier;
this.instanceId = instanceId;
this.tenantId = tenantId;
this.clock = alertingClock;
this.metrics = metrics;
}
// -------------------------------------------------------------------------
@@ -112,21 +115,61 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
for (AlertRule rule : claimed) {
Instant nextRun = Instant.now(clock).plusSeconds(rule.evaluationIntervalSeconds());
if (circuitBreaker.isOpen(rule.conditionKind())) {
log.debug("Circuit breaker open for {}; skipping rule {}", rule.conditionKind(), rule.id());
reschedule(rule, nextRun);
continue;
}
EvalResult result;
try {
if (circuitBreaker.isOpen(rule.conditionKind())) {
log.debug("Circuit breaker open for {}; skipping rule {}", rule.conditionKind(), rule.id());
continue;
}
EvalResult result = metrics.evalDuration(rule.conditionKind())
result = metrics.evalDuration(rule.conditionKind())
.recordCallable(() -> evaluateSafely(rule, ctx));
applyResult(rule, result);
circuitBreaker.recordSuccess(rule.conditionKind());
} catch (Exception e) {
metrics.evalError(rule.conditionKind(), rule.id());
circuitBreaker.recordFailure(rule.conditionKind());
log.warn("Evaluator error for rule {} ({}): {}", rule.id(), rule.conditionKind(), e.toString());
} finally {
// Evaluation itself failed — release the claim so the rule can be
// retried on the next tick. Cursor stays put.
reschedule(rule, nextRun);
continue;
}
if (result instanceof EvalResult.Batch b) {
// Phase 2: the Batch path is atomic. The @Transactional apply() on
// BatchResultApplier wraps instance writes, notification enqueues,
// AND the cursor advance + releaseClaim into a single tx. A
// mid-batch fault rolls everything back — including the cursor —
// so the next tick replays the whole batch exactly once.
try {
batchResultApplier.apply(rule, b, nextRun);
circuitBreaker.recordSuccess(rule.conditionKind());
} catch (Exception e) {
metrics.evalError(rule.conditionKind(), rule.id());
circuitBreaker.recordFailure(rule.conditionKind());
log.warn("Batch apply failed for rule {} ({}): {} — rolling back; next tick will retry",
rule.id(), rule.conditionKind(), e.toString());
// The transaction rolled back. Do NOT call reschedule here —
// leaving claim + next_evaluation_at as they were means the
// claim TTL takes over and the rule becomes due on its own.
// Rethrowing is unnecessary for correctness — the cursor
// stayed put, so exactly-once-per-exchange is preserved.
}
} else {
// Non-Batch path (FIRING / Clear / Error): classic apply + rule
// reschedule. Not wrapped in a single tx — semantics unchanged
// from pre-Phase-2.
try {
applyResult(rule, result);
circuitBreaker.recordSuccess(rule.conditionKind());
} catch (Exception e) {
metrics.evalError(rule.conditionKind(), rule.id());
circuitBreaker.recordFailure(rule.conditionKind());
log.warn("applyResult failed for rule {} ({}): {}",
rule.id(), rule.conditionKind(), e.toString());
} finally {
reschedule(rule, nextRun);
}
}
}
@@ -171,14 +214,10 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
// -------------------------------------------------------------------------
private void applyResult(AlertRule rule, EvalResult result) {
if (result instanceof EvalResult.Batch b) {
// PER_EXCHANGE mode: each Firing in the batch creates its own AlertInstance
for (EvalResult.Firing f : b.firings()) {
applyBatchFiring(rule, f);
}
return;
}
// Note: the Batch path is handled by BatchResultApplier (transactional) —
// tick() routes Batch results there directly and never calls applyResult
// for them. This method only handles FIRING / Clear / Error state-machine
// transitions for the classic (non-PER_EXCHANGE) path.
AlertInstance current = instanceRepo.findOpenForRule(rule.id()).orElse(null);
Instant now = Instant.now(clock);
@@ -199,19 +238,6 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
});
}
/**
* Batch (PER_EXCHANGE) mode: always create a fresh FIRING instance per Firing entry.
* No forDuration check — each exchange is its own event.
*/
private void applyBatchFiring(AlertRule rule, EvalResult.Firing f) {
Instant now = Instant.now(clock);
AlertInstance instance = AlertStateTransitions.newInstance(rule, f, AlertState.FIRING, now)
.withRuleSnapshot(snapshotRule(rule));
AlertInstance enriched = enrichTitleMessage(rule, instance);
AlertInstance persisted = instanceRepo.save(enriched);
enqueueNotifications(rule, persisted, now);
}
// -------------------------------------------------------------------------
// Title / message rendering
// -------------------------------------------------------------------------

View File

@@ -0,0 +1,144 @@
package com.cameleer.server.app.alerting.eval;
import com.cameleer.server.app.alerting.notify.MustacheRenderer;
import com.cameleer.server.app.alerting.notify.NotificationContextBuilder;
import com.cameleer.server.core.alerting.*;
import com.cameleer.server.core.runtime.Environment;
import com.cameleer.server.core.runtime.EnvironmentRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.Clock;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
/**
* Applies a {@link EvalResult.Batch} result to persistent state inside a single
* transaction: instance writes, notification enqueues, and the rule's cursor
* advance + {@code releaseClaim} either all commit or all roll back together.
* <p>
* Lives in its own bean so the {@code @Transactional} annotation engages via the
* Spring proxy when invoked from {@link AlertEvaluatorJob#tick()}; calling it as
* {@code this.apply(...)} from {@code AlertEvaluatorJob} (a bean calling its own
* method) would bypass the proxy and silently disable the transaction.
* <p>
* Phase 2 of the per-exchange exactly-once plan (see
* {@code docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md}).
*/
@Component
public class BatchResultApplier {
private static final Logger log = LoggerFactory.getLogger(BatchResultApplier.class);
private final AlertRuleRepository ruleRepo;
private final AlertInstanceRepository instanceRepo;
private final AlertNotificationRepository notificationRepo;
private final MustacheRenderer renderer;
private final NotificationContextBuilder contextBuilder;
private final EnvironmentRepository environmentRepo;
private final ObjectMapper objectMapper;
private final Clock clock;
public BatchResultApplier(
AlertRuleRepository ruleRepo,
AlertInstanceRepository instanceRepo,
AlertNotificationRepository notificationRepo,
MustacheRenderer renderer,
NotificationContextBuilder contextBuilder,
EnvironmentRepository environmentRepo,
ObjectMapper objectMapper,
Clock alertingClock) {
this.ruleRepo = ruleRepo;
this.instanceRepo = instanceRepo;
this.notificationRepo = notificationRepo;
this.renderer = renderer;
this.contextBuilder = contextBuilder;
this.environmentRepo = environmentRepo;
this.objectMapper = objectMapper;
this.clock = alertingClock;
}
/**
* Atomically apply a Batch result for a single rule:
* <ol>
* <li>persist a FIRING instance per firing + enqueue its notifications</li>
* <li>advance the rule's cursor ({@code evalState}) iff the batch supplied one</li>
* <li>release the claim with the new {@code nextRun} + {@code evalState}</li>
* </ol>
* Any exception thrown from the repo calls rolls back every write — including
* the cursor advance — so the rule is replayable on the next tick.
*/
@Transactional
public void apply(AlertRule rule, EvalResult.Batch batch, Instant nextRun) {
for (EvalResult.Firing f : batch.firings()) {
applyBatchFiring(rule, f);
}
Map<String, Object> nextEvalState =
batch.nextEvalState().isEmpty() ? rule.evalState() : batch.nextEvalState();
ruleRepo.releaseClaim(rule.id(), nextRun, nextEvalState);
}
/**
* Batch (PER_EXCHANGE) mode: always create a fresh FIRING instance per Firing entry.
* No forDuration check — each exchange is its own event.
*/
private void applyBatchFiring(AlertRule rule, EvalResult.Firing f) {
Instant now = Instant.now(clock);
AlertInstance instance = AlertStateTransitions.newInstance(rule, f, AlertState.FIRING, now)
.withRuleSnapshot(snapshotRule(rule));
AlertInstance enriched = enrichTitleMessage(rule, instance);
AlertInstance persisted = instanceRepo.save(enriched);
enqueueNotifications(rule, persisted, now);
}
private AlertInstance enrichTitleMessage(AlertRule rule, AlertInstance instance) {
Environment env = environmentRepo.findById(rule.environmentId()).orElse(null);
Map<String, Object> ctx = contextBuilder.build(rule, instance, env, null);
String title = renderer.render(rule.notificationTitleTmpl(), ctx);
String message = renderer.render(rule.notificationMessageTmpl(), ctx);
return instance.withTitleMessage(title, message);
}
private void enqueueNotifications(AlertRule rule, AlertInstance instance, Instant now) {
for (WebhookBinding w : rule.webhooks()) {
Map<String, Object> payload = buildPayload(rule, instance);
notificationRepo.save(new AlertNotification(
UUID.randomUUID(),
instance.id(),
w.id(),
w.outboundConnectionId(),
NotificationStatus.PENDING,
0,
now,
null, null, null, null,
payload,
null,
now));
}
}
private Map<String, Object> buildPayload(AlertRule rule, AlertInstance instance) {
Environment env = environmentRepo.findById(rule.environmentId()).orElse(null);
return contextBuilder.build(rule, instance, env, null);
}
@SuppressWarnings("unchecked")
private Map<String, Object> snapshotRule(AlertRule rule) {
try {
Map<String, Object> raw = objectMapper.convertValue(rule, Map.class);
// Map.copyOf (used in AlertInstance compact ctor) rejects null values —
// strip them so the snapshot is safe to store.
Map<String, Object> safe = new LinkedHashMap<>();
raw.forEach((k, v) -> { if (v != null) safe.put(k, v); });
return safe;
} catch (Exception e) {
log.warn("Failed to snapshot rule {}: {}", rule.id(), e.getMessage());
return Map.of("id", rule.id().toString(), "name", rule.name());
}
}
}

View File

@@ -17,9 +17,14 @@ public sealed interface EvalResult {
record Error(Throwable cause) implements EvalResult {}
record Batch(List<Firing> firings) implements EvalResult {
record Batch(List<Firing> firings, Map<String, Object> nextEvalState) implements EvalResult {
public Batch {
firings = firings == null ? List.of() : List.copyOf(firings);
nextEvalState = nextEvalState == null ? Map.of() : Map.copyOf(nextEvalState);
}
/** Convenience: a Batch with no cursor update (first-run empty, or no matches). */
public static Batch empty() {
return new Batch(List.of(), Map.of());
}
}
}

View File

@@ -1,5 +1,6 @@
package com.cameleer.server.app.alerting.eval;
import com.cameleer.server.app.alerting.config.AlertingProperties;
import com.cameleer.server.app.search.ClickHouseSearchIndex;
import com.cameleer.server.core.alerting.AlertMatchSpec;
import com.cameleer.server.core.alerting.AlertRule;
@@ -14,6 +15,7 @@ import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -23,10 +25,14 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator<ExchangeMatchC
private final ClickHouseSearchIndex searchIndex;
private final EnvironmentRepository envRepo;
private final AlertingProperties alertingProperties;
public ExchangeMatchEvaluator(ClickHouseSearchIndex searchIndex, EnvironmentRepository envRepo) {
this.searchIndex = searchIndex;
this.envRepo = envRepo;
public ExchangeMatchEvaluator(ClickHouseSearchIndex searchIndex,
EnvironmentRepository envRepo,
AlertingProperties alertingProperties) {
this.searchIndex = searchIndex;
this.envRepo = envRepo;
this.alertingProperties = alertingProperties;
}
@Override
@@ -85,19 +91,31 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator<ExchangeMatchC
String routeId = c.scope() != null ? c.scope().routeId() : null;
ExchangeMatchCondition.ExchangeFilter filter = c.filter();
// Resolve cursor from evalState
Instant cursor = null;
Object raw = rule.evalState().get("lastExchangeTs");
// Resolve composite cursor: (startTime, executionId)
Instant cursorTs;
String cursorId;
Object raw = rule.evalState().get("lastExchangeCursor");
if (raw instanceof String s && !s.isBlank()) {
try { cursor = Instant.parse(s); } catch (Exception ignored) {}
} else if (raw instanceof Instant i) {
cursor = i;
int pipe = s.indexOf('|');
if (pipe < 0) {
// Malformed — treat as first-run (with deploy-backlog-cap clamp).
cursorTs = firstRunCursorTs(rule, ctx);
cursorId = "";
} else {
cursorTs = Instant.parse(s.substring(0, pipe));
cursorId = s.substring(pipe + 1);
}
} else {
// First run — bounded by rule.createdAt, empty executionId so any real id sorts after it.
// Clamp to deploy-backlog-cap to avoid backlog flooding for long-lived rules on first
// post-deploy tick. Normal-advance path (valid cursor above) is intentionally unaffected.
cursorTs = firstRunCursorTs(rule, ctx);
cursorId = "";
}
// Build SearchRequest — use cursor as timeFrom so we only see exchanges after last run
var req = new SearchRequest(
filter != null ? filter.status() : null,
cursor, // timeFrom = cursor (or null for first run)
cursorTs, // timeFrom
ctx.now(), // timeTo
null, null, null, // durationMin/Max, correlationId
null, null, null, null, // text variants
@@ -110,23 +128,26 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator<ExchangeMatchC
50,
"startTime",
"asc", // asc so we process oldest first
cursorId.isEmpty() ? null : cursorId, // afterExecutionId — null on first run enables >=
envSlug
);
SearchResult<ExecutionSummary> result = searchIndex.search(req);
List<ExecutionSummary> matches = result.data();
if (matches.isEmpty()) return new EvalResult.Batch(List.of());
if (matches.isEmpty()) return EvalResult.Batch.empty();
// Find the latest startTime across all matches — becomes the next cursor
Instant latestTs = matches.stream()
.map(ExecutionSummary::startTime)
.max(Instant::compareTo)
.orElse(ctx.now());
// Ensure deterministic ordering for cursor advance
matches = new ArrayList<>(matches);
matches.sort(Comparator
.comparing(ExecutionSummary::startTime)
.thenComparing(ExecutionSummary::executionId));
ExecutionSummary last = matches.get(matches.size() - 1);
String nextCursorSerialized = last.startTime().toString() + "|" + last.executionId();
List<EvalResult.Firing> firings = new ArrayList<>();
for (int i = 0; i < matches.size(); i++) {
ExecutionSummary ex = matches.get(i);
for (ExecutionSummary ex : matches) {
Map<String, Object> ctx2 = new HashMap<>();
ctx2.put("exchange", Map.of(
"id", ex.executionId(),
@@ -135,15 +156,32 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator<ExchangeMatchC
"startTime", ex.startTime() == null ? "" : ex.startTime().toString()
));
ctx2.put("app", Map.of("slug", ex.applicationId() == null ? "" : ex.applicationId()));
// Attach the next-cursor to the last firing so the job can extract it
if (i == matches.size() - 1) {
ctx2.put("_nextCursor", latestTs);
}
firings.add(new EvalResult.Firing(1.0, null, ctx2));
}
return new EvalResult.Batch(firings);
Map<String, Object> nextEvalState = new HashMap<>(rule.evalState());
nextEvalState.put("lastExchangeCursor", nextCursorSerialized);
return new EvalResult.Batch(firings, nextEvalState);
}
/**
* First-run cursor timestamp: {@code rule.createdAt()}, clamped to
* {@code now - perExchangeDeployBacklogCapSeconds} so a long-lived PER_EXCHANGE rule
* doesn't scan from its creation date forward on first post-deploy tick.
* <p>
* Cap ≤ 0 disables the clamp (first-run falls back to {@code rule.createdAt()} verbatim).
* Applied only on first-run / malformed-cursor paths — the normal-advance path is
* intentionally unaffected so legitimate missed ticks are not silently skipped.
*/
private Instant firstRunCursorTs(AlertRule rule, EvalContext ctx) {
Instant cursorTs = rule.createdAt();
int capSeconds = alertingProperties.effectivePerExchangeDeployBacklogCapSeconds();
if (capSeconds > 0) {
Instant capFloor = ctx.now().minusSeconds(capSeconds);
if (cursorTs == null || cursorTs.isBefore(capFloor)) {
cursorTs = capFloor;
}
}
return cursorTs;
}
}

View File

@@ -71,6 +71,7 @@ public class SearchController {
application, null,
offset, limit,
sortField, sortDir,
null,
env.slug()
);

View File

@@ -81,13 +81,24 @@ public class ClickHouseSearchIndex implements SearchIndex {
String sortColumn = SORT_FIELD_MAP.getOrDefault(request.sortField(), "start_time");
String sortDir = "asc".equalsIgnoreCase(request.sortDir()) ? "ASC" : "DESC";
// Composite-cursor callers (afterExecutionId set) need a deterministic tiebreak inside
// same-millisecond groups so the client-side last-row pick matches ClickHouse's row order.
// Without this, a same-start_time tail >LIMIT can silently drop rows: the page ends mid-ms,
// the cursor advances past the returned lastRowId, and the skipped rows with smaller
// execution_id values never reappear. Other callers (UI/stats) keep the unchanged
// single-column ORDER BY — they don't use the composite cursor.
String orderBy = sortColumn + " " + sortDir;
if (request.afterExecutionId() != null) {
orderBy += ", execution_id " + sortDir;
}
String dataSql = "SELECT execution_id, route_id, instance_id, application_id, "
+ "status, start_time, end_time, duration_ms, correlation_id, "
+ "error_message, error_stacktrace, diagram_content_hash, attributes, "
+ "has_trace_data, is_replay, "
+ "input_body, output_body, input_headers, output_headers, root_cause_message "
+ "FROM executions FINAL WHERE " + whereClause
+ " ORDER BY " + sortColumn + " " + sortDir
+ " ORDER BY " + orderBy
+ " LIMIT ? OFFSET ?";
List<Object> dataParams = new ArrayList<>(params);
@@ -124,7 +135,13 @@ public class ClickHouseSearchIndex implements SearchIndex {
conditions.add("tenant_id = ?");
params.add(tenantId);
if (request.timeFrom() != null) {
if (request.timeFrom() != null && request.afterExecutionId() != null) {
// composite predicate: strictly-after in (start_time, execution_id) tuple order
conditions.add("(start_time > ? OR (start_time = ? AND execution_id > ?))");
params.add(Timestamp.from(request.timeFrom()));
params.add(Timestamp.from(request.timeFrom()));
params.add(request.afterExecutionId());
} else if (request.timeFrom() != null) {
conditions.add("start_time >= ?");
params.add(Timestamp.from(request.timeFrom()));
}

View File

@@ -93,6 +93,10 @@ cameleer:
notification-retention-days: ${CAMELEER_SERVER_ALERTING_NOTIFICATIONRETENTIONDAYS:30}
webhook-timeout-ms: ${CAMELEER_SERVER_ALERTING_WEBHOOKTIMEOUTMS:5000}
webhook-max-attempts: ${CAMELEER_SERVER_ALERTING_WEBHOOKMAXATTEMPTS:3}
# PER_EXCHANGE first-run cursor clamp: on first tick with no persisted cursor, evaluator
# scans no further back than (now - this cap). Prevents one-time backlog flood for rules
# whose createdAt predates a migration. Set to 0 to disable and replay from createdAt.
per-exchange-deploy-backlog-cap-seconds: ${CAMELEER_SERVER_ALERTING_PEREXCHANGEDEPLOYBACKLOGCAPSECONDS:86400}
outbound-http:
trust-all: false
trusted-ca-pem-paths: []

View File

@@ -7,8 +7,10 @@ import com.cameleer.server.app.alerting.eval.AlertEvaluatorJob;
import com.cameleer.server.app.alerting.notify.NotificationDispatchJob;
import com.cameleer.server.app.outbound.crypto.SecretCipher;
import com.cameleer.server.app.search.ClickHouseLogStore;
import com.cameleer.server.app.storage.ClickHouseExecutionStore;
import com.cameleer.server.core.alerting.*;
import com.cameleer.server.core.ingestion.BufferedLogEntry;
import com.cameleer.server.core.ingestion.MergedExecution;
import com.cameleer.server.core.outbound.OutboundConnectionRepository;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -62,6 +64,7 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
@Autowired private AlertSilenceRepository silenceRepo;
@Autowired private OutboundConnectionRepository outboundRepo;
@Autowired private ClickHouseLogStore logStore;
@Autowired private ClickHouseExecutionStore executionStore;
@Autowired private SecretCipher secretCipher;
@Autowired private TestRestTemplate restTemplate;
@Autowired private TestSecurityHelper securityHelper;
@@ -399,6 +402,102 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", reNotifyRuleId);
}
/**
* Exactly-once-per-exchange end-to-end lifecycle.
* <p>
* 5 FAILED exchanges across 2 evaluator ticks must produce exactly
* 5 FIRING instances + 5 PENDING notifications (one per exchange, one webhook).
* A third tick with no new exchanges must be a no-op. Acking one instance
* must leave the other four untouched.
* <p>
* Exercises the full Phase-1+2+3 stack: evaluator cursor persistence across
* ticks, per-tick rollback isolation, and the ack-doesn't-cascade invariant.
* See: docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md
*/
@Test
@Order(7)
void perExchange_5FailuresAcross2Ticks_exactlyOncePerExchange() {
// Relative-to-now timestamps so they fall inside the evaluator's
// [rule.createdAt .. ctx.now()] window. Using Instant.parse(...) would
// require reconciling with the mocked alertingClock AND rule.createdAt,
// which is wall-clock in createPerExchangeRuleWithWebhook.
Instant base = Instant.now().minusSeconds(30);
// Pin the mocked alertingClock to current wall time so ctx.now() is >
// every seeded execution timestamp (base + 0..4s) AND > rule.createdAt
// (now - 60s). Prior tests may have set simulatedNow far in the past
// (step1 used wall time but step6 advanced by 61s — test ordering means
// the last value lingers). Re-pinning here makes the window deterministic.
setSimulatedNow(Instant.now());
UUID perExRuleId = createPerExchangeRuleWithWebhook();
// ── Tick 1 — seed 3, tick ────────────────────────────────────────────
seedFailedExecution("ex1-exec-1", base);
seedFailedExecution("ex1-exec-2", base.plusSeconds(1));
seedFailedExecution("ex1-exec-3", base.plusSeconds(2));
evaluatorJob.tick();
// ── Tick 2 — seed 2 more, tick ───────────────────────────────────────
seedFailedExecution("ex1-exec-4", base.plusSeconds(3));
seedFailedExecution("ex1-exec-5", base.plusSeconds(4));
// Re-open the rule claim so it's due for tick 2.
jdbcTemplate.update(
"UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " +
"claimed_by = NULL, claimed_until = NULL WHERE id = ?", perExRuleId);
evaluatorJob.tick();
// Assert: 5 instances, 5 PENDING notifications.
List<UUID> instanceIds = instanceIdsForRule(perExRuleId);
assertThat(instanceIds)
.as("5 FAILED exchanges across 2 ticks must produce exactly 5 FIRING instances")
.hasSize(5);
List<AlertNotification> allNotifs = notificationsForRule(perExRuleId);
assertThat(allNotifs)
.as("5 instances × 1 webhook must produce exactly 5 notifications")
.hasSize(5);
assertThat(allNotifs.stream().allMatch(n -> n.status() == NotificationStatus.PENDING))
.as("all notifications must be PENDING before dispatch")
.isTrue();
// ── Dispatch all pending, then tick 3 — expect no change ────────────
dispatchAllPending();
// Re-open the rule claim so it's due for tick 3.
jdbcTemplate.update(
"UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " +
"claimed_by = NULL, claimed_until = NULL WHERE id = ?", perExRuleId);
evaluatorJob.tick();
assertThat(instanceIdsForRule(perExRuleId))
.as("tick 3 with no new exchanges must not create new instances")
.hasSize(5);
long pending = notificationsForRule(perExRuleId).stream()
.filter(n -> n.status() == NotificationStatus.PENDING)
.count();
assertThat(pending)
.as("tick 3 must not re-enqueue notifications — all prior were dispatched")
.isZero();
// ── Ack one — others unchanged ──────────────────────────────────────
UUID firstInstanceId = instanceIds.get(0);
instanceRepo.ack(firstInstanceId, "test-operator", Instant.now());
List<AlertInstance> all = instanceIds.stream()
.map(id -> instanceRepo.findById(id).orElseThrow())
.toList();
long ackedCount = all.stream().filter(i -> i.ackedBy() != null).count();
assertThat(ackedCount)
.as("ack on one instance must not cascade to peers")
.isEqualTo(1);
// Cleanup — the @AfterAll cleans by envId which covers us, but be explicit.
jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " +
"(SELECT id FROM alert_instances WHERE rule_id = ?)", perExRuleId);
jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", perExRuleId);
jdbcTemplate.update("DELETE FROM alert_rule_targets WHERE rule_id = ?", perExRuleId);
jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", perExRuleId);
}
// ── Helpers ───────────────────────────────────────────────────────────────
/** POST the main lifecycle rule via REST API. Returns the created rule ID. */
@@ -513,4 +612,96 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
logStore.insertBufferedBatch(List.of(
new BufferedLogEntry(tenantId, envSlug, "lc-agent-01", "lc-app", entry)));
}
// ── Helpers for perExchange exactly-once test ────────────────────────────
private static final String PER_EX_APP_SLUG = "per-ex-app";
/**
* Create a PER_EXCHANGE rule bound to {@link #PER_EX_APP_SLUG} that fires on
* {@code status=FAILED} and enqueues one notification per match via the
* pre-seeded webhook connection ({@link #connId}). Returns the new rule id.
* <p>
* Replicates the pattern from {@code AlertEvaluatorJobIT#createPerExchangeRuleWithWebhook}
* but reuses this test's env + outbound connection.
*/
private UUID createPerExchangeRuleWithWebhook() {
UUID rid = UUID.randomUUID();
Instant now = Instant.now();
var condition = new ExchangeMatchCondition(
new AlertScope(PER_EX_APP_SLUG, null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.PER_EXCHANGE, null, null);
var webhook = new WebhookBinding(connId, null, null, Map.of());
var rule = new AlertRule(
rid, envId, "per-ex-lc-rule-" + rid, null,
AlertSeverity.WARNING, true, ConditionKind.EXCHANGE_MATCH, condition,
60, 0, 60,
"Exchange FAILED: {{exchange.id}}", "route={{exchange.routeId}}",
List.of(webhook), List.of(),
now.minusSeconds(5), // due now
null, null, Map.of(),
now.minusSeconds(60), "test-operator", // createdAt bounds first-run cursor
now.minusSeconds(60), "test-operator");
ruleRepo.save(rule);
return rid;
}
/**
* Seed one FAILED execution into ClickHouse, scoped to this test's tenant/env/app
* so it's picked up by a PER_EXCHANGE rule targeting {@link #PER_EX_APP_SLUG}.
*/
private void seedFailedExecution(String executionId, Instant startTime) {
executionStore.insertExecutionBatch(List.of(new MergedExecution(
tenantId, 1L, executionId, "route-a", "inst-1", PER_EX_APP_SLUG, envSlug,
"FAILED", "", "exchange-" + executionId,
startTime, startTime.plusMillis(100), 100L,
"", "", "", "", "", "", // error fields
"", "FULL", // diagramContentHash, engineLevel
"", "", "", "", "", "", // bodies / headers / properties
"{}", // attributes (JSON)
"", "", // traceId, spanId
false, false,
null, null
)));
}
/** All instance ids for a rule, ordered by fired_at ascending (deterministic). */
private List<UUID> instanceIdsForRule(UUID rid) {
return jdbcTemplate.queryForList(
"SELECT id FROM alert_instances WHERE rule_id = ? ORDER BY fired_at ASC",
UUID.class, rid);
}
/** All notifications across every instance of a rule. */
private List<AlertNotification> notificationsForRule(UUID rid) {
List<UUID> ids = instanceIdsForRule(rid);
List<AlertNotification> out = new java.util.ArrayList<>();
for (UUID iid : ids) {
out.addAll(notificationRepo.listForInstance(iid));
}
return out;
}
/**
* Simulate a dispatch pass without hitting the real webhook — marks every
* PENDING notification for this rule as DELIVERED. Using
* {@code dispatchJob.tick()} would round-trip through WireMock and require
* extra plumbing; the exactly-once contract under test is about the
* evaluator re-enqueueing behaviour, not webhook delivery.
*/
private void dispatchAllPending() {
Instant now = Instant.now();
// Drain PENDING notifications across the whole env (safe because the
// ackedBy-scoped assertions further down look at this rule only).
List<UUID> pendingIds = jdbcTemplate.queryForList(
"SELECT n.id FROM alert_notifications n " +
"JOIN alert_instances i ON n.alert_instance_id = i.id " +
"WHERE i.environment_id = ? " +
"AND n.status = 'PENDING'::notification_status_enum",
UUID.class, envId);
for (UUID nid : pendingIds) {
notificationRepo.markDelivered(nid, 200, "OK", now);
}
}
}

View File

@@ -145,7 +145,7 @@ class AlertRuleControllerIT extends AbstractPostgresIT {
{"name":"sqli-test","severity":"WARNING","conditionKind":"EXCHANGE_MATCH",
"condition":{"kind":"EXCHANGE_MATCH","scope":{},
"filter":{"status":"FAILED","attributes":{"foo'; DROP TABLE executions; --":"x"}},
"fireMode":"PER_EXCHANGE","perExchangeLingerSeconds":60}}
"fireMode":"PER_EXCHANGE"}}
""";
ResponseEntity<String> resp = restTemplate.exchange(
@@ -164,7 +164,8 @@ class AlertRuleControllerIT extends AbstractPostgresIT {
{"name":"valid-attr","severity":"WARNING","conditionKind":"EXCHANGE_MATCH",
"condition":{"kind":"EXCHANGE_MATCH","scope":{},
"filter":{"status":"FAILED","attributes":{"order.type":"x"}},
"fireMode":"PER_EXCHANGE","perExchangeLingerSeconds":60}}
"fireMode":"PER_EXCHANGE"},
"targets":[{"kind":"USER","targetId":"test-operator"}]}
""";
ResponseEntity<String> resp = restTemplate.exchange(
@@ -246,6 +247,61 @@ class AlertRuleControllerIT extends AbstractPostgresIT {
assertThat(preview.getStatusCode()).isEqualTo(HttpStatus.OK);
}
// --- PER_EXCHANGE cross-field validation + empty-targets validation ---
// RED tests: today's controller accepts these bodies; Task 3.3 adds the validator.
@Test
void createPerExchangeRule_withReNotifyMinutesNonZero_returns400() {
String body = perExchangeRuleBodyWithExtras(
"per-exchange-renotify",
/*reNotifyMinutes*/ 60,
/*forDurationSeconds*/ null);
ResponseEntity<String> resp = restTemplate.exchange(
"/api/v1/environments/" + envSlug + "/alerts/rules",
HttpMethod.POST,
new HttpEntity<>(body, securityHelper.authHeaders(operatorJwt)),
String.class);
assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.BAD_REQUEST);
assertThat(resp.getBody()).contains("reNotifyMinutes");
}
@Test
void createPerExchangeRule_withForDurationSecondsNonZero_returns400() {
String body = perExchangeRuleBodyWithExtras(
"per-exchange-forduration",
/*reNotifyMinutes*/ null,
/*forDurationSeconds*/ 60);
ResponseEntity<String> resp = restTemplate.exchange(
"/api/v1/environments/" + envSlug + "/alerts/rules",
HttpMethod.POST,
new HttpEntity<>(body, securityHelper.authHeaders(operatorJwt)),
String.class);
assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.BAD_REQUEST);
assertThat(resp.getBody()).contains("forDurationSeconds");
}
@Test
void createAnyRule_withEmptyWebhooksAndTargets_returns400() {
// baseValidPerExchangeRuleRequest() already produces no webhooks / no targets — that's
// precisely the "empty webhooks + empty targets" shape this test pins as a 400.
String body = baseValidPerExchangeRuleRequest("no-sinks");
ResponseEntity<String> resp = restTemplate.exchange(
"/api/v1/environments/" + envSlug + "/alerts/rules",
HttpMethod.POST,
new HttpEntity<>(body, securityHelper.authHeaders(operatorJwt)),
String.class);
assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.BAD_REQUEST);
assertThat(resp.getBody()).satisfiesAnyOf(
s -> assertThat(s).contains("webhook"),
s -> assertThat(s).contains("target"));
}
// --- Unknown env returns 404 ---
@Test
@@ -269,10 +325,49 @@ class AlertRuleControllerIT extends AbstractPostgresIT {
}
private static String routeMetricRuleBody(String name) {
// Includes a USER target so the rule passes the "at least one webhook or target" guard.
return """
{"name":"%s","severity":"WARNING","conditionKind":"ROUTE_METRIC",
"condition":{"kind":"ROUTE_METRIC","scope":{},
"metric":"ERROR_RATE","comparator":"GT","threshold":0.05,"windowSeconds":60}}
"metric":"ERROR_RATE","comparator":"GT","threshold":0.05,"windowSeconds":60},
"targets":[{"kind":"USER","targetId":"test-operator"}]}
""".formatted(name);
}
/**
* Produces a request body for a valid PER_EXCHANGE rule (baseline) — no webhooks,
* no targets, no reNotifyMinutes, no forDurationSeconds. The controller currently
* accepts this shape; Task 3.3 tightens that (empty sinks will 400).
*/
private static String baseValidPerExchangeRuleRequest(String name) {
return """
{"name":"%s","severity":"WARNING","conditionKind":"EXCHANGE_MATCH",
"condition":{"kind":"EXCHANGE_MATCH","scope":{},
"filter":{"status":"FAILED","attributes":{}},
"fireMode":"PER_EXCHANGE"}}
""".formatted(name);
}
/**
* Variant of {@link #baseValidPerExchangeRuleRequest(String)} that sets
* reNotifyMinutes and/or forDurationSeconds at the top-level request. Used to pin
* the PER_EXCHANGE cross-field validation contract (Task 3.3).
*/
private static String perExchangeRuleBodyWithExtras(String name,
Integer reNotifyMinutes,
Integer forDurationSeconds) {
StringBuilder extras = new StringBuilder();
if (reNotifyMinutes != null) {
extras.append(",\"reNotifyMinutes\":").append(reNotifyMinutes);
}
if (forDurationSeconds != null) {
extras.append(",\"forDurationSeconds\":").append(forDurationSeconds);
}
return """
{"name":"%s","severity":"WARNING","conditionKind":"EXCHANGE_MATCH",
"condition":{"kind":"EXCHANGE_MATCH","scope":{},
"filter":{"status":"FAILED","attributes":{}},
"fireMode":"PER_EXCHANGE"}%s}
""".formatted(name, extras.toString());
}
}

View File

@@ -2,20 +2,30 @@ package com.cameleer.server.app.alerting.eval;
import com.cameleer.server.app.AbstractPostgresIT;
import com.cameleer.server.app.search.ClickHouseLogStore;
import com.cameleer.server.app.storage.ClickHouseExecutionStore;
import com.cameleer.server.core.agent.AgentInfo;
import com.cameleer.server.core.agent.AgentRegistryService;
import com.cameleer.server.core.agent.AgentState;
import com.cameleer.server.core.alerting.*;
import com.cameleer.server.core.ingestion.MergedExecution;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.core.JdbcTemplate;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@@ -29,6 +39,7 @@ import static org.mockito.Mockito.when;
* {@code AgentRegistryService} is mocked so tests can control which agents
* are DEAD without depending on in-memory timing.
*/
@Import(AlertEvaluatorJobIT.FaultInjectingNotificationRepoConfig.class)
class AlertEvaluatorJobIT extends AbstractPostgresIT {
@MockBean(name = "clickHouseLogStore") ClickHouseLogStore clickHouseLogStore;
@@ -37,24 +48,37 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
@Autowired private AlertEvaluatorJob job;
@Autowired private AlertRuleRepository ruleRepo;
@Autowired private AlertInstanceRepository instanceRepo;
@Autowired private AlertNotificationRepository notificationRepo;
@Autowired private FaultInjectingNotificationRepository faultInjectingNotificationRepo;
@Autowired private ClickHouseExecutionStore executionStore;
@Autowired @Qualifier("clickHouseJdbcTemplate") private JdbcTemplate clickHouseJdbc;
private UUID envId;
private UUID ruleId;
private static final String SYS_USER = "sys-eval-it";
private static final String APP_SLUG = "orders";
private static final String AGENT_ID = "test-agent-01";
private String envSlug;
@BeforeEach
void setup() {
// ClickHouse — purge any executions left over from prior tests in the
// shared CH instance. Matches the house-style used across the CH IT
// suite (see ClickHouseExecutionStoreIT, ClickHouseStatsStoreIT, etc.).
// TRUNCATE is synchronous, unlike ALTER ... DELETE (mutations_sync=0).
clickHouseJdbc.execute("TRUNCATE TABLE executions");
clickHouseJdbc.execute("TRUNCATE TABLE processor_executions");
// Default: empty registry — all evaluators return Clear
when(agentRegistryService.findAll()).thenReturn(List.of());
envId = UUID.randomUUID();
ruleId = UUID.randomUUID();
envId = UUID.randomUUID();
ruleId = UUID.randomUUID();
envSlug = "eval-it-env-" + envId;
jdbcTemplate.update(
"INSERT INTO environments (id, slug, display_name) VALUES (?, ?, ?)",
envId, "eval-it-env-" + envId, "Eval IT Env");
envId, envSlug, "Eval IT Env");
jdbcTemplate.update(
"INSERT INTO users (user_id, provider, email) VALUES (?, 'local', ?) ON CONFLICT (user_id) DO NOTHING",
SYS_USER, SYS_USER + "@test.example.com");
@@ -76,12 +100,17 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
@AfterEach
void cleanup() {
// Always reset the fault-injector — a prior @Test may have left it armed,
// and Spring reuses the same context (and thus the same decorator bean)
// across tests in this class.
faultInjectingNotificationRepo.clearFault();
jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " +
"(SELECT id FROM alert_instances WHERE environment_id = ?)", envId);
jdbcTemplate.update("DELETE FROM alert_instances WHERE environment_id = ?", envId);
jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId);
jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId);
jdbcTemplate.update("DELETE FROM users WHERE user_id = ?", SYS_USER);
// ClickHouse `executions` is truncated in @BeforeEach (house style).
}
// -------------------------------------------------------------------------
@@ -94,6 +123,77 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
AgentState.DEAD, lastHeartbeat.minusSeconds(300), lastHeartbeat, null);
}
/**
* Seed one FAILED execution row into the ClickHouse {@code executions} table,
* scoped to this test's tenant/env/app so it's picked up by a PER_EXCHANGE rule
* targeting {@code APP_SLUG}. Executions older than {@code rule.createdAt} are
* filtered out by the evaluator; callers must pick {@code startTime} accordingly.
*/
private void seedFailedExecution(String executionId, Instant startTime) {
executionStore.insertExecutionBatch(List.of(new MergedExecution(
"default", 1L, executionId, "route-a", "inst-1", APP_SLUG, envSlug,
"FAILED", "", "exchange-" + executionId,
startTime, startTime.plusMillis(100), 100L,
"", "", "", "", "", "", // errorMessage..rootCauseMessage
"", "FULL", // diagramContentHash, engineLevel
"", "", "", "", "", "", // bodies / headers / properties
"{}", // attributes (JSON)
"", "", // traceId, spanId
false, false,
null, null
)));
}
/**
* Create a PER_EXCHANGE rule targeting {@code APP_SLUG} + status=FAILED with a
* single webhook binding. Returns the persisted rule id.
* <p>
* {@code createdAt} is set {@code 60s} before {@code Instant.now()} so the
* evaluator's first-run lower bound ({@code timeFrom = rule.createdAt}) picks
* up the seeded executions.
*/
private UUID createPerExchangeRuleWithWebhook() {
UUID ruleId2 = UUID.randomUUID();
Instant now = Instant.now();
var condition = new ExchangeMatchCondition(
new AlertScope(APP_SLUG, null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.PER_EXCHANGE, null, null);
var webhook = new WebhookBinding(UUID.randomUUID(), null, null, Map.of());
var rule = new AlertRule(
ruleId2, envId, "per-exchange-rule-" + ruleId2, null,
AlertSeverity.WARNING, true, ConditionKind.EXCHANGE_MATCH, condition,
60, 0, 60,
"Exchange FAILED: {{exchange.id}}", "route={{exchange.routeId}}",
List.of(webhook), List.of(),
now.minusSeconds(5), // due now
null, null, Map.of(),
now.minusSeconds(60), SYS_USER, // createdAt — bounds first-run cursor
now.minusSeconds(60), SYS_USER);
ruleRepo.save(rule);
return ruleId2;
}
/** List all notifications enqueued for any instance of {@code ruleId}. */
private List<AlertNotification> listNotificationsByRule(UUID ruleId) {
List<UUID> instanceIds = jdbcTemplate.queryForList(
"SELECT id FROM alert_instances WHERE rule_id = ?",
UUID.class, ruleId);
List<AlertNotification> out = new java.util.ArrayList<>();
for (UUID iid : instanceIds) {
out.addAll(notificationRepo.listForInstance(iid));
}
return out;
}
/** List all instances for {@code ruleId} (open or resolved). */
private int countInstancesByRule(UUID ruleId) {
Long c = jdbcTemplate.queryForObject(
"SELECT count(*) FROM alert_instances WHERE rule_id = ?",
Long.class, ruleId);
return c == null ? 0 : c.intValue();
}
// -------------------------------------------------------------------------
// Tests
// -------------------------------------------------------------------------
@@ -238,4 +338,247 @@ class AlertEvaluatorJobIT extends AbstractPostgresIT {
assertThat(snapshotAfter).contains("\"name\": \"dead-agent-rule\"");
assertThat(snapshotAfter).contains("\"severity\": \"WARNING\"");
}
// -------------------------------------------------------------------------
// PER_EXCHANGE regression pin — notifications must not re-enqueue for
// already-matched exchanges across tick boundaries (cursor must be persisted
// via releaseClaim, then read back on the next tick).
// See: docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md
// -------------------------------------------------------------------------
@Test
void tick2_noNewExchanges_enqueuesZeroAdditionalNotifications() {
// Arrange: 2 FAILED executions in ClickHouse, 1 PER_EXCHANGE rule with 1 webhook.
// Use relative-to-now timestamps so they sort within the evaluator's
// [rule.createdAt .. ctx.now()] window.
Instant t0 = Instant.now().minusSeconds(30);
seedFailedExecution("exec-1", t0);
seedFailedExecution("exec-2", t0.plusSeconds(1));
UUID perExRuleId = createPerExchangeRuleWithWebhook();
// Tick 1 — expect 2 instances, 2 notifications.
job.tick();
assertThat(countInstancesByRule(perExRuleId))
.as("tick 1 must create one FIRING instance per matched exchange")
.isEqualTo(2);
List<AlertNotification> afterTick1 = listNotificationsByRule(perExRuleId);
assertThat(afterTick1)
.as("tick 1 must enqueue one notification per instance (1 webhook × 2 instances)")
.hasSize(2);
// Simulate NotificationDispatchJob draining the queue.
Instant now = Instant.now();
afterTick1.forEach(n -> notificationRepo.markDelivered(n.id(), 200, "OK", now));
// Reopen the claim so the rule is due for Tick 2.
jdbcTemplate.update(
"UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " +
"claimed_by = NULL, claimed_until = NULL WHERE id = ?", perExRuleId);
// Tick 2 — no new ClickHouse rows; cursor should have advanced past exec-2
// during tick 1 and persisted via releaseClaim. Therefore: no new firings,
// no new notifications.
job.tick();
// Instance count unchanged.
assertThat(countInstancesByRule(perExRuleId))
.as("tick 2 must not create new instances — cursor persisted past exec-2")
.isEqualTo(2);
// THE BLEED: any new PENDING notification after tick 2 indicates the
// evaluator re-matched already-processed exchanges (cursor not persisted
// across ticks). Must be zero after the Phase 1 fix.
long pending = listNotificationsByRule(perExRuleId).stream()
.filter(n -> n.status() == NotificationStatus.PENDING)
.count();
assertThat(pending)
.as("tick 2 must NOT re-enqueue notifications for already-matched exchanges")
.isZero();
// Clean up the extra rule (setup-created rule is handled by @AfterEach).
jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " +
"(SELECT id FROM alert_instances WHERE rule_id = ?)", perExRuleId);
jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", perExRuleId);
jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", perExRuleId);
}
// -------------------------------------------------------------------------
// Tick atomicity regression pin — a crash mid-batch must roll back every
// instance + notification write AND leave the cursor unchanged so the
// next tick re-processes the entire batch exactly once.
// See: docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md (Task 2.1)
// -------------------------------------------------------------------------
@Test
void tickRollback_faultOnSecondNotificationInsert_leavesCursorUnchanged() {
// Seed 3 FAILED executions so the rule's PER_EXCHANGE batch has 3 firings.
// Relative-to-now timestamps so they fall inside [rule.createdAt .. ctx.now()].
Instant t0 = Instant.now().minusSeconds(30);
seedFailedExecution("exec-1", t0);
seedFailedExecution("exec-2", t0.plusSeconds(1));
seedFailedExecution("exec-3", t0.plusSeconds(2));
UUID perExRuleId = createPerExchangeRuleWithWebhook();
var ruleBefore = ruleRepo.findById(perExRuleId).orElseThrow();
Object cursorBefore = ruleBefore.evalState().get("lastExchangeCursor"); // null on first run
Instant nextRunBefore = ruleBefore.nextEvaluationAt();
// Arm the fault injector: the 2nd notification save() throws.
// (Instance saves are NOT counted — the injector is scoped to notification saves.)
faultInjectingNotificationRepo.failOnSave(2);
// Today (Phase 1, non-transactional): the evaluator catches the exception
// per-rule and logs a warning — see AlertEvaluatorJob#tick's try/catch
// around applyResult. So tick() itself does NOT rethrow. That is exactly
// why this IT is RED pre-Phase-2: post-rollback asserts expect 0 instances
// and 0 notifications, but the current code will have persisted
// firing #1 (instance + notification) and firing #2's instance before the
// fault on firing #2's notification. Phase 2 wraps the per-rule body in
// @Transactional so the single-rule failure rolls back all of its writes.
try {
job.tick();
} catch (RuntimeException expectedAfterPhase2) {
// Phase 2 may choose to rethrow; either way the rollback assertions
// below are what pin the contract.
// intentionally empty — fault-injection swallow/rethrow tolerance; see comment above
}
// Post-rollback: zero instances, zero notifications, cursor unchanged,
// nextRunAt unchanged (Phase 2 will hold the claim so the next tick retries).
assertThat(countInstancesByRule(perExRuleId))
.as("Phase 2 contract: mid-batch fault rolls back every instance write")
.isZero();
assertThat(listNotificationsByRule(perExRuleId))
.as("Phase 2 contract: mid-batch fault rolls back every notification write")
.isEmpty();
var ruleAfter = ruleRepo.findById(perExRuleId).orElseThrow();
assertThat(ruleAfter.evalState().get("lastExchangeCursor"))
.as("Phase 2 contract: cursor MUST NOT advance when the tick fails")
.isEqualTo(cursorBefore);
assertThat(ruleAfter.nextEvaluationAt())
.as("Phase 2 contract: nextEvaluationAt MUST NOT advance when the tick fails")
.isEqualTo(nextRunBefore);
// Recover: clear the fault, reopen the claim, tick again.
// All 3 firings must land on the second tick — exactly-once-per-exchange.
faultInjectingNotificationRepo.clearFault();
jdbcTemplate.update(
"UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " +
"claimed_by = NULL, claimed_until = NULL WHERE id = ?", perExRuleId);
job.tick();
assertThat(countInstancesByRule(perExRuleId))
.as("after recovery: all 3 exchanges produce exactly one instance each")
.isEqualTo(3);
assertThat(listNotificationsByRule(perExRuleId))
.as("after recovery: all 3 instances produce exactly one notification each")
.hasSize(3);
assertThat(ruleRepo.findById(perExRuleId).orElseThrow()
.evalState().get("lastExchangeCursor"))
.as("after recovery: cursor advanced past exec-3")
.isNotEqualTo(cursorBefore);
// Clean up the extra rule (setup-created rule is handled by @AfterEach).
jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN " +
"(SELECT id FROM alert_instances WHERE rule_id = ?)", perExRuleId);
jdbcTemplate.update("DELETE FROM alert_instances WHERE rule_id = ?", perExRuleId);
jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", perExRuleId);
}
// -------------------------------------------------------------------------
// Fault-injecting AlertNotificationRepository decorator
//
// Delegates all calls to the real Postgres-backed repository except
// {@link #save(AlertNotification)} — that method increments a counter and
// throws a RuntimeException when the configured trigger-count is reached.
// Only notification saves are counted; instance saves go through a separate
// repo and are unaffected.
// -------------------------------------------------------------------------
static class FaultInjectingNotificationRepository implements AlertNotificationRepository {
private final AlertNotificationRepository delegate;
private final AtomicInteger saveCount = new AtomicInteger(0);
private volatile int failOnNthSave = -1; // -1 = disabled
FaultInjectingNotificationRepository(AlertNotificationRepository delegate) {
this.delegate = delegate;
}
/** Arms the fault: the {@code n}-th call to {@link #save} (1-indexed) throws. */
void failOnSave(int n) {
saveCount.set(0);
failOnNthSave = n;
}
/** Disarms the fault and resets the counter. */
void clearFault() {
failOnNthSave = -1;
saveCount.set(0);
}
@Override
public AlertNotification save(AlertNotification n) {
int current = saveCount.incrementAndGet();
if (failOnNthSave > 0 && current == failOnNthSave) {
throw new RuntimeException(
"FaultInjectingNotificationRepository: injected failure on save #" + current);
}
return delegate.save(n);
}
@Override
public Optional<AlertNotification> findById(UUID id) { return delegate.findById(id); }
@Override
public List<AlertNotification> listForInstance(UUID alertInstanceId) {
return delegate.listForInstance(alertInstanceId);
}
@Override
public List<AlertNotification> claimDueNotifications(String instanceId, int batchSize, int claimTtlSeconds) {
return delegate.claimDueNotifications(instanceId, batchSize, claimTtlSeconds);
}
@Override
public void markDelivered(UUID id, int status, String snippet, Instant when) {
delegate.markDelivered(id, status, snippet, when);
}
@Override
public void scheduleRetry(UUID id, Instant nextAttemptAt, int status, String snippet) {
delegate.scheduleRetry(id, nextAttemptAt, status, snippet);
}
@Override
public void markFailed(UUID id, int status, String snippet) {
delegate.markFailed(id, status, snippet);
}
@Override
public void resetForRetry(UUID id, Instant nextAttemptAt) {
delegate.resetForRetry(id, nextAttemptAt);
}
@Override
public void deleteSettledBefore(Instant cutoff) {
delegate.deleteSettledBefore(cutoff);
}
}
/**
* {@link TestConfiguration} that installs the fault-injecting decorator as the
* {@code @Primary} {@link AlertNotificationRepository}. The real Postgres repo is
* still registered (via {@code AlertingBeanConfig}) and is injected into the
* decorator as the delegate, so every non-instrumented call path exercises real SQL.
*/
@TestConfiguration
static class FaultInjectingNotificationRepoConfig {
@Bean
@Primary
FaultInjectingNotificationRepository faultInjectingNotificationRepository(
@Qualifier("alertNotificationRepository") AlertNotificationRepository realRepo) {
return new FaultInjectingNotificationRepository(realRepo);
}
}
}

View File

@@ -157,7 +157,7 @@ class AlertStateTransitionsTest {
@Test
void batchResultAlwaysEmpty() {
var batch = new EvalResult.Batch(List.of(FIRING_RESULT));
var batch = new EvalResult.Batch(List.of(FIRING_RESULT), Map.of());
var next = AlertStateTransitions.apply(null, batch, ruleWith(0), NOW);
assertThat(next).isEmpty();
}

View File

@@ -1,15 +1,18 @@
package com.cameleer.server.app.alerting.eval;
import com.cameleer.server.app.alerting.config.AlertingProperties;
import com.cameleer.server.app.search.ClickHouseSearchIndex;
import com.cameleer.server.core.alerting.*;
import com.cameleer.server.core.runtime.Environment;
import com.cameleer.server.core.runtime.EnvironmentRepository;
import com.cameleer.server.core.search.ExecutionSummary;
import com.cameleer.server.core.search.SearchRequest;
import com.cameleer.server.core.search.SearchResult;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
@@ -34,7 +37,9 @@ class ExchangeMatchEvaluatorTest {
void setUp() {
searchIndex = mock(ClickHouseSearchIndex.class);
envRepo = mock(EnvironmentRepository.class);
eval = new ExchangeMatchEvaluator(searchIndex, envRepo);
AlertingProperties props = new AlertingProperties(
null, null, null, null, null, null, null, null, null, null, null, null, null, null);
eval = new ExchangeMatchEvaluator(searchIndex, envRepo, props);
var env = new Environment(ENV_ID, "prod", "Production", false, true, null, null, null);
when(envRepo.findById(ENV_ID)).thenReturn(Optional.of(env));
@@ -45,10 +50,21 @@ class ExchangeMatchEvaluatorTest {
}
private AlertRule ruleWith(AlertCondition condition, Map<String, Object> evalState) {
return ruleWith(condition, evalState, null);
}
private AlertRule ruleWith(AlertCondition condition, Map<String, Object> evalState, Instant createdAt) {
return new AlertRule(RULE_ID, ENV_ID, "test", null,
AlertSeverity.WARNING, true, condition.kind(), condition,
60, 0, 0, null, null, List.of(), List.of(),
null, null, null, evalState, null, null, null, null);
null, null, null, evalState, createdAt, null, null, null);
}
private ExchangeMatchCondition perExchangeCondition() {
return new ExchangeMatchCondition(
new AlertScope("orders", null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.PER_EXCHANGE, null, null);
}
private ExecutionSummary summary(String id, Instant startTime, String status) {
@@ -64,7 +80,7 @@ class ExchangeMatchEvaluatorTest {
var condition = new ExchangeMatchCondition(
new AlertScope("orders", null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.COUNT_IN_WINDOW, 5, 300, null);
FireMode.COUNT_IN_WINDOW, 5, 300);
when(searchIndex.countExecutionsForAlerting(any())).thenReturn(7L);
@@ -79,7 +95,7 @@ class ExchangeMatchEvaluatorTest {
var condition = new ExchangeMatchCondition(
new AlertScope("orders", null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.COUNT_IN_WINDOW, 5, 300, null);
FireMode.COUNT_IN_WINDOW, 5, 300);
when(searchIndex.countExecutionsForAlerting(any())).thenReturn(3L);
@@ -92,7 +108,7 @@ class ExchangeMatchEvaluatorTest {
var condition = new ExchangeMatchCondition(
new AlertScope("orders", "direct:pay", null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of("orderId", "123")),
FireMode.COUNT_IN_WINDOW, 1, 120, null);
FireMode.COUNT_IN_WINDOW, 1, 120);
when(searchIndex.countExecutionsForAlerting(any())).thenReturn(2L);
@@ -119,7 +135,7 @@ class ExchangeMatchEvaluatorTest {
var condition = new ExchangeMatchCondition(
new AlertScope("orders", null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.PER_EXCHANGE, null, null, 60);
FireMode.PER_EXCHANGE, null, null);
when(searchIndex.search(any())).thenReturn(SearchResult.empty(0, 50));
@@ -133,7 +149,7 @@ class ExchangeMatchEvaluatorTest {
var condition = new ExchangeMatchCondition(
new AlertScope("orders", null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.PER_EXCHANGE, null, null, 60);
FireMode.PER_EXCHANGE, null, null);
Instant t1 = NOW.minusSeconds(50);
Instant t2 = NOW.minusSeconds(30);
@@ -153,11 +169,11 @@ class ExchangeMatchEvaluatorTest {
}
@Test
void perExchange_lastFiringCarriesNextCursor() {
void perExchange_batchCarriesNextCursorInEvalState() {
var condition = new ExchangeMatchCondition(
new AlertScope("orders", null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.PER_EXCHANGE, null, null, 60);
FireMode.PER_EXCHANGE, null, null);
Instant t1 = NOW.minusSeconds(50);
Instant t2 = NOW.minusSeconds(10); // latest
@@ -169,32 +185,119 @@ class ExchangeMatchEvaluatorTest {
EvalResult r = eval.evaluate(condition, ruleWith(condition), new EvalContext("default", NOW, new TickCache()));
var batch = (EvalResult.Batch) r;
// last firing carries the _nextCursor key with the latest startTime
EvalResult.Firing last = batch.firings().get(batch.firings().size() - 1);
assertThat(last.context()).containsKey("_nextCursor");
assertThat(last.context().get("_nextCursor")).isEqualTo(t2);
// The batch carries the composite next-cursor in nextEvalState under "lastExchangeCursor"
assertThat(batch.nextEvalState()).containsKey("lastExchangeCursor");
assertThat(batch.nextEvalState().get("lastExchangeCursor"))
.isEqualTo(t2.toString() + "|ex-2");
}
@Test
void perExchange_usesLastExchangeTsFromEvalState() {
void perExchange_usesLastExchangeCursorFromEvalState() {
var condition = new ExchangeMatchCondition(
new AlertScope("orders", null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.PER_EXCHANGE, null, null, 60);
FireMode.PER_EXCHANGE, null, null);
Instant cursor = NOW.minusSeconds(120);
var rule = ruleWith(condition, Map.of("lastExchangeTs", cursor.toString()));
var rule = ruleWith(condition, Map.of("lastExchangeCursor", cursor.toString() + "|ex-prev"));
when(searchIndex.search(any())).thenReturn(SearchResult.empty(0, 50));
eval.evaluate(condition, rule, new EvalContext("default", NOW, new TickCache()));
// Verify the search request used the cursor as the lower-bound
// Verify the search request used the cursor tuple: timeFrom + afterExecutionId
ArgumentCaptor<com.cameleer.server.core.search.SearchRequest> captor =
ArgumentCaptor.forClass(com.cameleer.server.core.search.SearchRequest.class);
verify(searchIndex).search(captor.capture());
// timeFrom should be the cursor value
assertThat(captor.getValue().timeFrom()).isEqualTo(cursor);
assertThat(captor.getValue().afterExecutionId()).isEqualTo("ex-prev");
}
@Test
void cursorMonotonicity_sameMillisecondExchanges_fireExactlyOncePerTick() {
var t = Instant.parse("2026-04-22T10:00:00Z");
var exA = summary("exec-a", t, "FAILED");
var exB = summary("exec-b", t, "FAILED");
when(searchIndex.search(any())).thenReturn(new SearchResult<>(List.of(exA, exB), 2L, 0, 50));
ExchangeMatchCondition condition = perExchangeCondition();
AlertRule rule = ruleWith(condition, Map.of()).withEvalState(Map.of()); // first-run
EvalResult r1 = eval.evaluate(condition, rule,
new EvalContext("default", t.plusSeconds(1), new TickCache()));
assertThat(r1).isInstanceOf(EvalResult.Batch.class);
var batch1 = (EvalResult.Batch) r1;
assertThat(batch1.firings()).hasSize(2);
assertThat(batch1.nextEvalState()).containsKey("lastExchangeCursor");
// cursor is (t, "exec-b") since "exec-b" > "exec-a" lexicographically
// Tick 2: reflect the advanced cursor back; expect zero firings
AlertRule advanced = rule.withEvalState(batch1.nextEvalState());
when(searchIndex.search(any())).thenReturn(new SearchResult<>(List.of(), 0L, 0, 50));
EvalResult r2 = eval.evaluate(condition, advanced,
new EvalContext("default", t.plusSeconds(2), new TickCache()));
assertThat(((EvalResult.Batch) r2).firings()).isEmpty();
// Tick 3: a third exchange at the same t with exec-c > exec-b; expect exactly one firing
var exC = summary("exec-c", t, "FAILED");
when(searchIndex.search(any())).thenReturn(new SearchResult<>(List.of(exC), 1L, 0, 50));
EvalResult r3 = eval.evaluate(condition, advanced,
new EvalContext("default", t.plusSeconds(3), new TickCache()));
assertThat(((EvalResult.Batch) r3).firings()).hasSize(1);
assertThat(((EvalResult.Batch) r3).nextEvalState()).containsKey("lastExchangeCursor");
}
@Test
void firstRun_boundedByRuleCreatedAt_notRetentionHistory() {
var created = Instant.parse("2026-04-22T09:00:00Z");
var after = created.plus(Duration.ofMinutes(30));
// The evaluator must pass `timeFrom = created` to the search.
ArgumentCaptor<SearchRequest> cap = ArgumentCaptor.forClass(SearchRequest.class);
when(searchIndex.search(cap.capture())).thenReturn(
new SearchResult<>(List.of(summary("exec-after", after, "FAILED")), 1L, 0, 50));
ExchangeMatchCondition condition = perExchangeCondition();
AlertRule rule = ruleWith(condition, Map.of(), created).withEvalState(Map.of()); // no cursor
EvalResult r = eval.evaluate(condition, rule,
new EvalContext("default", after.plusSeconds(10), new TickCache()));
SearchRequest req = cap.getValue();
assertThat(req.timeFrom()).isEqualTo(created);
assertThat(((EvalResult.Batch) r).firings()).hasSize(1);
}
@Test
void firstRun_clampsCursorToDeployBacklogCap_whenRuleCreatedLongAgo() {
// Rule created 7 days ago, cap default is 24h; expect timeFrom to be now - 24h, not rule.createdAt.
Instant now = Instant.parse("2026-04-22T12:00:00Z");
Instant createdLongAgo = now.minus(Duration.ofDays(7));
Instant expectedClampFloor = now.minusSeconds(86_400); // 24h
ArgumentCaptor<SearchRequest> cap = ArgumentCaptor.forClass(SearchRequest.class);
when(searchIndex.search(cap.capture())).thenReturn(new SearchResult<>(List.of(), 0L, 0, 50));
ExchangeMatchCondition condition = perExchangeCondition();
AlertRule rule = ruleWith(condition, Map.of(), createdLongAgo);
eval.evaluate(condition, rule, new EvalContext("default", now, new TickCache()));
SearchRequest req = cap.getValue();
assertThat(req.timeFrom()).isEqualTo(expectedClampFloor);
}
@Test
void firstRun_usesCreatedAt_whenWithinDeployBacklogCap() {
Instant now = Instant.parse("2026-04-22T12:00:00Z");
Instant createdRecent = now.minus(Duration.ofHours(1)); // 1h < 24h cap
ArgumentCaptor<SearchRequest> cap = ArgumentCaptor.forClass(SearchRequest.class);
when(searchIndex.search(cap.capture())).thenReturn(new SearchResult<>(List.of(), 0L, 0, 50));
ExchangeMatchCondition condition = perExchangeCondition();
AlertRule rule = ruleWith(condition, Map.of(), createdRecent);
eval.evaluate(condition, rule, new EvalContext("default", now, new TickCache()));
assertThat(cap.getValue().timeFrom()).isEqualTo(createdRecent);
}
@Test

View File

@@ -39,7 +39,7 @@ class NotificationContextBuilderTest {
case EXCHANGE_MATCH -> new ExchangeMatchCondition(
new AlertScope("my-app", "route-1", null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.PER_EXCHANGE, null, null, 30);
FireMode.PER_EXCHANGE, null, null);
case AGENT_STATE -> new AgentStateCondition(
new AlertScope(null, null, null),
"DEAD", 0);

View File

@@ -50,7 +50,7 @@ class WebhookDispatcherIT {
new ApacheOutboundHttpClientFactory(props, new SslContextBuilder()),
cipher,
new MustacheRenderer(),
new AlertingProperties(null, null, null, null, null, null, null, null, null, null, null, null, null),
new AlertingProperties(null, null, null, null, null, null, null, null, null, null, null, null, null, null),
new ObjectMapper()
);
}

View File

@@ -167,7 +167,7 @@ class AlertingRetentionJobIT extends AbstractPostgresIT {
// effectiveEventRetentionDays = 90, effectiveNotificationRetentionDays = 30
new com.cameleer.server.app.alerting.config.AlertingProperties(
null, null, null, null, null, null, null, null, null,
90, 30, null, null),
90, 30, null, null, null),
instanceRepo,
notificationRepo,
fixedClock);

View File

@@ -118,7 +118,7 @@ class ClickHouseSearchIndexIT {
void search_withNoFilters_returnsAllExecutions() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -130,7 +130,7 @@ class ClickHouseSearchIndexIT {
void search_byStatus_filtersCorrectly() {
SearchRequest request = new SearchRequest(
"FAILED", null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -145,7 +145,7 @@ class ClickHouseSearchIndexIT {
// Time window covering exec-1 and exec-2 but not exec-3
SearchRequest request = new SearchRequest(
null, baseTime, baseTime.plusMillis(1500), null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -158,7 +158,7 @@ class ClickHouseSearchIndexIT {
void search_fullTextSearch_findsInErrorMessage() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, "NullPointerException", null, null, null,
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -170,7 +170,7 @@ class ClickHouseSearchIndexIT {
void search_fullTextSearch_findsInInputBody() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, "12345", null, null, null,
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -182,7 +182,7 @@ class ClickHouseSearchIndexIT {
void search_textInBody_searchesProcessorBodies() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, "Hello World", null, null,
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -194,7 +194,7 @@ class ClickHouseSearchIndexIT {
void search_textInHeaders_searchesProcessorHeaders() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, "secret-token", null,
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -206,7 +206,7 @@ class ClickHouseSearchIndexIT {
void search_textInErrors_searchesErrorFields() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, "Foo.bar",
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -218,7 +218,7 @@ class ClickHouseSearchIndexIT {
void search_withHighlight_returnsSnippet() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, "NullPointerException", null, null, null,
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -230,7 +230,7 @@ class ClickHouseSearchIndexIT {
void search_pagination_works() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 2, null, null, null);
null, null, null, null, null, 0, 2, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -244,7 +244,7 @@ class ClickHouseSearchIndexIT {
void search_byApplication_filtersCorrectly() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, null,
null, null, null, "other-app", null, 0, 50, null, null, null);
null, null, null, "other-app", null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -256,7 +256,7 @@ class ClickHouseSearchIndexIT {
void search_byAgentIds_filtersCorrectly() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, null,
null, null, null, null, List.of("agent-b"), 0, 50, null, null, null);
null, null, null, null, List.of("agent-b"), 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -268,7 +268,7 @@ class ClickHouseSearchIndexIT {
void count_returnsMatchingCount() {
SearchRequest request = new SearchRequest(
"COMPLETED", null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
long count = searchIndex.count(request);
@@ -279,7 +279,7 @@ class ClickHouseSearchIndexIT {
void search_multipleStatusFilter_works() {
SearchRequest request = new SearchRequest(
"COMPLETED,FAILED", null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -290,7 +290,7 @@ class ClickHouseSearchIndexIT {
void search_byCorrelationId_filtersCorrectly() {
SearchRequest request = new SearchRequest(
null, null, null, null, null, "corr-1", null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -302,7 +302,7 @@ class ClickHouseSearchIndexIT {
void search_byDurationRange_filtersCorrectly() {
SearchRequest request = new SearchRequest(
null, null, null, 300L, 600L, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null);
null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request);

View File

@@ -157,7 +157,7 @@ class ClickHouseChunkPipelineIT {
null, null, null, null, null, null,
"ORD-123", null, null, null,
null, null, null, null, null,
0, 50, null, null, null));
0, 50, null, null, null, null));
assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1");
assertThat(result.data().get(0).status()).isEqualTo("COMPLETED");
@@ -168,7 +168,7 @@ class ClickHouseChunkPipelineIT {
null, null, null, null, null, null,
null, "ABC-123", null, null,
null, null, null, null, null,
0, 50, null, null, null));
0, 50, null, null, null, null));
assertThat(bodyResult.total()).isEqualTo(1);
// Verify iteration data in processor_executions

View File

@@ -35,4 +35,15 @@ public record AlertRule(
targets = targets == null ? List.of() : List.copyOf(targets);
evalState = evalState == null ? Map.of() : Map.copyOf(evalState);
}
public AlertRule withEvalState(Map<String, Object> newEvalState) {
return new AlertRule(
id, environmentId, name, description, severity, enabled,
conditionKind, condition, evaluationIntervalSeconds,
forDurationSeconds, reNotifyMinutes,
notificationTitleTmpl, notificationMessageTmpl,
webhooks, targets, nextEvaluationAt, claimedBy, claimedUntil,
newEvalState,
createdAt, createdBy, updatedAt, updatedBy);
}
}

View File

@@ -9,8 +9,7 @@ public record ExchangeMatchCondition(
ExchangeFilter filter,
FireMode fireMode,
Integer threshold, // required when COUNT_IN_WINDOW; null for PER_EXCHANGE
Integer windowSeconds, // required when COUNT_IN_WINDOW
Integer perExchangeLingerSeconds // required when PER_EXCHANGE
Integer windowSeconds // required when COUNT_IN_WINDOW
) implements AlertCondition {
public ExchangeMatchCondition {
@@ -18,8 +17,6 @@ public record ExchangeMatchCondition(
throw new IllegalArgumentException("fireMode is required (PER_EXCHANGE or COUNT_IN_WINDOW)");
if (fireMode == FireMode.COUNT_IN_WINDOW && (threshold == null || windowSeconds == null))
throw new IllegalArgumentException("COUNT_IN_WINDOW requires threshold + windowSeconds");
if (fireMode == FireMode.PER_EXCHANGE && perExchangeLingerSeconds == null)
throw new IllegalArgumentException("PER_EXCHANGE requires perExchangeLingerSeconds");
}
@Override

View File

@@ -9,26 +9,29 @@ import java.util.List;
* All filter fields are nullable/optional. When null, the filter is not applied.
* The compact constructor validates and normalizes pagination parameters.
*
* @param status execution status filter (COMPLETED, FAILED, RUNNING)
* @param timeFrom inclusive start of time range
* @param timeTo exclusive end of time range
* @param durationMin minimum duration in milliseconds (inclusive)
* @param durationMax maximum duration in milliseconds (inclusive)
* @param correlationId exact correlation ID match
* @param text global full-text search across all text fields
* @param textInBody full-text search scoped to exchange bodies
* @param textInHeaders full-text search scoped to exchange headers
* @param textInErrors full-text search scoped to error messages and stack traces
* @param routeId exact match on route_id
* @param instanceId exact match on instance_id
* @param processorType matches processor_types array via has()
* @param applicationId exact match on application_id
* @param instanceIds list of instance IDs for an IN clause (only set when drilling down to specific agents)
* @param offset pagination offset (0-based)
* @param limit page size (default 50, max 500)
* @param sortField column to sort by (default: startTime)
* @param sortDir sort direction: asc or desc (default: desc)
* @param environment optional environment filter (e.g. "dev", "staging", "prod")
* @param status execution status filter (COMPLETED, FAILED, RUNNING)
* @param timeFrom inclusive start of time range
* @param timeTo exclusive end of time range
* @param durationMin minimum duration in milliseconds (inclusive)
* @param durationMax maximum duration in milliseconds (inclusive)
* @param correlationId exact correlation ID match
* @param text global full-text search across all text fields
* @param textInBody full-text search scoped to exchange bodies
* @param textInHeaders full-text search scoped to exchange headers
* @param textInErrors full-text search scoped to error messages and stack traces
* @param routeId exact match on route_id
* @param instanceId exact match on instance_id
* @param processorType matches processor_types array via has()
* @param applicationId exact match on application_id
* @param instanceIds list of instance IDs for an IN clause (only set when drilling down to specific agents)
* @param offset pagination offset (0-based)
* @param limit page size (default 50, max 500)
* @param sortField column to sort by (default: startTime)
* @param sortDir sort direction: asc or desc (default: desc)
* @param afterExecutionId when combined with a non-null {@code timeFrom}, applies the composite predicate
* {@code (start_time > timeFrom) OR (start_time = timeFrom AND execution_id > afterExecutionId)}.
* When null, {@code timeFrom} is applied as a plain {@code >=} lower bound (existing behaviour).
* @param environment optional environment filter (e.g. "dev", "staging", "prod")
*/
public record SearchRequest(
String status,
@@ -50,6 +53,7 @@ public record SearchRequest(
int limit,
String sortField,
String sortDir,
String afterExecutionId,
String environment
) {
@@ -92,7 +96,7 @@ public record SearchRequest(
status, timeFrom, timeTo, durationMin, durationMax, correlationId,
text, textInBody, textInHeaders, textInErrors,
routeId, instanceId, processorType, applicationId, resolvedInstanceIds,
offset, limit, sortField, sortDir, environment
offset, limit, sortField, sortDir, afterExecutionId, environment
);
}
@@ -102,7 +106,23 @@ public record SearchRequest(
status, timeFrom, timeTo, durationMin, durationMax, correlationId,
text, textInBody, textInHeaders, textInErrors,
routeId, instanceId, processorType, applicationId, instanceIds,
offset, limit, sortField, sortDir, env
offset, limit, sortField, sortDir, afterExecutionId, env
);
}
/**
* Create a copy with a composite {@code (start_time, execution_id)} cursor.
* <p>
* The resulting request applies a strictly-after tuple predicate
* {@code (start_time > ts) OR (start_time = ts AND execution_id > afterExecutionId)},
* enabling exactly-once consumption of same-millisecond exchanges across scheduler ticks.
*/
public SearchRequest withCursor(Instant ts, String afterExecutionId) {
return new SearchRequest(
status, ts, timeTo, durationMin, durationMax, correlationId,
text, textInBody, textInHeaders, textInErrors,
routeId, instanceId, processorType, applicationId, instanceIds,
offset, limit, sortField, sortDir, afterExecutionId, environment
);
}
}

View File

@@ -28,7 +28,7 @@ class AlertConditionJsonTest {
var c = new ExchangeMatchCondition(
new AlertScope("orders", null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of("type","payment")),
FireMode.PER_EXCHANGE, null, null, 300);
FireMode.PER_EXCHANGE, null, null);
String json = om.writeValueAsString((AlertCondition) c);
AlertCondition parsed = om.readValue(json, AlertCondition.class);
assertThat(parsed).isInstanceOf(ExchangeMatchCondition.class);
@@ -39,7 +39,7 @@ class AlertConditionJsonTest {
var c = new ExchangeMatchCondition(
new AlertScope("orders", null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
FireMode.COUNT_IN_WINDOW, 5, 900, null);
FireMode.COUNT_IN_WINDOW, 5, 900);
AlertCondition parsed = om.readValue(om.writeValueAsString((AlertCondition) c), AlertCondition.class);
assertThat(((ExchangeMatchCondition) parsed).threshold()).isEqualTo(5);
}
@@ -49,7 +49,7 @@ class AlertConditionJsonTest {
assertThatThrownBy(() -> new ExchangeMatchCondition(
new AlertScope(null, null, null),
new ExchangeMatchCondition.ExchangeFilter("FAILED", Map.of()),
null, null, null, null))
null, null, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("fireMode");
}
@@ -63,8 +63,7 @@ class AlertConditionJsonTest {
"filter": {"status": "FAILED", "attributes": {}},
"fireMode": null,
"threshold": null,
"windowSeconds": null,
"perExchangeLingerSeconds": null
"windowSeconds": null
}
""";
assertThatThrownBy(() -> om.readValue(json, AlertCondition.class))

View File

@@ -36,25 +36,60 @@ Comparators: `GT`, `GTE`, `LT`, `LTE`, `EQ`.
### EXCHANGE_MATCH
Fires when the number of exchanges matching a filter exceeds a threshold.
Fires on exchanges matching a filter. Two firing modes — pick the one that matches your operational intent.
#### `fireMode: COUNT_IN_WINDOW`
One alert when the count of matching exchanges in a rolling window crosses a threshold. Aggregation-style: good for "more than 3 payment failures in 10 minutes."
```json
{
"name": "Failed payment exchanges",
"name": "Payment failures spike",
"severity": "WARNING",
"conditionKind": "EXCHANGE_MATCH",
"condition": {
"kind": "EXCHANGE_MATCH",
"scope": { "appSlug": "payment-service", "routeId": "processPayment" },
"filter": { "status": "FAILED", "attributes": { "payment.type": "card" } },
"fireMode": "AGGREGATE",
"fireMode": "COUNT_IN_WINDOW",
"threshold": 3,
"windowSeconds": 600
}
}
```
`fireMode`: `AGGREGATE` (one alert for the count) or `PER_EXCHANGE` (one alert per matching exchange).
#### `fireMode: PER_EXCHANGE`
One alert per distinct failed exchange — **exactly once**. Each failure produces its own `AlertInstance` and its own notification. The Inbox contains one row per failed exchange, never a duplicate, across ticks or process restarts. Good for "page me for every failed order regardless of rate."
```json
{
"name": "Any order failure",
"severity": "CRITICAL",
"conditionKind": "EXCHANGE_MATCH",
"condition": {
"kind": "EXCHANGE_MATCH",
"scope": { "appSlug": "orders-service" },
"filter": { "status": "FAILED" },
"fireMode": "PER_EXCHANGE"
}
}
```
PER_EXCHANGE rules have a tighter configurable surface — the server rejects non-coherent combinations at save time with 400:
| Field | PER_EXCHANGE | COUNT_IN_WINDOW |
|---|---|---|
| `threshold`, `windowSeconds` | must be absent / zero | required, positive |
| `reNotifyMinutes` | must be 0 (fires once; re-notify does not apply) | optional |
| `forDurationSeconds` | must be 0 | optional |
| `scope`, `filter`, `severity`, notification template, `webhooks` / `targets` | standard | standard |
Additionally, any rule (any `conditionKind`) with **both** empty `webhooks` and empty `targets` is rejected — a rule that notifies no one is always a misconfiguration.
**Exactly-once guarantee — scope.** One `AlertInstance` and one PENDING `AlertNotification` per exchange, survived across evaluator ticks and process restarts. HTTP webhook delivery is still at-least-once under transient failure; for Slack and similar, include `{{alert.id}}` in the message template so the consumer can dedup.
**First post-deploy tick — backlog cap.** A PER_EXCHANGE rule's first run (no persisted cursor yet) would otherwise scan from `rule.createdAt` forward, which can trigger a one-time notification flood for long-lived rules after a DB migration or schema reset. The server clamps the first-run scan to `max(rule.createdAt, now - deployBacklogCap)`. Default cap: 24 h. Tune via `cameleer.server.alerting.per-exchange-deploy-backlog-cap-seconds` (set to 0 to disable the clamp and replay from `createdAt`).
### AGENT_STATE

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,188 @@
# PER_EXCHANGE — Exactly-Once-Per-Exchange Alerting — Design
**Date:** 2026-04-22
**Scope:** alerting layer only (webhook-delivery-level idempotency is out of scope — see "Out of scope" below).
**Preceding context:** `.planning/sse-flakiness-diagnosis.md` (unrelated); `docs/superpowers/specs/2026-04-19-alerting-design.md` (foundational alerting design — original PER_EXCHANGE intent).
## Motivation
A user wants to create an alert rule of the shape "for every exchange that ends in FAILED status, notify Slack — exactly once per exchange." Exchanges are **terminal events**: once in FAILED state they never transition back, unlike agents which toggle between LIVE / STALE / DEAD. So "exactly once" is well-defined and achievable.
Today's PER_EXCHANGE mode partially supports this but has three gaps that, in combination, either miss exchanges or flood the Inbox with duplicates:
1. **The cursor is dead code.** `ExchangeMatchEvaluator.evaluatePerExchange` at `cameleer-server-app/.../eval/ExchangeMatchEvaluator.java:141` computes a `_nextCursor` and stamps it onto the **last firing's context map**, but `AlertEvaluatorJob.applyBatchFiring` (`:206-213`) never reads it, and `reschedule` (`:259`) calls `releaseClaim(rule.id(), nextRun, rule.evalState())` — passing the **original, unmodified** `evalState`. So `lastExchangeTs` is never persisted; every tick runs with `timeFrom = null` and re-scans ClickHouse from retention's beginning. The partial unique index on `alert_instances(rule_id, context->'exchange'->>'id')` silently de-dups duplicate *instances* on each tick — but `enqueueNotifications` in `applyBatchFiring` runs unconditionally on the returned row, so **every tick enqueues a fresh PENDING `AlertNotification` for every matching exchange already in retention**. The user-visible symptom is not "a same-millisecond collision"; it is "Slack gets re-spammed for every historical failed exchange on every tick." Same-millisecond collision is also real, but strictly subsumed once a working cursor exists.
2. Alert-instance writes, notification enqueues, and `evalState` cursor advance are not coupled transactionally. Once §1 is fixed and the cursor *does* advance, a crash between the instance write and the cursor persist would produce either silent data loss (cursor advanced, instances never persisted) or duplicate instances on recovery (instances persisted, cursor not advanced). §2 makes this atomic before the cursor goes live.
3. The rule-configuration surface for PER_EXCHANGE admits nonsensical combinations (`reNotifyMinutes > 0`, mandatory-but-unused `perExchangeLingerSeconds`, `forDurationSeconds`) — a user following the UI defaults can build a rule that re-notifies hourly even though they want one-shot semantics.
The product rules, agreed with the user:
- The AlertInstance **stays FIRING** until a human acks or resolves it. No auto-resolve sweep.
- The action (webhook) **fires exactly once** per AlertInstance.
- The Inbox **contains exactly one AlertInstance** per failed exchange — never a duplicate from cursor errors, tick re-runs, or process restarts.
"Exactly once" here is at the alerting layer — one AlertInstance, one PENDING AlertNotification per (instance × webhook binding). The HTTP dispatch that follows is still at-least-once on transient failures; that's a separate scope.
## Non-goals
- Auto-resolve after linger seconds. The existing spec reserved `perExchangeLingerSeconds` for this; we're explicitly dropping the field (unused + not desired).
- Resolve-on-delivery semantics. Alert stays FIRING until human intervention.
- Webhook-level idempotency / exactly-once HTTP delivery to Slack. Rare duplicate Slack messages on timeout retries are accepted; consumer-side dedup (via `alert.id` in the payload) is a template concern, not a server change.
- Any change to COUNT_IN_WINDOW mode.
- Backfilling duplicate instances already created by the existing broken cursor in any running environment. Pre-prod, manual cleanup if needed.
- **Sealed condition-type hierarchy.** A follow-up refactor could replace the current "one `AlertRule` with mode-gated fields" model with a sealed hierarchy (`PerExchangeCondition` / `CountInWindowCondition` / `AgentLifecycleCondition`) where each type carries only the knobs it supports. The UI already sharded condition forms under `ui/src/pages/Alerts/RuleEditor/condition-forms/` — the backend is the laggard. §3 here only patches the three known field conflicts; the structural cleanup is a separate phase (see §Follow-ups).
- **Throttle / coalesce primitive for PER_EXCHANGE.** njams bakes `throttling` + `throttlingEventCount` into its CEP queries ("fire once, count the rest in W seconds"). If operators later find the Inbox unwieldy during incident storms, a `coalesceSeconds` knob is the right cure — one FIRING per (rule × signature) per window, with `occurrenceCount` maintained on the instance. Explicitly parked; see §Follow-ups.
## Design
Four focused changes. Each is small on its own; together they make PER_EXCHANGE fire exactly once per failed exchange.
### 1. Composite cursor for cursor monotonicity
**Current.** `evalState.lastExchangeTs` is a single ISO-8601 string the evaluator *reads* as a lower bound, but **never writes**. The advance is computed (`latestTs = max(startTime)`) and attached to the last firing's context map as `_nextCursor`, then discarded by `applyBatchFiring``reschedule` passes the untouched `rule.evalState()` to `releaseClaim`. Net effect today: every tick queries ClickHouse with `timeFrom = null` (first-run path). The same-millisecond-collision bug described in the original spec assumes the cursor works; in practice the cursor has never worked. Fixing the advance is therefore both a correctness fix and a dead-code-elimination.
**New.** Replace with a composite cursor `(startTime, executionId)`, serialized as `"<ISO-8601 startTime>|<executionId>"` in `evalState.lastExchangeCursor`.
- Selection predicate: `(start_time > cursor.ts) OR (start_time = cursor.ts AND execution_id > cursor.id)`.
- This is trivially monotone: every consumed exchange is strictly-after the cursor in the composite ordering.
- Handles two exchanges at the exact same millisecond correctly — both are selected on their turn, neither re-selected.
- Uses the existing ClickHouse primary-key order `(tenant_id, start_time, …, execution_id)`, so the predicate is a range scan on the PK.
- Advance: set cursor to the `(startTime, executionId)` of the lexicographically-last row in the batch (last row when sorted by `(start_time asc, execution_id asc)`).
- First run (no cursor): today's behaviour is `cursor = null → timeFrom = null → unbounded scan of ClickHouse history` — any pre-existing FAILED exchange in retention would fire an alert on the first tick. That's broken and needs fixing. New rule: initialize `lastExchangeCursor` to `(rule.createdAt, "")` at rule creation time — so a PER_EXCHANGE rule only alerts on exchanges that fail *after* it was created. The empty-string `executionId` component is correct: any real execution_id sorts strictly after it lexicographically, so the very first matching exchange post-creation gets picked up on the first tick. No ambient lookback window, no retention dependency, no backlog flood.
- `evalState` schema change: retire the `lastExchangeTs` key, add `lastExchangeCursor`. Pre-prod; no migration needed. Readers that see neither key treat the rule as first-run.
**Affected files (scope estimate):**
- `ExchangeMatchEvaluator.evaluatePerExchange` — cursor parse/advance/selection.
- `SearchRequest` / `ClickHouseSearchIndex.search` — needs to accept the composite predicate. Option A: add an optional `afterExecutionId` param alongside `timeFrom`. Option B: introduce a dedicated `AfterCursor(ts, id)` type. Plan phase picks one — A is simpler.
- `evalState` JSON schema (documented in alerting spec).
### 2. Transactional coupling of instance writes + cursor advance
This section presumes §1 is landed — once the cursor actually advances, we need the advance and the instance writes to be atomic.
**Current (post-§1).** Per tick for a PER_EXCHANGE rule:
1. `applyResult` iterates the `EvalResult.Batch` firings and calls `applyBatchFiring` for each — one `AlertInstance` save + `enqueueNotifications` per firing, each its own transaction (or auto-commit).
2. After the rule loop, `reschedule(rule, nextRun)` saves the updated `evalState` + `nextRunAt` in a separate write.
Crash anywhere between steps 1 and 2 (or partway through the loop in step 1) produces one of two inconsistent states:
- Instances saved but cursor not advanced → next tick duplicates them.
- Cursor advanced but no instances saved → those exchanges never alerted.
(Note: today, pre-§1, the "cursor never advances" bug means only the first failure mode ever occurs. §2 prevents the second from appearing once §1 is live.)
**New.** Wrap the whole Batch-result processing for a single rule in one `TransactionTemplate.execute(...)`:
```
TX {
persist all AlertInstances for the batch
insert all PENDING AlertNotifications for those instances
update rule: evalState.lastExchangeCursor + nextRunAt
}
```
Commit: all three land atomically. Rollback: none do, and the rule stays claimed-but-cursor-unchanged so the next tick re-processes the same exchanges. Combined with the monotone cursor from §1, that gives exactly-once instance creation: if a batch half-succeeded and rolled back, the second attempt starts from the same cursor and produces the same set.
Notification dispatch (`NotificationDispatchJob` picking up PENDING rows) happens outside the transaction on its own schedule — webhook I/O must never hold a DB transaction open.
**Affected files (scope estimate):**
- `AlertEvaluatorJob.applyResult` + `applyBatchFiring` — fold into one transactional block when the result is a `Batch`.
- No change to the COUNT_IN_WINDOW path (`applyResult` for non-Batch results keeps its current semantics).
- `PostgresAlertInstanceRepository` / `PostgresAlertNotificationRepository` / `PostgresAlertRuleRepository` — existing methods usable from inside a transaction; verify no implicit auto-commit.
### 3. Config hygiene — enforce a coherent PER_EXCHANGE rule shape
Three knobs on the rule are wrong for PER_EXCHANGE and trap the user into buggy configurations.
| Knob | Current state | New state for PER_EXCHANGE |
|---|---|---|
| `reNotifyMinutes` | Default 60 in UI; re-notify sweep fires every N min while FIRING | **Must be `0`.** API 400s if non-zero. UI forces to `0` and disables the input with tooltip "Per-exchange rules fire exactly once — re-notify does not apply." |
| `perExchangeLingerSeconds` | Validated as required by `ExchangeMatchCondition` compact ctor; unused anywhere in the code | **Removed.** Drop the field entirely — from the record, the compact-ctor validation, `AlertRuleRequest` DTO, form state, UI. Pre-prod; no shim. |
| `forDurationSeconds` | Applied by the state machine in the COUNT_IN_WINDOW / agent-lifecycle path | **Must be `0`/null** for PER_EXCHANGE. 400 on save if non-zero. UI hides the field when PER_EXCHANGE is selected. Evaluator path already ignores it for `Batch` results, so this is a contract-tightening at the API edge only. |
Net effect: a PER_EXCHANGE rule's configurable surface becomes exactly `{scope, filter, severity, notification title/message, webhooks, targets}`. The user can't express an inconsistent combination.
**Mode-toggle state hygiene (UX).** When the user flips `fireMode` PER_EXCHANGE ↔ COUNT_IN_WINDOW inside `ExchangeMatchForm`, the form state for the *other* mode's fields must be cleared — not just hidden. The njams Server_4 frontend mode-gates fields via `*ngIf` and silently retains stale values behind the toggle (`src/app/rules/.../rule-view.component.html:96112`), which produces save-time surprises. In `form-state.ts` the `setFireMode` reducer must reset the fields that are no longer in scope for the new mode (to their type-appropriate zero, not to undefined — the record compact-ctor still runs). That keeps the API-layer cross-field validator (400-on-save) and the form shape permanently consistent.
**Builder-UX lessons worth adopting (tiny, in-scope).**
- **Disabled "Add" gating.** `AlertRuleController` accepts `webhooks: []` and `targets: []` as valid, which lets the user save a rule that never notifies anyone. The form already splits by step; the Notify step's "Add webhook" button should stay enabled, but the wizard's "Save rule" in `ReviewStep.tsx` should block-with-reason if `webhooks.length === 0 && targets.length === 0`. njams's pattern of disabling "Add X" until the last row is complete (`rule-view.component.ts:3845`) is the right shape.
- **Preserve `/test-evaluate` and `/render-preview`.** `AlertRuleController` exposes POST `{id}/test-evaluate` and `{id}/render-preview`; the wizard should surface at least render-preview in `ReviewStep.tsx` before save. njams ships no in-builder preview and operators compensate with trial-and-error creation. We already have the endpoints; not wiring them up would be leaving value on the floor.
**Affected files (scope estimate):**
- `ExchangeMatchCondition` — remove `perExchangeLingerSeconds`.
- `AlertRuleController` / `AlertRuleRequest` — cross-field validation (reNotify + forDuration vs fireMode; empty webhooks+targets).
- `ui/src/pages/Alerts/RuleEditor/condition-forms/ExchangeMatchForm.tsx` + `form-state.ts` — clear fields on mode toggle; disable reNotify + hide forDuration when PER_EXCHANGE; remove the linger field.
- `ui/src/pages/Alerts/RuleEditor/ReviewStep.tsx` — block save on empty webhooks+targets; render-preview pane.
- Tests (§4).
### 4. Tests that lock the guarantees
Six scenarios: four on the correctness core, one red-test that reproduces today's actual bleed (turns green on fix), one on the builder-UX state-clearing contract.
**Test 1 — cursor monotonicity** (`ExchangeMatchEvaluatorTest`, unit)
- Seed two FAILED executions with identical `start_time`, different `executionId`.
- Tick 1: both fire, batch of 2.
- Tick 2: neither fires.
- Seed a third at the same timestamp. Tick 3: that third only.
**Test 2 — tick atomicity** (`AlertEvaluatorJobIT`, integration with real Postgres)
- Seed 3 FAILED executions. Inject a fault on the second notification-insert.
- Tick → transaction rolls back: 0 AlertInstances, cursor unchanged, rule `nextRunAt` unchanged.
- Remove fault, tick again: 3 AlertInstances + 3 PENDING notifications, cursor advanced.
**Test 3 — full-lifecycle exactly-once** (extends `AlertingFullLifecycleIT`)
- PER_EXCHANGE rule, dummy webhook.
- Seed 5 FAILED executions across two ticks (3 + 2). After both ticks: exactly 5 FIRING AlertInstances, exactly 5 PENDING notifications.
- Third tick with no new executions: zero new instances, zero new notifications.
- Ack one instance: other four unchanged.
- Additionally: POST a PER_EXCHANGE rule with `reNotifyMinutes=60` via the controller → expect 400.
- Additionally: POST a PER_EXCHANGE rule with `forDurationSeconds=60` → expect 400.
**Test 4 — first-run uses rule creation time, not unbounded history** (unit, in `ExchangeMatchEvaluatorTest`)
- Seed 2 FAILED executions dated *before* rule creation, 1 *after*.
- Evaluate a freshly-created PER_EXCHANGE rule whose `evalState` is empty.
- Expect: exactly 1 firing (the one after creation). The pre-creation ones must not appear in the batch.
**Test 5 — pre-fix regression reproducer: notifications do not re-enqueue for already-matched exchanges** (integration, `AlertEvaluatorJobIT`)
- Seed 2 FAILED executions. Tick 1 → 2 FIRING instances, 2 PENDING notifications. Dispatcher drains them → 2 DELIVERED.
- Tick 2 with **no new executions**: expect zero new PENDING notifications. (Today, without the §1+§2 fix, tick 2 re-enqueues both. This test should be written red-first against `main`, then go green when the cursor is actually persisted.)
- This test directly pins the bug the original spec text understated: instance-level dedup via the unique index is already working; notification-level dedup is what's broken.
**Test 6 — form state clears on fireMode toggle** (unit, Vitest, `condition-forms/ExchangeMatchForm.test.tsx`)
- Build an initial form state with `fireMode=COUNT_IN_WINDOW, threshold=5, windowSeconds=300`.
- Dispatch `setFireMode(PER_EXCHANGE)`.
- Expect: `threshold` and `windowSeconds` are cleared to their zero-values (not merely hidden), and the record compact-ctor doesn't throw when `form-state.ts` rebuilds the condition object.
- Dispatch `setFireMode(COUNT_IN_WINDOW)` — expect threshold/window come back as defaults, not as stale values.
Plus a small unit test on the new cross-field validator to isolate its logic from the IT setup.
## Out of scope
- **Webhook-level idempotency.** `WebhookDispatcher` still retries on 5xx / network / timeout. For Slack, that means a timeout mid-POST can produce a duplicate channel message. The consumer-side fix is to include a stable ID (e.g. `{{alert.id}}`) in the message template and drop duplicates on Slack's side — doable today via the existing Mustache editor, no server change. If in the future we want strict exactly-once HTTP delivery, that's a separate design.
- **Auto-resolve of PER_EXCHANGE instances.** Alerts stay FIRING until humans intervene. If operational experience shows the Inbox gets unwieldy, a later phase can add a manual "resolve all" bulk action or an opt-in TTL sweep.
- **Rule-level dedup of identical alerts in a short window** (e.g. "same failure signature fires twice in 5 s"). Out of scope; every failed exchange is its own event by design.
- **COUNT_IN_WINDOW changes.** Untouched.
- **Migration of existing PER_EXCHANGE rules.** Pre-prod; any existing rule using the retired `perExchangeLingerSeconds` field gets the value silently dropped by the API's unknown-property handling on next PUT, or rejected on create (new shape). If needed, a one-shot cleanup is easier than a shim.
## Risks
- **ClickHouse predicate performance.** The composite predicate `(start_time > ? OR (start_time = ? AND execution_id > ?))` must hit the PK range efficiently. The table PK is `(tenant_id, start_time, environment, application_id, route_id, execution_id)`, so the OR-form should be fine, but we'll verify with `EXPLAIN PIPELINE` against the IT container during plan-phase. Fallback: `(start_time, execution_id)` tuple comparison if CH has native support (`(start_time, execution_id) > (?, ?)`), which it does in recent versions.
- **Transaction size.** A single tick caps at `limit = 50` matches (existing behaviour), so the transaction holds at most 50 AlertInstance + 50 AlertNotification writes + 1 rule update. Well within safe bounds.
- **Cursor format churn.** Dropping `lastExchangeTs` in favour of `lastExchangeCursor` is a one-line `evalState` JSON change. Pre-prod; no shim needed. In practice the churn is even more benign than it looks: today no rule has ever persisted a `lastExchangeTs` value (the advance path is dead code — see §1 Current.), so every existing PER_EXCHANGE rule will hit the first-run path `(rule.createdAt, "")` on first post-deploy tick. Side effect: on deploy, long-standing PER_EXCHANGE rules will immediately scan from `rule.createdAt` forward and enqueue notifications for every FAILED exchange in retention that matches. This is a **one-time backlog flood** proportional to ClickHouse retention × failure rate × number of PER_EXCHANGE rules. For pre-prod with small history this is tolerable; if a rule was created years ago on a real environment, bound the first-run scan by clamping initial cursor to `max(rule.createdAt, now() - deployBacklogCap)` where `deployBacklogCap` is a config (default 24 h). Call this out explicitly in the plan-phase so deployment order is "deploy first, then create rules" or "accept the one-time flood."
## Follow-ups (parked — separate phases)
Explicit list of ideas that are valuable but deliberately not in this spec's scope.
1. **Sealed condition-type hierarchy (backend).** Replace `AlertRule` + `fireMode` field with a sealed `Condition` hierarchy where each type carries only its own knobs. The UI is already sharded (`condition-forms/*Form.tsx`); the backend would follow. Biggest win: kills the whole "mode-gated field" class of bug at the record level, so cross-field validators become compact-ctor invariants instead of controller-layer glue. Estimated scope: medium (DTO migration, Jackson polymorphism, request compatibility). Trigger: when a 4th condition kind lands or when the next "silently-ignored field" bug surfaces.
2. **`coalesceSeconds` primitive on PER_EXCHANGE.** "One FIRING per (rule × signature) per window; attach occurrenceCount." Addresses the Inbox-flood scenario during incident storms without breaking the exactly-once-per-exchange guarantee for the default case. njams bakes this into its CEP template as `throttling` + `throttlingEventCount`; we'd express it as a post-match coalescer on the AlertInstance write path. Trigger: first operator complaint about Inbox volume during a real incident, or when we onboard a tenant with >100 failed exchanges/min.
3. **Cross-phase: ingestion-time rule-matching.** Today's tick+cursor model is correct but latency-bound to `evaluationIntervalSeconds`. A streaming path (agent → ClickHouse ingest → publish → rule matcher) would drop alert latency to seconds. Not needed today; flagged because the spec's design explicitly chooses batch over streaming and future requirements may flip that.
## Verification
- `mvn -pl cameleer-server-app -am -Dit.test='ExchangeMatchEvaluatorTest,AlertEvaluatorJobIT,AlertingFullLifecycleIT,AlertRuleControllerIT' ... verify` → 0 failures.
- Manual: create a PER_EXCHANGE / FAILED rule via UI. Verify `reNotifyMinutes` is fixed at 0 and disabled; verify the linger field is gone; verify toggling `fireMode` clears COUNT_IN_WINDOW-specific fields. Produce failing exchanges. Verify Inbox shows one instance per exchange, Slack gets exactly one message each. **Wait three evaluation-interval ticks with no new exchanges**; verify no additional notifications arrive (the pre-fix bleed). Ack one instance. Produce another failure. Verify only the new one appears. Save a rule with empty webhooks+targets → expect blocked at the Review step with a reason shown.

File diff suppressed because one or more lines are too long

View File

@@ -2334,8 +2334,6 @@ export interface components {
threshold?: number;
/** Format: int32 */
windowSeconds?: number;
/** Format: int32 */
perExchangeLingerSeconds?: number;
/** @enum {string} */
readonly kind?: "ROUTE_METRIC" | "EXCHANGE_MATCH" | "AGENT_STATE" | "AGENT_LIFECYCLE" | "DEPLOYMENT_STATE" | "LOG_PATTERN" | "JVM_METRIC";
});
@@ -2636,6 +2634,7 @@ export interface components {
limit?: number;
sortField?: string;
sortDir?: string;
afterExecutionId?: string;
environment?: string;
};
ExecutionSummary: {

View File

@@ -17,11 +17,10 @@ export function ConditionStep({ form, setForm }: { form: FormState; setForm: (f:
const prev = form.condition as Record<string, unknown>;
const base: Record<string, unknown> = { kind, scope: prev.scope };
// EXCHANGE_MATCH must carry a fireMode — the backend ctor now rejects null.
// Seed PER_EXCHANGE + 300s linger so a user can save without touching every
// sub-field (matches what the form's Select displays by default).
// Seed PER_EXCHANGE so a user can save without touching every sub-field
// (matches what the form's Select displays by default).
if (kind === 'EXCHANGE_MATCH') {
base.fireMode = 'PER_EXCHANGE';
base.perExchangeLingerSeconds = 300;
base.filter = {};
}
if (kind === 'AGENT_LIFECYCLE') {

View File

@@ -1,16 +1,68 @@
import { Toggle } from '@cameleer/design-system';
import { useMemo, useState } from 'react';
import { Alert, Button, Toggle } from '@cameleer/design-system';
import { useRenderPreview } from '../../../api/queries/alertRules';
import { describeApiError } from '../../../api/errors';
import { toRequest, type FormState } from './form-state';
/**
* Pure helper: returns a human-readable reason why saving should be blocked,
* or null when the rule is safe to save (from the wizard's perspective).
*
* Currently covers: a rule with no webhooks AND no targets would be rejected
* at the server edge (Task 3.3 validator) and would never notify anyone, so
* the wizard blocks it earlier with a clear reason. The helper is exported
* so `RuleEditorWizard` can also drive the Save button's `disabled` state
* off the same single source of truth.
*/
export function computeSaveBlockReason(form: FormState): string | null {
const noWebhooks = (form.webhooks ?? []).length === 0;
const noTargets = (form.targets ?? []).length === 0;
if (noWebhooks && noTargets) {
return 'Add at least one webhook or target \u2014 a rule with no recipients never notifies anyone.';
}
return null;
}
export function ReviewStep({
form,
setForm,
ruleId,
}: {
form: FormState;
setForm?: (f: FormState) => void;
/**
* Present only in edit mode. When absent the notification-preview button is
* hidden, because the backend `/render-preview` endpoint is id-bound and
* has no stateless variant — rendering against an unsaved draft would
* require a new endpoint and is explicitly out of scope here.
*/
ruleId?: string;
}) {
const req = toRequest(form);
const saveBlockReason = useMemo(() => computeSaveBlockReason(form), [form]);
const previewMutation = useRenderPreview();
const [preview, setPreview] = useState<{ title: string; message: string } | null>(null);
const [previewError, setPreviewError] = useState<string | null>(null);
const onPreview = async () => {
setPreviewError(null);
try {
const res = await previewMutation.mutateAsync({ id: ruleId!, req: {} });
setPreview({ title: res.title ?? '', message: res.message ?? '' });
} catch (e) {
setPreview(null);
setPreviewError(describeApiError(e));
}
};
return (
<div style={{ display: 'grid', gap: 12, maxWidth: 720 }}>
{saveBlockReason && (
<Alert variant="error" title="Rule cannot be saved yet">
{saveBlockReason}
</Alert>
)}
<div>
<strong>Name:</strong> {form.name}
</div>
@@ -43,6 +95,57 @@ export function ReviewStep({
/>
</div>
)}
{ruleId && (
<div style={{ display: 'grid', gap: 8 }}>
<div>
<Button
variant="secondary"
onClick={onPreview}
disabled={previewMutation.isPending}
>
{previewMutation.isPending ? 'Rendering\u2026' : 'Preview notification'}
</Button>
</div>
{previewError && (
<Alert variant="error" title="Preview failed">
{previewError}
</Alert>
)}
{preview && (
<div
aria-label="Rendered notification preview"
style={{
border: '1px solid var(--border)',
borderRadius: 8,
padding: 12,
background: 'var(--surface-raised, var(--surface))',
display: 'grid',
gap: 6,
}}
>
<div style={{ fontSize: 11, color: 'var(--text-muted)' }}>
Rendered notification preview
</div>
<div style={{ fontWeight: 600, fontSize: 14 }}>
{preview.title || <em style={{ color: 'var(--text-muted)' }}>(empty title)</em>}
</div>
<pre
style={{
margin: 0,
fontSize: 12,
fontFamily: 'inherit',
whiteSpace: 'pre-wrap',
color: 'var(--text)',
}}
>
{preview.message || '(empty message)'}
</pre>
</div>
)}
</div>
)}
<details>
<summary>Raw request JSON</summary>
<pre

View File

@@ -19,7 +19,7 @@ import { ScopeStep } from './ScopeStep';
import { ConditionStep } from './ConditionStep';
import { TriggerStep } from './TriggerStep';
import { NotifyStep } from './NotifyStep';
import { ReviewStep } from './ReviewStep';
import { ReviewStep, computeSaveBlockReason } from './ReviewStep';
import { prefillFromPromotion, type PrefillWarning } from './promotion-prefill';
import { useCatalog } from '../../../api/queries/catalog';
import { useOutboundConnections } from '../../../api/queries/admin/outboundConnections';
@@ -143,7 +143,7 @@ export default function RuleEditorWizard() {
) : step === 'notify' ? (
<NotifyStep form={form} setForm={setForm} ruleId={id} />
) : (
<ReviewStep form={form} setForm={setForm} />
<ReviewStep form={form} setForm={setForm} ruleId={id} />
);
return (
@@ -194,7 +194,11 @@ export default function RuleEditorWizard() {
Next
</Button>
) : (
<Button variant="primary" onClick={onSave} disabled={create.isPending || update.isPending}>
<Button
variant="primary"
onClick={onSave}
disabled={create.isPending || update.isPending || computeSaveBlockReason(form) !== null}
>
{isEdit ? 'Save changes' : 'Create rule'}
</Button>
)}

View File

@@ -17,6 +17,13 @@ export function TriggerStep({
const { toast } = useToast();
const [lastResult, setLastResult] = useState<string | null>(null);
// PER_EXCHANGE fires exactly once per exchange (Task 3.3 server validator
// rejects non-zero reNotifyMinutes / forDurationSeconds for this mode).
// Gate the two inputs here so users can't type values the server will reject.
const isPerExchange =
form.conditionKind === 'EXCHANGE_MATCH' &&
(form.condition as Record<string, unknown>).fireMode === 'PER_EXCHANGE';
const onTest = async () => {
if (!ruleId) {
toast({ title: 'Save rule first to run test evaluate', variant: 'error' });
@@ -40,20 +47,28 @@ export function TriggerStep({
onChange={(e) => setForm({ ...form, evaluationIntervalSeconds: Number(e.target.value) })}
/>
</FormField>
<FormField label="For-duration before firing (seconds, 0 = fire immediately)">
<Input
type="number"
min={0}
value={form.forDurationSeconds}
onChange={(e) => setForm({ ...form, forDurationSeconds: Number(e.target.value) })}
/>
</FormField>
{!isPerExchange && (
<FormField label="For-duration before firing (seconds, 0 = fire immediately)">
<Input
type="number"
min={0}
value={form.forDurationSeconds}
onChange={(e) => setForm({ ...form, forDurationSeconds: Number(e.target.value) })}
/>
</FormField>
)}
<FormField label="Re-notify cadence (minutes, 0 = notify once)">
<Input
type="number"
min={0}
value={form.reNotifyMinutes}
onChange={(e) => setForm({ ...form, reNotifyMinutes: Number(e.target.value) })}
disabled={isPerExchange}
title={
isPerExchange
? 'Per-exchange rules fire exactly once per exchange — re-notify does not apply.'
: undefined
}
/>
</FormField>
<div>

View File

@@ -1,6 +1,7 @@
import { FormField, Input, Select } from '@cameleer/design-system';
import type { FormState } from '../form-state';
import { EXCHANGE_FIRE_MODE_OPTIONS } from '../../enums';
import { applyFireModeChange } from '../form-state';
import { EXCHANGE_FIRE_MODE_OPTIONS, type ExchangeFireMode } from '../../enums';
// ExchangeFilter.status is typed as `String` on the backend (no @Schema
// allowableValues yet) so options stay hand-typed. Follow-up: annotate the
@@ -23,7 +24,7 @@ export function ExchangeMatchForm({ form, setForm }: { form: FormState; setForm:
<FormField label="Fire mode">
<Select
value={(c.fireMode as string) ?? 'PER_EXCHANGE'}
onChange={(e) => patch({ fireMode: e.target.value })}
onChange={(e) => setForm(applyFireModeChange(form, e.target.value as ExchangeFireMode))}
options={EXCHANGE_FIRE_MODE_OPTIONS}
/>
</FormField>
@@ -34,15 +35,6 @@ export function ExchangeMatchForm({ form, setForm }: { form: FormState; setForm:
options={STATUSES}
/>
</FormField>
{c.fireMode === 'PER_EXCHANGE' && (
<FormField label="Linger seconds (default 300)">
<Input
type="number"
value={(c.perExchangeLingerSeconds as number | undefined) ?? 300}
onChange={(e) => patch({ perExchangeLingerSeconds: Number(e.target.value) })}
/>
</FormField>
)}
{c.fireMode === 'COUNT_IN_WINDOW' && (
<>
<FormField label="Threshold (matches)">

View File

@@ -1,5 +1,9 @@
import { describe, it, expect } from 'vitest';
import { initialForm, toRequest, validateStep } from './form-state';
// `applyFireModeChange` will be introduced by Task 4.3 as a pure helper in
// `form-state.ts` so that the ExchangeMatchForm can reuse it on mode-toggle
// and guarantee state hygiene (no stale COUNT_IN_WINDOW fields leaking into a
// PER_EXCHANGE rule payload and vice-versa). These tests pin the contract.
import { applyFireModeChange, initialForm, toRequest, validateStep } from './form-state';
describe('initialForm', () => {
it('defaults to env-wide ROUTE_METRIC with safe intervals', () => {
@@ -69,15 +73,90 @@ describe('validateStep', () => {
expect(errs.some((e) => /Window/.test(e))).toBe(true);
});
it('passes when EXCHANGE_MATCH PER_EXCHANGE has linger seconds', () => {
it('passes when EXCHANGE_MATCH PER_EXCHANGE has a fire mode', () => {
const f = initialForm();
f.conditionKind = 'EXCHANGE_MATCH';
f.condition = {
kind: 'EXCHANGE_MATCH',
scope: {},
fireMode: 'PER_EXCHANGE',
perExchangeLingerSeconds: 300,
} as unknown as typeof f.condition;
expect(validateStep('condition', f)).toEqual([]);
});
});
// ----------------------------------------------------------------------------
// Task 4.2 RED: fire-mode toggle state hygiene
//
// `applyFireModeChange(form, newMode)` is the pure helper (Task 4.3) that the
// ExchangeMatchForm's Fire-mode <Select> will call in place of the current
// raw `patch({ fireMode })`. It must guarantee:
//
// - Switching to PER_EXCHANGE clears COUNT_IN_WINDOW-only fields
// (condition.threshold, condition.windowSeconds) AND forces
// top-level reNotifyMinutes = 0, forDurationSeconds = 0. PER_EXCHANGE
// is exactly-once-per-exchange — re-notify cadence and hold-duration
// are semantically meaningless and must not leak into toRequest().
// - Switching back to COUNT_IN_WINDOW resets to defaults (zero), never
// restoring stale values from the previous COUNT_IN_WINDOW session.
//
// These tests will fail until Task 4.3 introduces the helper.
// ----------------------------------------------------------------------------
describe('applyFireModeChange (fire-mode toggle hygiene)', () => {
const exchangeMatchForm = () => {
const f = initialForm();
f.conditionKind = 'EXCHANGE_MATCH';
f.condition = {
kind: 'EXCHANGE_MATCH',
scope: {},
fireMode: 'COUNT_IN_WINDOW',
} as unknown as typeof f.condition;
return f;
};
it('clears COUNT_IN_WINDOW fields when switching to PER_EXCHANGE', () => {
let f = exchangeMatchForm();
// Simulate a user who filled in COUNT_IN_WINDOW fields first.
f.condition = {
...(f.condition as Record<string, unknown>),
threshold: 5,
windowSeconds: 300,
} as typeof f.condition;
f = applyFireModeChange(f, 'PER_EXCHANGE');
const c = f.condition as Record<string, unknown>;
expect(c.fireMode).toBe('PER_EXCHANGE');
expect(c.threshold ?? 0).toBe(0);
expect(c.windowSeconds ?? 0).toBe(0);
});
it('resets to defaults (not stale values) when switching back to COUNT_IN_WINDOW', () => {
let f = exchangeMatchForm();
f.condition = {
...(f.condition as Record<string, unknown>),
threshold: 5,
windowSeconds: 300,
} as typeof f.condition;
f = applyFireModeChange(f, 'PER_EXCHANGE');
f = applyFireModeChange(f, 'COUNT_IN_WINDOW');
const c = f.condition as Record<string, unknown>;
expect(c.fireMode).toBe('COUNT_IN_WINDOW');
// Must be fresh defaults, not the stale 5 / 300 the user typed before.
expect(c.threshold ?? 0).toBe(0);
expect(c.windowSeconds ?? 0).toBe(0);
});
it('forces reNotifyMinutes=0 and forDurationSeconds=0 when switching to PER_EXCHANGE', () => {
let f = exchangeMatchForm();
f.reNotifyMinutes = 60;
f.forDurationSeconds = 120;
f = applyFireModeChange(f, 'PER_EXCHANGE');
expect(f.reNotifyMinutes).toBe(0);
expect(f.forDurationSeconds).toBe(0);
});
});

View File

@@ -3,7 +3,7 @@ import type {
AlertRuleResponse,
AlertCondition,
} from '../../../api/queries/alertRules';
import type { ConditionKind, Severity, TargetKind } from '../enums';
import type { ConditionKind, ExchangeFireMode, Severity, TargetKind } from '../enums';
export type WizardStep = 'scope' | 'condition' | 'trigger' | 'notify' | 'review';
export const WIZARD_STEPS: WizardStep[] = ['scope', 'condition', 'trigger', 'notify', 'review'];
@@ -137,6 +137,38 @@ export function toRequest(f: FormState): AlertRuleRequest {
} as AlertRuleRequest;
}
/**
* Pure helper for the ExchangeMatchForm's Fire-mode <Select>. Guarantees state
* hygiene across toggles:
*
* - Switching to PER_EXCHANGE clears COUNT_IN_WINDOW-only condition fields
* (threshold, windowSeconds) AND forces top-level reNotifyMinutes = 0
* and forDurationSeconds = 0 — PER_EXCHANGE is exactly-once-per-exchange,
* so re-notify cadence and hold-duration are meaningless and must not leak
* into toRequest().
* - Switching back to COUNT_IN_WINDOW resets threshold/windowSeconds to 0
* (never restoring stale values from the previous COUNT_IN_WINDOW session).
*
* No-op for non-EXCHANGE_MATCH conditions. Returns a new form object.
*/
export function applyFireModeChange(form: FormState, newMode: ExchangeFireMode): FormState {
const c = form.condition as Record<string, unknown>;
if (c.kind !== 'EXCHANGE_MATCH') return form;
const base: FormState = {
...form,
condition: {
...c,
fireMode: newMode,
threshold: 0,
windowSeconds: 0,
} as FormState['condition'],
};
if (newMode === 'PER_EXCHANGE') {
return { ...base, reNotifyMinutes: 0, forDurationSeconds: 0 };
}
return base;
}
export function validateStep(step: WizardStep, f: FormState): string[] {
const errs: string[] = [];
if (step === 'scope') {
@@ -151,10 +183,6 @@ export function validateStep(step: WizardStep, f: FormState): string[] {
const c = f.condition as Record<string, unknown>;
if (!c.fireMode) {
errs.push('Fire mode is required.');
} else if (c.fireMode === 'PER_EXCHANGE') {
if (c.perExchangeLingerSeconds == null) {
errs.push('Linger seconds is required for PER_EXCHANGE.');
}
} else if (c.fireMode === 'COUNT_IN_WINDOW') {
if (c.threshold == null) errs.push('Threshold is required for COUNT_IN_WINDOW.');
if (c.windowSeconds == null) errs.push('Window (seconds) is required for COUNT_IN_WINDOW.');