diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java new file mode 100644 index 00000000..21ed7602 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java @@ -0,0 +1,68 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * Ingestion endpoint for execution chunk data (ClickHouse pipeline). + *

+ * Accepts single or array {@link ExecutionChunk} payloads and feeds them + * into the {@link ChunkAccumulator}. Only active when + * {@code clickhouse.enabled=true} (conditional on the accumulator bean). + */ +@RestController +@RequestMapping("/api/v1/data") +@ConditionalOnBean(ChunkAccumulator.class) +@Tag(name = "Ingestion", description = "Data ingestion endpoints") +public class ChunkIngestionController { + + private static final Logger log = LoggerFactory.getLogger(ChunkIngestionController.class); + + private final ChunkAccumulator accumulator; + private final ObjectMapper objectMapper; + + public ChunkIngestionController(ChunkAccumulator accumulator) { + this.accumulator = accumulator; + this.objectMapper = new ObjectMapper(); + this.objectMapper.registerModule(new JavaTimeModule()); + } + + @PostMapping("/chunks") + @Operation(summary = "Ingest execution chunk") + public ResponseEntity ingestChunks(@RequestBody String body) { + try { + String trimmed = body.strip(); + List chunks; + if (trimmed.startsWith("[")) { + chunks = objectMapper.readValue(trimmed, new TypeReference<>() {}); + } else { + ExecutionChunk single = objectMapper.readValue(trimmed, ExecutionChunk.class); + chunks = List.of(single); + } + + for (ExecutionChunk chunk : chunks) { + accumulator.onChunk(chunk); + } + + return ResponseEntity.accepted().build(); + } catch (Exception e) { + log.warn("Failed to parse execution chunk payload: {}", e.getMessage()); + return ResponseEntity.badRequest().build(); + } + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java new file mode 100644 index 00000000..31ce0fe4 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java @@ -0,0 +1,136 @@ +package com.cameleer3.server.app.ingestion; + +import com.cameleer3.server.app.config.IngestionConfig; +import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.ingestion.WriteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; +import org.springframework.scheduling.annotation.Scheduled; + +import java.util.List; + +/** + * Scheduled flush task for ClickHouse execution and processor write buffers. + *

+ * Drains both buffers on a fixed interval and delegates batch inserts to + * {@link ClickHouseExecutionStore}. Also periodically sweeps stale exchanges + * from the {@link ChunkAccumulator}. + *

+ * Not a {@code @Component} — instantiated as a {@code @Bean} in StorageBeanConfig. + */ +public class ExecutionFlushScheduler implements SmartLifecycle { + + private static final Logger log = LoggerFactory.getLogger(ExecutionFlushScheduler.class); + + private final WriteBuffer executionBuffer; + private final WriteBuffer processorBuffer; + private final ClickHouseExecutionStore executionStore; + private final ChunkAccumulator accumulator; + private final int batchSize; + private volatile boolean running = false; + + public ExecutionFlushScheduler(WriteBuffer executionBuffer, + WriteBuffer processorBuffer, + ClickHouseExecutionStore executionStore, + ChunkAccumulator accumulator, + IngestionConfig config) { + this.executionBuffer = executionBuffer; + this.processorBuffer = processorBuffer; + this.executionStore = executionStore; + this.accumulator = accumulator; + this.batchSize = config.getBatchSize(); + } + + @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}") + public void flush() { + try { + List executions = executionBuffer.drain(batchSize); + if (!executions.isEmpty()) { + executionStore.insertExecutionBatch(executions); + log.debug("Flushed {} executions to ClickHouse", executions.size()); + } + } catch (Exception e) { + log.error("Failed to flush executions", e); + } + + try { + List batches = processorBuffer.drain(batchSize); + for (ChunkAccumulator.ProcessorBatch batch : batches) { + executionStore.insertProcessorBatch( + batch.tenantId(), + batch.executionId(), + batch.routeId(), + batch.applicationName(), + batch.execStartTime(), + batch.processors()); + } + if (!batches.isEmpty()) { + log.debug("Flushed {} processor batches to ClickHouse", batches.size()); + } + } catch (Exception e) { + log.error("Failed to flush processor batches", e); + } + } + + @Scheduled(fixedDelay = 60_000) + public void sweepStale() { + try { + accumulator.sweepStale(); + } catch (Exception e) { + log.error("Failed to sweep stale exchanges", e); + } + } + + @Override + public void start() { + running = true; + } + + @Override + public void stop() { + // Drain remaining executions on shutdown + while (executionBuffer.size() > 0) { + List batch = executionBuffer.drain(batchSize); + if (batch.isEmpty()) break; + try { + executionStore.insertExecutionBatch(batch); + } catch (Exception e) { + log.error("Failed to flush executions during shutdown", e); + break; + } + } + // Drain remaining processor batches on shutdown + while (processorBuffer.size() > 0) { + List batches = processorBuffer.drain(batchSize); + if (batches.isEmpty()) break; + try { + for (ChunkAccumulator.ProcessorBatch batch : batches) { + executionStore.insertProcessorBatch( + batch.tenantId(), + batch.executionId(), + batch.routeId(), + batch.applicationName(), + batch.execStartTime(), + batch.processors()); + } + } catch (Exception e) { + log.error("Failed to flush processor batches during shutdown", e); + break; + } + } + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + public int getPhase() { + return Integer.MAX_VALUE - 1; + } +}