From 7dbfaf0932f63860fc6da0fd0cd61f0a0e3bc7f5 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Mon, 16 Mar 2026 18:27:58 +0100 Subject: [PATCH] feat: wire new storage beans, add MetricsFlushScheduler and RetentionScheduler Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/config/IngestionBeanConfig.java | 27 ++------- .../server/app/config/SearchBeanConfig.java | 23 ++------ .../server/app/config/StorageBeanConfig.java | 37 ++++++++++++ .../app/ingestion/MetricsFlushScheduler.java | 59 +++++++++++++++++++ .../app/retention/RetentionScheduler.java | 48 +++++++++++++++ 5 files changed, 153 insertions(+), 41 deletions(-) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java index 83507e16..c0d3a479 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java @@ -1,41 +1,22 @@ package com.cameleer3.server.app.config; -import com.cameleer3.server.core.ingestion.IngestionService; -import com.cameleer3.server.core.ingestion.TaggedDiagram; -import com.cameleer3.server.core.ingestion.TaggedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.storage.model.MetricsSnapshot; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** - * Creates the write buffer and ingestion service beans. + * Creates the write buffer bean for metrics. *

- * The {@link WriteBuffer} instances are shared between the - * {@link IngestionService} (producer side) and the flush scheduler (consumer side). + * The {@link WriteBuffer} instance is shared between the + * {@link com.cameleer3.server.core.ingestion.IngestionService} (producer side) + * and the flush scheduler (consumer side). */ @Configuration public class IngestionBeanConfig { - @Bean - public WriteBuffer executionBuffer(IngestionConfig config) { - return new WriteBuffer<>(config.getBufferCapacity()); - } - - @Bean - public WriteBuffer diagramBuffer(IngestionConfig config) { - return new WriteBuffer<>(config.getBufferCapacity()); - } - @Bean public WriteBuffer metricsBuffer(IngestionConfig config) { return new WriteBuffer<>(config.getBufferCapacity()); } - - @Bean - public IngestionService ingestionService(WriteBuffer executionBuffer, - WriteBuffer diagramBuffer, - WriteBuffer metricsBuffer) { - return new IngestionService(executionBuffer, diagramBuffer, metricsBuffer); - } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java index debc1e8b..e722f0a8 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java @@ -1,32 +1,19 @@ package com.cameleer3.server.app.config; -import com.cameleer3.server.app.search.ClickHouseSearchEngine; -import com.cameleer3.server.core.detail.DetailService; -import com.cameleer3.server.core.search.SearchEngine; import com.cameleer3.server.core.search.SearchService; -import com.cameleer3.server.core.storage.ExecutionRepository; +import com.cameleer3.server.core.storage.SearchIndex; +import com.cameleer3.server.core.storage.StatsStore; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.jdbc.core.JdbcTemplate; /** - * Creates beans for the search and detail layers. + * Creates beans for the search layer. */ @Configuration public class SearchBeanConfig { @Bean - public SearchEngine searchEngine(JdbcTemplate jdbcTemplate) { - return new ClickHouseSearchEngine(jdbcTemplate); - } - - @Bean - public SearchService searchService(SearchEngine searchEngine) { - return new SearchService(searchEngine); - } - - @Bean - public DetailService detailService(ExecutionRepository executionRepository) { - return new DetailService(executionRepository); + public SearchService searchService(SearchIndex searchIndex, StatsStore statsStore) { + return new SearchService(searchIndex, statsStore); } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java new file mode 100644 index 00000000..92f34943 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -0,0 +1,37 @@ +package com.cameleer3.server.app.config; + +import com.cameleer3.server.core.detail.DetailService; +import com.cameleer3.server.core.indexing.SearchIndexer; +import com.cameleer3.server.core.ingestion.IngestionService; +import com.cameleer3.server.core.ingestion.WriteBuffer; +import com.cameleer3.server.core.storage.*; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class StorageBeanConfig { + + @Bean + public DetailService detailService(ExecutionStore executionStore) { + return new DetailService(executionStore); + } + + @Bean(destroyMethod = "shutdown") + public SearchIndexer searchIndexer(ExecutionStore executionStore, SearchIndex searchIndex, + @Value("${opensearch.debounce-ms:2000}") long debounceMs, + @Value("${opensearch.queue-size:10000}") int queueSize) { + return new SearchIndexer(executionStore, searchIndex, debounceMs, queueSize); + } + + @Bean + public IngestionService ingestionService(ExecutionStore executionStore, + DiagramStore diagramStore, + WriteBuffer metricsBuffer, + SearchIndexer searchIndexer, + @Value("${cameleer.body-size-limit:16384}") int bodySizeLimit) { + return new IngestionService(executionStore, diagramStore, metricsBuffer, + searchIndexer::onExecutionUpdated, bodySizeLimit); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java new file mode 100644 index 00000000..1479c762 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java @@ -0,0 +1,59 @@ +package com.cameleer3.server.app.ingestion; + +import com.cameleer3.server.app.config.IngestionConfig; +import com.cameleer3.server.core.ingestion.WriteBuffer; +import com.cameleer3.server.core.storage.MetricsStore; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Component +public class MetricsFlushScheduler implements SmartLifecycle { + + private static final Logger log = LoggerFactory.getLogger(MetricsFlushScheduler.class); + + private final WriteBuffer metricsBuffer; + private final MetricsStore metricsStore; + private final int batchSize; + private volatile boolean running = false; + + public MetricsFlushScheduler(WriteBuffer metricsBuffer, + MetricsStore metricsStore, + IngestionConfig config) { + this.metricsBuffer = metricsBuffer; + this.metricsStore = metricsStore; + this.batchSize = config.getBatchSize(); + } + + @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}") + public void flush() { + try { + List batch = metricsBuffer.drain(batchSize); + if (!batch.isEmpty()) { + metricsStore.insertBatch(batch); + log.debug("Flushed {} metrics to PostgreSQL", batch.size()); + } + } catch (Exception e) { + log.error("Failed to flush metrics", e); + } + } + + @Override public void start() { running = true; } + @Override public void stop() { + // Drain remaining on shutdown + while (metricsBuffer.size() > 0) { + List batch = metricsBuffer.drain(batchSize); + if (batch.isEmpty()) break; + try { metricsStore.insertBatch(batch); } + catch (Exception e) { log.error("Failed to flush metrics during shutdown", e); break; } + } + running = false; + } + @Override public boolean isRunning() { return running; } + @Override public int getPhase() { return Integer.MAX_VALUE - 1; } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java new file mode 100644 index 00000000..152bb1c9 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java @@ -0,0 +1,48 @@ +package com.cameleer3.server.app.retention; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Component +public class RetentionScheduler { + + private static final Logger log = LoggerFactory.getLogger(RetentionScheduler.class); + + private final JdbcTemplate jdbc; + private final int retentionDays; + + public RetentionScheduler(JdbcTemplate jdbc, + @Value("${cameleer.retention-days:30}") int retentionDays) { + this.jdbc = jdbc; + this.retentionDays = retentionDays; + } + + @Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM UTC + public void dropExpiredChunks() { + String interval = retentionDays + " days"; + try { + // Raw data + jdbc.execute("SELECT drop_chunks('executions', INTERVAL '" + interval + "')"); + jdbc.execute("SELECT drop_chunks('processor_executions', INTERVAL '" + interval + "')"); + jdbc.execute("SELECT drop_chunks('agent_metrics', INTERVAL '" + interval + "')"); + + // Continuous aggregates (keep 3x longer) + String caggInterval = (retentionDays * 3) + " days"; + jdbc.execute("SELECT drop_chunks('stats_1m_all', INTERVAL '" + caggInterval + "')"); + jdbc.execute("SELECT drop_chunks('stats_1m_app', INTERVAL '" + caggInterval + "')"); + jdbc.execute("SELECT drop_chunks('stats_1m_route', INTERVAL '" + caggInterval + "')"); + jdbc.execute("SELECT drop_chunks('stats_1m_processor', INTERVAL '" + caggInterval + "')"); + + log.info("Retention: dropped chunks older than {} days (aggregates: {} days)", + retentionDays, retentionDays * 3); + } catch (Exception e) { + log.error("Retention job failed", e); + } + } + // Note: OpenSearch daily index deletion should be handled via ILM policy + // configured at deployment time, not in application code. +}