perf: batch processor and log inserts to reduce ClickHouse part creation
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m7s
CI / docker (push) Successful in 39s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 1m2s
SonarQube / sonarqube (push) Failing after 1m58s

Diagnostics showed ~3,200 tiny inserts per 5 minutes:
- processor_executions: 2,376 inserts (14 rows avg) — one per chunk
- logs: 803 inserts (5 rows avg) — synchronous in HTTP handler

Fix 1: Consolidate processor inserts — new insertProcessorBatches() method
flattens all ProcessorBatch records into a single INSERT per flush cycle.

Fix 2: Buffer log inserts — route through WriteBuffer<BufferedLogEntry>,
flushed on the same 5s interval as executions. LogIngestionController now
pushes to buffer instead of inserting directly.

Also reverts async_insert config (doesn't work with JDBC inline VALUES).

Expected: ~3,200 inserts/5min → ~160 (20x reduction in part creation,
MV triggers, and background merge work).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-03 22:48:04 +02:00
parent e0aac4bf0a
commit 633a61d89d
8 changed files with 148 additions and 30 deletions

View File

@@ -1,5 +1,6 @@
package com.cameleer3.server.app.config; package com.cameleer3.server.app.config;
import com.cameleer3.server.core.ingestion.BufferedLogEntry;
import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.ingestion.WriteBuffer;
@@ -34,4 +35,10 @@ public class IngestionBeanConfig {
public WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer(IngestionConfig config) { public WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity()); return new WriteBuffer<>(config.getBufferCapacity());
} }
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public WriteBuffer<BufferedLogEntry> logBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
} }

View File

@@ -15,6 +15,7 @@ import com.cameleer3.server.core.indexing.SearchIndexer;
import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler; import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler;
import com.cameleer3.server.app.search.ClickHouseSearchIndex; import com.cameleer3.server.app.search.ClickHouseSearchIndex;
import com.cameleer3.server.app.storage.ClickHouseExecutionStore; import com.cameleer3.server.app.storage.ClickHouseExecutionStore;
import com.cameleer3.server.core.ingestion.BufferedLogEntry;
import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.server.core.ingestion.IngestionService; import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.MergedExecution;
@@ -96,11 +97,13 @@ public class StorageBeanConfig {
public ExecutionFlushScheduler executionFlushScheduler( public ExecutionFlushScheduler executionFlushScheduler(
WriteBuffer<MergedExecution> executionBuffer, WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer, WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
WriteBuffer<BufferedLogEntry> logBuffer,
ClickHouseExecutionStore executionStore, ClickHouseExecutionStore executionStore,
ClickHouseLogStore logStore,
ChunkAccumulator accumulator, ChunkAccumulator accumulator,
IngestionConfig config) { IngestionConfig config) {
return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer, return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer,
executionStore, accumulator, config); logBuffer, executionStore, logStore, accumulator, config);
} }
@Bean @Bean
@@ -136,7 +139,7 @@ public class StorageBeanConfig {
// ── ClickHouse Log Store ────────────────────────────────────────── // ── ClickHouse Log Store ──────────────────────────────────────────
@Bean @Bean
public LogIndex clickHouseLogStore( public ClickHouseLogStore clickHouseLogStore(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseLogStore(clickHouseJdbc); return new ClickHouseLogStore(clickHouseJdbc);
} }

View File

@@ -1,7 +1,8 @@
package com.cameleer3.server.app.controller; package com.cameleer3.server.app.controller;
import com.cameleer3.common.model.LogBatch; import com.cameleer3.common.model.LogBatch;
import com.cameleer3.server.core.storage.LogIndex; import com.cameleer3.server.core.ingestion.BufferedLogEntry;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.agent.AgentRegistryService;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
@@ -24,18 +25,18 @@ public class LogIngestionController {
private static final Logger log = LoggerFactory.getLogger(LogIngestionController.class); private static final Logger log = LoggerFactory.getLogger(LogIngestionController.class);
private final LogIndex logIndex; private final WriteBuffer<BufferedLogEntry> logBuffer;
private final AgentRegistryService registryService; private final AgentRegistryService registryService;
public LogIngestionController(LogIndex logIndex, public LogIngestionController(WriteBuffer<BufferedLogEntry> logBuffer,
AgentRegistryService registryService) { AgentRegistryService registryService) {
this.logIndex = logIndex; this.logBuffer = logBuffer;
this.registryService = registryService; this.registryService = registryService;
} }
@PostMapping("/logs") @PostMapping("/logs")
@Operation(summary = "Ingest application log entries", @Operation(summary = "Ingest application log entries",
description = "Accepts a batch of log entries from an agent. Entries are stored in the configured log store.") description = "Accepts a batch of log entries from an agent. Entries are buffered and flushed periodically.")
@ApiResponse(responseCode = "202", description = "Logs accepted for indexing") @ApiResponse(responseCode = "202", description = "Logs accepted for indexing")
public ResponseEntity<Void> ingestLogs(@RequestBody LogBatch batch) { public ResponseEntity<Void> ingestLogs(@RequestBody LogBatch batch) {
String instanceId = extractAgentId(); String instanceId = extractAgentId();
@@ -43,7 +44,9 @@ public class LogIngestionController {
if (batch.getEntries() != null && !batch.getEntries().isEmpty()) { if (batch.getEntries() != null && !batch.getEntries().isEmpty()) {
log.debug("Received {} log entries from instance={}, app={}", batch.getEntries().size(), instanceId, applicationId); log.debug("Received {} log entries from instance={}, app={}", batch.getEntries().size(), instanceId, applicationId);
logIndex.indexBatch(instanceId, applicationId, batch.getEntries()); for (var entry : batch.getEntries()) {
logBuffer.offerOrWarn(new BufferedLogEntry(instanceId, applicationId, entry));
}
} }
return ResponseEntity.accepted().build(); return ResponseEntity.accepted().build();

View File

@@ -1,7 +1,9 @@
package com.cameleer3.server.app.ingestion; package com.cameleer3.server.app.ingestion;
import com.cameleer3.server.app.config.IngestionConfig; import com.cameleer3.server.app.config.IngestionConfig;
import com.cameleer3.server.app.search.ClickHouseLogStore;
import com.cameleer3.server.app.storage.ClickHouseExecutionStore; import com.cameleer3.server.app.storage.ClickHouseExecutionStore;
import com.cameleer3.server.core.ingestion.BufferedLogEntry;
import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.ingestion.WriteBuffer;
@@ -27,19 +29,25 @@ public class ExecutionFlushScheduler implements SmartLifecycle {
private final WriteBuffer<MergedExecution> executionBuffer; private final WriteBuffer<MergedExecution> executionBuffer;
private final WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer; private final WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer;
private final WriteBuffer<BufferedLogEntry> logBuffer;
private final ClickHouseExecutionStore executionStore; private final ClickHouseExecutionStore executionStore;
private final ClickHouseLogStore logStore;
private final ChunkAccumulator accumulator; private final ChunkAccumulator accumulator;
private final int batchSize; private final int batchSize;
private volatile boolean running = false; private volatile boolean running = false;
public ExecutionFlushScheduler(WriteBuffer<MergedExecution> executionBuffer, public ExecutionFlushScheduler(WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer, WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer,
WriteBuffer<BufferedLogEntry> logBuffer,
ClickHouseExecutionStore executionStore, ClickHouseExecutionStore executionStore,
ClickHouseLogStore logStore,
ChunkAccumulator accumulator, ChunkAccumulator accumulator,
IngestionConfig config) { IngestionConfig config) {
this.executionBuffer = executionBuffer; this.executionBuffer = executionBuffer;
this.processorBuffer = processorBuffer; this.processorBuffer = processorBuffer;
this.logBuffer = logBuffer;
this.executionStore = executionStore; this.executionStore = executionStore;
this.logStore = logStore;
this.accumulator = accumulator; this.accumulator = accumulator;
this.batchSize = config.getBatchSize(); this.batchSize = config.getBatchSize();
} }
@@ -58,21 +66,23 @@ public class ExecutionFlushScheduler implements SmartLifecycle {
try { try {
List<ChunkAccumulator.ProcessorBatch> batches = processorBuffer.drain(batchSize); List<ChunkAccumulator.ProcessorBatch> batches = processorBuffer.drain(batchSize);
for (ChunkAccumulator.ProcessorBatch batch : batches) {
executionStore.insertProcessorBatch(
batch.tenantId(),
batch.executionId(),
batch.routeId(),
batch.applicationId(),
batch.execStartTime(),
batch.processors());
}
if (!batches.isEmpty()) { if (!batches.isEmpty()) {
executionStore.insertProcessorBatches(batches);
log.debug("Flushed {} processor batches to ClickHouse", batches.size()); log.debug("Flushed {} processor batches to ClickHouse", batches.size());
} }
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to flush processor batches", e); log.error("Failed to flush processor batches", e);
} }
try {
List<BufferedLogEntry> logEntries = logBuffer.drain(batchSize);
if (!logEntries.isEmpty()) {
logStore.insertBufferedBatch(logEntries);
log.debug("Flushed {} log entries to ClickHouse", logEntries.size());
}
} catch (Exception e) {
log.error("Failed to flush log entries", e);
}
} }
@Scheduled(fixedDelay = 60_000) @Scheduled(fixedDelay = 60_000)
@@ -107,20 +117,23 @@ public class ExecutionFlushScheduler implements SmartLifecycle {
List<ChunkAccumulator.ProcessorBatch> batches = processorBuffer.drain(batchSize); List<ChunkAccumulator.ProcessorBatch> batches = processorBuffer.drain(batchSize);
if (batches.isEmpty()) break; if (batches.isEmpty()) break;
try { try {
for (ChunkAccumulator.ProcessorBatch batch : batches) { executionStore.insertProcessorBatches(batches);
executionStore.insertProcessorBatch(
batch.tenantId(),
batch.executionId(),
batch.routeId(),
batch.applicationId(),
batch.execStartTime(),
batch.processors());
}
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to flush processor batches during shutdown", e); log.error("Failed to flush processor batches during shutdown", e);
break; break;
} }
} }
// Drain remaining log entries on shutdown
while (logBuffer.size() > 0) {
List<BufferedLogEntry> entries = logBuffer.drain(batchSize);
if (entries.isEmpty()) break;
try {
logStore.insertBufferedBatch(entries);
} catch (Exception e) {
log.error("Failed to flush log entries during shutdown", e);
break;
}
}
running = false; running = false;
} }

View File

@@ -1,6 +1,7 @@
package com.cameleer3.server.app.search; package com.cameleer3.server.app.search;
import com.cameleer3.common.model.LogEntry; import com.cameleer3.common.model.LogEntry;
import com.cameleer3.server.core.ingestion.BufferedLogEntry;
import com.cameleer3.server.core.search.LogSearchRequest; import com.cameleer3.server.core.search.LogSearchRequest;
import com.cameleer3.server.core.search.LogSearchResponse; import com.cameleer3.server.core.search.LogSearchResponse;
import com.cameleer3.server.core.storage.LogEntryResult; import com.cameleer3.server.core.storage.LogEntryResult;
@@ -67,6 +68,34 @@ public class ClickHouseLogStore implements LogIndex {
log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId); log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId);
} }
public void insertBufferedBatch(List<BufferedLogEntry> entries) {
if (entries.isEmpty()) return;
String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " +
"logger_name, message, thread_name, stack_trace, exchange_id, mdc) " +
"VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
jdbc.batchUpdate(sql, entries, entries.size(), (ps, ble) -> {
LogEntry entry = ble.entry();
Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now();
ps.setTimestamp(1, Timestamp.from(ts));
ps.setString(2, ble.applicationId());
ps.setString(3, ble.instanceId());
ps.setString(4, entry.getLevel() != null ? entry.getLevel() : "");
ps.setString(5, entry.getLoggerName() != null ? entry.getLoggerName() : "");
ps.setString(6, entry.getMessage() != null ? entry.getMessage() : "");
ps.setString(7, entry.getThreadName() != null ? entry.getThreadName() : "");
ps.setString(8, entry.getStackTrace() != null ? entry.getStackTrace() : "");
Map<String, String> mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap();
String exchangeId = mdc.getOrDefault("camel.exchangeId", "");
ps.setString(9, exchangeId);
ps.setObject(10, mdc);
});
log.debug("Flushed {} buffered log entries to ClickHouse", entries.size());
}
@Override @Override
public LogSearchResponse search(LogSearchRequest request) { public LogSearchResponse search(LogSearchRequest request) {
// Build shared WHERE conditions (used by both data and count queries) // Build shared WHERE conditions (used by both data and count queries)

View File

@@ -1,5 +1,6 @@
package com.cameleer3.server.app.storage; package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.storage.ExecutionStore; import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.common.model.FlatProcessorRecord; import com.cameleer3.common.model.FlatProcessorRecord;
@@ -132,6 +133,60 @@ public class ClickHouseExecutionStore implements ExecutionStore {
}).toList()); }).toList());
} }
public void insertProcessorBatches(List<ChunkAccumulator.ProcessorBatch> batches) {
if (batches.isEmpty()) return;
List<Object[]> allRows = batches.stream()
.flatMap(batch -> batch.processors().stream().map(p -> new Object[]{
nullToEmpty(batch.tenantId()),
nullToEmpty(batch.executionId()),
p.getSeq(),
p.getParentSeq(),
nullToEmpty(p.getParentProcessorId()),
nullToEmpty(p.getProcessorId()),
nullToEmpty(p.getProcessorType()),
Timestamp.from(p.getStartTime() != null ? p.getStartTime() : batch.execStartTime()),
nullToEmpty(batch.routeId()),
nullToEmpty(batch.applicationId()),
p.getIteration(),
p.getIterationSize(),
p.getStatus() != null ? p.getStatus().name() : "",
computeEndTime(p.getStartTime(), p.getDurationMs()),
p.getDurationMs(),
nullToEmpty(p.getErrorMessage()),
nullToEmpty(p.getErrorStackTrace()),
nullToEmpty(p.getErrorType()),
nullToEmpty(p.getErrorCategory()),
nullToEmpty(p.getRootCauseType()),
nullToEmpty(p.getRootCauseMessage()),
nullToEmpty(p.getInputBody()),
nullToEmpty(p.getOutputBody()),
mapToJson(p.getInputHeaders()),
mapToJson(p.getOutputHeaders()),
mapToJson(p.getAttributes()),
nullToEmpty(p.getResolvedEndpointUri()),
nullToEmpty(p.getCircuitBreakerState()),
boolOrFalse(p.getFallbackTriggered()),
boolOrFalse(p.getFilterMatched()),
boolOrFalse(p.getDuplicateMessage())
}))
.toList();
jdbc.batchUpdate("""
INSERT INTO processor_executions (
tenant_id, execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time, route_id, application_id,
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", allRows);
}
// --- ExecutionStore interface: read methods --- // --- ExecutionStore interface: read methods ---
@Override @Override

View File

@@ -0,0 +1,12 @@
package com.cameleer3.server.core.ingestion;
import com.cameleer3.common.model.LogEntry;
/**
* A log entry paired with its agent metadata, ready for buffered ClickHouse insertion.
*/
public record BufferedLogEntry(
String instanceId,
String applicationId,
LogEntry entry
) {}

View File

@@ -182,10 +182,6 @@ data:
<max_block_size>8192</max_block_size> <max_block_size>8192</max_block_size>
<queue_max_wait_ms>1000</queue_max_wait_ms> <queue_max_wait_ms>1000</queue_max_wait_ms>
<max_execution_time>600</max_execution_time> <max_execution_time>600</max_execution_time>
<!-- Buffer small inserts server-side before creating parts -->
<async_insert>1</async_insert>
<wait_for_async_insert>0</wait_for_async_insert>
<async_insert_busy_timeout_ms>5000</async_insert_busy_timeout_ms>
<!-- Disable parallel parse/format to reduce per-query memory --> <!-- Disable parallel parse/format to reduce per-query memory -->
<input_format_parallel_parsing>0</input_format_parallel_parsing> <input_format_parallel_parsing>0</input_format_parallel_parsing>
<output_format_parallel_formatting>0</output_format_parallel_formatting> <output_format_parallel_formatting>0</output_format_parallel_formatting>