From 7b79d3aa6422019bf3e23b9b1b52f65a0ded4c4e Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Sun, 19 Apr 2026 19:18:49 +0200 Subject: [PATCH] feat(alerting): countExecutionsForAlerting for exchange-match evaluator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds AlertMatchSpec record (core) and ClickHouseSearchIndex.countExecutionsForAlerting — no FINAL, no text subqueries. Filters by tenant, env, app, route, status, time window, and optional after-cursor. Attributes (JSON string column) use inlined JSONExtractString key literals since ClickHouse JDBC does not bind ? placeholders inside JSON functions. Co-Authored-By: Claude Sonnet 4.6 --- .../app/search/ClickHouseSearchIndex.java | 49 ++++++ .../ClickHouseSearchIndexAlertingCountIT.java | 146 ++++++++++++++++++ .../server/core/alerting/AlertMatchSpec.java | 25 +++ 3 files changed, 220 insertions(+) create mode 100644 cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexAlertingCountIT.java create mode 100644 cameleer-server-core/src/main/java/com/cameleer/server/core/alerting/AlertMatchSpec.java diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseSearchIndex.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseSearchIndex.java index d23eef3f..ce550495 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseSearchIndex.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseSearchIndex.java @@ -1,5 +1,6 @@ package com.cameleer.server.app.search; +import com.cameleer.server.core.alerting.AlertMatchSpec; import com.cameleer.server.core.search.ExecutionSummary; import com.cameleer.server.core.search.SearchRequest; import com.cameleer.server.core.search.SearchResult; @@ -317,6 +318,54 @@ public class ClickHouseSearchIndex implements SearchIndex { .replace("_", "\\_"); } + /** + * Counts executions matching the given alerting spec — no {@code FINAL}, no text subqueries. + * Attributes are stored as a JSON string column; use {@code JSONExtractString} for key=value filters. + */ + public long countExecutionsForAlerting(AlertMatchSpec spec) { + List conditions = new ArrayList<>(); + List args = new ArrayList<>(); + + conditions.add("tenant_id = ?"); + args.add(spec.tenantId()); + conditions.add("environment = ?"); + args.add(spec.environment()); + conditions.add("start_time >= ?"); + args.add(Timestamp.from(spec.from())); + conditions.add("start_time <= ?"); + args.add(Timestamp.from(spec.to())); + + if (spec.applicationId() != null) { + conditions.add("application_id = ?"); + args.add(spec.applicationId()); + } + if (spec.routeId() != null) { + conditions.add("route_id = ?"); + args.add(spec.routeId()); + } + if (spec.status() != null) { + conditions.add("status = ?"); + args.add(spec.status()); + } + if (spec.after() != null) { + conditions.add("start_time > ?"); + args.add(Timestamp.from(spec.after())); + } + + // attributes is a JSON String column. JSONExtractString does not accept a ? placeholder for + // the key argument via ClickHouse JDBC — inline the key as a single-quoted literal. + // Keys originate from internal AlertMatchSpec (evaluator-constructed, not user HTTP input). + for (Map.Entry entry : spec.attributes().entrySet()) { + String escapedKey = entry.getKey().replace("'", "\\'"); + conditions.add("JSONExtractString(attributes, '" + escapedKey + "') = ?"); + args.add(entry.getValue()); + } + + String sql = "SELECT count() FROM executions WHERE " + String.join(" AND ", conditions); // NO FINAL + Long result = jdbc.queryForObject(sql, Long.class, args.toArray()); + return result != null ? result : 0L; + } + @Override public List distinctAttributeKeys(String environment) { try { diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexAlertingCountIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexAlertingCountIT.java new file mode 100644 index 00000000..67fc97e1 --- /dev/null +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexAlertingCountIT.java @@ -0,0 +1,146 @@ +package com.cameleer.server.app.search; + +import com.cameleer.server.app.ClickHouseTestHelper; +import com.cameleer.server.app.storage.ClickHouseExecutionStore; +import com.cameleer.server.core.alerting.AlertMatchSpec; +import com.cameleer.server.core.ingestion.MergedExecution; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseSearchIndexAlertingCountIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseSearchIndex searchIndex; + private ClickHouseExecutionStore store; + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + ClickHouseTestHelper.executeInitSql(jdbc); + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + store = new ClickHouseExecutionStore("default", jdbc); + searchIndex = new ClickHouseSearchIndex("default", jdbc); + } + + private MergedExecution exec(String id, String status, String appId, String routeId, String attributes, Instant start) { + return new MergedExecution( + "default", 1L, id, routeId, "agent-1", appId, "prod", + status, "", "exchange-" + id, + start, start.plusMillis(100), 100L, + "", "", "", "", "", "", // errorMessage..rootCauseMessage + "", "FULL", // diagramContentHash, engineLevel + "", "", "", "", "", "", // inputBody, outputBody, inputHeaders, outputHeaders, inputProperties, outputProperties + attributes, // attributes (JSON string) + "", "", // traceId, spanId + false, false, + null, null + ); + } + + @Test + void countExecutionsForAlerting_byStatus() { + Instant base = Instant.parse("2026-04-19T10:00:00Z"); + store.insertExecutionBatch(List.of( + exec("e1", "FAILED", "orders", "route-a", "{}", base), + exec("e2", "FAILED", "orders", "route-a", "{}", base.plusSeconds(1)), + exec("e3", "COMPLETED", "orders", "route-a", "{}", base.plusSeconds(2)) + )); + + AlertMatchSpec spec = new AlertMatchSpec( + "default", "prod", "orders", null, "FAILED", + null, + base.minusSeconds(10), base.plusSeconds(60), null); + + assertThat(searchIndex.countExecutionsForAlerting(spec)).isEqualTo(2); + } + + @Test + void countExecutionsForAlerting_byRouteId() { + Instant base = Instant.parse("2026-04-19T10:00:00Z"); + store.insertExecutionBatch(List.of( + exec("e1", "FAILED", "orders", "route-a", "{}", base), + exec("e2", "FAILED", "orders", "route-b", "{}", base.plusSeconds(1)), + exec("e3", "FAILED", "orders", "route-a", "{}", base.plusSeconds(2)) + )); + + AlertMatchSpec spec = new AlertMatchSpec( + "default", "prod", null, "route-a", null, + null, + base.minusSeconds(10), base.plusSeconds(60), null); + + assertThat(searchIndex.countExecutionsForAlerting(spec)).isEqualTo(2); + } + + @Test + void countExecutionsForAlerting_withAttributes() { + Instant base = Instant.parse("2026-04-19T10:00:00Z"); + store.insertExecutionBatch(List.of( + exec("e1", "FAILED", "orders", "route-a", "{\"region\":\"eu\",\"priority\":\"high\"}", base), + exec("e2", "FAILED", "orders", "route-a", "{\"region\":\"us\"}", base.plusSeconds(1)), + exec("e3", "FAILED", "orders", "route-a", "{}", base.plusSeconds(2)) + )); + + AlertMatchSpec spec = new AlertMatchSpec( + "default", "prod", null, null, null, + Map.of("region", "eu"), + base.minusSeconds(10), base.plusSeconds(60), null); + + assertThat(searchIndex.countExecutionsForAlerting(spec)).isEqualTo(1); + } + + @Test + void countExecutionsForAlerting_afterCursor() { + Instant base = Instant.parse("2026-04-19T10:00:00Z"); + store.insertExecutionBatch(List.of( + exec("e1", "FAILED", "orders", "route-a", "{}", base), + exec("e2", "FAILED", "orders", "route-a", "{}", base.plusSeconds(5)), + exec("e3", "FAILED", "orders", "route-a", "{}", base.plusSeconds(10)) + )); + + // after = base+2s, so only e2 and e3 should count + AlertMatchSpec spec = new AlertMatchSpec( + "default", "prod", null, null, null, + null, + base.minusSeconds(1), base.plusSeconds(60), base.plusSeconds(2)); + + assertThat(searchIndex.countExecutionsForAlerting(spec)).isEqualTo(2); + } + + @Test + void countExecutionsForAlerting_noMatchReturnsZero() { + Instant base = Instant.parse("2026-04-19T10:00:00Z"); + store.insertExecutionBatch(List.of( + exec("e1", "COMPLETED", "orders", "route-a", "{}", base) + )); + + AlertMatchSpec spec = new AlertMatchSpec( + "default", "prod", null, null, "FAILED", + null, + base.minusSeconds(10), base.plusSeconds(60), null); + + assertThat(searchIndex.countExecutionsForAlerting(spec)).isZero(); + } +} diff --git a/cameleer-server-core/src/main/java/com/cameleer/server/core/alerting/AlertMatchSpec.java b/cameleer-server-core/src/main/java/com/cameleer/server/core/alerting/AlertMatchSpec.java new file mode 100644 index 00000000..9d3c78be --- /dev/null +++ b/cameleer-server-core/src/main/java/com/cameleer/server/core/alerting/AlertMatchSpec.java @@ -0,0 +1,25 @@ +package com.cameleer.server.core.alerting; + +import java.time.Instant; +import java.util.Map; + +/** + * Specification for alerting-specific execution counting. + * Distinct from {@code SearchRequest}: no text-in-body subqueries, no cursor, no {@code FINAL}. + * All fields except {@code tenantId}, {@code environment}, {@code from}, and {@code to} are nullable filters. + */ +public record AlertMatchSpec( + String tenantId, + String environment, + String applicationId, // nullable — omit to match all apps + String routeId, // nullable — omit to match all routes + String status, // "FAILED" / "COMPLETED" / null for any + Map attributes, // exact match on execution attribute key=value; empty = no filter + Instant from, + Instant to, + Instant after // nullable; used by PER_EXCHANGE mode to advance cursor past last seen +) { + public AlertMatchSpec { + attributes = attributes == null ? Map.of() : Map.copyOf(attributes); + } +}