feat(alerting): countExecutionsForAlerting for exchange-match evaluator
Adds AlertMatchSpec record (core) and ClickHouseSearchIndex.countExecutionsForAlerting — no FINAL, no text subqueries. Filters by tenant, env, app, route, status, time window, and optional after-cursor. Attributes (JSON string column) use inlined JSONExtractString key literals since ClickHouse JDBC does not bind ? placeholders inside JSON functions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
package com.cameleer.server.app.search;
|
package com.cameleer.server.app.search;
|
||||||
|
|
||||||
|
import com.cameleer.server.core.alerting.AlertMatchSpec;
|
||||||
import com.cameleer.server.core.search.ExecutionSummary;
|
import com.cameleer.server.core.search.ExecutionSummary;
|
||||||
import com.cameleer.server.core.search.SearchRequest;
|
import com.cameleer.server.core.search.SearchRequest;
|
||||||
import com.cameleer.server.core.search.SearchResult;
|
import com.cameleer.server.core.search.SearchResult;
|
||||||
@@ -317,6 +318,54 @@ public class ClickHouseSearchIndex implements SearchIndex {
|
|||||||
.replace("_", "\\_");
|
.replace("_", "\\_");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Counts executions matching the given alerting spec — no {@code FINAL}, no text subqueries.
|
||||||
|
* Attributes are stored as a JSON string column; use {@code JSONExtractString} for key=value filters.
|
||||||
|
*/
|
||||||
|
public long countExecutionsForAlerting(AlertMatchSpec spec) {
|
||||||
|
List<String> conditions = new ArrayList<>();
|
||||||
|
List<Object> args = new ArrayList<>();
|
||||||
|
|
||||||
|
conditions.add("tenant_id = ?");
|
||||||
|
args.add(spec.tenantId());
|
||||||
|
conditions.add("environment = ?");
|
||||||
|
args.add(spec.environment());
|
||||||
|
conditions.add("start_time >= ?");
|
||||||
|
args.add(Timestamp.from(spec.from()));
|
||||||
|
conditions.add("start_time <= ?");
|
||||||
|
args.add(Timestamp.from(spec.to()));
|
||||||
|
|
||||||
|
if (spec.applicationId() != null) {
|
||||||
|
conditions.add("application_id = ?");
|
||||||
|
args.add(spec.applicationId());
|
||||||
|
}
|
||||||
|
if (spec.routeId() != null) {
|
||||||
|
conditions.add("route_id = ?");
|
||||||
|
args.add(spec.routeId());
|
||||||
|
}
|
||||||
|
if (spec.status() != null) {
|
||||||
|
conditions.add("status = ?");
|
||||||
|
args.add(spec.status());
|
||||||
|
}
|
||||||
|
if (spec.after() != null) {
|
||||||
|
conditions.add("start_time > ?");
|
||||||
|
args.add(Timestamp.from(spec.after()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// attributes is a JSON String column. JSONExtractString does not accept a ? placeholder for
|
||||||
|
// the key argument via ClickHouse JDBC — inline the key as a single-quoted literal.
|
||||||
|
// Keys originate from internal AlertMatchSpec (evaluator-constructed, not user HTTP input).
|
||||||
|
for (Map.Entry<String, String> entry : spec.attributes().entrySet()) {
|
||||||
|
String escapedKey = entry.getKey().replace("'", "\\'");
|
||||||
|
conditions.add("JSONExtractString(attributes, '" + escapedKey + "') = ?");
|
||||||
|
args.add(entry.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
String sql = "SELECT count() FROM executions WHERE " + String.join(" AND ", conditions); // NO FINAL
|
||||||
|
Long result = jdbc.queryForObject(sql, Long.class, args.toArray());
|
||||||
|
return result != null ? result : 0L;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> distinctAttributeKeys(String environment) {
|
public List<String> distinctAttributeKeys(String environment) {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -0,0 +1,146 @@
|
|||||||
|
package com.cameleer.server.app.search;
|
||||||
|
|
||||||
|
import com.cameleer.server.app.ClickHouseTestHelper;
|
||||||
|
import com.cameleer.server.app.storage.ClickHouseExecutionStore;
|
||||||
|
import com.cameleer.server.core.alerting.AlertMatchSpec;
|
||||||
|
import com.cameleer.server.core.ingestion.MergedExecution;
|
||||||
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
class ClickHouseSearchIndexAlertingCountIT {
|
||||||
|
|
||||||
|
@Container
|
||||||
|
static final ClickHouseContainer clickhouse =
|
||||||
|
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||||
|
|
||||||
|
private JdbcTemplate jdbc;
|
||||||
|
private ClickHouseSearchIndex searchIndex;
|
||||||
|
private ClickHouseExecutionStore store;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() throws Exception {
|
||||||
|
HikariDataSource ds = new HikariDataSource();
|
||||||
|
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||||
|
ds.setUsername(clickhouse.getUsername());
|
||||||
|
ds.setPassword(clickhouse.getPassword());
|
||||||
|
|
||||||
|
jdbc = new JdbcTemplate(ds);
|
||||||
|
ClickHouseTestHelper.executeInitSql(jdbc);
|
||||||
|
jdbc.execute("TRUNCATE TABLE executions");
|
||||||
|
jdbc.execute("TRUNCATE TABLE processor_executions");
|
||||||
|
|
||||||
|
store = new ClickHouseExecutionStore("default", jdbc);
|
||||||
|
searchIndex = new ClickHouseSearchIndex("default", jdbc);
|
||||||
|
}
|
||||||
|
|
||||||
|
private MergedExecution exec(String id, String status, String appId, String routeId, String attributes, Instant start) {
|
||||||
|
return new MergedExecution(
|
||||||
|
"default", 1L, id, routeId, "agent-1", appId, "prod",
|
||||||
|
status, "", "exchange-" + id,
|
||||||
|
start, start.plusMillis(100), 100L,
|
||||||
|
"", "", "", "", "", "", // errorMessage..rootCauseMessage
|
||||||
|
"", "FULL", // diagramContentHash, engineLevel
|
||||||
|
"", "", "", "", "", "", // inputBody, outputBody, inputHeaders, outputHeaders, inputProperties, outputProperties
|
||||||
|
attributes, // attributes (JSON string)
|
||||||
|
"", "", // traceId, spanId
|
||||||
|
false, false,
|
||||||
|
null, null
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void countExecutionsForAlerting_byStatus() {
|
||||||
|
Instant base = Instant.parse("2026-04-19T10:00:00Z");
|
||||||
|
store.insertExecutionBatch(List.of(
|
||||||
|
exec("e1", "FAILED", "orders", "route-a", "{}", base),
|
||||||
|
exec("e2", "FAILED", "orders", "route-a", "{}", base.plusSeconds(1)),
|
||||||
|
exec("e3", "COMPLETED", "orders", "route-a", "{}", base.plusSeconds(2))
|
||||||
|
));
|
||||||
|
|
||||||
|
AlertMatchSpec spec = new AlertMatchSpec(
|
||||||
|
"default", "prod", "orders", null, "FAILED",
|
||||||
|
null,
|
||||||
|
base.minusSeconds(10), base.plusSeconds(60), null);
|
||||||
|
|
||||||
|
assertThat(searchIndex.countExecutionsForAlerting(spec)).isEqualTo(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void countExecutionsForAlerting_byRouteId() {
|
||||||
|
Instant base = Instant.parse("2026-04-19T10:00:00Z");
|
||||||
|
store.insertExecutionBatch(List.of(
|
||||||
|
exec("e1", "FAILED", "orders", "route-a", "{}", base),
|
||||||
|
exec("e2", "FAILED", "orders", "route-b", "{}", base.plusSeconds(1)),
|
||||||
|
exec("e3", "FAILED", "orders", "route-a", "{}", base.plusSeconds(2))
|
||||||
|
));
|
||||||
|
|
||||||
|
AlertMatchSpec spec = new AlertMatchSpec(
|
||||||
|
"default", "prod", null, "route-a", null,
|
||||||
|
null,
|
||||||
|
base.minusSeconds(10), base.plusSeconds(60), null);
|
||||||
|
|
||||||
|
assertThat(searchIndex.countExecutionsForAlerting(spec)).isEqualTo(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void countExecutionsForAlerting_withAttributes() {
|
||||||
|
Instant base = Instant.parse("2026-04-19T10:00:00Z");
|
||||||
|
store.insertExecutionBatch(List.of(
|
||||||
|
exec("e1", "FAILED", "orders", "route-a", "{\"region\":\"eu\",\"priority\":\"high\"}", base),
|
||||||
|
exec("e2", "FAILED", "orders", "route-a", "{\"region\":\"us\"}", base.plusSeconds(1)),
|
||||||
|
exec("e3", "FAILED", "orders", "route-a", "{}", base.plusSeconds(2))
|
||||||
|
));
|
||||||
|
|
||||||
|
AlertMatchSpec spec = new AlertMatchSpec(
|
||||||
|
"default", "prod", null, null, null,
|
||||||
|
Map.of("region", "eu"),
|
||||||
|
base.minusSeconds(10), base.plusSeconds(60), null);
|
||||||
|
|
||||||
|
assertThat(searchIndex.countExecutionsForAlerting(spec)).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void countExecutionsForAlerting_afterCursor() {
|
||||||
|
Instant base = Instant.parse("2026-04-19T10:00:00Z");
|
||||||
|
store.insertExecutionBatch(List.of(
|
||||||
|
exec("e1", "FAILED", "orders", "route-a", "{}", base),
|
||||||
|
exec("e2", "FAILED", "orders", "route-a", "{}", base.plusSeconds(5)),
|
||||||
|
exec("e3", "FAILED", "orders", "route-a", "{}", base.plusSeconds(10))
|
||||||
|
));
|
||||||
|
|
||||||
|
// after = base+2s, so only e2 and e3 should count
|
||||||
|
AlertMatchSpec spec = new AlertMatchSpec(
|
||||||
|
"default", "prod", null, null, null,
|
||||||
|
null,
|
||||||
|
base.minusSeconds(1), base.plusSeconds(60), base.plusSeconds(2));
|
||||||
|
|
||||||
|
assertThat(searchIndex.countExecutionsForAlerting(spec)).isEqualTo(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void countExecutionsForAlerting_noMatchReturnsZero() {
|
||||||
|
Instant base = Instant.parse("2026-04-19T10:00:00Z");
|
||||||
|
store.insertExecutionBatch(List.of(
|
||||||
|
exec("e1", "COMPLETED", "orders", "route-a", "{}", base)
|
||||||
|
));
|
||||||
|
|
||||||
|
AlertMatchSpec spec = new AlertMatchSpec(
|
||||||
|
"default", "prod", null, null, "FAILED",
|
||||||
|
null,
|
||||||
|
base.minusSeconds(10), base.plusSeconds(60), null);
|
||||||
|
|
||||||
|
assertThat(searchIndex.countExecutionsForAlerting(spec)).isZero();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,25 @@
|
|||||||
|
package com.cameleer.server.core.alerting;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specification for alerting-specific execution counting.
|
||||||
|
* Distinct from {@code SearchRequest}: no text-in-body subqueries, no cursor, no {@code FINAL}.
|
||||||
|
* All fields except {@code tenantId}, {@code environment}, {@code from}, and {@code to} are nullable filters.
|
||||||
|
*/
|
||||||
|
public record AlertMatchSpec(
|
||||||
|
String tenantId,
|
||||||
|
String environment,
|
||||||
|
String applicationId, // nullable — omit to match all apps
|
||||||
|
String routeId, // nullable — omit to match all routes
|
||||||
|
String status, // "FAILED" / "COMPLETED" / null for any
|
||||||
|
Map<String, String> attributes, // exact match on execution attribute key=value; empty = no filter
|
||||||
|
Instant from,
|
||||||
|
Instant to,
|
||||||
|
Instant after // nullable; used by PER_EXCHANGE mode to advance cursor past last seen
|
||||||
|
) {
|
||||||
|
public AlertMatchSpec {
|
||||||
|
attributes = attributes == null ? Map.of() : Map.copyOf(attributes);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user