diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java new file mode 100644 index 00000000..e33e5648 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java @@ -0,0 +1,41 @@ +package com.cameleer3.server.app.config; + +import com.cameleer3.common.graph.RouteGraph; +import com.cameleer3.common.model.RouteExecution; +import com.cameleer3.server.core.ingestion.IngestionService; +import com.cameleer3.server.core.ingestion.WriteBuffer; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Creates the write buffer and ingestion service beans. + *

+ * The {@link WriteBuffer} instances are shared between the + * {@link IngestionService} (producer side) and the flush scheduler (consumer side). + */ +@Configuration +public class IngestionBeanConfig { + + @Bean + public WriteBuffer executionBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); + } + + @Bean + public WriteBuffer diagramBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); + } + + @Bean + public WriteBuffer metricsBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); + } + + @Bean + public IngestionService ingestionService(WriteBuffer executionBuffer, + WriteBuffer diagramBuffer, + WriteBuffer metricsBuffer) { + return new IngestionService(executionBuffer, diagramBuffer, metricsBuffer); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java new file mode 100644 index 00000000..aa42083f --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java @@ -0,0 +1,159 @@ +package com.cameleer3.server.app.ingestion; + +import com.cameleer3.common.graph.RouteGraph; +import com.cameleer3.common.model.RouteExecution; +import com.cameleer3.server.app.config.IngestionConfig; +import com.cameleer3.server.core.ingestion.WriteBuffer; +import com.cameleer3.server.core.storage.DiagramRepository; +import com.cameleer3.server.core.storage.ExecutionRepository; +import com.cameleer3.server.core.storage.MetricsRepository; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * Scheduled task that drains the write buffers and batch-inserts into ClickHouse. + *

+ * Implements {@link SmartLifecycle} to ensure all remaining buffered data is + * flushed on application shutdown. + */ +@Component +public class ClickHouseFlushScheduler implements SmartLifecycle { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseFlushScheduler.class); + + private final WriteBuffer executionBuffer; + private final WriteBuffer diagramBuffer; + private final WriteBuffer metricsBuffer; + private final ExecutionRepository executionRepository; + private final DiagramRepository diagramRepository; + private final MetricsRepository metricsRepository; + private final int batchSize; + + private volatile boolean running = false; + + public ClickHouseFlushScheduler(WriteBuffer executionBuffer, + WriteBuffer diagramBuffer, + WriteBuffer metricsBuffer, + ExecutionRepository executionRepository, + DiagramRepository diagramRepository, + MetricsRepository metricsRepository, + IngestionConfig config) { + this.executionBuffer = executionBuffer; + this.diagramBuffer = diagramBuffer; + this.metricsBuffer = metricsBuffer; + this.executionRepository = executionRepository; + this.diagramRepository = diagramRepository; + this.metricsRepository = metricsRepository; + this.batchSize = config.getBatchSize(); + } + + @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}") + public void flushAll() { + flushExecutions(); + flushDiagrams(); + flushMetrics(); + } + + private void flushExecutions() { + try { + List batch = executionBuffer.drain(batchSize); + if (!batch.isEmpty()) { + executionRepository.insertBatch(batch); + log.debug("Flushed {} executions to ClickHouse", batch.size()); + } + } catch (Exception e) { + log.error("Failed to flush executions to ClickHouse", e); + } + } + + private void flushDiagrams() { + try { + List batch = diagramBuffer.drain(batchSize); + for (RouteGraph graph : batch) { + diagramRepository.store(graph); + } + if (!batch.isEmpty()) { + log.debug("Flushed {} diagrams to ClickHouse", batch.size()); + } + } catch (Exception e) { + log.error("Failed to flush diagrams to ClickHouse", e); + } + } + + private void flushMetrics() { + try { + List batch = metricsBuffer.drain(batchSize); + if (!batch.isEmpty()) { + metricsRepository.insertBatch(batch); + log.debug("Flushed {} metrics to ClickHouse", batch.size()); + } + } catch (Exception e) { + log.error("Failed to flush metrics to ClickHouse", e); + } + } + + // SmartLifecycle -- flush remaining data on shutdown + + @Override + public void start() { + running = true; + log.info("ClickHouseFlushScheduler started"); + } + + @Override + public void stop() { + log.info("ClickHouseFlushScheduler stopping -- flushing remaining data"); + drainAll(); + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + public int getPhase() { + // Run after most beans but before DataSource shutdown + return Integer.MAX_VALUE - 1; + } + + /** + * Drain all buffers completely (loop until empty). + */ + private void drainAll() { + drainBufferCompletely("executions", executionBuffer, batch -> executionRepository.insertBatch(batch)); + drainBufferCompletely("diagrams", diagramBuffer, batch -> { + for (RouteGraph g : batch) { + diagramRepository.store(g); + } + }); + drainBufferCompletely("metrics", metricsBuffer, batch -> metricsRepository.insertBatch(batch)); + } + + private void drainBufferCompletely(String name, WriteBuffer buffer, java.util.function.Consumer> inserter) { + int total = 0; + while (buffer.size() > 0) { + List batch = buffer.drain(batchSize); + if (batch.isEmpty()) { + break; + } + try { + inserter.accept(batch); + total += batch.size(); + } catch (Exception e) { + log.error("Failed to flush remaining {} during shutdown", name, e); + break; + } + } + if (total > 0) { + log.info("Flushed {} remaining {} during shutdown", total, name); + } + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramRepository.java new file mode 100644 index 00000000..ff1fd28b --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramRepository.java @@ -0,0 +1,105 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.common.graph.RouteGraph; +import com.cameleer3.server.core.storage.DiagramRepository; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.HexFormat; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * ClickHouse implementation of {@link DiagramRepository}. + *

+ * Stores route graphs as JSON with SHA-256 content-hash deduplication. + * The underlying table uses ReplacingMergeTree keyed on content_hash. + */ +@Repository +public class ClickHouseDiagramRepository implements DiagramRepository { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseDiagramRepository.class); + + private static final String INSERT_SQL = """ + INSERT INTO route_diagrams (content_hash, route_id, agent_id, definition) + VALUES (?, ?, ?, ?) + """; + + private static final String SELECT_BY_HASH = """ + SELECT definition FROM route_diagrams WHERE content_hash = ? LIMIT 1 + """; + + private static final String SELECT_HASH_FOR_ROUTE = """ + SELECT content_hash FROM route_diagrams + WHERE route_id = ? AND agent_id = ? + ORDER BY created_at DESC LIMIT 1 + """; + + private final JdbcTemplate jdbcTemplate; + private final ObjectMapper objectMapper; + + public ClickHouseDiagramRepository(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + this.objectMapper = new ObjectMapper(); + this.objectMapper.registerModule(new JavaTimeModule()); + } + + @Override + public void store(RouteGraph graph) { + try { + String json = objectMapper.writeValueAsString(graph); + String contentHash = sha256Hex(json); + String routeId = graph.getRouteId() != null ? graph.getRouteId() : ""; + // agent_id is not part of RouteGraph -- set empty, controllers can enrich + String agentId = ""; + + jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, json); + log.debug("Stored diagram for route={} with hash={}", routeId, contentHash); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize RouteGraph to JSON", e); + } + } + + @Override + public Optional findByContentHash(String contentHash) { + List> rows = jdbcTemplate.queryForList(SELECT_BY_HASH, contentHash); + if (rows.isEmpty()) { + return Optional.empty(); + } + String json = (String) rows.get(0).get("definition"); + try { + return Optional.of(objectMapper.readValue(json, RouteGraph.class)); + } catch (JsonProcessingException e) { + log.error("Failed to deserialize RouteGraph from ClickHouse", e); + return Optional.empty(); + } + } + + @Override + public Optional findContentHashForRoute(String routeId, String agentId) { + List> rows = jdbcTemplate.queryForList(SELECT_HASH_FOR_ROUTE, routeId, agentId); + if (rows.isEmpty()) { + return Optional.empty(); + } + return Optional.of((String) rows.get(0).get("content_hash")); + } + + static String sha256Hex(String input) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8)); + return HexFormat.of().formatHex(hash); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("SHA-256 not available", e); + } + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java new file mode 100644 index 00000000..0049a0f8 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java @@ -0,0 +1,117 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.common.model.ProcessorExecution; +import com.cameleer3.common.model.RouteExecution; +import com.cameleer3.server.core.storage.ExecutionRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import java.util.UUID; + +/** + * ClickHouse implementation of {@link ExecutionRepository}. + *

+ * Performs batch inserts into the {@code route_executions} table. + * Processor executions are flattened into parallel arrays. + */ +@Repository +public class ClickHouseExecutionRepository implements ExecutionRepository { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseExecutionRepository.class); + + private static final String INSERT_SQL = """ + INSERT INTO route_executions ( + execution_id, route_id, agent_id, status, start_time, end_time, + duration_ms, correlation_id, exchange_id, error_message, error_stacktrace, + processor_ids, processor_types, processor_starts, processor_ends, + processor_durations, processor_statuses + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """; + + private final JdbcTemplate jdbcTemplate; + + public ClickHouseExecutionRepository(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + } + + @Override + public void insertBatch(List executions) { + if (executions.isEmpty()) { + return; + } + + jdbcTemplate.batchUpdate(INSERT_SQL, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + RouteExecution exec = executions.get(i); + List processors = flattenProcessors(exec.getProcessors()); + + ps.setString(1, UUID.randomUUID().toString()); + ps.setString(2, nullSafe(exec.getRouteId())); + ps.setString(3, ""); // agent_id set by controller header or empty + ps.setString(4, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING"); + ps.setObject(5, toTimestamp(exec.getStartTime())); + ps.setObject(6, toTimestamp(exec.getEndTime())); + ps.setLong(7, exec.getDurationMs()); + ps.setString(8, nullSafe(exec.getCorrelationId())); + ps.setString(9, nullSafe(exec.getExchangeId())); + ps.setString(10, nullSafe(exec.getErrorMessage())); + ps.setString(11, nullSafe(exec.getErrorStackTrace())); + + // Parallel arrays for processor executions + ps.setObject(12, processors.stream().map(p -> nullSafe(p.getProcessorId())).toArray(String[]::new)); + ps.setObject(13, processors.stream().map(p -> nullSafe(p.getProcessorType())).toArray(String[]::new)); + ps.setObject(14, processors.stream().map(p -> toTimestamp(p.getStartTime())).toArray(Timestamp[]::new)); + ps.setObject(15, processors.stream().map(p -> toTimestamp(p.getEndTime())).toArray(Timestamp[]::new)); + ps.setObject(16, processors.stream().mapToLong(ProcessorExecution::getDurationMs).boxed().toArray(Long[]::new)); + ps.setObject(17, processors.stream().map(p -> p.getStatus() != null ? p.getStatus().name() : "RUNNING").toArray(String[]::new)); + } + + @Override + public int getBatchSize() { + return executions.size(); + } + }); + + log.debug("Inserted batch of {} route executions into ClickHouse", executions.size()); + } + + /** + * Flatten the processor tree into a flat list (depth-first). + */ + private List flattenProcessors(List processors) { + if (processors == null || processors.isEmpty()) { + return List.of(); + } + var result = new java.util.ArrayList(); + for (ProcessorExecution p : processors) { + flatten(p, result); + } + return result; + } + + private void flatten(ProcessorExecution processor, List result) { + result.add(processor); + if (processor.getChildren() != null) { + for (ProcessorExecution child : processor.getChildren()) { + flatten(child, result); + } + } + } + + private static String nullSafe(String value) { + return value != null ? value : ""; + } + + private static Timestamp toTimestamp(Instant instant) { + return instant != null ? Timestamp.from(instant) : Timestamp.from(Instant.EPOCH); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsRepository.java new file mode 100644 index 00000000..a72ea26d --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsRepository.java @@ -0,0 +1,67 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsRepository; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * ClickHouse implementation of {@link MetricsRepository}. + *

+ * Performs batch inserts into the {@code agent_metrics} table. + */ +@Repository +public class ClickHouseMetricsRepository implements MetricsRepository { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseMetricsRepository.class); + + private static final String INSERT_SQL = """ + INSERT INTO agent_metrics (agent_id, collected_at, metric_name, metric_value, tags) + VALUES (?, ?, ?, ?, ?) + """; + + private final JdbcTemplate jdbcTemplate; + + public ClickHouseMetricsRepository(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + } + + @Override + public void insertBatch(List metrics) { + if (metrics.isEmpty()) { + return; + } + + jdbcTemplate.batchUpdate(INSERT_SQL, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + MetricsSnapshot m = metrics.get(i); + ps.setString(1, m.agentId() != null ? m.agentId() : ""); + ps.setObject(2, m.collectedAt() != null ? Timestamp.from(m.collectedAt()) : Timestamp.from(Instant.EPOCH)); + ps.setString(3, m.metricName() != null ? m.metricName() : ""); + ps.setDouble(4, m.metricValue()); + // ClickHouse Map(String, String) -- pass as a java.util.Map + Map tags = m.tags() != null ? m.tags() : new HashMap<>(); + ps.setObject(5, tags); + } + + @Override + public int getBatchSize() { + return metrics.size(); + } + }); + + log.debug("Inserted batch of {} metrics into ClickHouse", metrics.size()); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java new file mode 100644 index 00000000..8ed9afba --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java @@ -0,0 +1,115 @@ +package com.cameleer3.server.core.ingestion; + +import com.cameleer3.common.graph.RouteGraph; +import com.cameleer3.common.model.RouteExecution; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; + +import java.util.List; + +/** + * Routes incoming data to the appropriate {@link WriteBuffer} instances. + *

+ * This is a plain class (no Spring annotations) -- it lives in the core module + * and is wired as a bean by the app module configuration. + */ +public class IngestionService { + + private final WriteBuffer executionBuffer; + private final WriteBuffer diagramBuffer; + private final WriteBuffer metricsBuffer; + + public IngestionService(WriteBuffer executionBuffer, + WriteBuffer diagramBuffer, + WriteBuffer metricsBuffer) { + this.executionBuffer = executionBuffer; + this.diagramBuffer = diagramBuffer; + this.metricsBuffer = metricsBuffer; + } + + /** + * Accept a batch of route executions into the buffer. + * + * @return true if all items were buffered, false if buffer is full (backpressure) + */ + public boolean acceptExecutions(List executions) { + return executionBuffer.offerBatch(executions); + } + + /** + * Accept a single route execution into the buffer. + * + * @return true if the item was buffered, false if buffer is full (backpressure) + */ + public boolean acceptExecution(RouteExecution execution) { + return executionBuffer.offer(execution); + } + + /** + * Accept a single route diagram into the buffer. + * + * @return true if the item was buffered, false if buffer is full (backpressure) + */ + public boolean acceptDiagram(RouteGraph graph) { + return diagramBuffer.offer(graph); + } + + /** + * Accept a batch of route diagrams into the buffer. + * + * @return true if all items were buffered, false if buffer is full (backpressure) + */ + public boolean acceptDiagrams(List graphs) { + return diagramBuffer.offerBatch(graphs); + } + + /** + * Accept a batch of metrics snapshots into the buffer. + * + * @return true if all items were buffered, false if buffer is full (backpressure) + */ + public boolean acceptMetrics(List metrics) { + return metricsBuffer.offerBatch(metrics); + } + + /** + * @return current number of items in the execution buffer + */ + public int getExecutionBufferDepth() { + return executionBuffer.size(); + } + + /** + * @return current number of items in the diagram buffer + */ + public int getDiagramBufferDepth() { + return diagramBuffer.size(); + } + + /** + * @return current number of items in the metrics buffer + */ + public int getMetricsBufferDepth() { + return metricsBuffer.size(); + } + + /** + * @return the execution write buffer (for use by flush scheduler) + */ + public WriteBuffer getExecutionBuffer() { + return executionBuffer; + } + + /** + * @return the diagram write buffer (for use by flush scheduler) + */ + public WriteBuffer getDiagramBuffer() { + return diagramBuffer; + } + + /** + * @return the metrics write buffer (for use by flush scheduler) + */ + public WriteBuffer getMetricsBuffer() { + return metricsBuffer; + } +}