From 7d7eb52afbd4ddca1034c15ff8c03bc91cfcbb67 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 23:42:07 +0200 Subject: [PATCH] feat(clickhouse): add ClickHouseLogStore with LogIndex interface Extract LogIndex interface from OpenSearchLogIndex. Both ClickHouseLogStore and OpenSearchLogIndex implement it. Controllers now inject LogIndex. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../controller/LogIngestionController.java | 6 +- .../app/controller/LogQueryController.java | 14 +- .../server/app/search/ClickHouseLogStore.java | 129 +++++++++++++ .../server/app/search/OpenSearchLogIndex.java | 17 +- .../app/search/ClickHouseLogStoreIT.java | 178 ++++++++++++++++++ .../server/core/storage/LogEntryResult.java | 6 + .../server/core/storage/LogIndex.java | 15 ++ 7 files changed, 351 insertions(+), 14 deletions(-) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java create mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseLogStoreIT.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogEntryResult.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogIndex.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java index 480ac951..628cc794 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java @@ -1,7 +1,7 @@ package com.cameleer3.server.app.controller; import com.cameleer3.common.model.LogBatch; -import com.cameleer3.server.app.search.OpenSearchLogIndex; +import com.cameleer3.server.core.storage.LogIndex; import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; import io.swagger.v3.oas.annotations.Operation; @@ -24,10 +24,10 @@ public class LogIngestionController { private static final Logger log = LoggerFactory.getLogger(LogIngestionController.class); - private final OpenSearchLogIndex logIndex; + private final LogIndex logIndex; private final AgentRegistryService registryService; - public LogIngestionController(OpenSearchLogIndex logIndex, + public LogIngestionController(LogIndex logIndex, AgentRegistryService registryService) { this.logIndex = logIndex; this.registryService = registryService; diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java index 5def039e..d734946f 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java @@ -1,7 +1,8 @@ package com.cameleer3.server.app.controller; import com.cameleer3.server.app.dto.LogEntryResponse; -import com.cameleer3.server.app.search.OpenSearchLogIndex; +import com.cameleer3.server.core.storage.LogEntryResult; +import com.cameleer3.server.core.storage.LogIndex; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import org.springframework.http.ResponseEntity; @@ -18,9 +19,9 @@ import java.util.List; @Tag(name = "Application Logs", description = "Query application logs stored in OpenSearch") public class LogQueryController { - private final OpenSearchLogIndex logIndex; + private final LogIndex logIndex; - public LogQueryController(OpenSearchLogIndex logIndex) { + public LogQueryController(LogIndex logIndex) { this.logIndex = logIndex; } @@ -42,9 +43,14 @@ public class LogQueryController { Instant fromInstant = from != null ? Instant.parse(from) : null; Instant toInstant = to != null ? Instant.parse(to) : null; - List entries = logIndex.search( + List results = logIndex.search( application, agentId, level, query, exchangeId, fromInstant, toInstant, limit); + List entries = results.stream() + .map(r -> new LogEntryResponse(r.timestamp(), r.level(), r.loggerName(), + r.message(), r.threadName(), r.stackTrace())) + .toList(); + return ResponseEntity.ok(entries); } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java new file mode 100644 index 00000000..52adb6d4 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java @@ -0,0 +1,129 @@ +package com.cameleer3.server.app.search; + +import com.cameleer3.common.model.LogEntry; +import com.cameleer3.server.core.storage.LogEntryResult; +import com.cameleer3.server.core.storage.LogIndex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * ClickHouse-backed implementation of {@link LogIndex}. + * Stores application logs in the {@code logs} MergeTree table with + * ngram bloom-filter indexes for efficient substring search. + */ +public class ClickHouseLogStore implements LogIndex { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseLogStore.class); + private static final String TENANT = "default"; + private static final DateTimeFormatter ISO_FMT = DateTimeFormatter.ISO_INSTANT; + + private final JdbcTemplate jdbc; + + public ClickHouseLogStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void indexBatch(String agentId, String application, List entries) { + if (entries == null || entries.isEmpty()) { + return; + } + + String sql = "INSERT INTO logs (tenant_id, timestamp, application, agent_id, level, " + + "logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + + "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + jdbc.batchUpdate(sql, entries, entries.size(), (ps, entry) -> { + Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now(); + ps.setTimestamp(1, Timestamp.from(ts)); + ps.setString(2, application); + ps.setString(3, agentId); + ps.setString(4, entry.getLevel() != null ? entry.getLevel() : ""); + ps.setString(5, entry.getLoggerName() != null ? entry.getLoggerName() : ""); + ps.setString(6, entry.getMessage() != null ? entry.getMessage() : ""); + ps.setString(7, entry.getThreadName() != null ? entry.getThreadName() : ""); + ps.setString(8, entry.getStackTrace() != null ? entry.getStackTrace() : ""); + + // Extract camel.exchangeId from MDC into top-level column + Map mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap(); + String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); + ps.setString(9, exchangeId); + + // ClickHouse JDBC handles java.util.Map natively for Map columns + ps.setObject(10, mdc); + }); + + log.debug("Indexed {} log entries for agent={}, app={}", entries.size(), agentId, application); + } + + @Override + public List search(String application, String agentId, String level, + String query, String exchangeId, + Instant from, Instant to, int limit) { + StringBuilder sql = new StringBuilder( + "SELECT timestamp, level, logger_name, message, thread_name, stack_trace " + + "FROM logs WHERE tenant_id = 'default' AND application = ?"); + List params = new ArrayList<>(); + params.add(application); + + if (agentId != null && !agentId.isEmpty()) { + sql.append(" AND agent_id = ?"); + params.add(agentId); + } + + if (level != null && !level.isEmpty()) { + sql.append(" AND level = ?"); + params.add(level.toUpperCase()); + } + + if (exchangeId != null && !exchangeId.isEmpty()) { + sql.append(" AND (exchange_id = ? OR (mapContains(mdc, 'camel.exchangeId') AND mdc['camel.exchangeId'] = ?))"); + params.add(exchangeId); + params.add(exchangeId); + } + + if (query != null && !query.isEmpty()) { + sql.append(" AND message LIKE ?"); + params.add("%" + query + "%"); + } + + if (from != null) { + sql.append(" AND timestamp >= ?"); + params.add(Timestamp.from(from)); + } + + if (to != null) { + sql.append(" AND timestamp <= ?"); + params.add(Timestamp.from(to)); + } + + sql.append(" ORDER BY timestamp DESC LIMIT ?"); + params.add(limit); + + return jdbc.query(sql.toString(), params.toArray(), (rs, rowNum) -> { + Timestamp ts = rs.getTimestamp("timestamp"); + String timestampStr = ts != null + ? ts.toInstant().atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT) + : null; + return new LogEntryResult( + timestampStr, + rs.getString("level"), + rs.getString("logger_name"), + rs.getString("message"), + rs.getString("thread_name"), + rs.getString("stack_trace") + ); + }); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchLogIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchLogIndex.java index cf1f7628..46550010 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchLogIndex.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchLogIndex.java @@ -1,7 +1,8 @@ package com.cameleer3.server.app.search; import com.cameleer3.common.model.LogEntry; -import com.cameleer3.server.app.dto.LogEntryResponse; +import com.cameleer3.server.core.storage.LogEntryResult; +import com.cameleer3.server.core.storage.LogIndex; import jakarta.annotation.PostConstruct; import org.opensearch.client.json.JsonData; import org.opensearch.client.opensearch.OpenSearchClient; @@ -30,7 +31,7 @@ import java.util.List; import java.util.Map; @Repository -public class OpenSearchLogIndex { +public class OpenSearchLogIndex implements LogIndex { private static final Logger log = LoggerFactory.getLogger(OpenSearchLogIndex.class); private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd") @@ -100,9 +101,10 @@ public class OpenSearchLogIndex { } } - public List search(String application, String agentId, String level, - String query, String exchangeId, - Instant from, Instant to, int limit) { + @Override + public List search(String application, String agentId, String level, + String query, String exchangeId, + Instant from, Instant to, int limit) { try { BoolQuery.Builder bool = new BoolQuery.Builder(); bool.must(Query.of(q -> q.term(t -> t.field("application").value(FieldValue.of(application))))); @@ -137,12 +139,12 @@ public class OpenSearchLogIndex { .sort(so -> so.field(f -> f.field("@timestamp").order(SortOrder.Desc))) .size(limit), Map.class); - List results = new ArrayList<>(); + List results = new ArrayList<>(); for (var hit : response.hits().hits()) { @SuppressWarnings("unchecked") Map src = (Map) hit.source(); if (src == null) continue; - results.add(new LogEntryResponse( + results.add(new LogEntryResult( str(src, "@timestamp"), str(src, "level"), str(src, "loggerName"), @@ -162,6 +164,7 @@ public class OpenSearchLogIndex { return v != null ? v.toString() : null; } + @Override public void indexBatch(String agentId, String application, List entries) { if (entries == null || entries.isEmpty()) { return; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseLogStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseLogStoreIT.java new file mode 100644 index 00000000..b53e55b0 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseLogStoreIT.java @@ -0,0 +1,178 @@ +package com.cameleer3.server.app.search; + +import com.cameleer3.common.model.LogEntry; +import com.cameleer3.server.core.storage.LogEntryResult; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseLogStoreIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseLogStore store; + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + String ddl = new ClassPathResource("clickhouse/V8__logs.sql") + .getContentAsString(StandardCharsets.UTF_8); + jdbc.execute(ddl); + jdbc.execute("TRUNCATE TABLE logs"); + + store = new ClickHouseLogStore(jdbc); + } + + // ── Helpers ────────────────────────────────────────────────────────── + + private LogEntry entry(Instant ts, String level, String logger, String message, + String thread, String stackTrace, Map mdc) { + return new LogEntry(ts, level, logger, message, thread, stackTrace, mdc); + } + + // ── Tests ───────────────────────────────────────────────────────────── + + @Test + void indexBatch_writesLogs() { + Instant now = Instant.parse("2026-03-31T12:00:00Z"); + List entries = List.of( + entry(now, "INFO", "com.example.Foo", "Hello world", "main", null, null), + entry(now.plusSeconds(1), "ERROR", "com.example.Bar", "Something failed", "worker-1", "stack...", null) + ); + + store.indexBatch("agent-1", "my-app", entries); + + Long count = jdbc.queryForObject("SELECT count() FROM logs WHERE application = 'my-app'", Long.class); + assertThat(count).isEqualTo(2); + } + + @Test + void search_byApplication_returnsLogs() { + Instant now = Instant.parse("2026-03-31T12:00:00Z"); + store.indexBatch("agent-1", "app-a", List.of( + entry(now, "INFO", "logger", "msg-a", "t1", null, null) + )); + store.indexBatch("agent-2", "app-b", List.of( + entry(now, "INFO", "logger", "msg-b", "t1", null, null) + )); + + List results = store.search("app-a", null, null, null, null, null, null, 100); + + assertThat(results).hasSize(1); + assertThat(results.get(0).message()).isEqualTo("msg-a"); + } + + @Test + void search_byLevel_filtersCorrectly() { + Instant now = Instant.parse("2026-03-31T12:00:00Z"); + store.indexBatch("agent-1", "my-app", List.of( + entry(now, "INFO", "logger", "info message", "t1", null, null), + entry(now.plusSeconds(1), "ERROR", "logger", "error message", "t1", null, null) + )); + + List results = store.search("my-app", null, "ERROR", null, null, null, null, 100); + + assertThat(results).hasSize(1); + assertThat(results.get(0).level()).isEqualTo("ERROR"); + assertThat(results.get(0).message()).isEqualTo("error message"); + } + + @Test + void search_byQuery_usesLikeSearch() { + Instant now = Instant.parse("2026-03-31T12:00:00Z"); + store.indexBatch("agent-1", "my-app", List.of( + entry(now, "INFO", "logger", "Processing order #12345", "t1", null, null), + entry(now.plusSeconds(1), "INFO", "logger", "Health check OK", "t1", null, null) + )); + + List results = store.search("my-app", null, null, "order #12345", null, null, null, 100); + + assertThat(results).hasSize(1); + assertThat(results.get(0).message()).contains("order #12345"); + } + + @Test + void search_byExchangeId_matchesTopLevelAndMdc() { + Instant now = Instant.parse("2026-03-31T12:00:00Z"); + Map mdc = Map.of("camel.exchangeId", "exchange-abc"); + + store.indexBatch("agent-1", "my-app", List.of( + entry(now, "INFO", "logger", "msg with exchange", "t1", null, mdc), + entry(now.plusSeconds(1), "INFO", "logger", "msg without exchange", "t1", null, null) + )); + + List results = store.search("my-app", null, null, null, "exchange-abc", null, null, 100); + + assertThat(results).hasSize(1); + assertThat(results.get(0).message()).isEqualTo("msg with exchange"); + } + + @Test + void search_byTimeRange_filtersCorrectly() { + Instant t1 = Instant.parse("2026-03-31T10:00:00Z"); + Instant t2 = Instant.parse("2026-03-31T12:00:00Z"); + Instant t3 = Instant.parse("2026-03-31T14:00:00Z"); + + store.indexBatch("agent-1", "my-app", List.of( + entry(t1, "INFO", "logger", "morning", "t1", null, null), + entry(t2, "INFO", "logger", "noon", "t1", null, null), + entry(t3, "INFO", "logger", "afternoon", "t1", null, null) + )); + + // Query only the noon window + Instant from = Instant.parse("2026-03-31T11:00:00Z"); + Instant to = Instant.parse("2026-03-31T13:00:00Z"); + + List results = store.search("my-app", null, null, null, null, from, to, 100); + + assertThat(results).hasSize(1); + assertThat(results.get(0).message()).isEqualTo("noon"); + } + + @Test + void indexBatch_storesMdc() { + Instant now = Instant.parse("2026-03-31T12:00:00Z"); + Map mdc = Map.of( + "camel.exchangeId", "ex-123", + "custom.key", "custom-value" + ); + + store.indexBatch("agent-1", "my-app", List.of( + entry(now, "INFO", "logger", "msg", "t1", null, mdc) + )); + + // Verify MDC is stored by querying raw data + String exchangeId = jdbc.queryForObject( + "SELECT exchange_id FROM logs WHERE application = 'my-app' LIMIT 1", + String.class); + assertThat(exchangeId).isEqualTo("ex-123"); + + // Verify MDC map contains custom key + String customVal = jdbc.queryForObject( + "SELECT mdc['custom.key'] FROM logs WHERE application = 'my-app' LIMIT 1", + String.class); + assertThat(customVal).isEqualTo("custom-value"); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogEntryResult.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogEntryResult.java new file mode 100644 index 00000000..b13912b9 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogEntryResult.java @@ -0,0 +1,6 @@ +package com.cameleer3.server.core.storage; + +import java.time.Instant; + +public record LogEntryResult(String timestamp, String level, String loggerName, + String message, String threadName, String stackTrace) {} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogIndex.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogIndex.java new file mode 100644 index 00000000..afe98ed8 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogIndex.java @@ -0,0 +1,15 @@ +package com.cameleer3.server.core.storage; + +import com.cameleer3.common.model.LogEntry; + +import java.time.Instant; +import java.util.List; + +public interface LogIndex { + + List search(String application, String agentId, String level, + String query, String exchangeId, + Instant from, Instant to, int limit); + + void indexBatch(String agentId, String application, List entries); +}