feat: wire new storage beans, add MetricsFlushScheduler and RetentionScheduler

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-16 18:27:58 +01:00
parent f7d7302694
commit 7dbfaf0932
5 changed files with 153 additions and 41 deletions

View File

@@ -1,41 +1,22 @@
package com.cameleer3.server.app.config;
import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.TaggedDiagram;
import com.cameleer3.server.core.ingestion.TaggedExecution;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Creates the write buffer and ingestion service beans.
* Creates the write buffer bean for metrics.
* <p>
* The {@link WriteBuffer} instances are shared between the
* {@link IngestionService} (producer side) and the flush scheduler (consumer side).
* The {@link WriteBuffer} instance is shared between the
* {@link com.cameleer3.server.core.ingestion.IngestionService} (producer side)
* and the flush scheduler (consumer side).
*/
@Configuration
public class IngestionBeanConfig {
@Bean
public WriteBuffer<TaggedExecution> executionBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
@Bean
public WriteBuffer<TaggedDiagram> diagramBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
@Bean
public WriteBuffer<MetricsSnapshot> metricsBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
@Bean
public IngestionService ingestionService(WriteBuffer<TaggedExecution> executionBuffer,
WriteBuffer<TaggedDiagram> diagramBuffer,
WriteBuffer<MetricsSnapshot> metricsBuffer) {
return new IngestionService(executionBuffer, diagramBuffer, metricsBuffer);
}
}

View File

@@ -1,32 +1,19 @@
package com.cameleer3.server.app.config;
import com.cameleer3.server.app.search.ClickHouseSearchEngine;
import com.cameleer3.server.core.detail.DetailService;
import com.cameleer3.server.core.search.SearchEngine;
import com.cameleer3.server.core.search.SearchService;
import com.cameleer3.server.core.storage.ExecutionRepository;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.StatsStore;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
/**
* Creates beans for the search and detail layers.
* Creates beans for the search layer.
*/
@Configuration
public class SearchBeanConfig {
@Bean
public SearchEngine searchEngine(JdbcTemplate jdbcTemplate) {
return new ClickHouseSearchEngine(jdbcTemplate);
}
@Bean
public SearchService searchService(SearchEngine searchEngine) {
return new SearchService(searchEngine);
}
@Bean
public DetailService detailService(ExecutionRepository executionRepository) {
return new DetailService(executionRepository);
public SearchService searchService(SearchIndex searchIndex, StatsStore statsStore) {
return new SearchService(searchIndex, statsStore);
}
}

View File

@@ -0,0 +1,37 @@
package com.cameleer3.server.app.config;
import com.cameleer3.server.core.detail.DetailService;
import com.cameleer3.server.core.indexing.SearchIndexer;
import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.*;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class StorageBeanConfig {
@Bean
public DetailService detailService(ExecutionStore executionStore) {
return new DetailService(executionStore);
}
@Bean(destroyMethod = "shutdown")
public SearchIndexer searchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
@Value("${opensearch.debounce-ms:2000}") long debounceMs,
@Value("${opensearch.queue-size:10000}") int queueSize) {
return new SearchIndexer(executionStore, searchIndex, debounceMs, queueSize);
}
@Bean
public IngestionService ingestionService(ExecutionStore executionStore,
DiagramStore diagramStore,
WriteBuffer<MetricsSnapshot> metricsBuffer,
SearchIndexer searchIndexer,
@Value("${cameleer.body-size-limit:16384}") int bodySizeLimit) {
return new IngestionService(executionStore, diagramStore, metricsBuffer,
searchIndexer::onExecutionUpdated, bodySizeLimit);
}
}

View File

@@ -0,0 +1,59 @@
package com.cameleer3.server.app.ingestion;
import com.cameleer3.server.app.config.IngestionConfig;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.MetricsStore;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class MetricsFlushScheduler implements SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(MetricsFlushScheduler.class);
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
private final MetricsStore metricsStore;
private final int batchSize;
private volatile boolean running = false;
public MetricsFlushScheduler(WriteBuffer<MetricsSnapshot> metricsBuffer,
MetricsStore metricsStore,
IngestionConfig config) {
this.metricsBuffer = metricsBuffer;
this.metricsStore = metricsStore;
this.batchSize = config.getBatchSize();
}
@Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
public void flush() {
try {
List<MetricsSnapshot> batch = metricsBuffer.drain(batchSize);
if (!batch.isEmpty()) {
metricsStore.insertBatch(batch);
log.debug("Flushed {} metrics to PostgreSQL", batch.size());
}
} catch (Exception e) {
log.error("Failed to flush metrics", e);
}
}
@Override public void start() { running = true; }
@Override public void stop() {
// Drain remaining on shutdown
while (metricsBuffer.size() > 0) {
List<MetricsSnapshot> batch = metricsBuffer.drain(batchSize);
if (batch.isEmpty()) break;
try { metricsStore.insertBatch(batch); }
catch (Exception e) { log.error("Failed to flush metrics during shutdown", e); break; }
}
running = false;
}
@Override public boolean isRunning() { return running; }
@Override public int getPhase() { return Integer.MAX_VALUE - 1; }
}

View File

@@ -0,0 +1,48 @@
package com.cameleer3.server.app.retention;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class RetentionScheduler {
private static final Logger log = LoggerFactory.getLogger(RetentionScheduler.class);
private final JdbcTemplate jdbc;
private final int retentionDays;
public RetentionScheduler(JdbcTemplate jdbc,
@Value("${cameleer.retention-days:30}") int retentionDays) {
this.jdbc = jdbc;
this.retentionDays = retentionDays;
}
@Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM UTC
public void dropExpiredChunks() {
String interval = retentionDays + " days";
try {
// Raw data
jdbc.execute("SELECT drop_chunks('executions', INTERVAL '" + interval + "')");
jdbc.execute("SELECT drop_chunks('processor_executions', INTERVAL '" + interval + "')");
jdbc.execute("SELECT drop_chunks('agent_metrics', INTERVAL '" + interval + "')");
// Continuous aggregates (keep 3x longer)
String caggInterval = (retentionDays * 3) + " days";
jdbc.execute("SELECT drop_chunks('stats_1m_all', INTERVAL '" + caggInterval + "')");
jdbc.execute("SELECT drop_chunks('stats_1m_app', INTERVAL '" + caggInterval + "')");
jdbc.execute("SELECT drop_chunks('stats_1m_route', INTERVAL '" + caggInterval + "')");
jdbc.execute("SELECT drop_chunks('stats_1m_processor', INTERVAL '" + caggInterval + "')");
log.info("Retention: dropped chunks older than {} days (aggregates: {} days)",
retentionDays, retentionDays * 3);
} catch (Exception e) {
log.error("Retention job failed", e);
}
}
// Note: OpenSearch daily index deletion should be handled via ILM policy
// configured at deployment time, not in application code.
}