feat(alerting): ClickHouseLogStore.countLogs for log-pattern evaluator
Adds countLogs(LogSearchRequest) — no FINAL, no cursor/sort/limit — reusing the same WHERE-clause logic as search() for tenant, env, app, level, q, logger, source, exchangeId, and time-range filters. Also extends ClickHouseTestHelper with executeInitSqlWithProjections() and makes the script runner non-fatal for ADD/MATERIALIZE PROJECTION. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -256,6 +256,84 @@ public class ClickHouseLogStore implements LogIndex {
|
|||||||
return new LogSearchResponse(results, nextCursor, hasMore, levelCounts);
|
return new LogSearchResponse(results, nextCursor, hasMore, levelCounts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Counts log entries matching the given request — no {@code FINAL}, no cursor/sort/limit.
|
||||||
|
* Intended for alerting evaluators (LogPatternEvaluator) which tolerate brief duplicate counts.
|
||||||
|
*/
|
||||||
|
public long countLogs(LogSearchRequest request) {
|
||||||
|
List<String> conditions = new ArrayList<>();
|
||||||
|
List<Object> params = new ArrayList<>();
|
||||||
|
conditions.add("tenant_id = ?");
|
||||||
|
params.add(tenantId);
|
||||||
|
|
||||||
|
if (request.environment() != null && !request.environment().isEmpty()) {
|
||||||
|
conditions.add("environment = ?");
|
||||||
|
params.add(request.environment());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request.application() != null && !request.application().isEmpty()) {
|
||||||
|
conditions.add("application = ?");
|
||||||
|
params.add(request.application());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request.instanceId() != null && !request.instanceId().isEmpty()) {
|
||||||
|
conditions.add("instance_id = ?");
|
||||||
|
params.add(request.instanceId());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request.exchangeId() != null && !request.exchangeId().isEmpty()) {
|
||||||
|
conditions.add("(exchange_id = ?" +
|
||||||
|
" OR (mapContains(mdc, 'cameleer.exchangeId') AND mdc['cameleer.exchangeId'] = ?)" +
|
||||||
|
" OR (mapContains(mdc, 'camel.exchangeId') AND mdc['camel.exchangeId'] = ?))");
|
||||||
|
params.add(request.exchangeId());
|
||||||
|
params.add(request.exchangeId());
|
||||||
|
params.add(request.exchangeId());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request.q() != null && !request.q().isEmpty()) {
|
||||||
|
String term = "%" + escapeLike(request.q()) + "%";
|
||||||
|
conditions.add("(message ILIKE ? OR stack_trace ILIKE ?)");
|
||||||
|
params.add(term);
|
||||||
|
params.add(term);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request.logger() != null && !request.logger().isEmpty()) {
|
||||||
|
conditions.add("logger_name ILIKE ?");
|
||||||
|
params.add("%" + escapeLike(request.logger()) + "%");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request.sources() != null && !request.sources().isEmpty()) {
|
||||||
|
String placeholders = String.join(", ", Collections.nCopies(request.sources().size(), "?"));
|
||||||
|
conditions.add("source IN (" + placeholders + ")");
|
||||||
|
for (String s : request.sources()) {
|
||||||
|
params.add(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request.levels() != null && !request.levels().isEmpty()) {
|
||||||
|
String placeholders = String.join(", ", Collections.nCopies(request.levels().size(), "?"));
|
||||||
|
conditions.add("level IN (" + placeholders + ")");
|
||||||
|
for (String lvl : request.levels()) {
|
||||||
|
params.add(lvl.toUpperCase());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request.from() != null) {
|
||||||
|
conditions.add("timestamp >= parseDateTime64BestEffort(?, 3)");
|
||||||
|
params.add(request.from().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request.to() != null) {
|
||||||
|
conditions.add("timestamp <= parseDateTime64BestEffort(?, 3)");
|
||||||
|
params.add(request.to().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
String where = String.join(" AND ", conditions);
|
||||||
|
String sql = "SELECT count() FROM logs WHERE " + where; // NO FINAL
|
||||||
|
Long result = jdbc.queryForObject(sql, Long.class, params.toArray());
|
||||||
|
return result != null ? result : 0L;
|
||||||
|
}
|
||||||
|
|
||||||
private Map<String, Long> queryLevelCounts(String baseWhere, List<Object> baseParams) {
|
private Map<String, Long> queryLevelCounts(String baseWhere, List<Object> baseParams) {
|
||||||
String sql = "SELECT level, count() AS cnt FROM logs WHERE " + baseWhere + " GROUP BY level";
|
String sql = "SELECT level, count() AS cnt FROM logs WHERE " + baseWhere + " GROUP BY level";
|
||||||
Map<String, Long> counts = new LinkedHashMap<>();
|
Map<String, Long> counts = new LinkedHashMap<>();
|
||||||
|
|||||||
@@ -14,7 +14,16 @@ public final class ClickHouseTestHelper {
|
|||||||
private ClickHouseTestHelper() {}
|
private ClickHouseTestHelper() {}
|
||||||
|
|
||||||
public static void executeInitSql(JdbcTemplate jdbc) throws IOException {
|
public static void executeInitSql(JdbcTemplate jdbc) throws IOException {
|
||||||
String sql = new ClassPathResource("clickhouse/init.sql")
|
executeScript(jdbc, "clickhouse/init.sql");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void executeInitSqlWithProjections(JdbcTemplate jdbc) throws IOException {
|
||||||
|
executeScript(jdbc, "clickhouse/init.sql");
|
||||||
|
executeScript(jdbc, "clickhouse/alerting_projections.sql");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void executeScript(JdbcTemplate jdbc, String classpathResource) throws IOException {
|
||||||
|
String sql = new ClassPathResource(classpathResource)
|
||||||
.getContentAsString(StandardCharsets.UTF_8);
|
.getContentAsString(StandardCharsets.UTF_8);
|
||||||
for (String statement : sql.split(";")) {
|
for (String statement : sql.split(";")) {
|
||||||
String trimmed = statement.trim();
|
String trimmed = statement.trim();
|
||||||
@@ -24,7 +33,20 @@ public final class ClickHouseTestHelper {
|
|||||||
.filter(line -> !line.isEmpty())
|
.filter(line -> !line.isEmpty())
|
||||||
.reduce("", (a, b) -> a + b);
|
.reduce("", (a, b) -> a + b);
|
||||||
if (!withoutComments.isEmpty()) {
|
if (!withoutComments.isEmpty()) {
|
||||||
jdbc.execute(trimmed);
|
String upper = withoutComments.toUpperCase();
|
||||||
|
boolean isBestEffort = upper.contains("MATERIALIZE PROJECTION")
|
||||||
|
|| upper.contains("ADD PROJECTION");
|
||||||
|
try {
|
||||||
|
jdbc.execute(trimmed);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (isBestEffort) {
|
||||||
|
// ADD PROJECTION on ReplacingMergeTree requires a session setting unavailable
|
||||||
|
// via JDBC pool; MATERIALIZE can fail on empty tables — both non-fatal in tests.
|
||||||
|
System.err.println("Projection DDL skipped (non-fatal): " + e.getMessage());
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,127 @@
|
|||||||
|
package com.cameleer.server.app.search;
|
||||||
|
|
||||||
|
import com.cameleer.common.model.LogEntry;
|
||||||
|
import com.cameleer.server.core.ingestion.BufferedLogEntry;
|
||||||
|
import com.cameleer.server.core.search.LogSearchRequest;
|
||||||
|
import com.cameleer.server.app.ClickHouseTestHelper;
|
||||||
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
class ClickHouseLogStoreCountIT {
|
||||||
|
|
||||||
|
@Container
|
||||||
|
static final ClickHouseContainer clickhouse =
|
||||||
|
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||||
|
|
||||||
|
private JdbcTemplate jdbc;
|
||||||
|
private ClickHouseLogStore store;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() throws Exception {
|
||||||
|
HikariDataSource ds = new HikariDataSource();
|
||||||
|
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||||
|
ds.setUsername(clickhouse.getUsername());
|
||||||
|
ds.setPassword(clickhouse.getPassword());
|
||||||
|
|
||||||
|
jdbc = new JdbcTemplate(ds);
|
||||||
|
ClickHouseTestHelper.executeInitSql(jdbc);
|
||||||
|
jdbc.execute("TRUNCATE TABLE logs");
|
||||||
|
|
||||||
|
store = new ClickHouseLogStore("default", jdbc);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Seed a log row with explicit environment via insertBufferedBatch. */
|
||||||
|
private void seed(String tenantId, String environment, String appId, String instanceId,
|
||||||
|
Instant ts, String level, String message) {
|
||||||
|
LogEntry entry = new LogEntry(ts, level, "com.example.Foo", message, "main", null, null);
|
||||||
|
store.insertBufferedBatch(List.of(
|
||||||
|
new BufferedLogEntry(tenantId, environment, instanceId, appId, entry)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void countLogs_respectsLevelAndPattern() {
|
||||||
|
Instant base = Instant.parse("2026-04-19T10:00:00Z");
|
||||||
|
|
||||||
|
// 3 ERROR rows with "TimeoutException" message
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
seed("default", "dev", "orders", "agent-1", base.plusSeconds(i),
|
||||||
|
"ERROR", "TimeoutException occurred");
|
||||||
|
}
|
||||||
|
// 2 non-matching INFO rows
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
seed("default", "dev", "orders", "agent-1", base.plusSeconds(10 + i),
|
||||||
|
"INFO", "Health check OK");
|
||||||
|
}
|
||||||
|
|
||||||
|
long count = store.countLogs(new LogSearchRequest(
|
||||||
|
"TimeoutException",
|
||||||
|
List.of("ERROR"),
|
||||||
|
"orders",
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
"dev",
|
||||||
|
List.of(),
|
||||||
|
base.minusSeconds(10),
|
||||||
|
base.plusSeconds(30),
|
||||||
|
null,
|
||||||
|
100,
|
||||||
|
"desc"));
|
||||||
|
|
||||||
|
assertThat(count).isEqualTo(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void countLogs_noMatchReturnsZero() {
|
||||||
|
Instant base = Instant.parse("2026-04-19T10:00:00Z");
|
||||||
|
seed("default", "dev", "orders", "agent-1", base, "INFO", "all good");
|
||||||
|
|
||||||
|
long count = store.countLogs(new LogSearchRequest(
|
||||||
|
null,
|
||||||
|
List.of("ERROR"),
|
||||||
|
"orders",
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
"dev",
|
||||||
|
List.of(),
|
||||||
|
base.minusSeconds(10),
|
||||||
|
base.plusSeconds(30),
|
||||||
|
null,
|
||||||
|
100,
|
||||||
|
"desc"));
|
||||||
|
|
||||||
|
assertThat(count).isZero();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void countLogs_environmentFilter_isolatesEnvironments() {
|
||||||
|
Instant base = Instant.parse("2026-04-19T10:00:00Z");
|
||||||
|
// 2 rows in "dev"
|
||||||
|
seed("default", "dev", "orders", "agent-1", base, "ERROR", "err");
|
||||||
|
seed("default", "dev", "orders", "agent-1", base.plusSeconds(1), "ERROR", "err");
|
||||||
|
// 1 row in "prod" — should not be counted
|
||||||
|
seed("default", "prod", "orders", "agent-2", base.plusSeconds(5), "ERROR", "err");
|
||||||
|
|
||||||
|
long devCount = store.countLogs(new LogSearchRequest(
|
||||||
|
null, List.of("ERROR"), "orders", null, null, null,
|
||||||
|
"dev", List.of(),
|
||||||
|
base.minusSeconds(1), base.plusSeconds(60),
|
||||||
|
null, 100, "desc"));
|
||||||
|
|
||||||
|
assertThat(devCount).isEqualTo(2);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user