From 633a61d89d19dde28b6e162233b9b366110408dc Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Fri, 3 Apr 2026 22:48:04 +0200 Subject: [PATCH] perf: batch processor and log inserts to reduce ClickHouse part creation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Diagnostics showed ~3,200 tiny inserts per 5 minutes: - processor_executions: 2,376 inserts (14 rows avg) — one per chunk - logs: 803 inserts (5 rows avg) — synchronous in HTTP handler Fix 1: Consolidate processor inserts — new insertProcessorBatches() method flattens all ProcessorBatch records into a single INSERT per flush cycle. Fix 2: Buffer log inserts — route through WriteBuffer, flushed on the same 5s interval as executions. LogIngestionController now pushes to buffer instead of inserting directly. Also reverts async_insert config (doesn't work with JDBC inline VALUES). Expected: ~3,200 inserts/5min → ~160 (20x reduction in part creation, MV triggers, and background merge work). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/config/IngestionBeanConfig.java | 7 +++ .../server/app/config/StorageBeanConfig.java | 7 ++- .../controller/LogIngestionController.java | 15 +++-- .../ingestion/ExecutionFlushScheduler.java | 49 +++++++++++------ .../server/app/search/ClickHouseLogStore.java | 29 ++++++++++ .../app/storage/ClickHouseExecutionStore.java | 55 +++++++++++++++++++ .../core/ingestion/BufferedLogEntry.java | 12 ++++ deploy/clickhouse.yaml | 4 -- 8 files changed, 148 insertions(+), 30 deletions(-) create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/BufferedLogEntry.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java index 3f38c4b6..dc90e8eb 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java @@ -1,5 +1,6 @@ package com.cameleer3.server.app.config; +import com.cameleer3.server.core.ingestion.BufferedLogEntry; import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; @@ -34,4 +35,10 @@ public class IngestionBeanConfig { public WriteBuffer processorBatchBuffer(IngestionConfig config) { return new WriteBuffer<>(config.getBufferCapacity()); } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public WriteBuffer logBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 4ca84bb0..913637ff 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -15,6 +15,7 @@ import com.cameleer3.server.core.indexing.SearchIndexer; import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler; import com.cameleer3.server.app.search.ClickHouseSearchIndex; import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.BufferedLogEntry; import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.IngestionService; import com.cameleer3.server.core.ingestion.MergedExecution; @@ -96,11 +97,13 @@ public class StorageBeanConfig { public ExecutionFlushScheduler executionFlushScheduler( WriteBuffer executionBuffer, WriteBuffer processorBatchBuffer, + WriteBuffer logBuffer, ClickHouseExecutionStore executionStore, + ClickHouseLogStore logStore, ChunkAccumulator accumulator, IngestionConfig config) { return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer, - executionStore, accumulator, config); + logBuffer, executionStore, logStore, accumulator, config); } @Bean @@ -136,7 +139,7 @@ public class StorageBeanConfig { // ── ClickHouse Log Store ────────────────────────────────────────── @Bean - public LogIndex clickHouseLogStore( + public ClickHouseLogStore clickHouseLogStore( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseLogStore(clickHouseJdbc); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java index 5fe718d5..94092d9a 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java @@ -1,7 +1,8 @@ package com.cameleer3.server.app.controller; import com.cameleer3.common.model.LogBatch; -import com.cameleer3.server.core.storage.LogIndex; +import com.cameleer3.server.core.ingestion.BufferedLogEntry; +import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; import io.swagger.v3.oas.annotations.Operation; @@ -24,18 +25,18 @@ public class LogIngestionController { private static final Logger log = LoggerFactory.getLogger(LogIngestionController.class); - private final LogIndex logIndex; + private final WriteBuffer logBuffer; private final AgentRegistryService registryService; - public LogIngestionController(LogIndex logIndex, + public LogIngestionController(WriteBuffer logBuffer, AgentRegistryService registryService) { - this.logIndex = logIndex; + this.logBuffer = logBuffer; this.registryService = registryService; } @PostMapping("/logs") @Operation(summary = "Ingest application log entries", - description = "Accepts a batch of log entries from an agent. Entries are stored in the configured log store.") + description = "Accepts a batch of log entries from an agent. Entries are buffered and flushed periodically.") @ApiResponse(responseCode = "202", description = "Logs accepted for indexing") public ResponseEntity ingestLogs(@RequestBody LogBatch batch) { String instanceId = extractAgentId(); @@ -43,7 +44,9 @@ public class LogIngestionController { if (batch.getEntries() != null && !batch.getEntries().isEmpty()) { log.debug("Received {} log entries from instance={}, app={}", batch.getEntries().size(), instanceId, applicationId); - logIndex.indexBatch(instanceId, applicationId, batch.getEntries()); + for (var entry : batch.getEntries()) { + logBuffer.offerOrWarn(new BufferedLogEntry(instanceId, applicationId, entry)); + } } return ResponseEntity.accepted().build(); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java index 17ec91e5..a9813508 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java @@ -1,7 +1,9 @@ package com.cameleer3.server.app.ingestion; import com.cameleer3.server.app.config.IngestionConfig; +import com.cameleer3.server.app.search.ClickHouseLogStore; import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.BufferedLogEntry; import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; @@ -27,19 +29,25 @@ public class ExecutionFlushScheduler implements SmartLifecycle { private final WriteBuffer executionBuffer; private final WriteBuffer processorBuffer; + private final WriteBuffer logBuffer; private final ClickHouseExecutionStore executionStore; + private final ClickHouseLogStore logStore; private final ChunkAccumulator accumulator; private final int batchSize; private volatile boolean running = false; public ExecutionFlushScheduler(WriteBuffer executionBuffer, WriteBuffer processorBuffer, + WriteBuffer logBuffer, ClickHouseExecutionStore executionStore, + ClickHouseLogStore logStore, ChunkAccumulator accumulator, IngestionConfig config) { this.executionBuffer = executionBuffer; this.processorBuffer = processorBuffer; + this.logBuffer = logBuffer; this.executionStore = executionStore; + this.logStore = logStore; this.accumulator = accumulator; this.batchSize = config.getBatchSize(); } @@ -58,21 +66,23 @@ public class ExecutionFlushScheduler implements SmartLifecycle { try { List batches = processorBuffer.drain(batchSize); - for (ChunkAccumulator.ProcessorBatch batch : batches) { - executionStore.insertProcessorBatch( - batch.tenantId(), - batch.executionId(), - batch.routeId(), - batch.applicationId(), - batch.execStartTime(), - batch.processors()); - } if (!batches.isEmpty()) { + executionStore.insertProcessorBatches(batches); log.debug("Flushed {} processor batches to ClickHouse", batches.size()); } } catch (Exception e) { log.error("Failed to flush processor batches", e); } + + try { + List logEntries = logBuffer.drain(batchSize); + if (!logEntries.isEmpty()) { + logStore.insertBufferedBatch(logEntries); + log.debug("Flushed {} log entries to ClickHouse", logEntries.size()); + } + } catch (Exception e) { + log.error("Failed to flush log entries", e); + } } @Scheduled(fixedDelay = 60_000) @@ -107,20 +117,23 @@ public class ExecutionFlushScheduler implements SmartLifecycle { List batches = processorBuffer.drain(batchSize); if (batches.isEmpty()) break; try { - for (ChunkAccumulator.ProcessorBatch batch : batches) { - executionStore.insertProcessorBatch( - batch.tenantId(), - batch.executionId(), - batch.routeId(), - batch.applicationId(), - batch.execStartTime(), - batch.processors()); - } + executionStore.insertProcessorBatches(batches); } catch (Exception e) { log.error("Failed to flush processor batches during shutdown", e); break; } } + // Drain remaining log entries on shutdown + while (logBuffer.size() > 0) { + List entries = logBuffer.drain(batchSize); + if (entries.isEmpty()) break; + try { + logStore.insertBufferedBatch(entries); + } catch (Exception e) { + log.error("Failed to flush log entries during shutdown", e); + break; + } + } running = false; } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java index c87f41a3..5353c5e0 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java @@ -1,6 +1,7 @@ package com.cameleer3.server.app.search; import com.cameleer3.common.model.LogEntry; +import com.cameleer3.server.core.ingestion.BufferedLogEntry; import com.cameleer3.server.core.search.LogSearchRequest; import com.cameleer3.server.core.search.LogSearchResponse; import com.cameleer3.server.core.storage.LogEntryResult; @@ -67,6 +68,34 @@ public class ClickHouseLogStore implements LogIndex { log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId); } + public void insertBufferedBatch(List entries) { + if (entries.isEmpty()) return; + + String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " + + "logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + + "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + jdbc.batchUpdate(sql, entries, entries.size(), (ps, ble) -> { + LogEntry entry = ble.entry(); + Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now(); + ps.setTimestamp(1, Timestamp.from(ts)); + ps.setString(2, ble.applicationId()); + ps.setString(3, ble.instanceId()); + ps.setString(4, entry.getLevel() != null ? entry.getLevel() : ""); + ps.setString(5, entry.getLoggerName() != null ? entry.getLoggerName() : ""); + ps.setString(6, entry.getMessage() != null ? entry.getMessage() : ""); + ps.setString(7, entry.getThreadName() != null ? entry.getThreadName() : ""); + ps.setString(8, entry.getStackTrace() != null ? entry.getStackTrace() : ""); + + Map mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap(); + String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); + ps.setString(9, exchangeId); + ps.setObject(10, mdc); + }); + + log.debug("Flushed {} buffered log entries to ClickHouse", entries.size()); + } + @Override public LogSearchResponse search(LogSearchRequest request) { // Build shared WHERE conditions (used by both data and count queries) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java index fa0c0295..061f2597 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java @@ -1,5 +1,6 @@ package com.cameleer3.server.app.storage; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.storage.ExecutionStore; import com.cameleer3.common.model.FlatProcessorRecord; @@ -132,6 +133,60 @@ public class ClickHouseExecutionStore implements ExecutionStore { }).toList()); } + public void insertProcessorBatches(List batches) { + if (batches.isEmpty()) return; + + List allRows = batches.stream() + .flatMap(batch -> batch.processors().stream().map(p -> new Object[]{ + nullToEmpty(batch.tenantId()), + nullToEmpty(batch.executionId()), + p.getSeq(), + p.getParentSeq(), + nullToEmpty(p.getParentProcessorId()), + nullToEmpty(p.getProcessorId()), + nullToEmpty(p.getProcessorType()), + Timestamp.from(p.getStartTime() != null ? p.getStartTime() : batch.execStartTime()), + nullToEmpty(batch.routeId()), + nullToEmpty(batch.applicationId()), + p.getIteration(), + p.getIterationSize(), + p.getStatus() != null ? p.getStatus().name() : "", + computeEndTime(p.getStartTime(), p.getDurationMs()), + p.getDurationMs(), + nullToEmpty(p.getErrorMessage()), + nullToEmpty(p.getErrorStackTrace()), + nullToEmpty(p.getErrorType()), + nullToEmpty(p.getErrorCategory()), + nullToEmpty(p.getRootCauseType()), + nullToEmpty(p.getRootCauseMessage()), + nullToEmpty(p.getInputBody()), + nullToEmpty(p.getOutputBody()), + mapToJson(p.getInputHeaders()), + mapToJson(p.getOutputHeaders()), + mapToJson(p.getAttributes()), + nullToEmpty(p.getResolvedEndpointUri()), + nullToEmpty(p.getCircuitBreakerState()), + boolOrFalse(p.getFallbackTriggered()), + boolOrFalse(p.getFilterMatched()), + boolOrFalse(p.getDuplicateMessage()) + })) + .toList(); + + jdbc.batchUpdate(""" + INSERT INTO processor_executions ( + tenant_id, execution_id, seq, parent_seq, parent_processor_id, + processor_id, processor_type, start_time, route_id, application_id, + iteration, iteration_size, status, end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, + input_body, output_body, input_headers, output_headers, attributes, + resolved_endpoint_uri, circuit_breaker_state, + fallback_triggered, filter_matched, duplicate_message + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, allRows); + } + // --- ExecutionStore interface: read methods --- @Override diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/BufferedLogEntry.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/BufferedLogEntry.java new file mode 100644 index 00000000..721ff86f --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/BufferedLogEntry.java @@ -0,0 +1,12 @@ +package com.cameleer3.server.core.ingestion; + +import com.cameleer3.common.model.LogEntry; + +/** + * A log entry paired with its agent metadata, ready for buffered ClickHouse insertion. + */ +public record BufferedLogEntry( + String instanceId, + String applicationId, + LogEntry entry +) {} diff --git a/deploy/clickhouse.yaml b/deploy/clickhouse.yaml index d11dea02..cb19e974 100644 --- a/deploy/clickhouse.yaml +++ b/deploy/clickhouse.yaml @@ -182,10 +182,6 @@ data: 8192 1000 600 - - 1 - 0 - 5000 0 0