Compare commits
5 Commits
af080337f5
...
968117c41a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
968117c41a | ||
|
|
7d7eb52afb | ||
|
|
c73e4abf68 | ||
|
|
cd63d300b3 | ||
|
|
f7daadaaa9 |
@@ -1,5 +1,8 @@
|
|||||||
package com.cameleer3.server.app.config;
|
package com.cameleer3.server.app.config;
|
||||||
|
|
||||||
|
import com.cameleer3.server.app.search.ClickHouseLogStore;
|
||||||
|
import com.cameleer3.server.app.storage.ClickHouseAgentEventRepository;
|
||||||
|
import com.cameleer3.server.app.storage.ClickHouseDiagramStore;
|
||||||
import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore;
|
import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore;
|
||||||
import com.cameleer3.server.app.storage.ClickHouseMetricsStore;
|
import com.cameleer3.server.app.storage.ClickHouseMetricsStore;
|
||||||
import com.cameleer3.server.app.storage.ClickHouseStatsStore;
|
import com.cameleer3.server.app.storage.ClickHouseStatsStore;
|
||||||
@@ -7,6 +10,7 @@ import com.cameleer3.server.app.storage.PostgresMetricsQueryStore;
|
|||||||
import com.cameleer3.server.app.storage.PostgresMetricsStore;
|
import com.cameleer3.server.app.storage.PostgresMetricsStore;
|
||||||
import com.cameleer3.server.core.admin.AuditRepository;
|
import com.cameleer3.server.core.admin.AuditRepository;
|
||||||
import com.cameleer3.server.core.admin.AuditService;
|
import com.cameleer3.server.core.admin.AuditService;
|
||||||
|
import com.cameleer3.server.core.agent.AgentEventRepository;
|
||||||
import com.cameleer3.server.core.detail.DetailService;
|
import com.cameleer3.server.core.detail.DetailService;
|
||||||
import com.cameleer3.server.core.indexing.SearchIndexer;
|
import com.cameleer3.server.core.indexing.SearchIndexer;
|
||||||
import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler;
|
import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler;
|
||||||
@@ -17,6 +21,7 @@ import com.cameleer3.server.core.ingestion.IngestionService;
|
|||||||
import com.cameleer3.server.core.ingestion.MergedExecution;
|
import com.cameleer3.server.core.ingestion.MergedExecution;
|
||||||
import com.cameleer3.server.core.ingestion.WriteBuffer;
|
import com.cameleer3.server.core.ingestion.WriteBuffer;
|
||||||
import com.cameleer3.server.core.storage.*;
|
import com.cameleer3.server.core.storage.*;
|
||||||
|
import com.cameleer3.server.core.storage.LogIndex;
|
||||||
import com.cameleer3.server.core.storage.StatsStore;
|
import com.cameleer3.server.core.storage.StatsStore;
|
||||||
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
|
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
@@ -129,4 +134,31 @@ public class StorageBeanConfig {
|
|||||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||||
return new ClickHouseStatsStore(clickHouseJdbc);
|
return new ClickHouseStatsStore(clickHouseJdbc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── ClickHouse Diagram Store ──────────────────────────────────────
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnProperty(name = "cameleer.storage.diagrams", havingValue = "clickhouse", matchIfMissing = true)
|
||||||
|
public DiagramStore clickHouseDiagramStore(
|
||||||
|
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||||
|
return new ClickHouseDiagramStore(clickHouseJdbc);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── ClickHouse Agent Event Repository ─────────────────────────────
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnProperty(name = "cameleer.storage.events", havingValue = "clickhouse", matchIfMissing = true)
|
||||||
|
public AgentEventRepository clickHouseAgentEventRepository(
|
||||||
|
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||||
|
return new ClickHouseAgentEventRepository(clickHouseJdbc);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── ClickHouse Log Store ──────────────────────────────────────────
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnProperty(name = "cameleer.storage.logs", havingValue = "clickhouse", matchIfMissing = true)
|
||||||
|
public LogIndex clickHouseLogStore(
|
||||||
|
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||||
|
return new ClickHouseLogStore(clickHouseJdbc);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
package com.cameleer3.server.app.controller;
|
package com.cameleer3.server.app.controller;
|
||||||
|
|
||||||
import com.cameleer3.common.model.LogBatch;
|
import com.cameleer3.common.model.LogBatch;
|
||||||
import com.cameleer3.server.app.search.OpenSearchLogIndex;
|
import com.cameleer3.server.core.storage.LogIndex;
|
||||||
import com.cameleer3.server.core.agent.AgentInfo;
|
import com.cameleer3.server.core.agent.AgentInfo;
|
||||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||||
import io.swagger.v3.oas.annotations.Operation;
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
@@ -24,10 +24,10 @@ public class LogIngestionController {
|
|||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(LogIngestionController.class);
|
private static final Logger log = LoggerFactory.getLogger(LogIngestionController.class);
|
||||||
|
|
||||||
private final OpenSearchLogIndex logIndex;
|
private final LogIndex logIndex;
|
||||||
private final AgentRegistryService registryService;
|
private final AgentRegistryService registryService;
|
||||||
|
|
||||||
public LogIngestionController(OpenSearchLogIndex logIndex,
|
public LogIngestionController(LogIndex logIndex,
|
||||||
AgentRegistryService registryService) {
|
AgentRegistryService registryService) {
|
||||||
this.logIndex = logIndex;
|
this.logIndex = logIndex;
|
||||||
this.registryService = registryService;
|
this.registryService = registryService;
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
package com.cameleer3.server.app.controller;
|
package com.cameleer3.server.app.controller;
|
||||||
|
|
||||||
import com.cameleer3.server.app.dto.LogEntryResponse;
|
import com.cameleer3.server.app.dto.LogEntryResponse;
|
||||||
import com.cameleer3.server.app.search.OpenSearchLogIndex;
|
import com.cameleer3.server.core.storage.LogEntryResult;
|
||||||
|
import com.cameleer3.server.core.storage.LogIndex;
|
||||||
import io.swagger.v3.oas.annotations.Operation;
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
@@ -18,9 +19,9 @@ import java.util.List;
|
|||||||
@Tag(name = "Application Logs", description = "Query application logs stored in OpenSearch")
|
@Tag(name = "Application Logs", description = "Query application logs stored in OpenSearch")
|
||||||
public class LogQueryController {
|
public class LogQueryController {
|
||||||
|
|
||||||
private final OpenSearchLogIndex logIndex;
|
private final LogIndex logIndex;
|
||||||
|
|
||||||
public LogQueryController(OpenSearchLogIndex logIndex) {
|
public LogQueryController(LogIndex logIndex) {
|
||||||
this.logIndex = logIndex;
|
this.logIndex = logIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -42,9 +43,14 @@ public class LogQueryController {
|
|||||||
Instant fromInstant = from != null ? Instant.parse(from) : null;
|
Instant fromInstant = from != null ? Instant.parse(from) : null;
|
||||||
Instant toInstant = to != null ? Instant.parse(to) : null;
|
Instant toInstant = to != null ? Instant.parse(to) : null;
|
||||||
|
|
||||||
List<LogEntryResponse> entries = logIndex.search(
|
List<LogEntryResult> results = logIndex.search(
|
||||||
application, agentId, level, query, exchangeId, fromInstant, toInstant, limit);
|
application, agentId, level, query, exchangeId, fromInstant, toInstant, limit);
|
||||||
|
|
||||||
|
List<LogEntryResponse> entries = results.stream()
|
||||||
|
.map(r -> new LogEntryResponse(r.timestamp(), r.level(), r.loggerName(),
|
||||||
|
r.message(), r.threadName(), r.stackTrace()))
|
||||||
|
.toList();
|
||||||
|
|
||||||
return ResponseEntity.ok(entries);
|
return ResponseEntity.ok(entries);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,129 @@
|
|||||||
|
package com.cameleer3.server.app.search;
|
||||||
|
|
||||||
|
import com.cameleer3.common.model.LogEntry;
|
||||||
|
import com.cameleer3.server.core.storage.LogEntryResult;
|
||||||
|
import com.cameleer3.server.core.storage.LogIndex;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ClickHouse-backed implementation of {@link LogIndex}.
|
||||||
|
* Stores application logs in the {@code logs} MergeTree table with
|
||||||
|
* ngram bloom-filter indexes for efficient substring search.
|
||||||
|
*/
|
||||||
|
public class ClickHouseLogStore implements LogIndex {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ClickHouseLogStore.class);
|
||||||
|
private static final String TENANT = "default";
|
||||||
|
private static final DateTimeFormatter ISO_FMT = DateTimeFormatter.ISO_INSTANT;
|
||||||
|
|
||||||
|
private final JdbcTemplate jdbc;
|
||||||
|
|
||||||
|
public ClickHouseLogStore(JdbcTemplate jdbc) {
|
||||||
|
this.jdbc = jdbc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void indexBatch(String agentId, String application, List<LogEntry> entries) {
|
||||||
|
if (entries == null || entries.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String sql = "INSERT INTO logs (tenant_id, timestamp, application, agent_id, level, " +
|
||||||
|
"logger_name, message, thread_name, stack_trace, exchange_id, mdc) " +
|
||||||
|
"VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
|
||||||
|
|
||||||
|
jdbc.batchUpdate(sql, entries, entries.size(), (ps, entry) -> {
|
||||||
|
Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now();
|
||||||
|
ps.setTimestamp(1, Timestamp.from(ts));
|
||||||
|
ps.setString(2, application);
|
||||||
|
ps.setString(3, agentId);
|
||||||
|
ps.setString(4, entry.getLevel() != null ? entry.getLevel() : "");
|
||||||
|
ps.setString(5, entry.getLoggerName() != null ? entry.getLoggerName() : "");
|
||||||
|
ps.setString(6, entry.getMessage() != null ? entry.getMessage() : "");
|
||||||
|
ps.setString(7, entry.getThreadName() != null ? entry.getThreadName() : "");
|
||||||
|
ps.setString(8, entry.getStackTrace() != null ? entry.getStackTrace() : "");
|
||||||
|
|
||||||
|
// Extract camel.exchangeId from MDC into top-level column
|
||||||
|
Map<String, String> mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap();
|
||||||
|
String exchangeId = mdc.getOrDefault("camel.exchangeId", "");
|
||||||
|
ps.setString(9, exchangeId);
|
||||||
|
|
||||||
|
// ClickHouse JDBC handles java.util.Map natively for Map columns
|
||||||
|
ps.setObject(10, mdc);
|
||||||
|
});
|
||||||
|
|
||||||
|
log.debug("Indexed {} log entries for agent={}, app={}", entries.size(), agentId, application);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<LogEntryResult> search(String application, String agentId, String level,
|
||||||
|
String query, String exchangeId,
|
||||||
|
Instant from, Instant to, int limit) {
|
||||||
|
StringBuilder sql = new StringBuilder(
|
||||||
|
"SELECT timestamp, level, logger_name, message, thread_name, stack_trace " +
|
||||||
|
"FROM logs WHERE tenant_id = 'default' AND application = ?");
|
||||||
|
List<Object> params = new ArrayList<>();
|
||||||
|
params.add(application);
|
||||||
|
|
||||||
|
if (agentId != null && !agentId.isEmpty()) {
|
||||||
|
sql.append(" AND agent_id = ?");
|
||||||
|
params.add(agentId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (level != null && !level.isEmpty()) {
|
||||||
|
sql.append(" AND level = ?");
|
||||||
|
params.add(level.toUpperCase());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (exchangeId != null && !exchangeId.isEmpty()) {
|
||||||
|
sql.append(" AND (exchange_id = ? OR (mapContains(mdc, 'camel.exchangeId') AND mdc['camel.exchangeId'] = ?))");
|
||||||
|
params.add(exchangeId);
|
||||||
|
params.add(exchangeId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (query != null && !query.isEmpty()) {
|
||||||
|
sql.append(" AND message LIKE ?");
|
||||||
|
params.add("%" + query + "%");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (from != null) {
|
||||||
|
sql.append(" AND timestamp >= ?");
|
||||||
|
params.add(Timestamp.from(from));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (to != null) {
|
||||||
|
sql.append(" AND timestamp <= ?");
|
||||||
|
params.add(Timestamp.from(to));
|
||||||
|
}
|
||||||
|
|
||||||
|
sql.append(" ORDER BY timestamp DESC LIMIT ?");
|
||||||
|
params.add(limit);
|
||||||
|
|
||||||
|
return jdbc.query(sql.toString(), params.toArray(), (rs, rowNum) -> {
|
||||||
|
Timestamp ts = rs.getTimestamp("timestamp");
|
||||||
|
String timestampStr = ts != null
|
||||||
|
? ts.toInstant().atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT)
|
||||||
|
: null;
|
||||||
|
return new LogEntryResult(
|
||||||
|
timestampStr,
|
||||||
|
rs.getString("level"),
|
||||||
|
rs.getString("logger_name"),
|
||||||
|
rs.getString("message"),
|
||||||
|
rs.getString("thread_name"),
|
||||||
|
rs.getString("stack_trace")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,7 +1,8 @@
|
|||||||
package com.cameleer3.server.app.search;
|
package com.cameleer3.server.app.search;
|
||||||
|
|
||||||
import com.cameleer3.common.model.LogEntry;
|
import com.cameleer3.common.model.LogEntry;
|
||||||
import com.cameleer3.server.app.dto.LogEntryResponse;
|
import com.cameleer3.server.core.storage.LogEntryResult;
|
||||||
|
import com.cameleer3.server.core.storage.LogIndex;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import org.opensearch.client.json.JsonData;
|
import org.opensearch.client.json.JsonData;
|
||||||
import org.opensearch.client.opensearch.OpenSearchClient;
|
import org.opensearch.client.opensearch.OpenSearchClient;
|
||||||
@@ -18,6 +19,7 @@ import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
import org.springframework.stereotype.Repository;
|
import org.springframework.stereotype.Repository;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -30,7 +32,8 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@Repository
|
@Repository
|
||||||
public class OpenSearchLogIndex {
|
@ConditionalOnProperty(name = "cameleer.storage.logs", havingValue = "opensearch")
|
||||||
|
public class OpenSearchLogIndex implements LogIndex {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(OpenSearchLogIndex.class);
|
private static final Logger log = LoggerFactory.getLogger(OpenSearchLogIndex.class);
|
||||||
private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd")
|
private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd")
|
||||||
@@ -100,9 +103,10 @@ public class OpenSearchLogIndex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<LogEntryResponse> search(String application, String agentId, String level,
|
@Override
|
||||||
String query, String exchangeId,
|
public List<LogEntryResult> search(String application, String agentId, String level,
|
||||||
Instant from, Instant to, int limit) {
|
String query, String exchangeId,
|
||||||
|
Instant from, Instant to, int limit) {
|
||||||
try {
|
try {
|
||||||
BoolQuery.Builder bool = new BoolQuery.Builder();
|
BoolQuery.Builder bool = new BoolQuery.Builder();
|
||||||
bool.must(Query.of(q -> q.term(t -> t.field("application").value(FieldValue.of(application)))));
|
bool.must(Query.of(q -> q.term(t -> t.field("application").value(FieldValue.of(application)))));
|
||||||
@@ -137,12 +141,12 @@ public class OpenSearchLogIndex {
|
|||||||
.sort(so -> so.field(f -> f.field("@timestamp").order(SortOrder.Desc)))
|
.sort(so -> so.field(f -> f.field("@timestamp").order(SortOrder.Desc)))
|
||||||
.size(limit), Map.class);
|
.size(limit), Map.class);
|
||||||
|
|
||||||
List<LogEntryResponse> results = new ArrayList<>();
|
List<LogEntryResult> results = new ArrayList<>();
|
||||||
for (var hit : response.hits().hits()) {
|
for (var hit : response.hits().hits()) {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Map<String, Object> src = (Map<String, Object>) hit.source();
|
Map<String, Object> src = (Map<String, Object>) hit.source();
|
||||||
if (src == null) continue;
|
if (src == null) continue;
|
||||||
results.add(new LogEntryResponse(
|
results.add(new LogEntryResult(
|
||||||
str(src, "@timestamp"),
|
str(src, "@timestamp"),
|
||||||
str(src, "level"),
|
str(src, "level"),
|
||||||
str(src, "loggerName"),
|
str(src, "loggerName"),
|
||||||
@@ -162,6 +166,7 @@ public class OpenSearchLogIndex {
|
|||||||
return v != null ? v.toString() : null;
|
return v != null ? v.toString() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void indexBatch(String agentId, String application, List<LogEntry> entries) {
|
public void indexBatch(String agentId, String application, List<LogEntry> entries) {
|
||||||
if (entries == null || entries.isEmpty()) {
|
if (entries == null || entries.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -0,0 +1,73 @@
|
|||||||
|
package com.cameleer3.server.app.storage;
|
||||||
|
|
||||||
|
import com.cameleer3.server.core.agent.AgentEventRecord;
|
||||||
|
import com.cameleer3.server.core.agent.AgentEventRepository;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ClickHouse implementation of {@link AgentEventRepository}.
|
||||||
|
* <p>
|
||||||
|
* The ClickHouse table has no {@code id} column (no BIGSERIAL equivalent),
|
||||||
|
* so all returned {@link AgentEventRecord} instances have {@code id = 0}.
|
||||||
|
*/
|
||||||
|
public class ClickHouseAgentEventRepository implements AgentEventRepository {
|
||||||
|
|
||||||
|
private static final String TENANT = "default";
|
||||||
|
|
||||||
|
private static final String INSERT_SQL =
|
||||||
|
"INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail) VALUES (?, ?, ?, ?, ?)";
|
||||||
|
|
||||||
|
private static final String SELECT_BASE =
|
||||||
|
"SELECT 0 AS id, agent_id, app_id, event_type, detail, timestamp FROM agent_events WHERE tenant_id = ?";
|
||||||
|
|
||||||
|
private final JdbcTemplate jdbc;
|
||||||
|
|
||||||
|
public ClickHouseAgentEventRepository(JdbcTemplate jdbc) {
|
||||||
|
this.jdbc = jdbc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void insert(String agentId, String appId, String eventType, String detail) {
|
||||||
|
jdbc.update(INSERT_SQL, TENANT, agentId, appId, eventType, detail);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<AgentEventRecord> query(String appId, String agentId, Instant from, Instant to, int limit) {
|
||||||
|
var sql = new StringBuilder(SELECT_BASE);
|
||||||
|
var params = new ArrayList<Object>();
|
||||||
|
params.add(TENANT);
|
||||||
|
|
||||||
|
if (appId != null) {
|
||||||
|
sql.append(" AND app_id = ?");
|
||||||
|
params.add(appId);
|
||||||
|
}
|
||||||
|
if (agentId != null) {
|
||||||
|
sql.append(" AND agent_id = ?");
|
||||||
|
params.add(agentId);
|
||||||
|
}
|
||||||
|
if (from != null) {
|
||||||
|
sql.append(" AND timestamp >= ?");
|
||||||
|
params.add(Timestamp.from(from));
|
||||||
|
}
|
||||||
|
if (to != null) {
|
||||||
|
sql.append(" AND timestamp < ?");
|
||||||
|
params.add(Timestamp.from(to));
|
||||||
|
}
|
||||||
|
sql.append(" ORDER BY timestamp DESC LIMIT ?");
|
||||||
|
params.add(limit);
|
||||||
|
|
||||||
|
return jdbc.query(sql.toString(), (rs, rowNum) -> new AgentEventRecord(
|
||||||
|
rs.getLong("id"),
|
||||||
|
rs.getString("agent_id"),
|
||||||
|
rs.getString("app_id"),
|
||||||
|
rs.getString("event_type"),
|
||||||
|
rs.getString("detail"),
|
||||||
|
rs.getTimestamp("timestamp").toInstant()
|
||||||
|
), params.toArray());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,193 @@
|
|||||||
|
package com.cameleer3.server.app.storage;
|
||||||
|
|
||||||
|
import com.cameleer3.common.graph.RouteGraph;
|
||||||
|
import com.cameleer3.common.graph.RouteNode;
|
||||||
|
import com.cameleer3.server.core.ingestion.TaggedDiagram;
|
||||||
|
import com.cameleer3.server.core.storage.DiagramStore;
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HexFormat;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ClickHouse implementation of {@link DiagramStore}.
|
||||||
|
* <p>
|
||||||
|
* Stores route graphs as JSON with SHA-256 content-hash deduplication.
|
||||||
|
* Uses ReplacingMergeTree — duplicate inserts are deduplicated on merge.
|
||||||
|
* <p>
|
||||||
|
* {@code findProcessorRouteMapping} fetches all definitions for the application
|
||||||
|
* and deserializes them in Java because ClickHouse has no equivalent of
|
||||||
|
* PostgreSQL's {@code jsonb_array_elements()}.
|
||||||
|
*/
|
||||||
|
public class ClickHouseDiagramStore implements DiagramStore {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ClickHouseDiagramStore.class);
|
||||||
|
|
||||||
|
private static final String TENANT = "default";
|
||||||
|
|
||||||
|
private static final String INSERT_SQL = """
|
||||||
|
INSERT INTO route_diagrams
|
||||||
|
(tenant_id, content_hash, route_id, agent_id, application_name, definition, created_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
|
""";
|
||||||
|
|
||||||
|
private static final String SELECT_BY_HASH = """
|
||||||
|
SELECT definition FROM route_diagrams
|
||||||
|
WHERE tenant_id = ? AND content_hash = ?
|
||||||
|
LIMIT 1
|
||||||
|
""";
|
||||||
|
|
||||||
|
private static final String SELECT_HASH_FOR_ROUTE = """
|
||||||
|
SELECT content_hash FROM route_diagrams
|
||||||
|
WHERE tenant_id = ? AND route_id = ? AND agent_id = ?
|
||||||
|
ORDER BY created_at DESC LIMIT 1
|
||||||
|
""";
|
||||||
|
|
||||||
|
private static final String SELECT_DEFINITIONS_FOR_APP = """
|
||||||
|
SELECT DISTINCT route_id, definition FROM route_diagrams
|
||||||
|
WHERE tenant_id = ? AND application_name = ?
|
||||||
|
""";
|
||||||
|
|
||||||
|
private final JdbcTemplate jdbc;
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
|
public ClickHouseDiagramStore(JdbcTemplate jdbc) {
|
||||||
|
this.jdbc = jdbc;
|
||||||
|
this.objectMapper = new ObjectMapper();
|
||||||
|
this.objectMapper.registerModule(new JavaTimeModule());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void store(TaggedDiagram diagram) {
|
||||||
|
try {
|
||||||
|
RouteGraph graph = diagram.graph();
|
||||||
|
String agentId = diagram.agentId() != null ? diagram.agentId() : "";
|
||||||
|
String applicationName = diagram.applicationName() != null ? diagram.applicationName() : "";
|
||||||
|
String json = objectMapper.writeValueAsString(graph);
|
||||||
|
String contentHash = sha256Hex(json);
|
||||||
|
String routeId = graph.getRouteId() != null ? graph.getRouteId() : "";
|
||||||
|
|
||||||
|
jdbc.update(INSERT_SQL,
|
||||||
|
TENANT,
|
||||||
|
contentHash,
|
||||||
|
routeId,
|
||||||
|
agentId,
|
||||||
|
applicationName,
|
||||||
|
json,
|
||||||
|
Timestamp.from(Instant.now()));
|
||||||
|
log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw new RuntimeException("Failed to serialize RouteGraph to JSON", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<RouteGraph> findByContentHash(String contentHash) {
|
||||||
|
List<Map<String, Object>> rows = jdbc.queryForList(SELECT_BY_HASH, TENANT, contentHash);
|
||||||
|
if (rows.isEmpty()) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
String json = (String) rows.get(0).get("definition");
|
||||||
|
try {
|
||||||
|
return Optional.of(objectMapper.readValue(json, RouteGraph.class));
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
log.error("Failed to deserialize RouteGraph from ClickHouse", e);
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<String> findContentHashForRoute(String routeId, String agentId) {
|
||||||
|
List<Map<String, Object>> rows = jdbc.queryForList(
|
||||||
|
SELECT_HASH_FOR_ROUTE, TENANT, routeId, agentId);
|
||||||
|
if (rows.isEmpty()) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
return Optional.of((String) rows.get(0).get("content_hash"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds) {
|
||||||
|
if (agentIds == null || agentIds.isEmpty()) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?"));
|
||||||
|
String sql = "SELECT content_hash FROM route_diagrams " +
|
||||||
|
"WHERE tenant_id = ? AND route_id = ? AND agent_id IN (" + placeholders + ") " +
|
||||||
|
"ORDER BY created_at DESC LIMIT 1";
|
||||||
|
var params = new ArrayList<Object>();
|
||||||
|
params.add(TENANT);
|
||||||
|
params.add(routeId);
|
||||||
|
params.addAll(agentIds);
|
||||||
|
List<Map<String, Object>> rows = jdbc.queryForList(sql, params.toArray());
|
||||||
|
if (rows.isEmpty()) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
return Optional.of((String) rows.get(0).get("content_hash"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> findProcessorRouteMapping(String applicationName) {
|
||||||
|
Map<String, String> mapping = new HashMap<>();
|
||||||
|
List<Map<String, Object>> rows = jdbc.queryForList(
|
||||||
|
SELECT_DEFINITIONS_FOR_APP, TENANT, applicationName);
|
||||||
|
for (Map<String, Object> row : rows) {
|
||||||
|
String routeId = (String) row.get("route_id");
|
||||||
|
String json = (String) row.get("definition");
|
||||||
|
if (json == null || routeId == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
RouteGraph graph = objectMapper.readValue(json, RouteGraph.class);
|
||||||
|
collectNodeIds(graph.getRoot(), routeId, mapping);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
log.warn("Failed to deserialize RouteGraph for route={} app={}", routeId, applicationName, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return mapping;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recursively walks the RouteNode tree and maps each node ID to the given routeId.
|
||||||
|
*/
|
||||||
|
private void collectNodeIds(RouteNode node, String routeId, Map<String, String> mapping) {
|
||||||
|
if (node == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String id = node.getId();
|
||||||
|
if (id != null && !id.isEmpty()) {
|
||||||
|
mapping.put(id, routeId);
|
||||||
|
}
|
||||||
|
List<RouteNode> children = node.getChildren();
|
||||||
|
if (children != null) {
|
||||||
|
for (RouteNode child : children) {
|
||||||
|
collectNodeIds(child, routeId, mapping);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static String sha256Hex(String input) {
|
||||||
|
try {
|
||||||
|
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||||
|
byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
|
||||||
|
return HexFormat.of().formatHex(hash);
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
throw new RuntimeException("SHA-256 not available", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ package com.cameleer3.server.app.storage;
|
|||||||
|
|
||||||
import com.cameleer3.server.core.agent.AgentEventRecord;
|
import com.cameleer3.server.core.agent.AgentEventRecord;
|
||||||
import com.cameleer3.server.core.agent.AgentEventRepository;
|
import com.cameleer3.server.core.agent.AgentEventRepository;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
import org.springframework.stereotype.Repository;
|
import org.springframework.stereotype.Repository;
|
||||||
|
|
||||||
@@ -11,6 +12,7 @@ import java.util.ArrayList;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@Repository
|
@Repository
|
||||||
|
@ConditionalOnProperty(name = "cameleer.storage.events", havingValue = "postgres")
|
||||||
public class PostgresAgentEventRepository implements AgentEventRepository {
|
public class PostgresAgentEventRepository implements AgentEventRepository {
|
||||||
|
|
||||||
private final JdbcTemplate jdbc;
|
private final JdbcTemplate jdbc;
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
import org.springframework.stereotype.Repository;
|
import org.springframework.stereotype.Repository;
|
||||||
|
|
||||||
@@ -29,6 +30,7 @@ import java.util.Optional;
|
|||||||
* Uses {@code ON CONFLICT (content_hash) DO NOTHING} for idempotent inserts.
|
* Uses {@code ON CONFLICT (content_hash) DO NOTHING} for idempotent inserts.
|
||||||
*/
|
*/
|
||||||
@Repository
|
@Repository
|
||||||
|
@ConditionalOnProperty(name = "cameleer.storage.diagrams", havingValue = "postgres")
|
||||||
public class PostgresDiagramStore implements DiagramStore {
|
public class PostgresDiagramStore implements DiagramStore {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(PostgresDiagramStore.class);
|
private static final Logger log = LoggerFactory.getLogger(PostgresDiagramStore.class);
|
||||||
|
|||||||
@@ -52,6 +52,9 @@ cameleer:
|
|||||||
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
|
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
|
||||||
search: ${CAMELEER_STORAGE_SEARCH:opensearch}
|
search: ${CAMELEER_STORAGE_SEARCH:opensearch}
|
||||||
stats: ${CAMELEER_STORAGE_STATS:clickhouse}
|
stats: ${CAMELEER_STORAGE_STATS:clickhouse}
|
||||||
|
diagrams: ${CAMELEER_STORAGE_DIAGRAMS:clickhouse}
|
||||||
|
events: ${CAMELEER_STORAGE_EVENTS:clickhouse}
|
||||||
|
logs: ${CAMELEER_STORAGE_LOGS:clickhouse}
|
||||||
|
|
||||||
security:
|
security:
|
||||||
access-token-expiry-ms: 3600000
|
access-token-expiry-ms: 3600000
|
||||||
|
|||||||
@@ -0,0 +1,12 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS route_diagrams (
|
||||||
|
tenant_id LowCardinality(String) DEFAULT 'default',
|
||||||
|
content_hash String,
|
||||||
|
route_id LowCardinality(String),
|
||||||
|
agent_id LowCardinality(String),
|
||||||
|
application_name LowCardinality(String),
|
||||||
|
definition String,
|
||||||
|
created_at DateTime64(3) DEFAULT now64(3)
|
||||||
|
)
|
||||||
|
ENGINE = ReplacingMergeTree(created_at)
|
||||||
|
ORDER BY (tenant_id, content_hash)
|
||||||
|
SETTINGS index_granularity = 8192;
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS agent_events (
|
||||||
|
tenant_id LowCardinality(String) DEFAULT 'default',
|
||||||
|
timestamp DateTime64(3) DEFAULT now64(3),
|
||||||
|
agent_id LowCardinality(String),
|
||||||
|
app_id LowCardinality(String),
|
||||||
|
event_type LowCardinality(String),
|
||||||
|
detail String DEFAULT ''
|
||||||
|
)
|
||||||
|
ENGINE = MergeTree()
|
||||||
|
PARTITION BY (tenant_id, toYYYYMM(timestamp))
|
||||||
|
ORDER BY (tenant_id, app_id, agent_id, timestamp)
|
||||||
|
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE;
|
||||||
@@ -0,0 +1,22 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS logs (
|
||||||
|
tenant_id LowCardinality(String) DEFAULT 'default',
|
||||||
|
timestamp DateTime64(3),
|
||||||
|
application LowCardinality(String),
|
||||||
|
agent_id LowCardinality(String),
|
||||||
|
level LowCardinality(String),
|
||||||
|
logger_name LowCardinality(String) DEFAULT '',
|
||||||
|
message String,
|
||||||
|
thread_name LowCardinality(String) DEFAULT '',
|
||||||
|
stack_trace String DEFAULT '',
|
||||||
|
exchange_id String DEFAULT '',
|
||||||
|
mdc Map(String, String) DEFAULT map(),
|
||||||
|
|
||||||
|
INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
||||||
|
INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
||||||
|
INDEX idx_level level TYPE set(10) GRANULARITY 1
|
||||||
|
)
|
||||||
|
ENGINE = MergeTree()
|
||||||
|
PARTITION BY (tenant_id, toYYYYMM(timestamp))
|
||||||
|
ORDER BY (tenant_id, application, timestamp)
|
||||||
|
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
|
||||||
|
SETTINGS index_granularity = 8192;
|
||||||
@@ -0,0 +1,178 @@
|
|||||||
|
package com.cameleer3.server.app.search;
|
||||||
|
|
||||||
|
import com.cameleer3.common.model.LogEntry;
|
||||||
|
import com.cameleer3.server.core.storage.LogEntryResult;
|
||||||
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
class ClickHouseLogStoreIT {
|
||||||
|
|
||||||
|
@Container
|
||||||
|
static final ClickHouseContainer clickhouse =
|
||||||
|
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||||
|
|
||||||
|
private JdbcTemplate jdbc;
|
||||||
|
private ClickHouseLogStore store;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() throws Exception {
|
||||||
|
HikariDataSource ds = new HikariDataSource();
|
||||||
|
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||||
|
ds.setUsername(clickhouse.getUsername());
|
||||||
|
ds.setPassword(clickhouse.getPassword());
|
||||||
|
|
||||||
|
jdbc = new JdbcTemplate(ds);
|
||||||
|
|
||||||
|
String ddl = new ClassPathResource("clickhouse/V8__logs.sql")
|
||||||
|
.getContentAsString(StandardCharsets.UTF_8);
|
||||||
|
jdbc.execute(ddl);
|
||||||
|
jdbc.execute("TRUNCATE TABLE logs");
|
||||||
|
|
||||||
|
store = new ClickHouseLogStore(jdbc);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Helpers ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
private LogEntry entry(Instant ts, String level, String logger, String message,
|
||||||
|
String thread, String stackTrace, Map<String, String> mdc) {
|
||||||
|
return new LogEntry(ts, level, logger, message, thread, stackTrace, mdc);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Tests ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void indexBatch_writesLogs() {
|
||||||
|
Instant now = Instant.parse("2026-03-31T12:00:00Z");
|
||||||
|
List<LogEntry> entries = List.of(
|
||||||
|
entry(now, "INFO", "com.example.Foo", "Hello world", "main", null, null),
|
||||||
|
entry(now.plusSeconds(1), "ERROR", "com.example.Bar", "Something failed", "worker-1", "stack...", null)
|
||||||
|
);
|
||||||
|
|
||||||
|
store.indexBatch("agent-1", "my-app", entries);
|
||||||
|
|
||||||
|
Long count = jdbc.queryForObject("SELECT count() FROM logs WHERE application = 'my-app'", Long.class);
|
||||||
|
assertThat(count).isEqualTo(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void search_byApplication_returnsLogs() {
|
||||||
|
Instant now = Instant.parse("2026-03-31T12:00:00Z");
|
||||||
|
store.indexBatch("agent-1", "app-a", List.of(
|
||||||
|
entry(now, "INFO", "logger", "msg-a", "t1", null, null)
|
||||||
|
));
|
||||||
|
store.indexBatch("agent-2", "app-b", List.of(
|
||||||
|
entry(now, "INFO", "logger", "msg-b", "t1", null, null)
|
||||||
|
));
|
||||||
|
|
||||||
|
List<LogEntryResult> results = store.search("app-a", null, null, null, null, null, null, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).message()).isEqualTo("msg-a");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void search_byLevel_filtersCorrectly() {
|
||||||
|
Instant now = Instant.parse("2026-03-31T12:00:00Z");
|
||||||
|
store.indexBatch("agent-1", "my-app", List.of(
|
||||||
|
entry(now, "INFO", "logger", "info message", "t1", null, null),
|
||||||
|
entry(now.plusSeconds(1), "ERROR", "logger", "error message", "t1", null, null)
|
||||||
|
));
|
||||||
|
|
||||||
|
List<LogEntryResult> results = store.search("my-app", null, "ERROR", null, null, null, null, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).level()).isEqualTo("ERROR");
|
||||||
|
assertThat(results.get(0).message()).isEqualTo("error message");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void search_byQuery_usesLikeSearch() {
|
||||||
|
Instant now = Instant.parse("2026-03-31T12:00:00Z");
|
||||||
|
store.indexBatch("agent-1", "my-app", List.of(
|
||||||
|
entry(now, "INFO", "logger", "Processing order #12345", "t1", null, null),
|
||||||
|
entry(now.plusSeconds(1), "INFO", "logger", "Health check OK", "t1", null, null)
|
||||||
|
));
|
||||||
|
|
||||||
|
List<LogEntryResult> results = store.search("my-app", null, null, "order #12345", null, null, null, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).message()).contains("order #12345");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void search_byExchangeId_matchesTopLevelAndMdc() {
|
||||||
|
Instant now = Instant.parse("2026-03-31T12:00:00Z");
|
||||||
|
Map<String, String> mdc = Map.of("camel.exchangeId", "exchange-abc");
|
||||||
|
|
||||||
|
store.indexBatch("agent-1", "my-app", List.of(
|
||||||
|
entry(now, "INFO", "logger", "msg with exchange", "t1", null, mdc),
|
||||||
|
entry(now.plusSeconds(1), "INFO", "logger", "msg without exchange", "t1", null, null)
|
||||||
|
));
|
||||||
|
|
||||||
|
List<LogEntryResult> results = store.search("my-app", null, null, null, "exchange-abc", null, null, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).message()).isEqualTo("msg with exchange");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void search_byTimeRange_filtersCorrectly() {
|
||||||
|
Instant t1 = Instant.parse("2026-03-31T10:00:00Z");
|
||||||
|
Instant t2 = Instant.parse("2026-03-31T12:00:00Z");
|
||||||
|
Instant t3 = Instant.parse("2026-03-31T14:00:00Z");
|
||||||
|
|
||||||
|
store.indexBatch("agent-1", "my-app", List.of(
|
||||||
|
entry(t1, "INFO", "logger", "morning", "t1", null, null),
|
||||||
|
entry(t2, "INFO", "logger", "noon", "t1", null, null),
|
||||||
|
entry(t3, "INFO", "logger", "afternoon", "t1", null, null)
|
||||||
|
));
|
||||||
|
|
||||||
|
// Query only the noon window
|
||||||
|
Instant from = Instant.parse("2026-03-31T11:00:00Z");
|
||||||
|
Instant to = Instant.parse("2026-03-31T13:00:00Z");
|
||||||
|
|
||||||
|
List<LogEntryResult> results = store.search("my-app", null, null, null, null, from, to, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).message()).isEqualTo("noon");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void indexBatch_storesMdc() {
|
||||||
|
Instant now = Instant.parse("2026-03-31T12:00:00Z");
|
||||||
|
Map<String, String> mdc = Map.of(
|
||||||
|
"camel.exchangeId", "ex-123",
|
||||||
|
"custom.key", "custom-value"
|
||||||
|
);
|
||||||
|
|
||||||
|
store.indexBatch("agent-1", "my-app", List.of(
|
||||||
|
entry(now, "INFO", "logger", "msg", "t1", null, mdc)
|
||||||
|
));
|
||||||
|
|
||||||
|
// Verify MDC is stored by querying raw data
|
||||||
|
String exchangeId = jdbc.queryForObject(
|
||||||
|
"SELECT exchange_id FROM logs WHERE application = 'my-app' LIMIT 1",
|
||||||
|
String.class);
|
||||||
|
assertThat(exchangeId).isEqualTo("ex-123");
|
||||||
|
|
||||||
|
// Verify MDC map contains custom key
|
||||||
|
String customVal = jdbc.queryForObject(
|
||||||
|
"SELECT mdc['custom.key'] FROM logs WHERE application = 'my-app' LIMIT 1",
|
||||||
|
String.class);
|
||||||
|
assertThat(customVal).isEqualTo("custom-value");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,158 @@
|
|||||||
|
package com.cameleer3.server.app.storage;
|
||||||
|
|
||||||
|
import com.cameleer3.server.core.agent.AgentEventRecord;
|
||||||
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
class ClickHouseAgentEventRepositoryIT {
|
||||||
|
|
||||||
|
@Container
|
||||||
|
static final ClickHouseContainer clickhouse =
|
||||||
|
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||||
|
|
||||||
|
private JdbcTemplate jdbc;
|
||||||
|
private ClickHouseAgentEventRepository repo;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() throws Exception {
|
||||||
|
HikariDataSource ds = new HikariDataSource();
|
||||||
|
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||||
|
ds.setUsername(clickhouse.getUsername());
|
||||||
|
ds.setPassword(clickhouse.getPassword());
|
||||||
|
|
||||||
|
jdbc = new JdbcTemplate(ds);
|
||||||
|
|
||||||
|
String ddl = new ClassPathResource("clickhouse/V7__agent_events.sql")
|
||||||
|
.getContentAsString(StandardCharsets.UTF_8);
|
||||||
|
jdbc.execute(ddl);
|
||||||
|
jdbc.execute("TRUNCATE TABLE agent_events");
|
||||||
|
|
||||||
|
repo = new ClickHouseAgentEventRepository(jdbc);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Helpers ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Insert a row with an explicit timestamp so tests can control ordering and ranges.
|
||||||
|
*/
|
||||||
|
private void insertAt(String agentId, String appId, String eventType, String detail, Instant ts) {
|
||||||
|
jdbc.update(
|
||||||
|
"INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail, timestamp) VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
|
"default", agentId, appId, eventType, detail, Timestamp.from(ts));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Tests ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void insert_writesEvent() {
|
||||||
|
repo.insert("agent-1", "app-a", "CONNECTED", "agent came online");
|
||||||
|
|
||||||
|
Long count = jdbc.queryForObject(
|
||||||
|
"SELECT count() FROM agent_events WHERE agent_id = 'agent-1'",
|
||||||
|
Long.class);
|
||||||
|
assertThat(count).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_byAppId_filtersCorrectly() {
|
||||||
|
repo.insert("agent-1", "app-x", "CONNECTED", "");
|
||||||
|
repo.insert("agent-2", "app-y", "DISCONNECTED", "");
|
||||||
|
|
||||||
|
List<AgentEventRecord> results = repo.query("app-x", null, null, null, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).appId()).isEqualTo("app-x");
|
||||||
|
assertThat(results.get(0).agentId()).isEqualTo("agent-1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_byAgentId_filtersCorrectly() {
|
||||||
|
repo.insert("agent-alpha", "app-shared", "CONNECTED", "");
|
||||||
|
repo.insert("agent-beta", "app-shared", "CONNECTED", "");
|
||||||
|
|
||||||
|
List<AgentEventRecord> results = repo.query(null, "agent-alpha", null, null, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).agentId()).isEqualTo("agent-alpha");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_byTimeRange_filtersCorrectly() {
|
||||||
|
Instant t1 = Instant.parse("2026-01-01T10:00:00Z");
|
||||||
|
Instant t2 = Instant.parse("2026-01-01T11:00:00Z");
|
||||||
|
Instant t3 = Instant.parse("2026-01-01T12:00:00Z");
|
||||||
|
|
||||||
|
insertAt("agent-1", "app-a", "CONNECTED", "early", t1);
|
||||||
|
insertAt("agent-1", "app-a", "HEARTBEAT", "mid", t2);
|
||||||
|
insertAt("agent-1", "app-a", "DISCONNECTED", "late", t3);
|
||||||
|
|
||||||
|
// Query [t2, t3) — should return only the middle event
|
||||||
|
List<AgentEventRecord> results = repo.query(null, null, t2, t3, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).eventType()).isEqualTo("HEARTBEAT");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_respectsLimit() {
|
||||||
|
Instant base = Instant.parse("2026-02-01T00:00:00Z");
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
insertAt("agent-1", "app-a", "HEARTBEAT", "beat-" + i, base.plusSeconds(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<AgentEventRecord> results = repo.query(null, null, null, null, 3);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_returnsZeroId() {
|
||||||
|
repo.insert("agent-1", "app-a", "CONNECTED", "");
|
||||||
|
|
||||||
|
List<AgentEventRecord> results = repo.query(null, null, null, null, 10);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).id()).isEqualTo(0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_noFilters_returnsAllEvents() {
|
||||||
|
repo.insert("agent-1", "app-a", "CONNECTED", "");
|
||||||
|
repo.insert("agent-2", "app-b", "DISCONNECTED", "");
|
||||||
|
|
||||||
|
List<AgentEventRecord> results = repo.query(null, null, null, null, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_resultsOrderedByTimestampDesc() {
|
||||||
|
Instant t1 = Instant.parse("2026-03-01T08:00:00Z");
|
||||||
|
Instant t2 = Instant.parse("2026-03-01T09:00:00Z");
|
||||||
|
Instant t3 = Instant.parse("2026-03-01T10:00:00Z");
|
||||||
|
|
||||||
|
insertAt("agent-1", "app-a", "FIRST", "", t1);
|
||||||
|
insertAt("agent-1", "app-a", "SECOND", "", t2);
|
||||||
|
insertAt("agent-1", "app-a", "THIRD", "", t3);
|
||||||
|
|
||||||
|
List<AgentEventRecord> results = repo.query(null, null, null, null, 100);
|
||||||
|
|
||||||
|
assertThat(results.get(0).eventType()).isEqualTo("THIRD");
|
||||||
|
assertThat(results.get(1).eventType()).isEqualTo("SECOND");
|
||||||
|
assertThat(results.get(2).eventType()).isEqualTo("FIRST");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,213 @@
|
|||||||
|
package com.cameleer3.server.app.storage;
|
||||||
|
|
||||||
|
import com.cameleer3.common.graph.NodeType;
|
||||||
|
import com.cameleer3.common.graph.RouteGraph;
|
||||||
|
import com.cameleer3.common.graph.RouteNode;
|
||||||
|
import com.cameleer3.server.core.ingestion.TaggedDiagram;
|
||||||
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
class ClickHouseDiagramStoreIT {
|
||||||
|
|
||||||
|
@Container
|
||||||
|
static final ClickHouseContainer clickhouse =
|
||||||
|
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||||
|
|
||||||
|
private JdbcTemplate jdbc;
|
||||||
|
private ClickHouseDiagramStore store;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() throws Exception {
|
||||||
|
HikariDataSource ds = new HikariDataSource();
|
||||||
|
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||||
|
ds.setUsername(clickhouse.getUsername());
|
||||||
|
ds.setPassword(clickhouse.getPassword());
|
||||||
|
|
||||||
|
jdbc = new JdbcTemplate(ds);
|
||||||
|
|
||||||
|
String ddl = new ClassPathResource("clickhouse/V6__route_diagrams.sql")
|
||||||
|
.getContentAsString(StandardCharsets.UTF_8);
|
||||||
|
jdbc.execute(ddl);
|
||||||
|
jdbc.execute("TRUNCATE TABLE route_diagrams");
|
||||||
|
|
||||||
|
store = new ClickHouseDiagramStore(jdbc);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Helpers ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
private RouteGraph buildGraph(String routeId, String... nodeIds) {
|
||||||
|
RouteGraph graph = new RouteGraph(routeId);
|
||||||
|
if (nodeIds.length > 0) {
|
||||||
|
RouteNode root = new RouteNode(nodeIds[0], NodeType.ENDPOINT, "from:" + nodeIds[0]);
|
||||||
|
for (int i = 1; i < nodeIds.length; i++) {
|
||||||
|
root.addChild(new RouteNode(nodeIds[i], NodeType.PROCESSOR, "proc:" + nodeIds[i]));
|
||||||
|
}
|
||||||
|
graph.setRoot(root);
|
||||||
|
}
|
||||||
|
return graph;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TaggedDiagram tagged(String agentId, String appName, RouteGraph graph) {
|
||||||
|
return new TaggedDiagram(agentId, appName, graph);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Tests ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void store_insertsNewDiagram() {
|
||||||
|
RouteGraph graph = buildGraph("route-1", "node-a", "node-b");
|
||||||
|
store.store(tagged("agent-1", "my-app", graph));
|
||||||
|
|
||||||
|
// Allow ReplacingMergeTree to settle
|
||||||
|
jdbc.execute("OPTIMIZE TABLE route_diagrams FINAL");
|
||||||
|
|
||||||
|
long count = jdbc.queryForObject(
|
||||||
|
"SELECT count() FROM route_diagrams WHERE route_id = 'route-1'",
|
||||||
|
Long.class);
|
||||||
|
assertThat(count).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void store_duplicateHashIgnored() {
|
||||||
|
RouteGraph graph = buildGraph("route-1", "node-a");
|
||||||
|
TaggedDiagram diagram = tagged("agent-1", "my-app", graph);
|
||||||
|
|
||||||
|
store.store(diagram);
|
||||||
|
store.store(diagram); // same graph → same hash
|
||||||
|
|
||||||
|
jdbc.execute("OPTIMIZE TABLE route_diagrams FINAL");
|
||||||
|
|
||||||
|
long count = jdbc.queryForObject(
|
||||||
|
"SELECT count() FROM route_diagrams FINAL WHERE route_id = 'route-1'",
|
||||||
|
Long.class);
|
||||||
|
assertThat(count).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void findByContentHash_returnsGraph() {
|
||||||
|
RouteGraph graph = buildGraph("route-2", "node-x");
|
||||||
|
graph.setDescription("Test route");
|
||||||
|
TaggedDiagram diagram = tagged("agent-2", "app-a", graph);
|
||||||
|
store.store(diagram);
|
||||||
|
|
||||||
|
// Compute the expected hash
|
||||||
|
String hash = store.findContentHashForRoute("route-2", "agent-2")
|
||||||
|
.orElseThrow(() -> new AssertionError("No hash found for route-2/agent-2"));
|
||||||
|
|
||||||
|
Optional<RouteGraph> result = store.findByContentHash(hash);
|
||||||
|
|
||||||
|
assertThat(result).isPresent();
|
||||||
|
assertThat(result.get().getRouteId()).isEqualTo("route-2");
|
||||||
|
assertThat(result.get().getDescription()).isEqualTo("Test route");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void findByContentHash_returnsEmptyForUnknownHash() {
|
||||||
|
Optional<RouteGraph> result = store.findByContentHash("nonexistent-hash-000");
|
||||||
|
assertThat(result).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void findContentHashForRoute_returnsMostRecent() throws InterruptedException {
|
||||||
|
RouteGraph graphV1 = buildGraph("route-3", "node-1");
|
||||||
|
graphV1.setDescription("v1");
|
||||||
|
RouteGraph graphV2 = buildGraph("route-3", "node-1", "node-2");
|
||||||
|
graphV2.setDescription("v2");
|
||||||
|
|
||||||
|
store.store(tagged("agent-1", "my-app", graphV1));
|
||||||
|
// Small delay to ensure different created_at timestamps
|
||||||
|
Thread.sleep(10);
|
||||||
|
store.store(tagged("agent-1", "my-app", graphV2));
|
||||||
|
|
||||||
|
Optional<String> hashOpt = store.findContentHashForRoute("route-3", "agent-1");
|
||||||
|
assertThat(hashOpt).isPresent();
|
||||||
|
|
||||||
|
// The hash should correspond to graphV2 (the most recent)
|
||||||
|
String expectedHash = ClickHouseDiagramStore.sha256Hex(
|
||||||
|
store.findByContentHash(hashOpt.get())
|
||||||
|
.map(g -> {
|
||||||
|
try {
|
||||||
|
return new com.fasterxml.jackson.databind.ObjectMapper()
|
||||||
|
.registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule())
|
||||||
|
.writeValueAsString(g);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.orElseThrow());
|
||||||
|
|
||||||
|
assertThat(hashOpt.get()).isEqualTo(expectedHash);
|
||||||
|
|
||||||
|
// Verify retrieved graph has v2's content
|
||||||
|
RouteGraph retrieved = store.findByContentHash(hashOpt.get()).orElseThrow();
|
||||||
|
assertThat(retrieved.getDescription()).isEqualTo("v2");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void findContentHashForRouteByAgents_returnsHash() {
|
||||||
|
RouteGraph graph = buildGraph("route-4", "node-z");
|
||||||
|
store.store(tagged("agent-10", "app-b", graph));
|
||||||
|
store.store(tagged("agent-20", "app-b", graph));
|
||||||
|
|
||||||
|
Optional<String> result = store.findContentHashForRouteByAgents(
|
||||||
|
"route-4", java.util.List.of("agent-10", "agent-20"));
|
||||||
|
|
||||||
|
assertThat(result).isPresent();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void findContentHashForRouteByAgents_emptyListReturnsEmpty() {
|
||||||
|
Optional<String> result = store.findContentHashForRouteByAgents("route-x", java.util.List.of());
|
||||||
|
assertThat(result).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void findProcessorRouteMapping_extractsMapping() {
|
||||||
|
// Build a graph with 3 nodes: root + 2 children
|
||||||
|
RouteGraph graph = buildGraph("route-5", "proc-from-1", "proc-to-2", "proc-log-3");
|
||||||
|
store.store(tagged("agent-1", "app-mapping", graph));
|
||||||
|
|
||||||
|
jdbc.execute("OPTIMIZE TABLE route_diagrams FINAL");
|
||||||
|
|
||||||
|
Map<String, String> mapping = store.findProcessorRouteMapping("app-mapping");
|
||||||
|
|
||||||
|
assertThat(mapping).containsEntry("proc-from-1", "route-5");
|
||||||
|
assertThat(mapping).containsEntry("proc-to-2", "route-5");
|
||||||
|
assertThat(mapping).containsEntry("proc-log-3", "route-5");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void findProcessorRouteMapping_multipleRoutes() {
|
||||||
|
RouteGraph graphA = buildGraph("route-a", "proc-a1", "proc-a2");
|
||||||
|
RouteGraph graphB = buildGraph("route-b", "proc-b1");
|
||||||
|
store.store(tagged("agent-1", "multi-app", graphA));
|
||||||
|
store.store(tagged("agent-1", "multi-app", graphB));
|
||||||
|
|
||||||
|
jdbc.execute("OPTIMIZE TABLE route_diagrams FINAL");
|
||||||
|
|
||||||
|
Map<String, String> mapping = store.findProcessorRouteMapping("multi-app");
|
||||||
|
|
||||||
|
assertThat(mapping).containsEntry("proc-a1", "route-a");
|
||||||
|
assertThat(mapping).containsEntry("proc-a2", "route-a");
|
||||||
|
assertThat(mapping).containsEntry("proc-b1", "route-b");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void findProcessorRouteMapping_unknownAppReturnsEmpty() {
|
||||||
|
Map<String, String> mapping = store.findProcessorRouteMapping("nonexistent-app");
|
||||||
|
assertThat(mapping).isEmpty();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,6 @@
|
|||||||
|
package com.cameleer3.server.core.storage;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
|
||||||
|
public record LogEntryResult(String timestamp, String level, String loggerName,
|
||||||
|
String message, String threadName, String stackTrace) {}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
package com.cameleer3.server.core.storage;
|
||||||
|
|
||||||
|
import com.cameleer3.common.model.LogEntry;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public interface LogIndex {
|
||||||
|
|
||||||
|
List<LogEntryResult> search(String application, String agentId, String level,
|
||||||
|
String query, String exchangeId,
|
||||||
|
Instant from, Instant to, int limit);
|
||||||
|
|
||||||
|
void indexBatch(String agentId, String application, List<LogEntry> entries);
|
||||||
|
}
|
||||||
@@ -95,6 +95,12 @@ spec:
|
|||||||
value: "clickhouse"
|
value: "clickhouse"
|
||||||
- name: CAMELEER_STORAGE_STATS
|
- name: CAMELEER_STORAGE_STATS
|
||||||
value: "clickhouse"
|
value: "clickhouse"
|
||||||
|
- name: CAMELEER_STORAGE_DIAGRAMS
|
||||||
|
value: "clickhouse"
|
||||||
|
- name: CAMELEER_STORAGE_EVENTS
|
||||||
|
value: "clickhouse"
|
||||||
|
- name: CAMELEER_STORAGE_LOGS
|
||||||
|
value: "clickhouse"
|
||||||
|
|
||||||
resources:
|
resources:
|
||||||
requests:
|
requests:
|
||||||
|
|||||||
Reference in New Issue
Block a user