feat: log forwarding v2 — accept List<LogEntry>, add source field

Replace LogBatch wrapper with raw List<LogEntry> on the ingestion endpoint.
Add source column to ClickHouse logs table and propagate it through the
storage, search, and HTTP layers (LogSearchRequest, LogEntryResult,
LogEntryResponse, LogQueryController).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-12 10:25:46 +02:00
parent 4b18579b11
commit b03dfee4f3
7 changed files with 33 additions and 15 deletions

View File

@@ -1,7 +1,8 @@
package com.cameleer3.server.app.controller; package com.cameleer3.server.app.controller;
import com.cameleer3.common.model.LogBatch; import com.cameleer3.common.model.LogEntry;
import com.cameleer3.server.core.ingestion.BufferedLogEntry; import com.cameleer3.server.core.ingestion.BufferedLogEntry;
import java.util.List;
import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.agent.AgentRegistryService;
@@ -42,14 +43,14 @@ public class LogIngestionController {
@Operation(summary = "Ingest application log entries", @Operation(summary = "Ingest application log entries",
description = "Accepts a batch of log entries from an agent. Entries are buffered and flushed periodically.") description = "Accepts a batch of log entries from an agent. Entries are buffered and flushed periodically.")
@ApiResponse(responseCode = "202", description = "Logs accepted for indexing") @ApiResponse(responseCode = "202", description = "Logs accepted for indexing")
public ResponseEntity<Void> ingestLogs(@RequestBody LogBatch batch) { public ResponseEntity<Void> ingestLogs(@RequestBody List<LogEntry> entries) {
String instanceId = extractAgentId(); String instanceId = extractAgentId();
String applicationId = resolveApplicationId(instanceId); String applicationId = resolveApplicationId(instanceId);
if (batch.getEntries() != null && !batch.getEntries().isEmpty()) { if (entries != null && !entries.isEmpty()) {
log.debug("Received {} log entries from instance={}, app={}", batch.getEntries().size(), instanceId, applicationId); log.debug("Received {} log entries from instance={}, app={}", entries.size(), instanceId, applicationId);
String environment = resolveEnvironment(instanceId); String environment = resolveEnvironment(instanceId);
for (var entry : batch.getEntries()) { for (var entry : entries) {
logBuffer.offerOrWarn(new BufferedLogEntry( logBuffer.offerOrWarn(new BufferedLogEntry(
tenantProperties.getId(), environment, instanceId, applicationId, entry)); tenantProperties.getId(), environment, instanceId, applicationId, entry));
} }

View File

@@ -41,6 +41,7 @@ public class LogQueryController {
@RequestParam(required = false) String exchangeId, @RequestParam(required = false) String exchangeId,
@RequestParam(required = false) String logger, @RequestParam(required = false) String logger,
@RequestParam(required = false) String environment, @RequestParam(required = false) String environment,
@RequestParam(required = false) String source,
@RequestParam(required = false) String from, @RequestParam(required = false) String from,
@RequestParam(required = false) String to, @RequestParam(required = false) String to,
@RequestParam(required = false) String cursor, @RequestParam(required = false) String cursor,
@@ -64,7 +65,7 @@ public class LogQueryController {
LogSearchRequest request = new LogSearchRequest( LogSearchRequest request = new LogSearchRequest(
searchText, levels, application, instanceId, exchangeId, searchText, levels, application, instanceId, exchangeId,
logger, environment, fromInstant, toInstant, cursor, limit, sort); logger, environment, source, fromInstant, toInstant, cursor, limit, sort);
LogSearchResponse result = logIndex.search(request); LogSearchResponse result = logIndex.search(request);
@@ -73,7 +74,7 @@ public class LogQueryController {
r.timestamp(), r.level(), r.loggerName(), r.timestamp(), r.level(), r.loggerName(),
r.message(), r.threadName(), r.stackTrace(), r.message(), r.threadName(), r.stackTrace(),
r.exchangeId(), r.instanceId(), r.application(), r.exchangeId(), r.instanceId(), r.application(),
r.mdc())) r.mdc(), r.source()))
.toList(); .toList();
return ResponseEntity.ok(new LogSearchPageResponse( return ResponseEntity.ok(new LogSearchPageResponse(

View File

@@ -15,5 +15,6 @@ public record LogEntryResponse(
@Schema(description = "Camel exchange ID (if present)") String exchangeId, @Schema(description = "Camel exchange ID (if present)") String exchangeId,
@Schema(description = "Agent instance ID") String instanceId, @Schema(description = "Agent instance ID") String instanceId,
@Schema(description = "Application ID") String application, @Schema(description = "Application ID") String application,
@Schema(description = "MDC context map") Map<String, String> mdc @Schema(description = "MDC context map") Map<String, String> mdc,
@Schema(description = "Log source: app or agent") String source
) {} ) {}

View File

@@ -46,8 +46,8 @@ public class ClickHouseLogStore implements LogIndex {
} }
String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " + String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " +
"logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + "logger_name, message, thread_name, stack_trace, exchange_id, mdc, source) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
jdbc.batchUpdate(sql, entries, entries.size(), (ps, entry) -> { jdbc.batchUpdate(sql, entries, entries.size(), (ps, entry) -> {
Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now(); Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now();
@@ -65,6 +65,7 @@ public class ClickHouseLogStore implements LogIndex {
String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); String exchangeId = mdc.getOrDefault("camel.exchangeId", "");
ps.setString(10, exchangeId); ps.setString(10, exchangeId);
ps.setObject(11, mdc); ps.setObject(11, mdc);
ps.setString(12, entry.getSource() != null ? entry.getSource() : "app");
}); });
log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId); log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId);
@@ -74,8 +75,8 @@ public class ClickHouseLogStore implements LogIndex {
if (entries.isEmpty()) return; if (entries.isEmpty()) return;
String sql = "INSERT INTO logs (tenant_id, environment, timestamp, application, instance_id, level, " + String sql = "INSERT INTO logs (tenant_id, environment, timestamp, application, instance_id, level, " +
"logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + "logger_name, message, thread_name, stack_trace, exchange_id, mdc, source) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
jdbc.batchUpdate(sql, entries, entries.size(), (ps, ble) -> { jdbc.batchUpdate(sql, entries, entries.size(), (ps, ble) -> {
LogEntry entry = ble.entry(); LogEntry entry = ble.entry();
@@ -95,6 +96,7 @@ public class ClickHouseLogStore implements LogIndex {
String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); String exchangeId = mdc.getOrDefault("camel.exchangeId", "");
ps.setString(11, exchangeId); ps.setString(11, exchangeId);
ps.setObject(12, mdc); ps.setObject(12, mdc);
ps.setString(13, entry.getSource() != null ? entry.getSource() : "app");
}); });
log.debug("Flushed {} buffered log entries to ClickHouse", entries.size()); log.debug("Flushed {} buffered log entries to ClickHouse", entries.size());
@@ -141,6 +143,11 @@ public class ClickHouseLogStore implements LogIndex {
baseParams.add("%" + escapeLike(request.logger()) + "%"); baseParams.add("%" + escapeLike(request.logger()) + "%");
} }
if (request.source() != null && !request.source().isEmpty()) {
baseConditions.add("source = ?");
baseParams.add(request.source());
}
if (request.from() != null) { if (request.from() != null) {
baseConditions.add("timestamp >= ?"); baseConditions.add("timestamp >= ?");
baseParams.add(Timestamp.from(request.from())); baseParams.add(Timestamp.from(request.from()));
@@ -182,7 +189,7 @@ public class ClickHouseLogStore implements LogIndex {
int fetchLimit = request.limit() + 1; // fetch N+1 to detect hasMore int fetchLimit = request.limit() + 1; // fetch N+1 to detect hasMore
String dataSql = "SELECT timestamp, level, logger_name, message, thread_name, stack_trace, " + String dataSql = "SELECT timestamp, level, logger_name, message, thread_name, stack_trace, " +
"exchange_id, instance_id, application, mdc " + "exchange_id, instance_id, application, mdc, source " +
"FROM logs WHERE " + dataWhere + "FROM logs WHERE " + dataWhere +
" ORDER BY timestamp " + orderDir + " LIMIT ?"; " ORDER BY timestamp " + orderDir + " LIMIT ?";
dataParams.add(fetchLimit); dataParams.add(fetchLimit);
@@ -197,6 +204,8 @@ public class ClickHouseLogStore implements LogIndex {
Map<String, String> mdc = (Map<String, String>) rs.getObject("mdc"); Map<String, String> mdc = (Map<String, String>) rs.getObject("mdc");
if (mdc == null) mdc = Collections.emptyMap(); if (mdc == null) mdc = Collections.emptyMap();
String source = rs.getString("source");
return new LogEntryResult( return new LogEntryResult(
timestampStr, timestampStr,
rs.getString("level"), rs.getString("level"),
@@ -207,7 +216,8 @@ public class ClickHouseLogStore implements LogIndex {
rs.getString("exchange_id"), rs.getString("exchange_id"),
rs.getString("instance_id"), rs.getString("instance_id"),
rs.getString("application"), rs.getString("application"),
mdc mdc,
source
); );
}); });

View File

@@ -357,6 +357,9 @@ ORDER BY (tenant_id, timestamp, environment, application, instance_id)
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192; SETTINGS index_granularity = 8192;
-- Add source column for log forwarding v2 (app vs agent logs)
ALTER TABLE logs ADD COLUMN IF NOT EXISTS source LowCardinality(String) DEFAULT 'app';
-- ── Usage Events ──────────────────────────────────────────────────────── -- ── Usage Events ────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS usage_events ( CREATE TABLE IF NOT EXISTS usage_events (

View File

@@ -13,6 +13,7 @@ import java.util.List;
* @param exchangeId Camel exchange ID filter * @param exchangeId Camel exchange ID filter
* @param logger logger name substring filter * @param logger logger name substring filter
* @param environment optional environment filter (e.g. "dev", "staging", "prod") * @param environment optional environment filter (e.g. "dev", "staging", "prod")
* @param source optional source filter: "app" or "agent"
* @param from inclusive start of time range (required) * @param from inclusive start of time range (required)
* @param to inclusive end of time range (required) * @param to inclusive end of time range (required)
* @param cursor ISO timestamp cursor for keyset pagination * @param cursor ISO timestamp cursor for keyset pagination
@@ -27,6 +28,7 @@ public record LogSearchRequest(
String exchangeId, String exchangeId,
String logger, String logger,
String environment, String environment,
String source,
Instant from, Instant from,
Instant to, Instant to,
String cursor, String cursor,

View File

@@ -5,4 +5,4 @@ import java.util.Map;
public record LogEntryResult(String timestamp, String level, String loggerName, public record LogEntryResult(String timestamp, String level, String loggerName,
String message, String threadName, String stackTrace, String message, String threadName, String stackTrace,
String exchangeId, String instanceId, String application, String exchangeId, String instanceId, String application,
Map<String, String> mdc) {} Map<String, String> mdc, String source) {}