feat(01-02): add IngestionService, ClickHouse repositories, and flush scheduler

- IngestionService routes data to WriteBuffer instances (core module, plain class)
- ClickHouseExecutionRepository: batch insert with parallel processor arrays
- ClickHouseDiagramRepository: JSON storage with SHA-256 content-hash dedup
- ClickHouseMetricsRepository: batch insert for agent_metrics table
- ClickHouseFlushScheduler: scheduled drain with SmartLifecycle shutdown flush
- IngestionBeanConfig: wires WriteBuffer and IngestionService beans

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-11 12:08:36 +01:00
parent ff0af0ef2f
commit 17a18cf6da
6 changed files with 604 additions and 0 deletions

View File

@@ -0,0 +1,41 @@
package com.cameleer3.server.app.config;
import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Creates the write buffer and ingestion service beans.
* <p>
* The {@link WriteBuffer} instances are shared between the
* {@link IngestionService} (producer side) and the flush scheduler (consumer side).
*/
@Configuration
public class IngestionBeanConfig {
@Bean
public WriteBuffer<RouteExecution> executionBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
@Bean
public WriteBuffer<RouteGraph> diagramBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
@Bean
public WriteBuffer<MetricsSnapshot> metricsBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
@Bean
public IngestionService ingestionService(WriteBuffer<RouteExecution> executionBuffer,
WriteBuffer<RouteGraph> diagramBuffer,
WriteBuffer<MetricsSnapshot> metricsBuffer) {
return new IngestionService(executionBuffer, diagramBuffer, metricsBuffer);
}
}

View File

@@ -0,0 +1,159 @@
package com.cameleer3.server.app.ingestion;
import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.app.config.IngestionConfig;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.DiagramRepository;
import com.cameleer3.server.core.storage.ExecutionRepository;
import com.cameleer3.server.core.storage.MetricsRepository;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Scheduled task that drains the write buffers and batch-inserts into ClickHouse.
* <p>
* Implements {@link SmartLifecycle} to ensure all remaining buffered data is
* flushed on application shutdown.
*/
@Component
public class ClickHouseFlushScheduler implements SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(ClickHouseFlushScheduler.class);
private final WriteBuffer<RouteExecution> executionBuffer;
private final WriteBuffer<RouteGraph> diagramBuffer;
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
private final ExecutionRepository executionRepository;
private final DiagramRepository diagramRepository;
private final MetricsRepository metricsRepository;
private final int batchSize;
private volatile boolean running = false;
public ClickHouseFlushScheduler(WriteBuffer<RouteExecution> executionBuffer,
WriteBuffer<RouteGraph> diagramBuffer,
WriteBuffer<MetricsSnapshot> metricsBuffer,
ExecutionRepository executionRepository,
DiagramRepository diagramRepository,
MetricsRepository metricsRepository,
IngestionConfig config) {
this.executionBuffer = executionBuffer;
this.diagramBuffer = diagramBuffer;
this.metricsBuffer = metricsBuffer;
this.executionRepository = executionRepository;
this.diagramRepository = diagramRepository;
this.metricsRepository = metricsRepository;
this.batchSize = config.getBatchSize();
}
@Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
public void flushAll() {
flushExecutions();
flushDiagrams();
flushMetrics();
}
private void flushExecutions() {
try {
List<RouteExecution> batch = executionBuffer.drain(batchSize);
if (!batch.isEmpty()) {
executionRepository.insertBatch(batch);
log.debug("Flushed {} executions to ClickHouse", batch.size());
}
} catch (Exception e) {
log.error("Failed to flush executions to ClickHouse", e);
}
}
private void flushDiagrams() {
try {
List<RouteGraph> batch = diagramBuffer.drain(batchSize);
for (RouteGraph graph : batch) {
diagramRepository.store(graph);
}
if (!batch.isEmpty()) {
log.debug("Flushed {} diagrams to ClickHouse", batch.size());
}
} catch (Exception e) {
log.error("Failed to flush diagrams to ClickHouse", e);
}
}
private void flushMetrics() {
try {
List<MetricsSnapshot> batch = metricsBuffer.drain(batchSize);
if (!batch.isEmpty()) {
metricsRepository.insertBatch(batch);
log.debug("Flushed {} metrics to ClickHouse", batch.size());
}
} catch (Exception e) {
log.error("Failed to flush metrics to ClickHouse", e);
}
}
// SmartLifecycle -- flush remaining data on shutdown
@Override
public void start() {
running = true;
log.info("ClickHouseFlushScheduler started");
}
@Override
public void stop() {
log.info("ClickHouseFlushScheduler stopping -- flushing remaining data");
drainAll();
running = false;
}
@Override
public boolean isRunning() {
return running;
}
@Override
public int getPhase() {
// Run after most beans but before DataSource shutdown
return Integer.MAX_VALUE - 1;
}
/**
* Drain all buffers completely (loop until empty).
*/
private void drainAll() {
drainBufferCompletely("executions", executionBuffer, batch -> executionRepository.insertBatch(batch));
drainBufferCompletely("diagrams", diagramBuffer, batch -> {
for (RouteGraph g : batch) {
diagramRepository.store(g);
}
});
drainBufferCompletely("metrics", metricsBuffer, batch -> metricsRepository.insertBatch(batch));
}
private <T> void drainBufferCompletely(String name, WriteBuffer<T> buffer, java.util.function.Consumer<List<T>> inserter) {
int total = 0;
while (buffer.size() > 0) {
List<T> batch = buffer.drain(batchSize);
if (batch.isEmpty()) {
break;
}
try {
inserter.accept(batch);
total += batch.size();
} catch (Exception e) {
log.error("Failed to flush remaining {} during shutdown", name, e);
break;
}
}
if (total > 0) {
log.info("Flushed {} remaining {} during shutdown", total, name);
}
}
}

View File

@@ -0,0 +1,105 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.server.core.storage.DiagramRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* ClickHouse implementation of {@link DiagramRepository}.
* <p>
* Stores route graphs as JSON with SHA-256 content-hash deduplication.
* The underlying table uses ReplacingMergeTree keyed on content_hash.
*/
@Repository
public class ClickHouseDiagramRepository implements DiagramRepository {
private static final Logger log = LoggerFactory.getLogger(ClickHouseDiagramRepository.class);
private static final String INSERT_SQL = """
INSERT INTO route_diagrams (content_hash, route_id, agent_id, definition)
VALUES (?, ?, ?, ?)
""";
private static final String SELECT_BY_HASH = """
SELECT definition FROM route_diagrams WHERE content_hash = ? LIMIT 1
""";
private static final String SELECT_HASH_FOR_ROUTE = """
SELECT content_hash FROM route_diagrams
WHERE route_id = ? AND agent_id = ?
ORDER BY created_at DESC LIMIT 1
""";
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
public ClickHouseDiagramRepository(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
}
@Override
public void store(RouteGraph graph) {
try {
String json = objectMapper.writeValueAsString(graph);
String contentHash = sha256Hex(json);
String routeId = graph.getRouteId() != null ? graph.getRouteId() : "";
// agent_id is not part of RouteGraph -- set empty, controllers can enrich
String agentId = "";
jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, json);
log.debug("Stored diagram for route={} with hash={}", routeId, contentHash);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize RouteGraph to JSON", e);
}
}
@Override
public Optional<RouteGraph> findByContentHash(String contentHash) {
List<Map<String, Object>> rows = jdbcTemplate.queryForList(SELECT_BY_HASH, contentHash);
if (rows.isEmpty()) {
return Optional.empty();
}
String json = (String) rows.get(0).get("definition");
try {
return Optional.of(objectMapper.readValue(json, RouteGraph.class));
} catch (JsonProcessingException e) {
log.error("Failed to deserialize RouteGraph from ClickHouse", e);
return Optional.empty();
}
}
@Override
public Optional<String> findContentHashForRoute(String routeId, String agentId) {
List<Map<String, Object>> rows = jdbcTemplate.queryForList(SELECT_HASH_FOR_ROUTE, routeId, agentId);
if (rows.isEmpty()) {
return Optional.empty();
}
return Optional.of((String) rows.get(0).get("content_hash"));
}
static String sha256Hex(String input) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
return HexFormat.of().formatHex(hash);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("SHA-256 not available", e);
}
}
}

View File

@@ -0,0 +1,117 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.common.model.ProcessorExecution;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.storage.ExecutionRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
/**
* ClickHouse implementation of {@link ExecutionRepository}.
* <p>
* Performs batch inserts into the {@code route_executions} table.
* Processor executions are flattened into parallel arrays.
*/
@Repository
public class ClickHouseExecutionRepository implements ExecutionRepository {
private static final Logger log = LoggerFactory.getLogger(ClickHouseExecutionRepository.class);
private static final String INSERT_SQL = """
INSERT INTO route_executions (
execution_id, route_id, agent_id, status, start_time, end_time,
duration_ms, correlation_id, exchange_id, error_message, error_stacktrace,
processor_ids, processor_types, processor_starts, processor_ends,
processor_durations, processor_statuses
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""";
private final JdbcTemplate jdbcTemplate;
public ClickHouseExecutionRepository(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void insertBatch(List<RouteExecution> executions) {
if (executions.isEmpty()) {
return;
}
jdbcTemplate.batchUpdate(INSERT_SQL, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
RouteExecution exec = executions.get(i);
List<ProcessorExecution> processors = flattenProcessors(exec.getProcessors());
ps.setString(1, UUID.randomUUID().toString());
ps.setString(2, nullSafe(exec.getRouteId()));
ps.setString(3, ""); // agent_id set by controller header or empty
ps.setString(4, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING");
ps.setObject(5, toTimestamp(exec.getStartTime()));
ps.setObject(6, toTimestamp(exec.getEndTime()));
ps.setLong(7, exec.getDurationMs());
ps.setString(8, nullSafe(exec.getCorrelationId()));
ps.setString(9, nullSafe(exec.getExchangeId()));
ps.setString(10, nullSafe(exec.getErrorMessage()));
ps.setString(11, nullSafe(exec.getErrorStackTrace()));
// Parallel arrays for processor executions
ps.setObject(12, processors.stream().map(p -> nullSafe(p.getProcessorId())).toArray(String[]::new));
ps.setObject(13, processors.stream().map(p -> nullSafe(p.getProcessorType())).toArray(String[]::new));
ps.setObject(14, processors.stream().map(p -> toTimestamp(p.getStartTime())).toArray(Timestamp[]::new));
ps.setObject(15, processors.stream().map(p -> toTimestamp(p.getEndTime())).toArray(Timestamp[]::new));
ps.setObject(16, processors.stream().mapToLong(ProcessorExecution::getDurationMs).boxed().toArray(Long[]::new));
ps.setObject(17, processors.stream().map(p -> p.getStatus() != null ? p.getStatus().name() : "RUNNING").toArray(String[]::new));
}
@Override
public int getBatchSize() {
return executions.size();
}
});
log.debug("Inserted batch of {} route executions into ClickHouse", executions.size());
}
/**
* Flatten the processor tree into a flat list (depth-first).
*/
private List<ProcessorExecution> flattenProcessors(List<ProcessorExecution> processors) {
if (processors == null || processors.isEmpty()) {
return List.of();
}
var result = new java.util.ArrayList<ProcessorExecution>();
for (ProcessorExecution p : processors) {
flatten(p, result);
}
return result;
}
private void flatten(ProcessorExecution processor, List<ProcessorExecution> result) {
result.add(processor);
if (processor.getChildren() != null) {
for (ProcessorExecution child : processor.getChildren()) {
flatten(child, result);
}
}
}
private static String nullSafe(String value) {
return value != null ? value : "";
}
private static Timestamp toTimestamp(Instant instant) {
return instant != null ? Timestamp.from(instant) : Timestamp.from(Instant.EPOCH);
}
}

View File

@@ -0,0 +1,67 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.storage.MetricsRepository;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* ClickHouse implementation of {@link MetricsRepository}.
* <p>
* Performs batch inserts into the {@code agent_metrics} table.
*/
@Repository
public class ClickHouseMetricsRepository implements MetricsRepository {
private static final Logger log = LoggerFactory.getLogger(ClickHouseMetricsRepository.class);
private static final String INSERT_SQL = """
INSERT INTO agent_metrics (agent_id, collected_at, metric_name, metric_value, tags)
VALUES (?, ?, ?, ?, ?)
""";
private final JdbcTemplate jdbcTemplate;
public ClickHouseMetricsRepository(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void insertBatch(List<MetricsSnapshot> metrics) {
if (metrics.isEmpty()) {
return;
}
jdbcTemplate.batchUpdate(INSERT_SQL, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
MetricsSnapshot m = metrics.get(i);
ps.setString(1, m.agentId() != null ? m.agentId() : "");
ps.setObject(2, m.collectedAt() != null ? Timestamp.from(m.collectedAt()) : Timestamp.from(Instant.EPOCH));
ps.setString(3, m.metricName() != null ? m.metricName() : "");
ps.setDouble(4, m.metricValue());
// ClickHouse Map(String, String) -- pass as a java.util.Map
Map<String, String> tags = m.tags() != null ? m.tags() : new HashMap<>();
ps.setObject(5, tags);
}
@Override
public int getBatchSize() {
return metrics.size();
}
});
log.debug("Inserted batch of {} metrics into ClickHouse", metrics.size());
}
}

View File

@@ -0,0 +1,115 @@
package com.cameleer3.server.core.ingestion;
import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import java.util.List;
/**
* Routes incoming data to the appropriate {@link WriteBuffer} instances.
* <p>
* This is a plain class (no Spring annotations) -- it lives in the core module
* and is wired as a bean by the app module configuration.
*/
public class IngestionService {
private final WriteBuffer<RouteExecution> executionBuffer;
private final WriteBuffer<RouteGraph> diagramBuffer;
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
public IngestionService(WriteBuffer<RouteExecution> executionBuffer,
WriteBuffer<RouteGraph> diagramBuffer,
WriteBuffer<MetricsSnapshot> metricsBuffer) {
this.executionBuffer = executionBuffer;
this.diagramBuffer = diagramBuffer;
this.metricsBuffer = metricsBuffer;
}
/**
* Accept a batch of route executions into the buffer.
*
* @return true if all items were buffered, false if buffer is full (backpressure)
*/
public boolean acceptExecutions(List<RouteExecution> executions) {
return executionBuffer.offerBatch(executions);
}
/**
* Accept a single route execution into the buffer.
*
* @return true if the item was buffered, false if buffer is full (backpressure)
*/
public boolean acceptExecution(RouteExecution execution) {
return executionBuffer.offer(execution);
}
/**
* Accept a single route diagram into the buffer.
*
* @return true if the item was buffered, false if buffer is full (backpressure)
*/
public boolean acceptDiagram(RouteGraph graph) {
return diagramBuffer.offer(graph);
}
/**
* Accept a batch of route diagrams into the buffer.
*
* @return true if all items were buffered, false if buffer is full (backpressure)
*/
public boolean acceptDiagrams(List<RouteGraph> graphs) {
return diagramBuffer.offerBatch(graphs);
}
/**
* Accept a batch of metrics snapshots into the buffer.
*
* @return true if all items were buffered, false if buffer is full (backpressure)
*/
public boolean acceptMetrics(List<MetricsSnapshot> metrics) {
return metricsBuffer.offerBatch(metrics);
}
/**
* @return current number of items in the execution buffer
*/
public int getExecutionBufferDepth() {
return executionBuffer.size();
}
/**
* @return current number of items in the diagram buffer
*/
public int getDiagramBufferDepth() {
return diagramBuffer.size();
}
/**
* @return current number of items in the metrics buffer
*/
public int getMetricsBufferDepth() {
return metricsBuffer.size();
}
/**
* @return the execution write buffer (for use by flush scheduler)
*/
public WriteBuffer<RouteExecution> getExecutionBuffer() {
return executionBuffer;
}
/**
* @return the diagram write buffer (for use by flush scheduler)
*/
public WriteBuffer<RouteGraph> getDiagramBuffer() {
return diagramBuffer;
}
/**
* @return the metrics write buffer (for use by flush scheduler)
*/
public WriteBuffer<MetricsSnapshot> getMetricsBuffer() {
return metricsBuffer;
}
}