diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java index 8b9b9b7d..3ba060b0 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java @@ -1,7 +1,8 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.common.model.LogBatch; +import com.cameleer3.common.model.LogEntry; import com.cameleer3.server.core.ingestion.BufferedLogEntry; +import java.util.List; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; @@ -42,14 +43,14 @@ public class LogIngestionController { @Operation(summary = "Ingest application log entries", description = "Accepts a batch of log entries from an agent. Entries are buffered and flushed periodically.") @ApiResponse(responseCode = "202", description = "Logs accepted for indexing") - public ResponseEntity ingestLogs(@RequestBody LogBatch batch) { + public ResponseEntity ingestLogs(@RequestBody List entries) { String instanceId = extractAgentId(); String applicationId = resolveApplicationId(instanceId); - if (batch.getEntries() != null && !batch.getEntries().isEmpty()) { - log.debug("Received {} log entries from instance={}, app={}", batch.getEntries().size(), instanceId, applicationId); + if (entries != null && !entries.isEmpty()) { + log.debug("Received {} log entries from instance={}, app={}", entries.size(), instanceId, applicationId); String environment = resolveEnvironment(instanceId); - for (var entry : batch.getEntries()) { + for (var entry : entries) { logBuffer.offerOrWarn(new BufferedLogEntry( tenantProperties.getId(), environment, instanceId, applicationId, entry)); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java index b7a6abb6..46ff9100 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java @@ -41,6 +41,7 @@ public class LogQueryController { @RequestParam(required = false) String exchangeId, @RequestParam(required = false) String logger, @RequestParam(required = false) String environment, + @RequestParam(required = false) String source, @RequestParam(required = false) String from, @RequestParam(required = false) String to, @RequestParam(required = false) String cursor, @@ -64,7 +65,7 @@ public class LogQueryController { LogSearchRequest request = new LogSearchRequest( searchText, levels, application, instanceId, exchangeId, - logger, environment, fromInstant, toInstant, cursor, limit, sort); + logger, environment, source, fromInstant, toInstant, cursor, limit, sort); LogSearchResponse result = logIndex.search(request); @@ -73,7 +74,7 @@ public class LogQueryController { r.timestamp(), r.level(), r.loggerName(), r.message(), r.threadName(), r.stackTrace(), r.exchangeId(), r.instanceId(), r.application(), - r.mdc())) + r.mdc(), r.source())) .toList(); return ResponseEntity.ok(new LogSearchPageResponse( diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/LogEntryResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/LogEntryResponse.java index 0f16764a..17ba4eda 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/LogEntryResponse.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/LogEntryResponse.java @@ -15,5 +15,6 @@ public record LogEntryResponse( @Schema(description = "Camel exchange ID (if present)") String exchangeId, @Schema(description = "Agent instance ID") String instanceId, @Schema(description = "Application ID") String application, - @Schema(description = "MDC context map") Map mdc + @Schema(description = "MDC context map") Map mdc, + @Schema(description = "Log source: app or agent") String source ) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java index 50dfbfc1..1a51fd19 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java @@ -46,8 +46,8 @@ public class ClickHouseLogStore implements LogIndex { } String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " + - "logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "logger_name, message, thread_name, stack_trace, exchange_id, mdc, source) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; jdbc.batchUpdate(sql, entries, entries.size(), (ps, entry) -> { Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now(); @@ -65,6 +65,7 @@ public class ClickHouseLogStore implements LogIndex { String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); ps.setString(10, exchangeId); ps.setObject(11, mdc); + ps.setString(12, entry.getSource() != null ? entry.getSource() : "app"); }); log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId); @@ -74,8 +75,8 @@ public class ClickHouseLogStore implements LogIndex { if (entries.isEmpty()) return; String sql = "INSERT INTO logs (tenant_id, environment, timestamp, application, instance_id, level, " + - "logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "logger_name, message, thread_name, stack_trace, exchange_id, mdc, source) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; jdbc.batchUpdate(sql, entries, entries.size(), (ps, ble) -> { LogEntry entry = ble.entry(); @@ -95,6 +96,7 @@ public class ClickHouseLogStore implements LogIndex { String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); ps.setString(11, exchangeId); ps.setObject(12, mdc); + ps.setString(13, entry.getSource() != null ? entry.getSource() : "app"); }); log.debug("Flushed {} buffered log entries to ClickHouse", entries.size()); @@ -141,6 +143,11 @@ public class ClickHouseLogStore implements LogIndex { baseParams.add("%" + escapeLike(request.logger()) + "%"); } + if (request.source() != null && !request.source().isEmpty()) { + baseConditions.add("source = ?"); + baseParams.add(request.source()); + } + if (request.from() != null) { baseConditions.add("timestamp >= ?"); baseParams.add(Timestamp.from(request.from())); @@ -182,7 +189,7 @@ public class ClickHouseLogStore implements LogIndex { int fetchLimit = request.limit() + 1; // fetch N+1 to detect hasMore String dataSql = "SELECT timestamp, level, logger_name, message, thread_name, stack_trace, " + - "exchange_id, instance_id, application, mdc " + + "exchange_id, instance_id, application, mdc, source " + "FROM logs WHERE " + dataWhere + " ORDER BY timestamp " + orderDir + " LIMIT ?"; dataParams.add(fetchLimit); @@ -197,6 +204,8 @@ public class ClickHouseLogStore implements LogIndex { Map mdc = (Map) rs.getObject("mdc"); if (mdc == null) mdc = Collections.emptyMap(); + String source = rs.getString("source"); + return new LogEntryResult( timestampStr, rs.getString("level"), @@ -207,7 +216,8 @@ public class ClickHouseLogStore implements LogIndex { rs.getString("exchange_id"), rs.getString("instance_id"), rs.getString("application"), - mdc + mdc, + source ); }); diff --git a/cameleer3-server-app/src/main/resources/clickhouse/init.sql b/cameleer3-server-app/src/main/resources/clickhouse/init.sql index f6e848c9..735d1526 100644 --- a/cameleer3-server-app/src/main/resources/clickhouse/init.sql +++ b/cameleer3-server-app/src/main/resources/clickhouse/init.sql @@ -357,6 +357,9 @@ ORDER BY (tenant_id, timestamp, environment, application, instance_id) TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; +-- Add source column for log forwarding v2 (app vs agent logs) +ALTER TABLE logs ADD COLUMN IF NOT EXISTS source LowCardinality(String) DEFAULT 'app'; + -- ── Usage Events ──────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS usage_events ( diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/LogSearchRequest.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/LogSearchRequest.java index e8372a50..0235ccd8 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/LogSearchRequest.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/LogSearchRequest.java @@ -13,6 +13,7 @@ import java.util.List; * @param exchangeId Camel exchange ID filter * @param logger logger name substring filter * @param environment optional environment filter (e.g. "dev", "staging", "prod") + * @param source optional source filter: "app" or "agent" * @param from inclusive start of time range (required) * @param to inclusive end of time range (required) * @param cursor ISO timestamp cursor for keyset pagination @@ -27,6 +28,7 @@ public record LogSearchRequest( String exchangeId, String logger, String environment, + String source, Instant from, Instant to, String cursor, diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogEntryResult.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogEntryResult.java index 2eba2415..0e5bd585 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogEntryResult.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogEntryResult.java @@ -5,4 +5,4 @@ import java.util.Map; public record LogEntryResult(String timestamp, String level, String loggerName, String message, String threadName, String stackTrace, String exchangeId, String instanceId, String application, - Map mdc) {} + Map mdc, String source) {}