feat(clickhouse): add ClickHouseLogStore with LogIndex interface

Extract LogIndex interface from OpenSearchLogIndex. Both ClickHouseLogStore
and OpenSearchLogIndex implement it. Controllers now inject LogIndex.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-31 23:42:07 +02:00
parent c73e4abf68
commit 7d7eb52afb
7 changed files with 351 additions and 14 deletions

View File

@@ -1,7 +1,7 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.common.model.LogBatch;
import com.cameleer3.server.app.search.OpenSearchLogIndex;
import com.cameleer3.server.core.storage.LogIndex;
import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService;
import io.swagger.v3.oas.annotations.Operation;
@@ -24,10 +24,10 @@ public class LogIngestionController {
private static final Logger log = LoggerFactory.getLogger(LogIngestionController.class);
private final OpenSearchLogIndex logIndex;
private final LogIndex logIndex;
private final AgentRegistryService registryService;
public LogIngestionController(OpenSearchLogIndex logIndex,
public LogIngestionController(LogIndex logIndex,
AgentRegistryService registryService) {
this.logIndex = logIndex;
this.registryService = registryService;

View File

@@ -1,7 +1,8 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.dto.LogEntryResponse;
import com.cameleer3.server.app.search.OpenSearchLogIndex;
import com.cameleer3.server.core.storage.LogEntryResult;
import com.cameleer3.server.core.storage.LogIndex;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.ResponseEntity;
@@ -18,9 +19,9 @@ import java.util.List;
@Tag(name = "Application Logs", description = "Query application logs stored in OpenSearch")
public class LogQueryController {
private final OpenSearchLogIndex logIndex;
private final LogIndex logIndex;
public LogQueryController(OpenSearchLogIndex logIndex) {
public LogQueryController(LogIndex logIndex) {
this.logIndex = logIndex;
}
@@ -42,9 +43,14 @@ public class LogQueryController {
Instant fromInstant = from != null ? Instant.parse(from) : null;
Instant toInstant = to != null ? Instant.parse(to) : null;
List<LogEntryResponse> entries = logIndex.search(
List<LogEntryResult> results = logIndex.search(
application, agentId, level, query, exchangeId, fromInstant, toInstant, limit);
List<LogEntryResponse> entries = results.stream()
.map(r -> new LogEntryResponse(r.timestamp(), r.level(), r.loggerName(),
r.message(), r.threadName(), r.stackTrace()))
.toList();
return ResponseEntity.ok(entries);
}
}

View File

@@ -0,0 +1,129 @@
package com.cameleer3.server.app.search;
import com.cameleer3.common.model.LogEntry;
import com.cameleer3.server.core.storage.LogEntryResult;
import com.cameleer3.server.core.storage.LogIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* ClickHouse-backed implementation of {@link LogIndex}.
* Stores application logs in the {@code logs} MergeTree table with
* ngram bloom-filter indexes for efficient substring search.
*/
public class ClickHouseLogStore implements LogIndex {
private static final Logger log = LoggerFactory.getLogger(ClickHouseLogStore.class);
private static final String TENANT = "default";
private static final DateTimeFormatter ISO_FMT = DateTimeFormatter.ISO_INSTANT;
private final JdbcTemplate jdbc;
public ClickHouseLogStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public void indexBatch(String agentId, String application, List<LogEntry> entries) {
if (entries == null || entries.isEmpty()) {
return;
}
String sql = "INSERT INTO logs (tenant_id, timestamp, application, agent_id, level, " +
"logger_name, message, thread_name, stack_trace, exchange_id, mdc) " +
"VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
jdbc.batchUpdate(sql, entries, entries.size(), (ps, entry) -> {
Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now();
ps.setTimestamp(1, Timestamp.from(ts));
ps.setString(2, application);
ps.setString(3, agentId);
ps.setString(4, entry.getLevel() != null ? entry.getLevel() : "");
ps.setString(5, entry.getLoggerName() != null ? entry.getLoggerName() : "");
ps.setString(6, entry.getMessage() != null ? entry.getMessage() : "");
ps.setString(7, entry.getThreadName() != null ? entry.getThreadName() : "");
ps.setString(8, entry.getStackTrace() != null ? entry.getStackTrace() : "");
// Extract camel.exchangeId from MDC into top-level column
Map<String, String> mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap();
String exchangeId = mdc.getOrDefault("camel.exchangeId", "");
ps.setString(9, exchangeId);
// ClickHouse JDBC handles java.util.Map natively for Map columns
ps.setObject(10, mdc);
});
log.debug("Indexed {} log entries for agent={}, app={}", entries.size(), agentId, application);
}
@Override
public List<LogEntryResult> search(String application, String agentId, String level,
String query, String exchangeId,
Instant from, Instant to, int limit) {
StringBuilder sql = new StringBuilder(
"SELECT timestamp, level, logger_name, message, thread_name, stack_trace " +
"FROM logs WHERE tenant_id = 'default' AND application = ?");
List<Object> params = new ArrayList<>();
params.add(application);
if (agentId != null && !agentId.isEmpty()) {
sql.append(" AND agent_id = ?");
params.add(agentId);
}
if (level != null && !level.isEmpty()) {
sql.append(" AND level = ?");
params.add(level.toUpperCase());
}
if (exchangeId != null && !exchangeId.isEmpty()) {
sql.append(" AND (exchange_id = ? OR (mapContains(mdc, 'camel.exchangeId') AND mdc['camel.exchangeId'] = ?))");
params.add(exchangeId);
params.add(exchangeId);
}
if (query != null && !query.isEmpty()) {
sql.append(" AND message LIKE ?");
params.add("%" + query + "%");
}
if (from != null) {
sql.append(" AND timestamp >= ?");
params.add(Timestamp.from(from));
}
if (to != null) {
sql.append(" AND timestamp <= ?");
params.add(Timestamp.from(to));
}
sql.append(" ORDER BY timestamp DESC LIMIT ?");
params.add(limit);
return jdbc.query(sql.toString(), params.toArray(), (rs, rowNum) -> {
Timestamp ts = rs.getTimestamp("timestamp");
String timestampStr = ts != null
? ts.toInstant().atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT)
: null;
return new LogEntryResult(
timestampStr,
rs.getString("level"),
rs.getString("logger_name"),
rs.getString("message"),
rs.getString("thread_name"),
rs.getString("stack_trace")
);
});
}
}

View File

@@ -1,7 +1,8 @@
package com.cameleer3.server.app.search;
import com.cameleer3.common.model.LogEntry;
import com.cameleer3.server.app.dto.LogEntryResponse;
import com.cameleer3.server.core.storage.LogEntryResult;
import com.cameleer3.server.core.storage.LogIndex;
import jakarta.annotation.PostConstruct;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.OpenSearchClient;
@@ -30,7 +31,7 @@ import java.util.List;
import java.util.Map;
@Repository
public class OpenSearchLogIndex {
public class OpenSearchLogIndex implements LogIndex {
private static final Logger log = LoggerFactory.getLogger(OpenSearchLogIndex.class);
private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd")
@@ -100,9 +101,10 @@ public class OpenSearchLogIndex {
}
}
public List<LogEntryResponse> search(String application, String agentId, String level,
String query, String exchangeId,
Instant from, Instant to, int limit) {
@Override
public List<LogEntryResult> search(String application, String agentId, String level,
String query, String exchangeId,
Instant from, Instant to, int limit) {
try {
BoolQuery.Builder bool = new BoolQuery.Builder();
bool.must(Query.of(q -> q.term(t -> t.field("application").value(FieldValue.of(application)))));
@@ -137,12 +139,12 @@ public class OpenSearchLogIndex {
.sort(so -> so.field(f -> f.field("@timestamp").order(SortOrder.Desc)))
.size(limit), Map.class);
List<LogEntryResponse> results = new ArrayList<>();
List<LogEntryResult> results = new ArrayList<>();
for (var hit : response.hits().hits()) {
@SuppressWarnings("unchecked")
Map<String, Object> src = (Map<String, Object>) hit.source();
if (src == null) continue;
results.add(new LogEntryResponse(
results.add(new LogEntryResult(
str(src, "@timestamp"),
str(src, "level"),
str(src, "loggerName"),
@@ -162,6 +164,7 @@ public class OpenSearchLogIndex {
return v != null ? v.toString() : null;
}
@Override
public void indexBatch(String agentId, String application, List<LogEntry> entries) {
if (entries == null || entries.isEmpty()) {
return;

View File

@@ -0,0 +1,178 @@
package com.cameleer3.server.app.search;
import com.cameleer3.common.model.LogEntry;
import com.cameleer3.server.core.storage.LogEntryResult;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class ClickHouseLogStoreIT {
@Container
static final ClickHouseContainer clickhouse =
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
private JdbcTemplate jdbc;
private ClickHouseLogStore store;
@BeforeEach
void setUp() throws Exception {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(clickhouse.getJdbcUrl());
ds.setUsername(clickhouse.getUsername());
ds.setPassword(clickhouse.getPassword());
jdbc = new JdbcTemplate(ds);
String ddl = new ClassPathResource("clickhouse/V8__logs.sql")
.getContentAsString(StandardCharsets.UTF_8);
jdbc.execute(ddl);
jdbc.execute("TRUNCATE TABLE logs");
store = new ClickHouseLogStore(jdbc);
}
// ── Helpers ──────────────────────────────────────────────────────────
private LogEntry entry(Instant ts, String level, String logger, String message,
String thread, String stackTrace, Map<String, String> mdc) {
return new LogEntry(ts, level, logger, message, thread, stackTrace, mdc);
}
// ── Tests ─────────────────────────────────────────────────────────────
@Test
void indexBatch_writesLogs() {
Instant now = Instant.parse("2026-03-31T12:00:00Z");
List<LogEntry> entries = List.of(
entry(now, "INFO", "com.example.Foo", "Hello world", "main", null, null),
entry(now.plusSeconds(1), "ERROR", "com.example.Bar", "Something failed", "worker-1", "stack...", null)
);
store.indexBatch("agent-1", "my-app", entries);
Long count = jdbc.queryForObject("SELECT count() FROM logs WHERE application = 'my-app'", Long.class);
assertThat(count).isEqualTo(2);
}
@Test
void search_byApplication_returnsLogs() {
Instant now = Instant.parse("2026-03-31T12:00:00Z");
store.indexBatch("agent-1", "app-a", List.of(
entry(now, "INFO", "logger", "msg-a", "t1", null, null)
));
store.indexBatch("agent-2", "app-b", List.of(
entry(now, "INFO", "logger", "msg-b", "t1", null, null)
));
List<LogEntryResult> results = store.search("app-a", null, null, null, null, null, null, 100);
assertThat(results).hasSize(1);
assertThat(results.get(0).message()).isEqualTo("msg-a");
}
@Test
void search_byLevel_filtersCorrectly() {
Instant now = Instant.parse("2026-03-31T12:00:00Z");
store.indexBatch("agent-1", "my-app", List.of(
entry(now, "INFO", "logger", "info message", "t1", null, null),
entry(now.plusSeconds(1), "ERROR", "logger", "error message", "t1", null, null)
));
List<LogEntryResult> results = store.search("my-app", null, "ERROR", null, null, null, null, 100);
assertThat(results).hasSize(1);
assertThat(results.get(0).level()).isEqualTo("ERROR");
assertThat(results.get(0).message()).isEqualTo("error message");
}
@Test
void search_byQuery_usesLikeSearch() {
Instant now = Instant.parse("2026-03-31T12:00:00Z");
store.indexBatch("agent-1", "my-app", List.of(
entry(now, "INFO", "logger", "Processing order #12345", "t1", null, null),
entry(now.plusSeconds(1), "INFO", "logger", "Health check OK", "t1", null, null)
));
List<LogEntryResult> results = store.search("my-app", null, null, "order #12345", null, null, null, 100);
assertThat(results).hasSize(1);
assertThat(results.get(0).message()).contains("order #12345");
}
@Test
void search_byExchangeId_matchesTopLevelAndMdc() {
Instant now = Instant.parse("2026-03-31T12:00:00Z");
Map<String, String> mdc = Map.of("camel.exchangeId", "exchange-abc");
store.indexBatch("agent-1", "my-app", List.of(
entry(now, "INFO", "logger", "msg with exchange", "t1", null, mdc),
entry(now.plusSeconds(1), "INFO", "logger", "msg without exchange", "t1", null, null)
));
List<LogEntryResult> results = store.search("my-app", null, null, null, "exchange-abc", null, null, 100);
assertThat(results).hasSize(1);
assertThat(results.get(0).message()).isEqualTo("msg with exchange");
}
@Test
void search_byTimeRange_filtersCorrectly() {
Instant t1 = Instant.parse("2026-03-31T10:00:00Z");
Instant t2 = Instant.parse("2026-03-31T12:00:00Z");
Instant t3 = Instant.parse("2026-03-31T14:00:00Z");
store.indexBatch("agent-1", "my-app", List.of(
entry(t1, "INFO", "logger", "morning", "t1", null, null),
entry(t2, "INFO", "logger", "noon", "t1", null, null),
entry(t3, "INFO", "logger", "afternoon", "t1", null, null)
));
// Query only the noon window
Instant from = Instant.parse("2026-03-31T11:00:00Z");
Instant to = Instant.parse("2026-03-31T13:00:00Z");
List<LogEntryResult> results = store.search("my-app", null, null, null, null, from, to, 100);
assertThat(results).hasSize(1);
assertThat(results.get(0).message()).isEqualTo("noon");
}
@Test
void indexBatch_storesMdc() {
Instant now = Instant.parse("2026-03-31T12:00:00Z");
Map<String, String> mdc = Map.of(
"camel.exchangeId", "ex-123",
"custom.key", "custom-value"
);
store.indexBatch("agent-1", "my-app", List.of(
entry(now, "INFO", "logger", "msg", "t1", null, mdc)
));
// Verify MDC is stored by querying raw data
String exchangeId = jdbc.queryForObject(
"SELECT exchange_id FROM logs WHERE application = 'my-app' LIMIT 1",
String.class);
assertThat(exchangeId).isEqualTo("ex-123");
// Verify MDC map contains custom key
String customVal = jdbc.queryForObject(
"SELECT mdc['custom.key'] FROM logs WHERE application = 'my-app' LIMIT 1",
String.class);
assertThat(customVal).isEqualTo("custom-value");
}
}

View File

@@ -0,0 +1,6 @@
package com.cameleer3.server.core.storage;
import java.time.Instant;
public record LogEntryResult(String timestamp, String level, String loggerName,
String message, String threadName, String stackTrace) {}

View File

@@ -0,0 +1,15 @@
package com.cameleer3.server.core.storage;
import com.cameleer3.common.model.LogEntry;
import java.time.Instant;
import java.util.List;
public interface LogIndex {
List<LogEntryResult> search(String application, String agentId, String level,
String query, String exchangeId,
Instant from, Instant to, int limit);
void indexBatch(String agentId, String application, List<LogEntry> entries);
}