diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseLogStore.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseLogStore.java index 708e0ef5..1cf95fa3 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseLogStore.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseLogStore.java @@ -256,6 +256,84 @@ public class ClickHouseLogStore implements LogIndex { return new LogSearchResponse(results, nextCursor, hasMore, levelCounts); } + /** + * Counts log entries matching the given request — no {@code FINAL}, no cursor/sort/limit. + * Intended for alerting evaluators (LogPatternEvaluator) which tolerate brief duplicate counts. + */ + public long countLogs(LogSearchRequest request) { + List conditions = new ArrayList<>(); + List params = new ArrayList<>(); + conditions.add("tenant_id = ?"); + params.add(tenantId); + + if (request.environment() != null && !request.environment().isEmpty()) { + conditions.add("environment = ?"); + params.add(request.environment()); + } + + if (request.application() != null && !request.application().isEmpty()) { + conditions.add("application = ?"); + params.add(request.application()); + } + + if (request.instanceId() != null && !request.instanceId().isEmpty()) { + conditions.add("instance_id = ?"); + params.add(request.instanceId()); + } + + if (request.exchangeId() != null && !request.exchangeId().isEmpty()) { + conditions.add("(exchange_id = ?" + + " OR (mapContains(mdc, 'cameleer.exchangeId') AND mdc['cameleer.exchangeId'] = ?)" + + " OR (mapContains(mdc, 'camel.exchangeId') AND mdc['camel.exchangeId'] = ?))"); + params.add(request.exchangeId()); + params.add(request.exchangeId()); + params.add(request.exchangeId()); + } + + if (request.q() != null && !request.q().isEmpty()) { + String term = "%" + escapeLike(request.q()) + "%"; + conditions.add("(message ILIKE ? OR stack_trace ILIKE ?)"); + params.add(term); + params.add(term); + } + + if (request.logger() != null && !request.logger().isEmpty()) { + conditions.add("logger_name ILIKE ?"); + params.add("%" + escapeLike(request.logger()) + "%"); + } + + if (request.sources() != null && !request.sources().isEmpty()) { + String placeholders = String.join(", ", Collections.nCopies(request.sources().size(), "?")); + conditions.add("source IN (" + placeholders + ")"); + for (String s : request.sources()) { + params.add(s); + } + } + + if (request.levels() != null && !request.levels().isEmpty()) { + String placeholders = String.join(", ", Collections.nCopies(request.levels().size(), "?")); + conditions.add("level IN (" + placeholders + ")"); + for (String lvl : request.levels()) { + params.add(lvl.toUpperCase()); + } + } + + if (request.from() != null) { + conditions.add("timestamp >= parseDateTime64BestEffort(?, 3)"); + params.add(request.from().toString()); + } + + if (request.to() != null) { + conditions.add("timestamp <= parseDateTime64BestEffort(?, 3)"); + params.add(request.to().toString()); + } + + String where = String.join(" AND ", conditions); + String sql = "SELECT count() FROM logs WHERE " + where; // NO FINAL + Long result = jdbc.queryForObject(sql, Long.class, params.toArray()); + return result != null ? result : 0L; + } + private Map queryLevelCounts(String baseWhere, List baseParams) { String sql = "SELECT level, count() AS cnt FROM logs WHERE " + baseWhere + " GROUP BY level"; Map counts = new LinkedHashMap<>(); diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/ClickHouseTestHelper.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/ClickHouseTestHelper.java index 045c550e..b2fc66e7 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/ClickHouseTestHelper.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/ClickHouseTestHelper.java @@ -14,7 +14,16 @@ public final class ClickHouseTestHelper { private ClickHouseTestHelper() {} public static void executeInitSql(JdbcTemplate jdbc) throws IOException { - String sql = new ClassPathResource("clickhouse/init.sql") + executeScript(jdbc, "clickhouse/init.sql"); + } + + public static void executeInitSqlWithProjections(JdbcTemplate jdbc) throws IOException { + executeScript(jdbc, "clickhouse/init.sql"); + executeScript(jdbc, "clickhouse/alerting_projections.sql"); + } + + private static void executeScript(JdbcTemplate jdbc, String classpathResource) throws IOException { + String sql = new ClassPathResource(classpathResource) .getContentAsString(StandardCharsets.UTF_8); for (String statement : sql.split(";")) { String trimmed = statement.trim(); @@ -24,7 +33,20 @@ public final class ClickHouseTestHelper { .filter(line -> !line.isEmpty()) .reduce("", (a, b) -> a + b); if (!withoutComments.isEmpty()) { - jdbc.execute(trimmed); + String upper = withoutComments.toUpperCase(); + boolean isBestEffort = upper.contains("MATERIALIZE PROJECTION") + || upper.contains("ADD PROJECTION"); + try { + jdbc.execute(trimmed); + } catch (Exception e) { + if (isBestEffort) { + // ADD PROJECTION on ReplacingMergeTree requires a session setting unavailable + // via JDBC pool; MATERIALIZE can fail on empty tables — both non-fatal in tests. + System.err.println("Projection DDL skipped (non-fatal): " + e.getMessage()); + } else { + throw e; + } + } } } } diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseLogStoreCountIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseLogStoreCountIT.java new file mode 100644 index 00000000..a363417b --- /dev/null +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseLogStoreCountIT.java @@ -0,0 +1,127 @@ +package com.cameleer.server.app.search; + +import com.cameleer.common.model.LogEntry; +import com.cameleer.server.core.ingestion.BufferedLogEntry; +import com.cameleer.server.core.search.LogSearchRequest; +import com.cameleer.server.app.ClickHouseTestHelper; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseLogStoreCountIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseLogStore store; + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + ClickHouseTestHelper.executeInitSql(jdbc); + jdbc.execute("TRUNCATE TABLE logs"); + + store = new ClickHouseLogStore("default", jdbc); + } + + /** Seed a log row with explicit environment via insertBufferedBatch. */ + private void seed(String tenantId, String environment, String appId, String instanceId, + Instant ts, String level, String message) { + LogEntry entry = new LogEntry(ts, level, "com.example.Foo", message, "main", null, null); + store.insertBufferedBatch(List.of( + new BufferedLogEntry(tenantId, environment, instanceId, appId, entry))); + } + + @Test + void countLogs_respectsLevelAndPattern() { + Instant base = Instant.parse("2026-04-19T10:00:00Z"); + + // 3 ERROR rows with "TimeoutException" message + for (int i = 0; i < 3; i++) { + seed("default", "dev", "orders", "agent-1", base.plusSeconds(i), + "ERROR", "TimeoutException occurred"); + } + // 2 non-matching INFO rows + for (int i = 0; i < 2; i++) { + seed("default", "dev", "orders", "agent-1", base.plusSeconds(10 + i), + "INFO", "Health check OK"); + } + + long count = store.countLogs(new LogSearchRequest( + "TimeoutException", + List.of("ERROR"), + "orders", + null, + null, + null, + "dev", + List.of(), + base.minusSeconds(10), + base.plusSeconds(30), + null, + 100, + "desc")); + + assertThat(count).isEqualTo(3); + } + + @Test + void countLogs_noMatchReturnsZero() { + Instant base = Instant.parse("2026-04-19T10:00:00Z"); + seed("default", "dev", "orders", "agent-1", base, "INFO", "all good"); + + long count = store.countLogs(new LogSearchRequest( + null, + List.of("ERROR"), + "orders", + null, + null, + null, + "dev", + List.of(), + base.minusSeconds(10), + base.plusSeconds(30), + null, + 100, + "desc")); + + assertThat(count).isZero(); + } + + @Test + void countLogs_environmentFilter_isolatesEnvironments() { + Instant base = Instant.parse("2026-04-19T10:00:00Z"); + // 2 rows in "dev" + seed("default", "dev", "orders", "agent-1", base, "ERROR", "err"); + seed("default", "dev", "orders", "agent-1", base.plusSeconds(1), "ERROR", "err"); + // 1 row in "prod" — should not be counted + seed("default", "prod", "orders", "agent-2", base.plusSeconds(5), "ERROR", "err"); + + long devCount = store.countLogs(new LogSearchRequest( + null, List.of("ERROR"), "orders", null, null, null, + "dev", List.of(), + base.minusSeconds(1), base.plusSeconds(60), + null, 100, "desc")); + + assertThat(devCount).isEqualTo(2); + } +}