From b03dfee4f35405d2be207c07d5aaee6e27bec47c Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Sun, 12 Apr 2026 10:25:46 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20log=20forwarding=20v2=20=E2=80=94=20acc?= =?UTF-8?q?ept=20List,=20add=20source=20field?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace LogBatch wrapper with raw List on the ingestion endpoint. Add source column to ClickHouse logs table and propagate it through the storage, search, and HTTP layers (LogSearchRequest, LogEntryResult, LogEntryResponse, LogQueryController). Co-Authored-By: Claude Sonnet 4.6 --- .../controller/LogIngestionController.java | 11 +++++----- .../app/controller/LogQueryController.java | 5 +++-- .../server/app/dto/LogEntryResponse.java | 3 ++- .../server/app/search/ClickHouseLogStore.java | 22 ++++++++++++++----- .../src/main/resources/clickhouse/init.sql | 3 +++ .../server/core/search/LogSearchRequest.java | 2 ++ .../server/core/storage/LogEntryResult.java | 2 +- 7 files changed, 33 insertions(+), 15 deletions(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java index 8b9b9b7d..3ba060b0 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java @@ -1,7 +1,8 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.common.model.LogBatch; +import com.cameleer3.common.model.LogEntry; import com.cameleer3.server.core.ingestion.BufferedLogEntry; +import java.util.List; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; @@ -42,14 +43,14 @@ public class LogIngestionController { @Operation(summary = "Ingest application log entries", description = "Accepts a batch of log entries from an agent. Entries are buffered and flushed periodically.") @ApiResponse(responseCode = "202", description = "Logs accepted for indexing") - public ResponseEntity ingestLogs(@RequestBody LogBatch batch) { + public ResponseEntity ingestLogs(@RequestBody List entries) { String instanceId = extractAgentId(); String applicationId = resolveApplicationId(instanceId); - if (batch.getEntries() != null && !batch.getEntries().isEmpty()) { - log.debug("Received {} log entries from instance={}, app={}", batch.getEntries().size(), instanceId, applicationId); + if (entries != null && !entries.isEmpty()) { + log.debug("Received {} log entries from instance={}, app={}", entries.size(), instanceId, applicationId); String environment = resolveEnvironment(instanceId); - for (var entry : batch.getEntries()) { + for (var entry : entries) { logBuffer.offerOrWarn(new BufferedLogEntry( tenantProperties.getId(), environment, instanceId, applicationId, entry)); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java index b7a6abb6..46ff9100 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java @@ -41,6 +41,7 @@ public class LogQueryController { @RequestParam(required = false) String exchangeId, @RequestParam(required = false) String logger, @RequestParam(required = false) String environment, + @RequestParam(required = false) String source, @RequestParam(required = false) String from, @RequestParam(required = false) String to, @RequestParam(required = false) String cursor, @@ -64,7 +65,7 @@ public class LogQueryController { LogSearchRequest request = new LogSearchRequest( searchText, levels, application, instanceId, exchangeId, - logger, environment, fromInstant, toInstant, cursor, limit, sort); + logger, environment, source, fromInstant, toInstant, cursor, limit, sort); LogSearchResponse result = logIndex.search(request); @@ -73,7 +74,7 @@ public class LogQueryController { r.timestamp(), r.level(), r.loggerName(), r.message(), r.threadName(), r.stackTrace(), r.exchangeId(), r.instanceId(), r.application(), - r.mdc())) + r.mdc(), r.source())) .toList(); return ResponseEntity.ok(new LogSearchPageResponse( diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/LogEntryResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/LogEntryResponse.java index 0f16764a..17ba4eda 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/LogEntryResponse.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/LogEntryResponse.java @@ -15,5 +15,6 @@ public record LogEntryResponse( @Schema(description = "Camel exchange ID (if present)") String exchangeId, @Schema(description = "Agent instance ID") String instanceId, @Schema(description = "Application ID") String application, - @Schema(description = "MDC context map") Map mdc + @Schema(description = "MDC context map") Map mdc, + @Schema(description = "Log source: app or agent") String source ) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java index 50dfbfc1..1a51fd19 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java @@ -46,8 +46,8 @@ public class ClickHouseLogStore implements LogIndex { } String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " + - "logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "logger_name, message, thread_name, stack_trace, exchange_id, mdc, source) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; jdbc.batchUpdate(sql, entries, entries.size(), (ps, entry) -> { Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now(); @@ -65,6 +65,7 @@ public class ClickHouseLogStore implements LogIndex { String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); ps.setString(10, exchangeId); ps.setObject(11, mdc); + ps.setString(12, entry.getSource() != null ? entry.getSource() : "app"); }); log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId); @@ -74,8 +75,8 @@ public class ClickHouseLogStore implements LogIndex { if (entries.isEmpty()) return; String sql = "INSERT INTO logs (tenant_id, environment, timestamp, application, instance_id, level, " + - "logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "logger_name, message, thread_name, stack_trace, exchange_id, mdc, source) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; jdbc.batchUpdate(sql, entries, entries.size(), (ps, ble) -> { LogEntry entry = ble.entry(); @@ -95,6 +96,7 @@ public class ClickHouseLogStore implements LogIndex { String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); ps.setString(11, exchangeId); ps.setObject(12, mdc); + ps.setString(13, entry.getSource() != null ? entry.getSource() : "app"); }); log.debug("Flushed {} buffered log entries to ClickHouse", entries.size()); @@ -141,6 +143,11 @@ public class ClickHouseLogStore implements LogIndex { baseParams.add("%" + escapeLike(request.logger()) + "%"); } + if (request.source() != null && !request.source().isEmpty()) { + baseConditions.add("source = ?"); + baseParams.add(request.source()); + } + if (request.from() != null) { baseConditions.add("timestamp >= ?"); baseParams.add(Timestamp.from(request.from())); @@ -182,7 +189,7 @@ public class ClickHouseLogStore implements LogIndex { int fetchLimit = request.limit() + 1; // fetch N+1 to detect hasMore String dataSql = "SELECT timestamp, level, logger_name, message, thread_name, stack_trace, " + - "exchange_id, instance_id, application, mdc " + + "exchange_id, instance_id, application, mdc, source " + "FROM logs WHERE " + dataWhere + " ORDER BY timestamp " + orderDir + " LIMIT ?"; dataParams.add(fetchLimit); @@ -197,6 +204,8 @@ public class ClickHouseLogStore implements LogIndex { Map mdc = (Map) rs.getObject("mdc"); if (mdc == null) mdc = Collections.emptyMap(); + String source = rs.getString("source"); + return new LogEntryResult( timestampStr, rs.getString("level"), @@ -207,7 +216,8 @@ public class ClickHouseLogStore implements LogIndex { rs.getString("exchange_id"), rs.getString("instance_id"), rs.getString("application"), - mdc + mdc, + source ); }); diff --git a/cameleer3-server-app/src/main/resources/clickhouse/init.sql b/cameleer3-server-app/src/main/resources/clickhouse/init.sql index f6e848c9..735d1526 100644 --- a/cameleer3-server-app/src/main/resources/clickhouse/init.sql +++ b/cameleer3-server-app/src/main/resources/clickhouse/init.sql @@ -357,6 +357,9 @@ ORDER BY (tenant_id, timestamp, environment, application, instance_id) TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; +-- Add source column for log forwarding v2 (app vs agent logs) +ALTER TABLE logs ADD COLUMN IF NOT EXISTS source LowCardinality(String) DEFAULT 'app'; + -- ── Usage Events ──────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS usage_events ( diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/LogSearchRequest.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/LogSearchRequest.java index e8372a50..0235ccd8 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/LogSearchRequest.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/LogSearchRequest.java @@ -13,6 +13,7 @@ import java.util.List; * @param exchangeId Camel exchange ID filter * @param logger logger name substring filter * @param environment optional environment filter (e.g. "dev", "staging", "prod") + * @param source optional source filter: "app" or "agent" * @param from inclusive start of time range (required) * @param to inclusive end of time range (required) * @param cursor ISO timestamp cursor for keyset pagination @@ -27,6 +28,7 @@ public record LogSearchRequest( String exchangeId, String logger, String environment, + String source, Instant from, Instant to, String cursor, diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogEntryResult.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogEntryResult.java index 2eba2415..0e5bd585 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogEntryResult.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/LogEntryResult.java @@ -5,4 +5,4 @@ import java.util.Map; public record LogEntryResult(String timestamp, String level, String loggerName, String message, String threadName, String stackTrace, String exchangeId, String instanceId, String application, - Map mdc) {} + Map mdc, String source) {}