diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java index 3f38c4b6..dc90e8eb 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java @@ -1,5 +1,6 @@ package com.cameleer3.server.app.config; +import com.cameleer3.server.core.ingestion.BufferedLogEntry; import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; @@ -34,4 +35,10 @@ public class IngestionBeanConfig { public WriteBuffer processorBatchBuffer(IngestionConfig config) { return new WriteBuffer<>(config.getBufferCapacity()); } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public WriteBuffer logBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 4ca84bb0..913637ff 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -15,6 +15,7 @@ import com.cameleer3.server.core.indexing.SearchIndexer; import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler; import com.cameleer3.server.app.search.ClickHouseSearchIndex; import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.BufferedLogEntry; import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.IngestionService; import com.cameleer3.server.core.ingestion.MergedExecution; @@ -96,11 +97,13 @@ public class StorageBeanConfig { public ExecutionFlushScheduler executionFlushScheduler( WriteBuffer executionBuffer, WriteBuffer processorBatchBuffer, + WriteBuffer logBuffer, ClickHouseExecutionStore executionStore, + ClickHouseLogStore logStore, ChunkAccumulator accumulator, IngestionConfig config) { return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer, - executionStore, accumulator, config); + logBuffer, executionStore, logStore, accumulator, config); } @Bean @@ -136,7 +139,7 @@ public class StorageBeanConfig { // ── ClickHouse Log Store ────────────────────────────────────────── @Bean - public LogIndex clickHouseLogStore( + public ClickHouseLogStore clickHouseLogStore( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseLogStore(clickHouseJdbc); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java index 5fe718d5..94092d9a 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java @@ -1,7 +1,8 @@ package com.cameleer3.server.app.controller; import com.cameleer3.common.model.LogBatch; -import com.cameleer3.server.core.storage.LogIndex; +import com.cameleer3.server.core.ingestion.BufferedLogEntry; +import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; import io.swagger.v3.oas.annotations.Operation; @@ -24,18 +25,18 @@ public class LogIngestionController { private static final Logger log = LoggerFactory.getLogger(LogIngestionController.class); - private final LogIndex logIndex; + private final WriteBuffer logBuffer; private final AgentRegistryService registryService; - public LogIngestionController(LogIndex logIndex, + public LogIngestionController(WriteBuffer logBuffer, AgentRegistryService registryService) { - this.logIndex = logIndex; + this.logBuffer = logBuffer; this.registryService = registryService; } @PostMapping("/logs") @Operation(summary = "Ingest application log entries", - description = "Accepts a batch of log entries from an agent. Entries are stored in the configured log store.") + description = "Accepts a batch of log entries from an agent. Entries are buffered and flushed periodically.") @ApiResponse(responseCode = "202", description = "Logs accepted for indexing") public ResponseEntity ingestLogs(@RequestBody LogBatch batch) { String instanceId = extractAgentId(); @@ -43,7 +44,9 @@ public class LogIngestionController { if (batch.getEntries() != null && !batch.getEntries().isEmpty()) { log.debug("Received {} log entries from instance={}, app={}", batch.getEntries().size(), instanceId, applicationId); - logIndex.indexBatch(instanceId, applicationId, batch.getEntries()); + for (var entry : batch.getEntries()) { + logBuffer.offerOrWarn(new BufferedLogEntry(instanceId, applicationId, entry)); + } } return ResponseEntity.accepted().build(); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java index 17ec91e5..a9813508 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java @@ -1,7 +1,9 @@ package com.cameleer3.server.app.ingestion; import com.cameleer3.server.app.config.IngestionConfig; +import com.cameleer3.server.app.search.ClickHouseLogStore; import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.BufferedLogEntry; import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; @@ -27,19 +29,25 @@ public class ExecutionFlushScheduler implements SmartLifecycle { private final WriteBuffer executionBuffer; private final WriteBuffer processorBuffer; + private final WriteBuffer logBuffer; private final ClickHouseExecutionStore executionStore; + private final ClickHouseLogStore logStore; private final ChunkAccumulator accumulator; private final int batchSize; private volatile boolean running = false; public ExecutionFlushScheduler(WriteBuffer executionBuffer, WriteBuffer processorBuffer, + WriteBuffer logBuffer, ClickHouseExecutionStore executionStore, + ClickHouseLogStore logStore, ChunkAccumulator accumulator, IngestionConfig config) { this.executionBuffer = executionBuffer; this.processorBuffer = processorBuffer; + this.logBuffer = logBuffer; this.executionStore = executionStore; + this.logStore = logStore; this.accumulator = accumulator; this.batchSize = config.getBatchSize(); } @@ -58,21 +66,23 @@ public class ExecutionFlushScheduler implements SmartLifecycle { try { List batches = processorBuffer.drain(batchSize); - for (ChunkAccumulator.ProcessorBatch batch : batches) { - executionStore.insertProcessorBatch( - batch.tenantId(), - batch.executionId(), - batch.routeId(), - batch.applicationId(), - batch.execStartTime(), - batch.processors()); - } if (!batches.isEmpty()) { + executionStore.insertProcessorBatches(batches); log.debug("Flushed {} processor batches to ClickHouse", batches.size()); } } catch (Exception e) { log.error("Failed to flush processor batches", e); } + + try { + List logEntries = logBuffer.drain(batchSize); + if (!logEntries.isEmpty()) { + logStore.insertBufferedBatch(logEntries); + log.debug("Flushed {} log entries to ClickHouse", logEntries.size()); + } + } catch (Exception e) { + log.error("Failed to flush log entries", e); + } } @Scheduled(fixedDelay = 60_000) @@ -107,20 +117,23 @@ public class ExecutionFlushScheduler implements SmartLifecycle { List batches = processorBuffer.drain(batchSize); if (batches.isEmpty()) break; try { - for (ChunkAccumulator.ProcessorBatch batch : batches) { - executionStore.insertProcessorBatch( - batch.tenantId(), - batch.executionId(), - batch.routeId(), - batch.applicationId(), - batch.execStartTime(), - batch.processors()); - } + executionStore.insertProcessorBatches(batches); } catch (Exception e) { log.error("Failed to flush processor batches during shutdown", e); break; } } + // Drain remaining log entries on shutdown + while (logBuffer.size() > 0) { + List entries = logBuffer.drain(batchSize); + if (entries.isEmpty()) break; + try { + logStore.insertBufferedBatch(entries); + } catch (Exception e) { + log.error("Failed to flush log entries during shutdown", e); + break; + } + } running = false; } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java index c87f41a3..5353c5e0 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java @@ -1,6 +1,7 @@ package com.cameleer3.server.app.search; import com.cameleer3.common.model.LogEntry; +import com.cameleer3.server.core.ingestion.BufferedLogEntry; import com.cameleer3.server.core.search.LogSearchRequest; import com.cameleer3.server.core.search.LogSearchResponse; import com.cameleer3.server.core.storage.LogEntryResult; @@ -67,6 +68,34 @@ public class ClickHouseLogStore implements LogIndex { log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId); } + public void insertBufferedBatch(List entries) { + if (entries.isEmpty()) return; + + String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " + + "logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + + "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + jdbc.batchUpdate(sql, entries, entries.size(), (ps, ble) -> { + LogEntry entry = ble.entry(); + Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now(); + ps.setTimestamp(1, Timestamp.from(ts)); + ps.setString(2, ble.applicationId()); + ps.setString(3, ble.instanceId()); + ps.setString(4, entry.getLevel() != null ? entry.getLevel() : ""); + ps.setString(5, entry.getLoggerName() != null ? entry.getLoggerName() : ""); + ps.setString(6, entry.getMessage() != null ? entry.getMessage() : ""); + ps.setString(7, entry.getThreadName() != null ? entry.getThreadName() : ""); + ps.setString(8, entry.getStackTrace() != null ? entry.getStackTrace() : ""); + + Map mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap(); + String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); + ps.setString(9, exchangeId); + ps.setObject(10, mdc); + }); + + log.debug("Flushed {} buffered log entries to ClickHouse", entries.size()); + } + @Override public LogSearchResponse search(LogSearchRequest request) { // Build shared WHERE conditions (used by both data and count queries) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java index fa0c0295..061f2597 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java @@ -1,5 +1,6 @@ package com.cameleer3.server.app.storage; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.storage.ExecutionStore; import com.cameleer3.common.model.FlatProcessorRecord; @@ -132,6 +133,60 @@ public class ClickHouseExecutionStore implements ExecutionStore { }).toList()); } + public void insertProcessorBatches(List batches) { + if (batches.isEmpty()) return; + + List allRows = batches.stream() + .flatMap(batch -> batch.processors().stream().map(p -> new Object[]{ + nullToEmpty(batch.tenantId()), + nullToEmpty(batch.executionId()), + p.getSeq(), + p.getParentSeq(), + nullToEmpty(p.getParentProcessorId()), + nullToEmpty(p.getProcessorId()), + nullToEmpty(p.getProcessorType()), + Timestamp.from(p.getStartTime() != null ? p.getStartTime() : batch.execStartTime()), + nullToEmpty(batch.routeId()), + nullToEmpty(batch.applicationId()), + p.getIteration(), + p.getIterationSize(), + p.getStatus() != null ? p.getStatus().name() : "", + computeEndTime(p.getStartTime(), p.getDurationMs()), + p.getDurationMs(), + nullToEmpty(p.getErrorMessage()), + nullToEmpty(p.getErrorStackTrace()), + nullToEmpty(p.getErrorType()), + nullToEmpty(p.getErrorCategory()), + nullToEmpty(p.getRootCauseType()), + nullToEmpty(p.getRootCauseMessage()), + nullToEmpty(p.getInputBody()), + nullToEmpty(p.getOutputBody()), + mapToJson(p.getInputHeaders()), + mapToJson(p.getOutputHeaders()), + mapToJson(p.getAttributes()), + nullToEmpty(p.getResolvedEndpointUri()), + nullToEmpty(p.getCircuitBreakerState()), + boolOrFalse(p.getFallbackTriggered()), + boolOrFalse(p.getFilterMatched()), + boolOrFalse(p.getDuplicateMessage()) + })) + .toList(); + + jdbc.batchUpdate(""" + INSERT INTO processor_executions ( + tenant_id, execution_id, seq, parent_seq, parent_processor_id, + processor_id, processor_type, start_time, route_id, application_id, + iteration, iteration_size, status, end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, + input_body, output_body, input_headers, output_headers, attributes, + resolved_endpoint_uri, circuit_breaker_state, + fallback_triggered, filter_matched, duplicate_message + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, allRows); + } + // --- ExecutionStore interface: read methods --- @Override diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/BufferedLogEntry.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/BufferedLogEntry.java new file mode 100644 index 00000000..721ff86f --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/BufferedLogEntry.java @@ -0,0 +1,12 @@ +package com.cameleer3.server.core.ingestion; + +import com.cameleer3.common.model.LogEntry; + +/** + * A log entry paired with its agent metadata, ready for buffered ClickHouse insertion. + */ +public record BufferedLogEntry( + String instanceId, + String applicationId, + LogEntry entry +) {} diff --git a/deploy/clickhouse.yaml b/deploy/clickhouse.yaml index d11dea02..cb19e974 100644 --- a/deploy/clickhouse.yaml +++ b/deploy/clickhouse.yaml @@ -182,10 +182,6 @@ data: 8192 1000 600 - - 1 - 0 - 5000 0 0