From cc1c082adbe3aaf204b0f6c43027c32dfab52488 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 11 Mar 2026 11:49:25 +0100 Subject: [PATCH] feat(01-01): add WriteBuffer, repository interfaces, and config classes - WriteBuffer with offer/offerBatch/drain and backpressure (all tests green) - ExecutionRepository, DiagramRepository, MetricsRepository interfaces - MetricsSnapshot record for agent metrics data - IngestionConfig for buffer-capacity/batch-size/flush-interval-ms properties - ClickHouseConfig exposing JdbcTemplate bean Co-Authored-By: Claude Opus 4.6 --- .../server/app/config/ClickHouseConfig.java | 22 +++++ .../server/app/config/IngestionConfig.java | 41 ++++++++++ .../server/core/ingestion/WriteBuffer.java | 80 +++++++++++++++++++ .../core/storage/DiagramRepository.java | 26 ++++++ .../core/storage/ExecutionRepository.java | 17 ++++ .../core/storage/MetricsRepository.java | 17 ++++ .../core/storage/model/MetricsSnapshot.java | 16 ++++ 7 files changed, 219 insertions(+) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionConfig.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/WriteBuffer.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricsSnapshot.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java new file mode 100644 index 00000000..46406d95 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java @@ -0,0 +1,22 @@ +package com.cameleer3.server.app.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; + +/** + * ClickHouse configuration. + *

+ * Spring Boot auto-configures the DataSource from {@code spring.datasource.*} properties. + * This class exposes a JdbcTemplate bean for repository implementations. + */ +@Configuration +public class ClickHouseConfig { + + @Bean + public JdbcTemplate jdbcTemplate(DataSource dataSource) { + return new JdbcTemplate(dataSource); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionConfig.java new file mode 100644 index 00000000..6292eb5e --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionConfig.java @@ -0,0 +1,41 @@ +package com.cameleer3.server.app.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * Configuration properties for the ingestion write buffer. + * Bound from the {@code ingestion.*} namespace in application.yml. + */ +@Configuration +@ConfigurationProperties(prefix = "ingestion") +public class IngestionConfig { + + private int bufferCapacity = 50_000; + private int batchSize = 5_000; + private long flushIntervalMs = 1_000; + + public int getBufferCapacity() { + return bufferCapacity; + } + + public void setBufferCapacity(int bufferCapacity) { + this.bufferCapacity = bufferCapacity; + } + + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public long getFlushIntervalMs() { + return flushIntervalMs; + } + + public void setFlushIntervalMs(long flushIntervalMs) { + this.flushIntervalMs = flushIntervalMs; + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/WriteBuffer.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/WriteBuffer.java new file mode 100644 index 00000000..267de43c --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/WriteBuffer.java @@ -0,0 +1,80 @@ +package com.cameleer3.server.core.ingestion; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +/** + * Bounded write buffer that decouples HTTP ingestion from ClickHouse batch inserts. + *

+ * Items are offered to the buffer by controllers and drained in batches by a + * scheduled flush task. When the buffer is full, {@link #offer} returns false, + * signaling the caller to apply backpressure (HTTP 503). + * + * @param the type of items buffered + */ +public class WriteBuffer { + + private final BlockingQueue queue; + private final int capacity; + + public WriteBuffer(int capacity) { + this.capacity = capacity; + this.queue = new ArrayBlockingQueue<>(capacity); + } + + /** + * Offer a single item to the buffer. + * + * @return true if the item was added, false if the buffer is full + */ + public boolean offer(T item) { + return queue.offer(item); + } + + /** + * Offer a batch of items with all-or-nothing semantics. + * If the buffer does not have enough remaining capacity for the entire batch, + * no items are added and false is returned. + * + * @return true if all items were added, false if insufficient capacity + */ + public boolean offerBatch(List items) { + if (queue.remainingCapacity() < items.size()) { + return false; + } + for (T item : items) { + queue.offer(item); + } + return true; + } + + /** + * Drain up to {@code maxBatch} items from the buffer. + * Called by the scheduled flush task. + * + * @return list of drained items (may be empty) + */ + public List drain(int maxBatch) { + List batch = new ArrayList<>(maxBatch); + queue.drainTo(batch, maxBatch); + return batch; + } + + public int size() { + return queue.size(); + } + + public int capacity() { + return capacity; + } + + public boolean isFull() { + return queue.remainingCapacity() == 0; + } + + public int remainingCapacity() { + return queue.remainingCapacity(); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java new file mode 100644 index 00000000..57aede79 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java @@ -0,0 +1,26 @@ +package com.cameleer3.server.core.storage; + +import com.cameleer3.common.graph.RouteGraph; + +import java.util.Optional; + +/** + * Repository for route diagram storage with content-hash deduplication. + */ +public interface DiagramRepository { + + /** + * Store a route graph. Uses content-hash deduplication via ReplacingMergeTree. + */ + void store(RouteGraph graph); + + /** + * Find a route graph by its content hash. + */ + Optional findByContentHash(String contentHash); + + /** + * Find the content hash for the latest diagram of a given route and agent. + */ + Optional findContentHashForRoute(String routeId, String agentId); +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java new file mode 100644 index 00000000..249bef3a --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java @@ -0,0 +1,17 @@ +package com.cameleer3.server.core.storage; + +import com.cameleer3.common.model.RouteExecution; + +import java.util.List; + +/** + * Repository for route execution batch inserts into ClickHouse. + */ +public interface ExecutionRepository { + + /** + * Insert a batch of route executions. + * Implementations must perform a single batch insert for efficiency. + */ + void insertBatch(List executions); +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java new file mode 100644 index 00000000..ad15ef0a --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java @@ -0,0 +1,17 @@ +package com.cameleer3.server.core.storage; + +import com.cameleer3.server.core.storage.model.MetricsSnapshot; + +import java.util.List; + +/** + * Repository for agent metrics batch inserts into ClickHouse. + */ +public interface MetricsRepository { + + /** + * Insert a batch of metrics snapshots. + * Implementations must perform a single batch insert for efficiency. + */ + void insertBatch(List metrics); +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricsSnapshot.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricsSnapshot.java new file mode 100644 index 00000000..4e2101fe --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricsSnapshot.java @@ -0,0 +1,16 @@ +package com.cameleer3.server.core.storage.model; + +import java.time.Instant; +import java.util.Map; + +/** + * A single metrics data point from an agent. + */ +public record MetricsSnapshot( + String agentId, + Instant collectedAt, + String metricName, + double metricValue, + Map tags +) { +}