Compare commits
10 Commits
c79a6234af
...
aa9e93369f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aa9e93369f | ||
|
|
b0ba08e572 | ||
|
|
2c82b50ea2 | ||
|
|
7e79ff4d98 | ||
|
|
424894a3e2 | ||
|
|
d74079da63 | ||
|
|
3f036da03d | ||
|
|
8bf45d5456 | ||
|
|
f1abca3a45 | ||
|
|
144915563c |
12
AGENTS.md
12
AGENTS.md
@@ -1,7 +1,7 @@
|
||||
<!-- gitnexus:start -->
|
||||
# GitNexus — Code Intelligence
|
||||
|
||||
This project is indexed by GitNexus as **cameleer-server** (6306 symbols, 15892 relationships, 300 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
|
||||
This project is indexed by GitNexus as **alerting-02** (7810 symbols, 20082 relationships, 300 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
|
||||
|
||||
> If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first.
|
||||
|
||||
@@ -17,7 +17,7 @@ This project is indexed by GitNexus as **cameleer-server** (6306 symbols, 15892
|
||||
|
||||
1. `gitnexus_query({query: "<error or symptom>"})` — find execution flows related to the issue
|
||||
2. `gitnexus_context({name: "<suspect function>"})` — see all callers, callees, and process participation
|
||||
3. `READ gitnexus://repo/cameleer-server/process/{processName}` — trace the full execution flow step by step
|
||||
3. `READ gitnexus://repo/alerting-02/process/{processName}` — trace the full execution flow step by step
|
||||
4. For regressions: `gitnexus_detect_changes({scope: "compare", base_ref: "main"})` — see what your branch changed
|
||||
|
||||
## When Refactoring
|
||||
@@ -56,10 +56,10 @@ This project is indexed by GitNexus as **cameleer-server** (6306 symbols, 15892
|
||||
|
||||
| Resource | Use for |
|
||||
|----------|---------|
|
||||
| `gitnexus://repo/cameleer-server/context` | Codebase overview, check index freshness |
|
||||
| `gitnexus://repo/cameleer-server/clusters` | All functional areas |
|
||||
| `gitnexus://repo/cameleer-server/processes` | All execution flows |
|
||||
| `gitnexus://repo/cameleer-server/process/{name}` | Step-by-step execution trace |
|
||||
| `gitnexus://repo/alerting-02/context` | Codebase overview, check index freshness |
|
||||
| `gitnexus://repo/alerting-02/clusters` | All functional areas |
|
||||
| `gitnexus://repo/alerting-02/processes` | All execution flows |
|
||||
| `gitnexus://repo/alerting-02/process/{name}` | Step-by-step execution trace |
|
||||
|
||||
## Self-Check Before Finishing
|
||||
|
||||
|
||||
15
CLAUDE.md
15
CLAUDE.md
@@ -67,6 +67,9 @@ PostgreSQL (Flyway): `cameleer-server-app/src/main/resources/db/migration/`
|
||||
- V8 — Deployment active config (resolved_config JSONB on deployments)
|
||||
- V9 — Password hardening (failed_login_attempts, locked_until, token_revoked_before on users)
|
||||
- V10 — Runtime type detection (detected_runtime_type, detected_main_class on app_versions)
|
||||
- V11 — Outbound connections (outbound_connections table, enums)
|
||||
- V12 — Alerting tables (alert_rules, alert_rule_targets, alert_instances, alert_notifications, alert_reads, alert_silences)
|
||||
- V13 — alert_instances open-rule unique index (alert_instances_open_rule_uq partial index on rule_id WHERE state IN PENDING/FIRING/ACKNOWLEDGED)
|
||||
|
||||
ClickHouse: `cameleer-server-app/src/main/resources/clickhouse/init.sql` (run idempotently on startup)
|
||||
|
||||
@@ -94,7 +97,7 @@ When adding, removing, or renaming classes, controllers, endpoints, UI component
|
||||
<!-- gitnexus:start -->
|
||||
# GitNexus — Code Intelligence
|
||||
|
||||
This project is indexed by GitNexus as **cameleer-server** (6436 symbols, 16257 relationships, 300 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
|
||||
This project is indexed by GitNexus as **alerting-02** (7810 symbols, 20082 relationships, 300 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
|
||||
|
||||
> If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first.
|
||||
|
||||
@@ -110,7 +113,7 @@ This project is indexed by GitNexus as **cameleer-server** (6436 symbols, 16257
|
||||
|
||||
1. `gitnexus_query({query: "<error or symptom>"})` — find execution flows related to the issue
|
||||
2. `gitnexus_context({name: "<suspect function>"})` — see all callers, callees, and process participation
|
||||
3. `READ gitnexus://repo/cameleer-server/process/{processName}` — trace the full execution flow step by step
|
||||
3. `READ gitnexus://repo/alerting-02/process/{processName}` — trace the full execution flow step by step
|
||||
4. For regressions: `gitnexus_detect_changes({scope: "compare", base_ref: "main"})` — see what your branch changed
|
||||
|
||||
## When Refactoring
|
||||
@@ -149,10 +152,10 @@ This project is indexed by GitNexus as **cameleer-server** (6436 symbols, 16257
|
||||
|
||||
| Resource | Use for |
|
||||
|----------|---------|
|
||||
| `gitnexus://repo/cameleer-server/context` | Codebase overview, check index freshness |
|
||||
| `gitnexus://repo/cameleer-server/clusters` | All functional areas |
|
||||
| `gitnexus://repo/cameleer-server/processes` | All execution flows |
|
||||
| `gitnexus://repo/cameleer-server/process/{name}` | Step-by-step execution trace |
|
||||
| `gitnexus://repo/alerting-02/context` | Codebase overview, check index freshness |
|
||||
| `gitnexus://repo/alerting-02/clusters` | All functional areas |
|
||||
| `gitnexus://repo/alerting-02/processes` | All execution flows |
|
||||
| `gitnexus://repo/alerting-02/process/{name}` | Step-by-step execution trace |
|
||||
|
||||
## Self-Check Before Finishing
|
||||
|
||||
|
||||
@@ -69,10 +69,7 @@ public class AlertNotificationController {
|
||||
}
|
||||
|
||||
// Reset for retry: status -> PENDING, attempts -> 0, next_attempt_at -> now
|
||||
// We use scheduleRetry to reset attempt timing; then we need to reset attempts count.
|
||||
// The repository has scheduleRetry which sets next_attempt_at and records last status.
|
||||
// We use a dedicated pattern: mark as pending by scheduling immediately.
|
||||
notificationRepo.scheduleRetry(id, Instant.now(), 0, null);
|
||||
notificationRepo.resetForRetry(id, Instant.now());
|
||||
|
||||
return AlertNotificationDto.from(notificationRepo.findById(id)
|
||||
.orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND)));
|
||||
|
||||
@@ -96,10 +96,10 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Tick — package-private so tests can call it directly
|
||||
// Tick — package-visible for same-package tests; also accessible cross-package for lifecycle ITs
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
void tick() {
|
||||
public void tick() {
|
||||
List<AlertRule> claimed = ruleRepo.claimDueRules(
|
||||
instanceId,
|
||||
props.effectiveEvaluatorBatchSize(),
|
||||
@@ -129,6 +129,28 @@ public class AlertEvaluatorJob implements SchedulingConfigurer {
|
||||
reschedule(rule, nextRun);
|
||||
}
|
||||
}
|
||||
|
||||
sweepReNotify();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Re-notification cadence sweep
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private void sweepReNotify() {
|
||||
Instant now = Instant.now(clock);
|
||||
List<AlertInstance> due = instanceRepo.listFiringDueForReNotify(now);
|
||||
for (AlertInstance i : due) {
|
||||
try {
|
||||
AlertRule rule = i.ruleId() == null ? null : ruleRepo.findById(i.ruleId()).orElse(null);
|
||||
if (rule == null || rule.reNotifyMinutes() <= 0) continue;
|
||||
enqueueNotifications(rule, i, now);
|
||||
instanceRepo.save(i.withLastNotifiedAt(now));
|
||||
log.debug("Re-notify enqueued for instance {} (rule {})", i.id(), i.ruleId());
|
||||
} catch (Exception e) {
|
||||
log.warn("Re-notify sweep error for instance {}: {}", i.id(), e.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@@ -2,8 +2,10 @@ package com.cameleer.server.app.alerting.eval;
|
||||
|
||||
import com.cameleer.server.core.alerting.AlertInstance;
|
||||
import com.cameleer.server.core.alerting.AlertRule;
|
||||
import com.cameleer.server.core.alerting.AlertRuleTarget;
|
||||
import com.cameleer.server.core.alerting.AlertSeverity;
|
||||
import com.cameleer.server.core.alerting.AlertState;
|
||||
import com.cameleer.server.core.alerting.TargetKind;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
@@ -98,6 +100,20 @@ public final class AlertStateTransitions {
|
||||
* title/message are left empty here; the job enriches them via MustacheRenderer after.
|
||||
*/
|
||||
static AlertInstance newInstance(AlertRule rule, EvalResult.Firing f, AlertState state, Instant now) {
|
||||
List<AlertRuleTarget> targets = rule.targets() != null ? rule.targets() : List.of();
|
||||
List<String> targetUserIds = targets.stream()
|
||||
.filter(t -> t.kind() == TargetKind.USER)
|
||||
.map(AlertRuleTarget::targetId)
|
||||
.toList();
|
||||
List<UUID> targetGroupIds = targets.stream()
|
||||
.filter(t -> t.kind() == TargetKind.GROUP)
|
||||
.map(t -> UUID.fromString(t.targetId()))
|
||||
.toList();
|
||||
List<String> targetRoleNames = targets.stream()
|
||||
.filter(t -> t.kind() == TargetKind.ROLE)
|
||||
.map(AlertRuleTarget::targetId)
|
||||
.toList();
|
||||
|
||||
return new AlertInstance(
|
||||
UUID.randomUUID(),
|
||||
rule.id(),
|
||||
@@ -116,8 +132,8 @@ public final class AlertStateTransitions {
|
||||
f.context() != null ? f.context() : Map.of(),
|
||||
"", // title — rendered by job
|
||||
"", // message — rendered by job
|
||||
List.of(),
|
||||
List.of(),
|
||||
List.of());
|
||||
targetUserIds,
|
||||
targetGroupIds,
|
||||
targetRoleNames);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,8 +48,7 @@ public class RouteMetricEvaluator implements ConditionEvaluator<RouteMetricCondi
|
||||
|
||||
double actual = switch (c.metric()) {
|
||||
case ERROR_RATE -> errorRate(stats);
|
||||
// ExecutionStats has no p95 field; avgDurationMs is the closest available proxy
|
||||
case P95_LATENCY_MS -> (double) stats.avgDurationMs();
|
||||
case AVG_DURATION_MS -> (double) stats.avgDurationMs();
|
||||
case P99_LATENCY_MS -> (double) stats.p99LatencyMs();
|
||||
case THROUGHPUT -> (double) stats.totalCount();
|
||||
case ERROR_COUNT -> (double) stats.failedCount();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.cameleer.server.app.alerting.notify;
|
||||
|
||||
import com.cameleer.server.app.alerting.config.AlertingProperties;
|
||||
import com.cameleer.server.app.alerting.metrics.AlertingMetrics;
|
||||
import com.cameleer.server.core.alerting.*;
|
||||
import com.cameleer.server.core.outbound.OutboundConnectionRepository;
|
||||
import com.cameleer.server.core.runtime.Environment;
|
||||
@@ -48,6 +49,7 @@ public class NotificationDispatchJob implements SchedulingConfigurer {
|
||||
private final String tenantId;
|
||||
private final Clock clock;
|
||||
private final String uiOrigin;
|
||||
private final AlertingMetrics metrics;
|
||||
|
||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||
public NotificationDispatchJob(
|
||||
@@ -64,7 +66,8 @@ public class NotificationDispatchJob implements SchedulingConfigurer {
|
||||
@Qualifier("alertingInstanceId") String instanceId,
|
||||
@Value("${cameleer.server.tenant.id:default}") String tenantId,
|
||||
Clock alertingClock,
|
||||
@Value("${cameleer.server.ui-origin:#{null}}") String uiOrigin) {
|
||||
@Value("${cameleer.server.ui-origin:#{null}}") String uiOrigin,
|
||||
AlertingMetrics metrics) {
|
||||
|
||||
this.props = props;
|
||||
this.notificationRepo = notificationRepo;
|
||||
@@ -80,6 +83,7 @@ public class NotificationDispatchJob implements SchedulingConfigurer {
|
||||
this.tenantId = tenantId;
|
||||
this.clock = alertingClock;
|
||||
this.uiOrigin = uiOrigin;
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -92,10 +96,10 @@ public class NotificationDispatchJob implements SchedulingConfigurer {
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Tick — package-private for tests
|
||||
// Tick — accessible for tests across packages
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
void tick() {
|
||||
public void tick() {
|
||||
List<AlertNotification> claimed = notificationRepo.claimDueNotifications(
|
||||
instanceId,
|
||||
props.effectiveNotificationBatchSize(),
|
||||
@@ -155,16 +159,19 @@ public class NotificationDispatchJob implements SchedulingConfigurer {
|
||||
|
||||
NotificationStatus outcomeStatus = outcome.status();
|
||||
if (outcomeStatus == NotificationStatus.DELIVERED) {
|
||||
notificationRepo.markDelivered(
|
||||
n.id(), outcome.httpStatus(), outcome.snippet(), Instant.now(clock));
|
||||
Instant now = Instant.now(clock);
|
||||
notificationRepo.markDelivered(n.id(), outcome.httpStatus(), outcome.snippet(), now);
|
||||
instanceRepo.save(instance.withLastNotifiedAt(now));
|
||||
metrics.notificationOutcome(NotificationStatus.DELIVERED);
|
||||
} else if (outcomeStatus == NotificationStatus.FAILED) {
|
||||
notificationRepo.markFailed(
|
||||
n.id(), outcome.httpStatus(), outcome.snippet());
|
||||
notificationRepo.markFailed(n.id(), outcome.httpStatus(), outcome.snippet());
|
||||
metrics.notificationOutcome(NotificationStatus.FAILED);
|
||||
} else {
|
||||
// null status = transient failure (5xx / network / timeout) → retry
|
||||
int attempts = n.attempts() + 1;
|
||||
if (attempts >= props.effectiveWebhookMaxAttempts()) {
|
||||
notificationRepo.markFailed(n.id(), outcome.httpStatus(), outcome.snippet());
|
||||
metrics.notificationOutcome(NotificationStatus.FAILED);
|
||||
} else {
|
||||
Instant next = Instant.now(clock).plus(outcome.retryAfter().multipliedBy(attempts));
|
||||
notificationRepo.scheduleRetry(n.id(), next, outcome.httpStatus(), outcome.snippet());
|
||||
|
||||
@@ -3,6 +3,9 @@ package com.cameleer.server.app.alerting.storage;
|
||||
import com.cameleer.server.core.alerting.*;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.jdbc.core.ConnectionCallback;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
@@ -15,6 +18,8 @@ import java.util.*;
|
||||
|
||||
public class PostgresAlertInstanceRepository implements AlertInstanceRepository {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PostgresAlertInstanceRepository.class);
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
private final ObjectMapper om;
|
||||
|
||||
@@ -55,14 +60,19 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
Array groupIds = toUuidArray(i.targetGroupIds());
|
||||
Array roleNames = toTextArray(i.targetRoleNames());
|
||||
|
||||
jdbc.update(sql,
|
||||
i.id(), i.ruleId(), writeJson(i.ruleSnapshot()),
|
||||
i.environmentId(), i.state().name(), i.severity().name(),
|
||||
ts(i.firedAt()), ts(i.ackedAt()), i.ackedBy(),
|
||||
ts(i.resolvedAt()), ts(i.lastNotifiedAt()),
|
||||
i.silenced(), i.currentValue(), i.threshold(),
|
||||
writeJson(i.context()), i.title(), i.message(),
|
||||
userIds, groupIds, roleNames);
|
||||
try {
|
||||
jdbc.update(sql,
|
||||
i.id(), i.ruleId(), writeJson(i.ruleSnapshot()),
|
||||
i.environmentId(), i.state().name(), i.severity().name(),
|
||||
ts(i.firedAt()), ts(i.ackedAt()), i.ackedBy(),
|
||||
ts(i.resolvedAt()), ts(i.lastNotifiedAt()),
|
||||
i.silenced(), i.currentValue(), i.threshold(),
|
||||
writeJson(i.context()), i.title(), i.message(),
|
||||
userIds, groupIds, roleNames);
|
||||
} catch (DuplicateKeyException e) {
|
||||
log.info("Skipped duplicate open alert_instance for rule {}: {}", i.ruleId(), e.getMessage());
|
||||
return findOpenForRule(i.ruleId()).orElse(i);
|
||||
}
|
||||
return i;
|
||||
}
|
||||
|
||||
@@ -147,6 +157,20 @@ public class PostgresAlertInstanceRepository implements AlertInstanceRepository
|
||||
jdbc.update("UPDATE alert_instances SET silenced = ? WHERE id = ?", silenced, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AlertInstance> listFiringDueForReNotify(Instant now) {
|
||||
return jdbc.query("""
|
||||
SELECT ai.* FROM alert_instances ai
|
||||
JOIN alert_rules ar ON ar.id = ai.rule_id
|
||||
WHERE ai.state = 'FIRING'::alert_state_enum
|
||||
AND ai.silenced = false
|
||||
AND ar.enabled = true
|
||||
AND ar.re_notify_minutes > 0
|
||||
AND ai.last_notified_at IS NOT NULL
|
||||
AND ai.last_notified_at + make_interval(mins => ar.re_notify_minutes) <= ?
|
||||
""", rowMapper(), Timestamp.from(now));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteResolvedBefore(Instant cutoff) {
|
||||
jdbc.update("""
|
||||
|
||||
@@ -118,6 +118,21 @@ public class PostgresAlertNotificationRepository implements AlertNotificationRep
|
||||
""", Timestamp.from(nextAttemptAt), status, snippet, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetForRetry(UUID id, Instant nextAttemptAt) {
|
||||
jdbc.update("""
|
||||
UPDATE alert_notifications
|
||||
SET attempts = 0,
|
||||
status = 'PENDING'::notification_status_enum,
|
||||
next_attempt_at = ?,
|
||||
claimed_by = NULL,
|
||||
claimed_until = NULL,
|
||||
last_response_status = NULL,
|
||||
last_response_snippet = NULL
|
||||
WHERE id = ?
|
||||
""", Timestamp.from(nextAttemptAt), id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markFailed(UUID id, int status, String snippet) {
|
||||
jdbc.update("""
|
||||
|
||||
@@ -55,20 +55,36 @@ public class PostgresAlertRuleRepository implements AlertRuleRepository {
|
||||
writeJson(r.evalState()),
|
||||
Timestamp.from(r.createdAt()), r.createdBy(),
|
||||
Timestamp.from(r.updatedAt()), r.updatedBy());
|
||||
saveTargets(r.id(), r.targets());
|
||||
return r;
|
||||
}
|
||||
|
||||
private void saveTargets(UUID ruleId, List<AlertRuleTarget> targets) {
|
||||
jdbc.update("DELETE FROM alert_rule_targets WHERE rule_id = ?", ruleId);
|
||||
if (targets == null || targets.isEmpty()) return;
|
||||
jdbc.batchUpdate(
|
||||
"INSERT INTO alert_rule_targets (id, rule_id, target_kind, target_id) VALUES (?, ?, ?::target_kind_enum, ?)",
|
||||
targets, targets.size(), (ps, t) -> {
|
||||
ps.setObject(1, t.id() != null ? t.id() : UUID.randomUUID());
|
||||
ps.setObject(2, ruleId);
|
||||
ps.setString(3, t.kind().name());
|
||||
ps.setString(4, t.targetId());
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<AlertRule> findById(UUID id) {
|
||||
var list = jdbc.query("SELECT * FROM alert_rules WHERE id = ?", rowMapper(), id);
|
||||
return list.isEmpty() ? Optional.empty() : Optional.of(list.get(0));
|
||||
if (list.isEmpty()) return Optional.empty();
|
||||
return Optional.of(withTargets(list).get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AlertRule> listByEnvironment(UUID environmentId) {
|
||||
return jdbc.query(
|
||||
var list = jdbc.query(
|
||||
"SELECT * FROM alert_rules WHERE environment_id = ? ORDER BY created_at DESC",
|
||||
rowMapper(), environmentId);
|
||||
return withTargets(list);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -113,7 +129,38 @@ public class PostgresAlertRuleRepository implements AlertRuleRepository {
|
||||
)
|
||||
RETURNING *
|
||||
""";
|
||||
return jdbc.query(sql, rowMapper(), instanceId, claimTtlSeconds, batchSize);
|
||||
List<AlertRule> rules = jdbc.query(sql, rowMapper(), instanceId, claimTtlSeconds, batchSize);
|
||||
return withTargets(rules);
|
||||
}
|
||||
|
||||
/** Batch-loads targets for the given rules and returns new rule instances with targets populated. */
|
||||
private List<AlertRule> withTargets(List<AlertRule> rules) {
|
||||
if (rules.isEmpty()) return rules;
|
||||
// Build IN clause
|
||||
String inClause = rules.stream()
|
||||
.map(r -> "'" + r.id() + "'")
|
||||
.collect(java.util.stream.Collectors.joining(","));
|
||||
String sql = "SELECT * FROM alert_rule_targets WHERE rule_id IN (" + inClause + ")";
|
||||
Map<UUID, List<AlertRuleTarget>> byRuleId = new HashMap<>();
|
||||
jdbc.query(sql, rs -> {
|
||||
UUID ruleId = (UUID) rs.getObject("rule_id");
|
||||
AlertRuleTarget t = new AlertRuleTarget(
|
||||
(UUID) rs.getObject("id"),
|
||||
ruleId,
|
||||
TargetKind.valueOf(rs.getString("target_kind")),
|
||||
rs.getString("target_id"));
|
||||
byRuleId.computeIfAbsent(ruleId, k -> new ArrayList<>()).add(t);
|
||||
});
|
||||
return rules.stream()
|
||||
.map(r -> new AlertRule(
|
||||
r.id(), r.environmentId(), r.name(), r.description(),
|
||||
r.severity(), r.enabled(), r.conditionKind(), r.condition(),
|
||||
r.evaluationIntervalSeconds(), r.forDurationSeconds(), r.reNotifyMinutes(),
|
||||
r.notificationTitleTmpl(), r.notificationMessageTmpl(),
|
||||
r.webhooks(), byRuleId.getOrDefault(r.id(), List.of()),
|
||||
r.nextEvaluationAt(), r.claimedBy(), r.claimedUntil(), r.evalState(),
|
||||
r.createdAt(), r.createdBy(), r.updatedAt(), r.updatedBy()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
-- Alerting projections — additive and idempotent (IF NOT EXISTS).
|
||||
-- Safe to run on every startup alongside init.sql.
|
||||
--
|
||||
-- NOTE: executions uses ReplacingMergeTree which requires deduplicate_merge_projection_mode='rebuild'
|
||||
-- to support projections (ClickHouse 24.x). The ADD PROJECTION and MATERIALIZE statements for
|
||||
-- executions are treated as best-effort by the schema initializer (non-fatal on failure).
|
||||
-- logs and agent_metrics use plain MergeTree and always succeed.
|
||||
-- executions uses ReplacingMergeTree. ClickHouse 24.x requires deduplicate_merge_projection_mode='rebuild'
|
||||
-- for projections to work on ReplacingMergeTree. ALTER TABLE MODIFY SETTING persists the setting in
|
||||
-- table metadata (survives restarts) and runs before the ADD PROJECTION statements.
|
||||
-- logs and agent_metrics use plain MergeTree and do not need this setting.
|
||||
--
|
||||
-- MATERIALIZE statements are also wrapped as non-fatal to handle empty tables in fresh deployments.
|
||||
-- MATERIALIZE statements are wrapped as non-fatal to handle empty tables in fresh deployments.
|
||||
|
||||
-- Plain MergeTree tables: always succeed
|
||||
ALTER TABLE logs
|
||||
@@ -17,7 +17,9 @@ ALTER TABLE agent_metrics
|
||||
ADD PROJECTION IF NOT EXISTS alerting_instance_metric
|
||||
(SELECT * ORDER BY (tenant_id, environment, instance_id, metric_name, collected_at));
|
||||
|
||||
-- ReplacingMergeTree tables: best-effort (requires deduplicate_merge_projection_mode='rebuild')
|
||||
-- ReplacingMergeTree: set table-level setting so ADD PROJECTION succeeds on any connection
|
||||
ALTER TABLE executions MODIFY SETTING deduplicate_merge_projection_mode = 'rebuild';
|
||||
|
||||
ALTER TABLE executions
|
||||
ADD PROJECTION IF NOT EXISTS alerting_app_status
|
||||
(SELECT * ORDER BY (tenant_id, environment, application_id, status, start_time));
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
-- V13 — Unique partial index: at most one open alert_instance per rule
|
||||
-- Prevents duplicate FIRING rows in multi-replica deployments.
|
||||
-- The Java save() path catches DuplicateKeyException and log-and-skips the losing insert.
|
||||
CREATE UNIQUE INDEX alert_instances_open_rule_uq
|
||||
ON alert_instances (rule_id)
|
||||
WHERE rule_id IS NOT NULL
|
||||
AND state IN ('PENDING','FIRING','ACKNOWLEDGED');
|
||||
@@ -1,7 +1,10 @@
|
||||
package com.cameleer.server.app;
|
||||
|
||||
import com.cameleer.server.app.search.ClickHouseSearchIndex;
|
||||
import com.cameleer.server.core.agent.AgentRegistryService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
import org.springframework.test.context.DynamicPropertyRegistry;
|
||||
@@ -14,6 +17,12 @@ import org.testcontainers.containers.PostgreSQLContainer;
|
||||
@ActiveProfiles("test")
|
||||
public abstract class AbstractPostgresIT {
|
||||
|
||||
// Mocked infrastructure beans required by the full application context.
|
||||
// ClickHouseSearchIndex is not available in test without explicit ClickHouse wiring,
|
||||
// and AgentRegistryService requires in-memory state that tests manage directly.
|
||||
@MockBean(name = "clickHouseSearchIndex") protected ClickHouseSearchIndex clickHouseSearchIndex;
|
||||
@MockBean protected AgentRegistryService agentRegistryService;
|
||||
|
||||
static final PostgreSQLContainer<?> postgres;
|
||||
static final ClickHouseContainer clickhouse;
|
||||
|
||||
|
||||
@@ -16,12 +16,16 @@ import com.github.tomakehurst.wiremock.WireMockServer;
|
||||
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.TestInstance.Lifecycle;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||
import org.springframework.http.*;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
@@ -32,9 +36,14 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
/**
|
||||
* Canary integration test — exercises the full alerting lifecycle end-to-end:
|
||||
* fire → notify → ack → silence → re-fire (suppressed) → resolve → rule delete.
|
||||
* Also verifies the re-notification cadence (reNotifyMinutes).
|
||||
*
|
||||
* Rule creation is driven through the REST API (POST /alerts/rules), not raw SQL,
|
||||
* so target persistence via saveTargets() is exercised on the critical path.
|
||||
*
|
||||
* Uses real Postgres (Testcontainers) and real ClickHouse for log seeding.
|
||||
* WireMock provides the webhook target.
|
||||
* Clock is replaced with a @MockBean so the re-notify test can advance time.
|
||||
*/
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
@TestInstance(Lifecycle.PER_CLASS)
|
||||
@@ -42,6 +51,9 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
|
||||
|
||||
// AbstractPostgresIT already declares clickHouseSearchIndex + agentRegistryService mocks.
|
||||
|
||||
// Replace the alertingClock bean so we can control time in re-notify test
|
||||
@MockBean(name = "alertingClock") Clock alertingClock;
|
||||
|
||||
// ── Spring beans ──────────────────────────────────────────────────────────
|
||||
|
||||
@Autowired private AlertEvaluatorJob evaluatorJob;
|
||||
@@ -71,15 +83,30 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
|
||||
private UUID connId;
|
||||
private UUID instanceId; // filled after first FIRING
|
||||
|
||||
// Current simulated clock time — starts at "now" and can be advanced
|
||||
private Instant simulatedNow = Instant.now();
|
||||
|
||||
// ── Setup / teardown ──────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Mockito resets @MockBean stubs between @Test methods even with PER_CLASS lifecycle.
|
||||
* Re-stub the clock before every test so clock.instant() never returns null.
|
||||
*/
|
||||
@BeforeEach
|
||||
void refreshClock() {
|
||||
stubClock();
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
void seedFixtures() throws Exception {
|
||||
wm = new WireMockServer(WireMockConfiguration.options()
|
||||
.httpDisabled(true)
|
||||
.dynamicHttpsPort());
|
||||
wm.start();
|
||||
// ClickHouse schema is auto-initialized by ClickHouseSchemaInitializer on Spring context startup.
|
||||
|
||||
// Default clock behaviour: delegate to simulatedNow
|
||||
stubClock();
|
||||
|
||||
operatorJwt = securityHelper.operatorToken();
|
||||
|
||||
// Seed operator user in Postgres
|
||||
@@ -111,41 +138,8 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
|
||||
" 'test-operator', 'test-operator')",
|
||||
connId, tenantId, webhookUrl, hmacCiphertext);
|
||||
|
||||
// Seed alert rule (LOG_PATTERN, forDurationSeconds=0, threshold=0 so >=1 log fires immediately)
|
||||
ruleId = UUID.randomUUID();
|
||||
UUID webhookBindingId = UUID.randomUUID();
|
||||
String webhooksJson = objectMapper.writeValueAsString(List.of(
|
||||
Map.of("id", webhookBindingId.toString(),
|
||||
"outboundConnectionId", connId.toString())));
|
||||
String conditionJson = objectMapper.writeValueAsString(Map.of(
|
||||
"kind", "LOG_PATTERN",
|
||||
"scope", Map.of("appSlug", "lc-app"),
|
||||
"level", "ERROR",
|
||||
"pattern", "TimeoutException",
|
||||
"threshold", 0,
|
||||
"windowSeconds", 300));
|
||||
|
||||
jdbcTemplate.update("""
|
||||
INSERT INTO alert_rules
|
||||
(id, environment_id, name, severity, enabled,
|
||||
condition_kind, condition,
|
||||
evaluation_interval_seconds, for_duration_seconds,
|
||||
notification_title_tmpl, notification_message_tmpl,
|
||||
webhooks, next_evaluation_at,
|
||||
created_by, updated_by)
|
||||
VALUES (?, ?, 'lc-timeout-rule', 'WARNING'::severity_enum, true,
|
||||
'LOG_PATTERN'::condition_kind_enum, ?::jsonb,
|
||||
60, 0,
|
||||
'Alert: {{rule.name}}', 'Instance {{alert.id}} fired',
|
||||
?::jsonb, now() - interval '1 second',
|
||||
'test-operator', 'test-operator')
|
||||
""",
|
||||
ruleId, envId, conditionJson, webhooksJson);
|
||||
|
||||
// Seed alert_rule_targets so the instance shows up in inbox
|
||||
jdbcTemplate.update(
|
||||
"INSERT INTO alert_rule_targets (id, rule_id, target_kind, target_id) VALUES (gen_random_uuid(), ?, 'USER'::target_kind_enum, 'test-operator') ON CONFLICT (rule_id, target_kind, target_id) DO NOTHING",
|
||||
ruleId);
|
||||
// Create alert rule via REST API (exercises saveTargets on the write path)
|
||||
ruleId = createRuleViaRestApi();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
@@ -154,8 +148,8 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
|
||||
jdbcTemplate.update("DELETE FROM alert_silences WHERE environment_id = ?", envId);
|
||||
jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id IN (SELECT id FROM alert_instances WHERE environment_id = ?)", envId);
|
||||
jdbcTemplate.update("DELETE FROM alert_instances WHERE environment_id = ?", envId);
|
||||
jdbcTemplate.update("DELETE FROM alert_rule_targets WHERE rule_id = ?", ruleId);
|
||||
jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", ruleId);
|
||||
jdbcTemplate.update("DELETE FROM alert_rule_targets WHERE rule_id IN (SELECT id FROM alert_rules WHERE environment_id = ?)", envId);
|
||||
jdbcTemplate.update("DELETE FROM alert_rules WHERE environment_id = ?", envId);
|
||||
jdbcTemplate.update("DELETE FROM outbound_connections WHERE id = ?", connId);
|
||||
jdbcTemplate.update("DELETE FROM environments WHERE id = ?", envId);
|
||||
jdbcTemplate.update("DELETE FROM users WHERE user_id = 'test-operator'");
|
||||
@@ -169,9 +163,27 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
|
||||
// Stub WireMock to return 200
|
||||
wm.stubFor(post("/webhook").willReturn(aResponse().withStatus(200).withBody("accepted")));
|
||||
|
||||
// Seed a matching log into ClickHouse
|
||||
// Seed a matching log into ClickHouse BEFORE capturing simulatedNow,
|
||||
// so the log timestamp is guaranteed to fall inside [simulatedNow-300s, simulatedNow].
|
||||
seedMatchingLog();
|
||||
|
||||
// Set simulatedNow to current wall time — the log was inserted a few ms earlier,
|
||||
// so its timestamp is guaranteed <= simulatedNow within the 300s window.
|
||||
setSimulatedNow(Instant.now());
|
||||
|
||||
// Release any claim the background scheduler may have already placed on the rule,
|
||||
// and backdate next_evaluation_at so it's due again for our manual tick.
|
||||
jdbcTemplate.update(
|
||||
"UPDATE alert_rules SET claimed_by = NULL, claimed_until = NULL, " +
|
||||
"next_evaluation_at = now() - interval '1 second' WHERE id = ?", ruleId);
|
||||
|
||||
// Verify rule is in DB and due (no claim outstanding)
|
||||
Integer ruleCount = jdbcTemplate.queryForObject(
|
||||
"SELECT count(*) FROM alert_rules WHERE id = ? AND enabled = true " +
|
||||
"AND next_evaluation_at <= now() AND (claimed_until IS NULL OR claimed_until < now())",
|
||||
Integer.class, ruleId);
|
||||
assertThat(ruleCount).as("rule must be unclaimed and due before tick").isEqualTo(1);
|
||||
|
||||
// Tick evaluator
|
||||
evaluatorJob.tick();
|
||||
|
||||
@@ -181,6 +193,13 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
|
||||
assertThat(instances).hasSize(1);
|
||||
assertThat(instances.get(0).state()).isEqualTo(AlertState.FIRING);
|
||||
assertThat(instances.get(0).ruleId()).isEqualTo(ruleId);
|
||||
|
||||
// B-1 fix verification: targets were persisted via the REST API path,
|
||||
// so target_user_ids must be non-empty (not {} as before the fix)
|
||||
assertThat(instances.get(0).targetUserIds())
|
||||
.as("target_user_ids must be non-empty — verifies B-1 fix (saveTargets)")
|
||||
.isNotEmpty();
|
||||
|
||||
instanceId = instances.get(0).id();
|
||||
}
|
||||
|
||||
@@ -205,6 +224,12 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
|
||||
// Body should contain rule name
|
||||
wm.verify(postRequestedFor(urlEqualTo("/webhook"))
|
||||
.withRequestBody(containing("lc-timeout-rule")));
|
||||
|
||||
// B-2: lastNotifiedAt must be set after dispatch (step sets it on DELIVERED)
|
||||
AlertInstance inst = instanceRepo.findById(instanceId).orElseThrow();
|
||||
assertThat(inst.lastNotifiedAt())
|
||||
.as("lastNotifiedAt must be set after DELIVERED — verifies B-2 tracking fix")
|
||||
.isNotNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -234,8 +259,8 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
|
||||
String silenceBody = objectMapper.writeValueAsString(Map.of(
|
||||
"matcher", Map.of("ruleId", ruleId.toString()),
|
||||
"reason", "lifecycle-test-silence",
|
||||
"startsAt", Instant.now().minusSeconds(10).toString(),
|
||||
"endsAt", Instant.now().plusSeconds(3600).toString()
|
||||
"startsAt", simulatedNow.minusSeconds(10).toString(),
|
||||
"endsAt", simulatedNow.plusSeconds(3600).toString()
|
||||
));
|
||||
ResponseEntity<String> silenceResp = restTemplate.exchange(
|
||||
"/api/v1/environments/" + envSlug + "/alerts/silences",
|
||||
@@ -305,8 +330,178 @@ class AlertingFullLifecycleIT extends AbstractPostgresIT {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(6)
|
||||
void step6_reNotifyCadenceFiresSecondNotification() throws Exception {
|
||||
// Standalone sub-test: create a fresh rule with reNotifyMinutes=1 and verify
|
||||
// that the evaluator's re-notify sweep enqueues a second notification after 61 seconds.
|
||||
|
||||
wm.resetRequests();
|
||||
wm.stubFor(post("/webhook").willReturn(aResponse().withStatus(200).withBody("accepted")));
|
||||
|
||||
// Create a new rule via REST with reNotifyMinutes=1, forDurationSeconds=0
|
||||
UUID reNotifyRuleId = createReNotifyRuleViaRestApi();
|
||||
|
||||
// Seed the log BEFORE capturing T+0 so the log timestamp falls inside
|
||||
// the evaluator window [t0-300s, t0].
|
||||
seedMatchingLog();
|
||||
|
||||
// Set T+0 to current wall time — the log was inserted a few ms earlier,
|
||||
// so its timestamp is guaranteed <= t0 within the 300s window.
|
||||
Instant t0 = Instant.now();
|
||||
setSimulatedNow(t0);
|
||||
|
||||
// Tick evaluator at T+0 → instance FIRING, notification PENDING
|
||||
evaluatorJob.tick();
|
||||
|
||||
List<AlertInstance> instances = instanceRepo.listForInbox(
|
||||
envId, List.of(), "test-operator", List.of("OPERATOR"), 10);
|
||||
// Find the instance for the reNotify rule
|
||||
AlertInstance inst = instances.stream()
|
||||
.filter(i -> reNotifyRuleId.equals(i.ruleId()))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
assertThat(inst).as("FIRING instance for reNotify rule").isNotNull();
|
||||
UUID reNotifyInstanceId = inst.id();
|
||||
|
||||
// Tick dispatcher at T+0 → notification DELIVERED, WireMock: 1 POST
|
||||
dispatchJob.tick();
|
||||
wm.verify(1, postRequestedFor(urlEqualTo("/webhook")));
|
||||
|
||||
// Verify lastNotifiedAt was stamped (B-2 tracking)
|
||||
AlertInstance afterFirstDispatch = instanceRepo.findById(reNotifyInstanceId).orElseThrow();
|
||||
assertThat(afterFirstDispatch.lastNotifiedAt()).isNotNull();
|
||||
|
||||
// --- Advance clock 61 seconds ---
|
||||
setSimulatedNow(t0.plusSeconds(61));
|
||||
|
||||
// Backdate next_evaluation_at so the rule is claimed again
|
||||
jdbcTemplate.update(
|
||||
"UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second', " +
|
||||
"claimed_by = NULL, claimed_until = NULL WHERE id = ?", reNotifyRuleId);
|
||||
|
||||
// Tick evaluator at T+61 — re-notify sweep fires because lastNotifiedAt + 1 min <= now
|
||||
evaluatorJob.tick();
|
||||
|
||||
// The sweep saves notifications with nextAttemptAt = simulatedNow (T+61s) which is in the
|
||||
// future relative to Postgres real clock. Backdate so the dispatcher can claim them.
|
||||
jdbcTemplate.update(
|
||||
"UPDATE alert_notifications SET next_attempt_at = now() - interval '1 second' " +
|
||||
"WHERE alert_instance_id = ? AND status = 'PENDING'::notification_status_enum",
|
||||
reNotifyInstanceId);
|
||||
|
||||
// Tick dispatcher → second POST
|
||||
dispatchJob.tick();
|
||||
wm.verify(2, postRequestedFor(urlEqualTo("/webhook")));
|
||||
|
||||
// Cleanup
|
||||
jdbcTemplate.update("DELETE FROM alert_notifications WHERE alert_instance_id = ?", reNotifyInstanceId);
|
||||
jdbcTemplate.update("DELETE FROM alert_instances WHERE id = ?", reNotifyInstanceId);
|
||||
jdbcTemplate.update("DELETE FROM alert_rule_targets WHERE rule_id = ?", reNotifyRuleId);
|
||||
jdbcTemplate.update("DELETE FROM alert_rules WHERE id = ?", reNotifyRuleId);
|
||||
}
|
||||
|
||||
// ── Helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
/** POST the main lifecycle rule via REST API. Returns the created rule ID. */
|
||||
private UUID createRuleViaRestApi() throws Exception {
|
||||
// Build JSON directly — Map.of() supports at most 10 entries
|
||||
String ruleBody = """
|
||||
{
|
||||
"name": "lc-timeout-rule",
|
||||
"severity": "WARNING",
|
||||
"conditionKind": "LOG_PATTERN",
|
||||
"condition": {
|
||||
"kind": "LOG_PATTERN",
|
||||
"scope": {"appSlug": "lc-app"},
|
||||
"level": "ERROR",
|
||||
"pattern": "TimeoutException",
|
||||
"threshold": 0,
|
||||
"windowSeconds": 300
|
||||
},
|
||||
"evaluationIntervalSeconds": 60,
|
||||
"forDurationSeconds": 0,
|
||||
"reNotifyMinutes": 0,
|
||||
"notificationTitleTmpl": "Alert: {{rule.name}}",
|
||||
"notificationMessageTmpl": "Instance {{alert.id}} fired",
|
||||
"webhooks": [{"outboundConnectionId": "%s"}],
|
||||
"targets": [{"kind": "USER", "targetId": "test-operator"}]
|
||||
}
|
||||
""".formatted(connId);
|
||||
|
||||
ResponseEntity<String> resp = restTemplate.exchange(
|
||||
"/api/v1/environments/" + envSlug + "/alerts/rules",
|
||||
HttpMethod.POST,
|
||||
new HttpEntity<>(ruleBody, securityHelper.authHeaders(operatorJwt)),
|
||||
String.class);
|
||||
|
||||
assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.CREATED);
|
||||
JsonNode body = objectMapper.readTree(resp.getBody());
|
||||
String id = body.path("id").asText();
|
||||
assertThat(id).isNotBlank();
|
||||
|
||||
// Backdate next_evaluation_at so it's due immediately
|
||||
UUID ruleUuid = UUID.fromString(id);
|
||||
jdbcTemplate.update(
|
||||
"UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second' WHERE id = ?",
|
||||
ruleUuid);
|
||||
|
||||
return ruleUuid;
|
||||
}
|
||||
|
||||
/** POST a short-cadence re-notify rule via REST API. Returns the created rule ID. */
|
||||
private UUID createReNotifyRuleViaRestApi() throws Exception {
|
||||
String ruleBody = """
|
||||
{
|
||||
"name": "lc-renotify-rule",
|
||||
"severity": "WARNING",
|
||||
"conditionKind": "LOG_PATTERN",
|
||||
"condition": {
|
||||
"kind": "LOG_PATTERN",
|
||||
"scope": {"appSlug": "lc-app"},
|
||||
"level": "ERROR",
|
||||
"pattern": "TimeoutException",
|
||||
"threshold": 0,
|
||||
"windowSeconds": 300
|
||||
},
|
||||
"evaluationIntervalSeconds": 60,
|
||||
"forDurationSeconds": 0,
|
||||
"reNotifyMinutes": 1,
|
||||
"notificationTitleTmpl": "ReNotify: {{rule.name}}",
|
||||
"notificationMessageTmpl": "Re-fired {{alert.id}}",
|
||||
"webhooks": [{"outboundConnectionId": "%s"}],
|
||||
"targets": [{"kind": "USER", "targetId": "test-operator"}]
|
||||
}
|
||||
""".formatted(connId);
|
||||
|
||||
ResponseEntity<String> resp = restTemplate.exchange(
|
||||
"/api/v1/environments/" + envSlug + "/alerts/rules",
|
||||
HttpMethod.POST,
|
||||
new HttpEntity<>(ruleBody, securityHelper.authHeaders(operatorJwt)),
|
||||
String.class);
|
||||
|
||||
assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.CREATED);
|
||||
JsonNode body = objectMapper.readTree(resp.getBody());
|
||||
String id = body.path("id").asText();
|
||||
assertThat(id).isNotBlank();
|
||||
|
||||
UUID ruleUuid = UUID.fromString(id);
|
||||
jdbcTemplate.update(
|
||||
"UPDATE alert_rules SET next_evaluation_at = now() - interval '1 second' WHERE id = ?",
|
||||
ruleUuid);
|
||||
return ruleUuid;
|
||||
}
|
||||
|
||||
private void setSimulatedNow(Instant instant) {
|
||||
simulatedNow = instant;
|
||||
stubClock();
|
||||
}
|
||||
|
||||
private void stubClock() {
|
||||
Mockito.when(alertingClock.instant()).thenReturn(simulatedNow);
|
||||
Mockito.when(alertingClock.getZone()).thenReturn(ZoneOffset.UTC);
|
||||
}
|
||||
|
||||
private void seedMatchingLog() {
|
||||
LogEntry entry = new LogEntry(
|
||||
Instant.now(),
|
||||
|
||||
@@ -113,6 +113,35 @@ class AlertNotificationControllerIT extends AbstractPostgresIT {
|
||||
assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
}
|
||||
|
||||
@Test
|
||||
void retryResetsAttemptsToZero() throws Exception {
|
||||
// Verify Fix I-1: retry endpoint resets attempts to 0, not attempts+1
|
||||
AlertInstance instance = seedInstance();
|
||||
AlertNotification notification = seedNotification(instance.id());
|
||||
|
||||
// Mark as failed with attempts at max (simulate exhausted retries)
|
||||
notificationRepo.markFailed(notification.id(), 500, "server error");
|
||||
notificationRepo.markFailed(notification.id(), 500, "server error");
|
||||
notificationRepo.markFailed(notification.id(), 500, "server error");
|
||||
|
||||
// Verify attempts > 0 before retry
|
||||
AlertNotification before = notificationRepo.findById(notification.id()).orElseThrow();
|
||||
assertThat(before.attempts()).isGreaterThan(0);
|
||||
|
||||
// Operator retries
|
||||
ResponseEntity<String> resp = restTemplate.exchange(
|
||||
"/api/v1/alerts/notifications/" + notification.id() + "/retry",
|
||||
HttpMethod.POST,
|
||||
new HttpEntity<>(securityHelper.authHeaders(operatorJwt)),
|
||||
String.class);
|
||||
assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
|
||||
// After retry: attempts must be 0 and status PENDING (not attempts+1)
|
||||
AlertNotification after = notificationRepo.findById(notification.id()).orElseThrow();
|
||||
assertThat(after.attempts()).as("retry must reset attempts to 0").isEqualTo(0);
|
||||
assertThat(after.status()).isEqualTo(NotificationStatus.PENDING);
|
||||
}
|
||||
|
||||
@Test
|
||||
void viewerCannotRetry() throws Exception {
|
||||
AlertInstance instance = seedInstance();
|
||||
|
||||
@@ -75,12 +75,17 @@ class PostgresAlertInstanceRepositoryIT extends AbstractPostgresIT {
|
||||
|
||||
@Test
|
||||
void listForInbox_seesAllThreeTargetTypes() {
|
||||
// Each instance gets a distinct ruleId so the unique-per-open-rule index
|
||||
// (V13: alert_instances_open_rule_uq) doesn't block the second and third saves.
|
||||
UUID ruleId2 = seedRule("rule-b");
|
||||
UUID ruleId3 = seedRule("rule-c");
|
||||
|
||||
// Instance 1 — targeted at user directly
|
||||
var byUser = newInstance(ruleId, List.of(userId), List.of(), List.of());
|
||||
// Instance 2 — targeted at group
|
||||
var byGroup = newInstance(ruleId, List.of(), List.of(UUID.fromString(groupId)), List.of());
|
||||
var byGroup = newInstance(ruleId2, List.of(), List.of(UUID.fromString(groupId)), List.of());
|
||||
// Instance 3 — targeted at role
|
||||
var byRole = newInstance(ruleId, List.of(), List.of(), List.of(roleName));
|
||||
var byRole = newInstance(ruleId3, List.of(), List.of(), List.of(roleName));
|
||||
|
||||
repo.save(byUser);
|
||||
repo.save(byGroup);
|
||||
@@ -159,8 +164,9 @@ class PostgresAlertInstanceRepositoryIT extends AbstractPostgresIT {
|
||||
|
||||
@Test
|
||||
void deleteResolvedBefore_deletesOnlyResolved() {
|
||||
UUID ruleId2 = seedRule("rule-del");
|
||||
var firing = newInstance(ruleId, List.of(userId), List.of(), List.of());
|
||||
var resolved = newInstance(ruleId, List.of(userId), List.of(), List.of());
|
||||
var resolved = newInstance(ruleId2, List.of(userId), List.of(), List.of());
|
||||
repo.save(firing);
|
||||
repo.save(resolved);
|
||||
|
||||
@@ -173,6 +179,39 @@ class PostgresAlertInstanceRepositoryIT extends AbstractPostgresIT {
|
||||
assertThat(repo.findById(resolved.id())).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void listFiringDueForReNotify_returnsOnlyEligibleInstances() {
|
||||
// Each instance gets its own rule — the V13 unique partial index allows only one
|
||||
// open (PENDING/FIRING/ACKNOWLEDGED) instance per rule_id.
|
||||
UUID ruleNever = seedReNotifyRule("renotify-never");
|
||||
UUID ruleLongAgo = seedReNotifyRule("renotify-longago");
|
||||
UUID ruleRecent = seedReNotifyRule("renotify-recent");
|
||||
|
||||
// Instance 1: FIRING, never notified (last_notified_at IS NULL) → must NOT appear.
|
||||
// The sweep only re-notifies; initial notification is the dispatcher's job.
|
||||
var neverNotified = newInstance(ruleNever, List.of(userId), List.of(), List.of());
|
||||
repo.save(neverNotified);
|
||||
|
||||
// Instance 2: FIRING, notified 2 minutes ago → cadence elapsed, must appear
|
||||
var notifiedLongAgo = newInstance(ruleLongAgo, List.of(userId), List.of(), List.of());
|
||||
repo.save(notifiedLongAgo);
|
||||
jdbcTemplate.update("UPDATE alert_instances SET last_notified_at = now() - interval '2 minutes' WHERE id = ?",
|
||||
notifiedLongAgo.id());
|
||||
|
||||
// Instance 3: FIRING, notified 30 seconds ago → cadence NOT elapsed, must NOT appear
|
||||
var notifiedRecently = newInstance(ruleRecent, List.of(userId), List.of(), List.of());
|
||||
repo.save(notifiedRecently);
|
||||
jdbcTemplate.update("UPDATE alert_instances SET last_notified_at = now() - interval '30 seconds' WHERE id = ?",
|
||||
notifiedRecently.id());
|
||||
|
||||
var due = repo.listFiringDueForReNotify(Instant.now());
|
||||
assertThat(due).extracting(AlertInstance::id)
|
||||
.containsExactly(notifiedLongAgo.id())
|
||||
.doesNotContain(neverNotified.id(), notifiedRecently.id());
|
||||
|
||||
// Extra rules are cleaned up by @AfterEach via env-scoped DELETE
|
||||
}
|
||||
|
||||
@Test
|
||||
void markSilenced_togglesToTrue() {
|
||||
var inst = newInstance(ruleId, List.of(userId), List.of(), List.of());
|
||||
@@ -197,4 +236,26 @@ class PostgresAlertInstanceRepositoryIT extends AbstractPostgresIT {
|
||||
Map.of(), "title", "message",
|
||||
userIds, groupIds, roleNames);
|
||||
}
|
||||
|
||||
/** Inserts a minimal alert_rule with re_notify_minutes=0 and returns its id. */
|
||||
private UUID seedRule(String name) {
|
||||
UUID id = UUID.randomUUID();
|
||||
jdbcTemplate.update(
|
||||
"INSERT INTO alert_rules (id, environment_id, name, severity, condition_kind, condition, " +
|
||||
"notification_title_tmpl, notification_message_tmpl, created_by, updated_by) " +
|
||||
"VALUES (?, ?, ?, 'WARNING', 'AGENT_STATE', '{}'::jsonb, 't', 'm', 'sys-user', 'sys-user')",
|
||||
id, envId, name + "-" + id);
|
||||
return id;
|
||||
}
|
||||
|
||||
/** Inserts a minimal alert_rule with re_notify_minutes=1 and returns its id. */
|
||||
private UUID seedReNotifyRule(String name) {
|
||||
UUID id = UUID.randomUUID();
|
||||
jdbcTemplate.update(
|
||||
"INSERT INTO alert_rules (id, environment_id, name, severity, condition_kind, condition, " +
|
||||
"re_notify_minutes, notification_title_tmpl, notification_message_tmpl, created_by, updated_by) " +
|
||||
"VALUES (?, ?, ?, 'WARNING', 'AGENT_STATE', '{}'::jsonb, 1, 't', 'm', 'sys-user', 'sys-user')",
|
||||
id, envId, name + "-" + id);
|
||||
return id;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,6 +66,44 @@ class PostgresAlertRuleRepositoryIT extends AbstractPostgresIT {
|
||||
assertThat(repo.findRuleIdsByOutboundConnectionId(UUID.randomUUID())).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void saveTargets_roundtrip() {
|
||||
// Rule saved with a USER target and a ROLE target
|
||||
UUID ruleId = UUID.randomUUID();
|
||||
AlertRuleTarget userTarget = new AlertRuleTarget(UUID.randomUUID(), ruleId, TargetKind.USER, "alice");
|
||||
AlertRuleTarget roleTarget = new AlertRuleTarget(UUID.randomUUID(), ruleId, TargetKind.ROLE, "OPERATOR");
|
||||
var rule = newRuleWithId(ruleId, List.of(), List.of(userTarget, roleTarget));
|
||||
|
||||
repo.save(rule);
|
||||
|
||||
// findById must return the targets that were persisted by saveTargets()
|
||||
var found = repo.findById(ruleId).orElseThrow();
|
||||
assertThat(found.targets()).hasSize(2);
|
||||
assertThat(found.targets()).extracting(AlertRuleTarget::targetId)
|
||||
.containsExactlyInAnyOrder("alice", "OPERATOR");
|
||||
assertThat(found.targets()).extracting(t -> t.kind().name())
|
||||
.containsExactlyInAnyOrder("USER", "ROLE");
|
||||
}
|
||||
|
||||
@Test
|
||||
void saveTargets_updateReplacesExistingTargets() {
|
||||
// Save rule with one target
|
||||
UUID ruleId = UUID.randomUUID();
|
||||
AlertRuleTarget initial = new AlertRuleTarget(UUID.randomUUID(), ruleId, TargetKind.USER, "bob");
|
||||
var rule = newRuleWithId(ruleId, List.of(), List.of(initial));
|
||||
repo.save(rule);
|
||||
|
||||
// Update: replace with a different target
|
||||
AlertRuleTarget updated = new AlertRuleTarget(UUID.randomUUID(), ruleId, TargetKind.GROUP, "team-ops");
|
||||
var updated_rule = newRuleWithId(ruleId, List.of(), List.of(updated));
|
||||
repo.save(updated_rule);
|
||||
|
||||
var found = repo.findById(ruleId).orElseThrow();
|
||||
assertThat(found.targets()).hasSize(1);
|
||||
assertThat(found.targets().get(0).targetId()).isEqualTo("team-ops");
|
||||
assertThat(found.targets().get(0).kind()).isEqualTo(TargetKind.GROUP);
|
||||
}
|
||||
|
||||
@Test
|
||||
void claimDueRulesAtomicSkipLocked() {
|
||||
var rule = newRule(List.of());
|
||||
@@ -80,11 +118,15 @@ class PostgresAlertRuleRepositoryIT extends AbstractPostgresIT {
|
||||
}
|
||||
|
||||
private AlertRule newRule(List<WebhookBinding> webhooks) {
|
||||
return newRuleWithId(UUID.randomUUID(), webhooks, List.of());
|
||||
}
|
||||
|
||||
private AlertRule newRuleWithId(UUID id, List<WebhookBinding> webhooks, List<AlertRuleTarget> targets) {
|
||||
return new AlertRule(
|
||||
UUID.randomUUID(), envId, "rule-" + UUID.randomUUID(), "desc",
|
||||
id, envId, "rule-" + id, "desc",
|
||||
AlertSeverity.WARNING, true, ConditionKind.AGENT_STATE,
|
||||
new AgentStateCondition(new AlertScope(null, null, null), "DEAD", 60),
|
||||
60, 0, 60, "t", "m", webhooks, List.of(),
|
||||
60, 0, 60, "t", "m", webhooks, targets,
|
||||
Instant.now().minusSeconds(10), null, null, Map.of(),
|
||||
Instant.now(), "test-user", Instant.now(), "test-user");
|
||||
}
|
||||
|
||||
@@ -34,17 +34,19 @@ class AlertingProjectionsIT {
|
||||
}
|
||||
|
||||
@Test
|
||||
void mergeTreeProjectionsExistAfterInit() {
|
||||
// logs and agent_metrics are plain MergeTree — projections always succeed.
|
||||
// executions is ReplacingMergeTree; its projections require the session setting
|
||||
// deduplicate_merge_projection_mode='rebuild' which is unavailable via JDBC pool,
|
||||
// so they are best-effort and not asserted here.
|
||||
void allFourProjectionsExistAfterInit() {
|
||||
// logs and agent_metrics are plain MergeTree — always succeed.
|
||||
// executions is ReplacingMergeTree; its projections now succeed because
|
||||
// alerting_projections.sql runs ALTER TABLE executions MODIFY SETTING
|
||||
// deduplicate_merge_projection_mode='rebuild' before the ADD PROJECTION statements.
|
||||
List<String> names = jdbc.queryForList(
|
||||
"SELECT name FROM system.projections WHERE table IN ('logs', 'agent_metrics')",
|
||||
"SELECT name FROM system.projections WHERE table IN ('logs', 'agent_metrics', 'executions')",
|
||||
String.class);
|
||||
|
||||
assertThat(names).contains(
|
||||
assertThat(names).containsExactlyInAnyOrder(
|
||||
"alerting_app_level",
|
||||
"alerting_instance_metric");
|
||||
"alerting_instance_metric",
|
||||
"alerting_app_status",
|
||||
"alerting_route_status");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,4 +19,7 @@ public interface AlertInstanceRepository {
|
||||
void resolve(UUID id, Instant when);
|
||||
void markSilenced(UUID id, boolean silenced);
|
||||
void deleteResolvedBefore(Instant cutoff);
|
||||
|
||||
/** FIRING instances whose reNotify cadence has elapsed since last notification. */
|
||||
List<AlertInstance> listFiringDueForReNotify(Instant now);
|
||||
}
|
||||
|
||||
@@ -13,5 +13,7 @@ public interface AlertNotificationRepository {
|
||||
void markDelivered(UUID id, int status, String snippet, Instant when);
|
||||
void scheduleRetry(UUID id, Instant nextAttemptAt, int status, String snippet);
|
||||
void markFailed(UUID id, int status, String snippet);
|
||||
/** Resets a FAILED notification for operator-triggered retry: attempts → 0, status → PENDING. */
|
||||
void resetForRetry(UUID id, Instant nextAttemptAt);
|
||||
void deleteSettledBefore(Instant cutoff);
|
||||
}
|
||||
|
||||
@@ -1,3 +1,10 @@
|
||||
package com.cameleer.server.core.alerting;
|
||||
|
||||
public enum RouteMetric { ERROR_RATE, P95_LATENCY_MS, P99_LATENCY_MS, THROUGHPUT, ERROR_COUNT }
|
||||
public enum RouteMetric {
|
||||
ERROR_RATE,
|
||||
/** Average execution duration — maps to stats_1m_route.avgDurationMs. */
|
||||
AVG_DURATION_MS,
|
||||
P99_LATENCY_MS,
|
||||
THROUGHPUT,
|
||||
ERROR_COUNT
|
||||
}
|
||||
|
||||
122
docs/alerting-02-final-review.md
Normal file
122
docs/alerting-02-final-review.md
Normal file
@@ -0,0 +1,122 @@
|
||||
# Plan 02 — Final Whole-Branch Review
|
||||
|
||||
**Verdict:** ⚠ FIX BEFORE SHIP
|
||||
|
||||
## Summary
|
||||
|
||||
The 43-commit, 14k-LOC implementation is structurally sound: the evaluator job, outbox loop, RBAC layering, SQL injection gate, state machine, and ClickHouse projections are all correct and well-tested. Three issues require fixing before production use. Two are functional blockers: (1) alert targets configured via the REST API are silently discarded because `PostgresAlertRuleRepository.save()` never writes to `alert_rule_targets`, making the entire in-app inbox feature non-functional for production-created rules; and (2) re-notification cadence (`reNotifyMinutes`) is stored and exposed but never acted on — `withLastNotifiedAt()` is defined but never called, so a still-FIRING alert will never re-notify no matter what the rule says. A third important issue is the retry endpoint calling `scheduleRetry` (which increments `attempts`) rather than resetting it, defeating the operator's intent. SSRF (Plan 01 scope) is absent and flagged for completeness.
|
||||
|
||||
---
|
||||
|
||||
## BLOCKER findings
|
||||
|
||||
### B-1: `PostgresAlertRuleRepository.save()` never persists targets — inbox is empty for all production-created rules
|
||||
|
||||
**File:** `cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/storage/PostgresAlertRuleRepository.java:24-58`
|
||||
|
||||
**Impact:** `AlertRuleController.buildRule()` accepts `req.targets()` and passes them into the `AlertRule` record (line 326/344), but `save()` only upserts the `alert_rules` row — it never touches `alert_rule_targets`. On re-load, `rowMapper()` returns `List.of()` for targets (line 185). When the evaluator creates a `newInstance()`, `AlertStateTransitions` copies `rule.targets()` — which is always empty for any rule created via the REST API. The result: `target_user_ids`, `target_group_ids`, and `target_role_names` on every `alert_instances` row are empty arrays, so `listForInbox()` returns nothing for any user. The IT only catches this because it seeds targets via raw SQL (`INSERT INTO alert_rule_targets … ON CONFLICT DO NOTHING`), not the API path.
|
||||
|
||||
**Repro:** POST a rule with `targets: [{kind:"USER", targetId:"alice"}]` via the REST API. Evaluate and fire. Check `alert_instances.target_user_ids` — it is `{}`.
|
||||
|
||||
**Fix:** Add a `saveTargets(UUID ruleId, List<AlertRuleTarget> targets)` step inside `save()`: delete existing targets for the rule, then insert new ones. Both operations must be inside the same logical unit (no transaction wrapper needed since JdbcTemplate auto-commits, but ordering matters: delete-then-insert).
|
||||
|
||||
---
|
||||
|
||||
### B-2: Re-notification cadence is completely unimplemented — `reNotifyMinutes` is stored but never consulted
|
||||
|
||||
**File:** `cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/notify/NotificationDispatchJob.java` (entire file), `cameleer-server-core/src/main/java/com/cameleer/server/core/alerting/AlertInstance.java:82`
|
||||
|
||||
**Impact:** The spec's §7 state diagram defines a FIRING→re-notify cycle driven by `rule.reNotifyMinutes`. The `withLastNotifiedAt()` wither method exists on `AlertInstance` but is never called anywhere in production code. `NotificationDispatchJob` has no logic to check `instance.lastNotifiedAt()` or `rule.reNotifyMinutes()`. A rule configured with `reNotifyMinutes=60` will send exactly one notification on first fire and nothing more, regardless of how long the alert stays FIRING. This is a silent spec violation visible to operators when an acknowledged-then-re-fired alert never pages again.
|
||||
|
||||
**Fix:** In `NotificationDispatchJob.markDelivered` path (or in the evaluator after `enqueueNotifications`), set `instance.withLastNotifiedAt(now)` and persist it. Add a scheduled re-notification enqueue: on each evaluator tick, for FIRING instances where `lastNotifiedAt` is older than `rule.reNotifyMinutes` minutes, enqueue fresh `AlertNotification` rows for each webhook binding.
|
||||
|
||||
---
|
||||
|
||||
## IMPORTANT findings
|
||||
|
||||
### I-1: Retry endpoint resets description says "attempts→0" but SQL does `attempts + 1`
|
||||
|
||||
**File:** `cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/controller/AlertNotificationController.java:71-75` and `storage/PostgresAlertNotificationRepository.java:108-119`
|
||||
|
||||
**Impact:** The operator intent of `POST /api/v1/alerts/notifications/{id}/retry` is to reset a FAILED notification and re-dispatch it fresh. The controller comment says "attempts → 0". However, it calls `scheduleRetry(id, Instant.now(), 0, null)`, and `scheduleRetry`'s SQL is `SET attempts = attempts + 1`. If the notification had already hit `webhookMaxAttempts` (default 3), the retried notification will immediately re-fail on the first transient 5xx because `attempts` is now 4 (≥ maxAttempts). The spec says "retry resets attempts to 0"; the code does the opposite.
|
||||
|
||||
**Fix:** Add a dedicated `resetForRetry(UUID id, Instant nextAttemptAt)` method to the repo that sets `attempts = 0, status = 'PENDING', next_attempt_at = ?, claimed_by = NULL, claimed_until = NULL`. Call it from the retry endpoint instead of `scheduleRetry`.
|
||||
|
||||
---
|
||||
|
||||
### I-2: No UNIQUE partial index on `alert_instances(rule_id)` WHERE open — two replicas can create duplicate FIRING rows
|
||||
|
||||
**File:** `cameleer-server-app/src/main/resources/db/migration/V12__alerting_tables.sql:68`
|
||||
|
||||
**Impact:** `findOpenForRule` is a plain SELECT (not inside a lock), followed by `instanceRepo.save()`. Two evaluator replicas claiming different rule batches won't conflict on the claim (SKIP LOCKED protects that). But `applyResult` calls `findOpenForRule` after claiming — if two replicas claim the same rule in back-to-back windows (claim TTL 30s, min rule interval 5s), the second will also see no open instance (the first is still PENDING, not yet visible if on a different connection) and create a second FIRING row. There is no `UNIQUE (rule_id) WHERE state IN ('PENDING','FIRING','ACKNOWLEDGED')` to block this. In a single-replica setup this is harmless; in HA it causes duplicate alerts.
|
||||
|
||||
**Fix:** Add `CREATE UNIQUE INDEX alert_instances_open_rule_uq ON alert_instances (rule_id) WHERE rule_id IS NOT NULL AND state IN ('PENDING','FIRING','ACKNOWLEDGED');` and handle the unique-violation in `save()` (log + skip).
|
||||
|
||||
---
|
||||
|
||||
### I-3: SSRF guard absent on `OutboundConnection.url` (Plan 01 scope, flagged here)
|
||||
|
||||
**File:** `cameleer-server-app/src/main/java/com/cameleer/server/app/outbound/dto/OutboundConnectionRequest.java` and `OutboundConnectionAdminController`
|
||||
|
||||
**Impact:** The URL constraint is `@Pattern("^https://.+")` — it accepts `https://169.254.169.254/` (AWS metadata), `https://10.0.0.1/internal`, and `https://localhost/`. An ADMIN user can configure a connection pointing to cloud metadata or internal services; the dispatcher will POST to it. In the SaaS multi-tenant context this is a server-side request forgery risk. Plan 01 scope — not blocking Plan 02 merge — but must be resolved before this feature is exposed in SaaS.
|
||||
|
||||
**Suggested fix:** At service-layer save time, resolve the URL's hostname and reject RFC-1918, loopback, link-local, and unroutable addresses. The Apache HttpClient already enforces the TLS handshake, which limits practical exploit, but the URL-level guard should be explicit.
|
||||
|
||||
---
|
||||
|
||||
### I-4: `alerting_notifications_total` metric (`notificationOutcome`) is never called
|
||||
|
||||
**File:** `cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/metrics/AlertingMetrics.java:141` and `notify/NotificationDispatchJob.java`
|
||||
|
||||
**Impact:** `AlertingMetrics.notificationOutcome(status)` is defined but `NotificationDispatchJob.processOne()` never calls it after `markDelivered`, `markFailed`, or `scheduleRetry`. The `alerting_notifications_total` counter will always read 0, making the metric useless for dashboards/alerts.
|
||||
|
||||
**Fix:** Call `metrics.notificationOutcome(NotificationStatus.DELIVERED)` / `FAILED` at the three outcome branches in `processOne()`. Requires injecting `AlertingMetrics` into `NotificationDispatchJob`.
|
||||
|
||||
---
|
||||
|
||||
## NIT findings
|
||||
|
||||
### N-1: `P95_LATENCY_MS` silently falls back to `avgDurationMs`
|
||||
|
||||
**File:** `cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/RouteMetricEvaluator.java:52` and `cameleer-server-core/src/main/java/com/cameleer/server/core/alerting/RouteMetric.java`
|
||||
|
||||
`ExecutionStats` has no p95 field (only `p99LatencyMs`). The evaluator handles `P95_LATENCY_MS` by returning `avgDurationMs` with a code comment acknowledging the substitution. This is misleading to operators who configure a threshold expecting p95 semantics. Recommend either removing `P95_LATENCY_MS` from the enum or renaming it `AVG_DURATION_MS` before GA.
|
||||
|
||||
---
|
||||
|
||||
### N-2: `withTargets()` IN-clause uses string interpolation with UUIDs
|
||||
|
||||
**File:** `cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/storage/PostgresAlertRuleRepository.java:124-127`
|
||||
|
||||
`inClause` is built by string-joining rule UUIDs. UUIDs come from the database (not user input), so SQL injection is not a realistic risk here. However the pattern is fragile and inconsistent with the rest of the codebase which uses parameterized queries. If `batchSize` ever grows large, a single `claimDueRules` call with 20 rules generates a 20-UUID IN clause that Postgres has to plan each time. Use `= ANY(?)` with a UUID array instead (matches the pattern already used in `PostgresAlertInstanceRepository`).
|
||||
|
||||
---
|
||||
|
||||
### N-3: `AlertingMetrics` gauge queries hit Postgres on every Micrometer scrape — no caching
|
||||
|
||||
**File:** `cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/metrics/AlertingMetrics.java:153-174`
|
||||
|
||||
Default scrape interval is typically 60s per Prometheus config, so 6 COUNT(*) queries per minute total. At current scale this is fine. If scrape interval is tightened (e.g. for alerting rules) or tenant count grows, these gauges add visible Postgres load. A 30s in-memory cache (e.g. `AtomicReference<CachedValue>` with expiry) would eliminate the concern. Low priority — leave as a documented follow-up.
|
||||
|
||||
---
|
||||
|
||||
## Notable strengths
|
||||
|
||||
- **Security gate is airtight for the injection surface:** The `ATTR_KEY` regex is applied on both create (`@PostMapping`) and update (`@PutMapping`) paths, validated before any persistence. Attribute values, log patterns, JVM metric names, and logger names all go through parameterized queries — only keys are inlined, and only after regex validation.
|
||||
- **Claim-polling concurrency model:** Both `claimDueRules` and `claimDueNotifications` use the correct `UPDATE … WHERE id IN (SELECT … FOR UPDATE SKIP LOCKED) RETURNING *` pattern. The subquery lock does not re-scan the outer table; rows are locked, updated, and returned atomically, which is exactly what multi-replica claim-polling requires.
|
||||
- **Target population from rule on FIRING:** `AlertStateTransitions.newInstance()` correctly copies USER/GROUP/ROLE targets from the rule at fire time, so inbox queries work correctly once B-1 is fixed.
|
||||
- **Rule snapshot is frozen on creation and never re-written on state transitions:** `withRuleSnapshot()` is only called in `applyResult` and `applyBatchFiring` before the first `instanceRepo.save()`, and the ON CONFLICT UPDATE clause on `alert_instances` intentionally does not include `rule_snapshot`. History survives rule deletion correctly.
|
||||
- **Test coverage is substantive:** The lifecycle IT (`AlertingFullLifecycleIT`) verifies fire→dispatch→ack→silence→rule-delete end-to-end with real Postgres, real ClickHouse, and WireMock. The webhook body assertion (step 2) confirms the rule name is present in the payload, not just that one POST arrived.
|
||||
- **ClickHouse test bootstrap is production-identical:** `ClickHouseTestHelper` runs the same `clickhouse/init.sql` as `ClickHouseSchemaInitializer`; no schema drift between test and prod paths.
|
||||
|
||||
---
|
||||
|
||||
## Open questions
|
||||
|
||||
1. **Target persistence design intent:** Was `alert_rule_targets` always intended to be managed by a separate `saveTargets()` call that was accidentally omitted, or was there a plan to store targets as JSONB in the `alert_rules` row (which would be simpler and avoid the separate table)? The migration creates the table, the evaluator reads from it, but the write path is absent. Clarify before fixing.
|
||||
|
||||
2. **Re-notification: evaluator vs dispatcher responsibility:** Should re-notification enqueue happen in the evaluator (on each tick when the instance is still FIRING and cadence elapsed) or in the dispatcher (after delivery, schedule a future notification)? The evaluator has the rule and instance context; the dispatcher has the outcome timing. Spec §7 is silent on which component owns this — confirm before implementing.
|
||||
|
||||
3. **HA deployment intent:** Is the alerting subsystem expected to run on multiple replicas in the current release? If single-replica only, the UNIQUE index for open instances (I-2) can be deferred; if HA is in scope for this release it should be fixed now.
|
||||
|
||||
4. **`P95_LATENCY_MS` enum removal:** Removing from the enum is a breaking API change if any rules using `P95_LATENCY_MS` exist in production (unlikely at launch, but confirm). Renaming to `AVG_DURATION_MS` also requires a migration to update existing `condition` JSONB values and the `condition_kind_enum` type.
|
||||
@@ -31,7 +31,7 @@ Fires when a computed route metric crosses a threshold over a rolling window.
|
||||
}
|
||||
```
|
||||
|
||||
Available metrics: `ERROR_RATE`, `THROUGHPUT`, `MEAN_PROCESSING_MS`, `P95_PROCESSING_MS`.
|
||||
Available metrics: `ERROR_RATE`, `THROUGHPUT`, `AVG_DURATION_MS`, `P99_LATENCY_MS`, `ERROR_COUNT`.
|
||||
Comparators: `GT`, `GTE`, `LT`, `LTE`, `EQ`.
|
||||
|
||||
### EXCHANGE_MATCH
|
||||
|
||||
Reference in New Issue
Block a user