feat(clickhouse): add ExecutionFlushScheduler and ChunkIngestionController

ExecutionFlushScheduler drains MergedExecution and ProcessorBatch write
buffers on a fixed interval and delegates batch inserts to
ClickHouseExecutionStore. Also sweeps stale exchanges every 60s.

ChunkIngestionController exposes POST /api/v1/data/chunks, accepts
single or array ExecutionChunk payloads, and feeds them into the
ChunkAccumulator. Conditional on ChunkAccumulator bean (clickhouse.enabled).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-31 19:12:38 +02:00
parent 62420cf0c2
commit 776f2ce90d
2 changed files with 204 additions and 0 deletions

View File

@@ -0,0 +1,68 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.server.core.storage.model.ExecutionChunk;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* Ingestion endpoint for execution chunk data (ClickHouse pipeline).
* <p>
* Accepts single or array {@link ExecutionChunk} payloads and feeds them
* into the {@link ChunkAccumulator}. Only active when
* {@code clickhouse.enabled=true} (conditional on the accumulator bean).
*/
@RestController
@RequestMapping("/api/v1/data")
@ConditionalOnBean(ChunkAccumulator.class)
@Tag(name = "Ingestion", description = "Data ingestion endpoints")
public class ChunkIngestionController {
private static final Logger log = LoggerFactory.getLogger(ChunkIngestionController.class);
private final ChunkAccumulator accumulator;
private final ObjectMapper objectMapper;
public ChunkIngestionController(ChunkAccumulator accumulator) {
this.accumulator = accumulator;
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
}
@PostMapping("/chunks")
@Operation(summary = "Ingest execution chunk")
public ResponseEntity<Void> ingestChunks(@RequestBody String body) {
try {
String trimmed = body.strip();
List<ExecutionChunk> chunks;
if (trimmed.startsWith("[")) {
chunks = objectMapper.readValue(trimmed, new TypeReference<>() {});
} else {
ExecutionChunk single = objectMapper.readValue(trimmed, ExecutionChunk.class);
chunks = List.of(single);
}
for (ExecutionChunk chunk : chunks) {
accumulator.onChunk(chunk);
}
return ResponseEntity.accepted().build();
} catch (Exception e) {
log.warn("Failed to parse execution chunk payload: {}", e.getMessage());
return ResponseEntity.badRequest().build();
}
}
}

View File

@@ -0,0 +1,136 @@
package com.cameleer3.server.app.ingestion;
import com.cameleer3.server.app.config.IngestionConfig;
import com.cameleer3.server.app.storage.ClickHouseExecutionStore;
import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.List;
/**
* Scheduled flush task for ClickHouse execution and processor write buffers.
* <p>
* Drains both buffers on a fixed interval and delegates batch inserts to
* {@link ClickHouseExecutionStore}. Also periodically sweeps stale exchanges
* from the {@link ChunkAccumulator}.
* <p>
* Not a {@code @Component} — instantiated as a {@code @Bean} in StorageBeanConfig.
*/
public class ExecutionFlushScheduler implements SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(ExecutionFlushScheduler.class);
private final WriteBuffer<MergedExecution> executionBuffer;
private final WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer;
private final ClickHouseExecutionStore executionStore;
private final ChunkAccumulator accumulator;
private final int batchSize;
private volatile boolean running = false;
public ExecutionFlushScheduler(WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer,
ClickHouseExecutionStore executionStore,
ChunkAccumulator accumulator,
IngestionConfig config) {
this.executionBuffer = executionBuffer;
this.processorBuffer = processorBuffer;
this.executionStore = executionStore;
this.accumulator = accumulator;
this.batchSize = config.getBatchSize();
}
@Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
public void flush() {
try {
List<MergedExecution> executions = executionBuffer.drain(batchSize);
if (!executions.isEmpty()) {
executionStore.insertExecutionBatch(executions);
log.debug("Flushed {} executions to ClickHouse", executions.size());
}
} catch (Exception e) {
log.error("Failed to flush executions", e);
}
try {
List<ChunkAccumulator.ProcessorBatch> batches = processorBuffer.drain(batchSize);
for (ChunkAccumulator.ProcessorBatch batch : batches) {
executionStore.insertProcessorBatch(
batch.tenantId(),
batch.executionId(),
batch.routeId(),
batch.applicationName(),
batch.execStartTime(),
batch.processors());
}
if (!batches.isEmpty()) {
log.debug("Flushed {} processor batches to ClickHouse", batches.size());
}
} catch (Exception e) {
log.error("Failed to flush processor batches", e);
}
}
@Scheduled(fixedDelay = 60_000)
public void sweepStale() {
try {
accumulator.sweepStale();
} catch (Exception e) {
log.error("Failed to sweep stale exchanges", e);
}
}
@Override
public void start() {
running = true;
}
@Override
public void stop() {
// Drain remaining executions on shutdown
while (executionBuffer.size() > 0) {
List<MergedExecution> batch = executionBuffer.drain(batchSize);
if (batch.isEmpty()) break;
try {
executionStore.insertExecutionBatch(batch);
} catch (Exception e) {
log.error("Failed to flush executions during shutdown", e);
break;
}
}
// Drain remaining processor batches on shutdown
while (processorBuffer.size() > 0) {
List<ChunkAccumulator.ProcessorBatch> batches = processorBuffer.drain(batchSize);
if (batches.isEmpty()) break;
try {
for (ChunkAccumulator.ProcessorBatch batch : batches) {
executionStore.insertProcessorBatch(
batch.tenantId(),
batch.executionId(),
batch.routeId(),
batch.applicationName(),
batch.execStartTime(),
batch.processors());
}
} catch (Exception e) {
log.error("Failed to flush processor batches during shutdown", e);
break;
}
}
running = false;
}
@Override
public boolean isRunning() {
return running;
}
@Override
public int getPhase() {
return Integer.MAX_VALUE - 1;
}
}