feat(01-01): add WriteBuffer, repository interfaces, and config classes

- WriteBuffer<T> with offer/offerBatch/drain and backpressure (all tests green)
- ExecutionRepository, DiagramRepository, MetricsRepository interfaces
- MetricsSnapshot record for agent metrics data
- IngestionConfig for buffer-capacity/batch-size/flush-interval-ms properties
- ClickHouseConfig exposing JdbcTemplate bean

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-11 11:49:25 +01:00
parent f37009e380
commit cc1c082adb
7 changed files with 219 additions and 0 deletions

View File

@@ -0,0 +1,22 @@
package com.cameleer3.server.app.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
/**
* ClickHouse configuration.
* <p>
* Spring Boot auto-configures the DataSource from {@code spring.datasource.*} properties.
* This class exposes a JdbcTemplate bean for repository implementations.
*/
@Configuration
public class ClickHouseConfig {
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
}

View File

@@ -0,0 +1,41 @@
package com.cameleer3.server.app.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* Configuration properties for the ingestion write buffer.
* Bound from the {@code ingestion.*} namespace in application.yml.
*/
@Configuration
@ConfigurationProperties(prefix = "ingestion")
public class IngestionConfig {
private int bufferCapacity = 50_000;
private int batchSize = 5_000;
private long flushIntervalMs = 1_000;
public int getBufferCapacity() {
return bufferCapacity;
}
public void setBufferCapacity(int bufferCapacity) {
this.bufferCapacity = bufferCapacity;
}
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public long getFlushIntervalMs() {
return flushIntervalMs;
}
public void setFlushIntervalMs(long flushIntervalMs) {
this.flushIntervalMs = flushIntervalMs;
}
}

View File

@@ -0,0 +1,80 @@
package com.cameleer3.server.core.ingestion;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Bounded write buffer that decouples HTTP ingestion from ClickHouse batch inserts.
* <p>
* Items are offered to the buffer by controllers and drained in batches by a
* scheduled flush task. When the buffer is full, {@link #offer} returns false,
* signaling the caller to apply backpressure (HTTP 503).
*
* @param <T> the type of items buffered
*/
public class WriteBuffer<T> {
private final BlockingQueue<T> queue;
private final int capacity;
public WriteBuffer(int capacity) {
this.capacity = capacity;
this.queue = new ArrayBlockingQueue<>(capacity);
}
/**
* Offer a single item to the buffer.
*
* @return true if the item was added, false if the buffer is full
*/
public boolean offer(T item) {
return queue.offer(item);
}
/**
* Offer a batch of items with all-or-nothing semantics.
* If the buffer does not have enough remaining capacity for the entire batch,
* no items are added and false is returned.
*
* @return true if all items were added, false if insufficient capacity
*/
public boolean offerBatch(List<T> items) {
if (queue.remainingCapacity() < items.size()) {
return false;
}
for (T item : items) {
queue.offer(item);
}
return true;
}
/**
* Drain up to {@code maxBatch} items from the buffer.
* Called by the scheduled flush task.
*
* @return list of drained items (may be empty)
*/
public List<T> drain(int maxBatch) {
List<T> batch = new ArrayList<>(maxBatch);
queue.drainTo(batch, maxBatch);
return batch;
}
public int size() {
return queue.size();
}
public int capacity() {
return capacity;
}
public boolean isFull() {
return queue.remainingCapacity() == 0;
}
public int remainingCapacity() {
return queue.remainingCapacity();
}
}

View File

@@ -0,0 +1,26 @@
package com.cameleer3.server.core.storage;
import com.cameleer3.common.graph.RouteGraph;
import java.util.Optional;
/**
* Repository for route diagram storage with content-hash deduplication.
*/
public interface DiagramRepository {
/**
* Store a route graph. Uses content-hash deduplication via ReplacingMergeTree.
*/
void store(RouteGraph graph);
/**
* Find a route graph by its content hash.
*/
Optional<RouteGraph> findByContentHash(String contentHash);
/**
* Find the content hash for the latest diagram of a given route and agent.
*/
Optional<String> findContentHashForRoute(String routeId, String agentId);
}

View File

@@ -0,0 +1,17 @@
package com.cameleer3.server.core.storage;
import com.cameleer3.common.model.RouteExecution;
import java.util.List;
/**
* Repository for route execution batch inserts into ClickHouse.
*/
public interface ExecutionRepository {
/**
* Insert a batch of route executions.
* Implementations must perform a single batch insert for efficiency.
*/
void insertBatch(List<RouteExecution> executions);
}

View File

@@ -0,0 +1,17 @@
package com.cameleer3.server.core.storage;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import java.util.List;
/**
* Repository for agent metrics batch inserts into ClickHouse.
*/
public interface MetricsRepository {
/**
* Insert a batch of metrics snapshots.
* Implementations must perform a single batch insert for efficiency.
*/
void insertBatch(List<MetricsSnapshot> metrics);
}

View File

@@ -0,0 +1,16 @@
package com.cameleer3.server.core.storage.model;
import java.time.Instant;
import java.util.Map;
/**
* A single metrics data point from an agent.
*/
public record MetricsSnapshot(
String agentId,
Instant collectedAt,
String metricName,
double metricValue,
Map<String, String> tags
) {
}