diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java
index 83507e16..c0d3a479 100644
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java
@@ -1,41 +1,22 @@
package com.cameleer3.server.app.config;
-import com.cameleer3.server.core.ingestion.IngestionService;
-import com.cameleer3.server.core.ingestion.TaggedDiagram;
-import com.cameleer3.server.core.ingestion.TaggedExecution;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
- * Creates the write buffer and ingestion service beans.
+ * Creates the write buffer bean for metrics.
*
- * The {@link WriteBuffer} instances are shared between the
- * {@link IngestionService} (producer side) and the flush scheduler (consumer side).
+ * The {@link WriteBuffer} instance is shared between the
+ * {@link com.cameleer3.server.core.ingestion.IngestionService} (producer side)
+ * and the flush scheduler (consumer side).
*/
@Configuration
public class IngestionBeanConfig {
- @Bean
- public WriteBuffer executionBuffer(IngestionConfig config) {
- return new WriteBuffer<>(config.getBufferCapacity());
- }
-
- @Bean
- public WriteBuffer diagramBuffer(IngestionConfig config) {
- return new WriteBuffer<>(config.getBufferCapacity());
- }
-
@Bean
public WriteBuffer metricsBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
-
- @Bean
- public IngestionService ingestionService(WriteBuffer executionBuffer,
- WriteBuffer diagramBuffer,
- WriteBuffer metricsBuffer) {
- return new IngestionService(executionBuffer, diagramBuffer, metricsBuffer);
- }
}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java
index debc1e8b..e722f0a8 100644
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java
@@ -1,32 +1,19 @@
package com.cameleer3.server.app.config;
-import com.cameleer3.server.app.search.ClickHouseSearchEngine;
-import com.cameleer3.server.core.detail.DetailService;
-import com.cameleer3.server.core.search.SearchEngine;
import com.cameleer3.server.core.search.SearchService;
-import com.cameleer3.server.core.storage.ExecutionRepository;
+import com.cameleer3.server.core.storage.SearchIndex;
+import com.cameleer3.server.core.storage.StatsStore;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.jdbc.core.JdbcTemplate;
/**
- * Creates beans for the search and detail layers.
+ * Creates beans for the search layer.
*/
@Configuration
public class SearchBeanConfig {
@Bean
- public SearchEngine searchEngine(JdbcTemplate jdbcTemplate) {
- return new ClickHouseSearchEngine(jdbcTemplate);
- }
-
- @Bean
- public SearchService searchService(SearchEngine searchEngine) {
- return new SearchService(searchEngine);
- }
-
- @Bean
- public DetailService detailService(ExecutionRepository executionRepository) {
- return new DetailService(executionRepository);
+ public SearchService searchService(SearchIndex searchIndex, StatsStore statsStore) {
+ return new SearchService(searchIndex, statsStore);
}
}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java
new file mode 100644
index 00000000..92f34943
--- /dev/null
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java
@@ -0,0 +1,37 @@
+package com.cameleer3.server.app.config;
+
+import com.cameleer3.server.core.detail.DetailService;
+import com.cameleer3.server.core.indexing.SearchIndexer;
+import com.cameleer3.server.core.ingestion.IngestionService;
+import com.cameleer3.server.core.ingestion.WriteBuffer;
+import com.cameleer3.server.core.storage.*;
+import com.cameleer3.server.core.storage.model.MetricsSnapshot;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class StorageBeanConfig {
+
+ @Bean
+ public DetailService detailService(ExecutionStore executionStore) {
+ return new DetailService(executionStore);
+ }
+
+ @Bean(destroyMethod = "shutdown")
+ public SearchIndexer searchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
+ @Value("${opensearch.debounce-ms:2000}") long debounceMs,
+ @Value("${opensearch.queue-size:10000}") int queueSize) {
+ return new SearchIndexer(executionStore, searchIndex, debounceMs, queueSize);
+ }
+
+ @Bean
+ public IngestionService ingestionService(ExecutionStore executionStore,
+ DiagramStore diagramStore,
+ WriteBuffer metricsBuffer,
+ SearchIndexer searchIndexer,
+ @Value("${cameleer.body-size-limit:16384}") int bodySizeLimit) {
+ return new IngestionService(executionStore, diagramStore, metricsBuffer,
+ searchIndexer::onExecutionUpdated, bodySizeLimit);
+ }
+}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java
new file mode 100644
index 00000000..1479c762
--- /dev/null
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java
@@ -0,0 +1,59 @@
+package com.cameleer3.server.app.ingestion;
+
+import com.cameleer3.server.app.config.IngestionConfig;
+import com.cameleer3.server.core.ingestion.WriteBuffer;
+import com.cameleer3.server.core.storage.MetricsStore;
+import com.cameleer3.server.core.storage.model.MetricsSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.SmartLifecycle;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Component
+public class MetricsFlushScheduler implements SmartLifecycle {
+
+ private static final Logger log = LoggerFactory.getLogger(MetricsFlushScheduler.class);
+
+ private final WriteBuffer metricsBuffer;
+ private final MetricsStore metricsStore;
+ private final int batchSize;
+ private volatile boolean running = false;
+
+ public MetricsFlushScheduler(WriteBuffer metricsBuffer,
+ MetricsStore metricsStore,
+ IngestionConfig config) {
+ this.metricsBuffer = metricsBuffer;
+ this.metricsStore = metricsStore;
+ this.batchSize = config.getBatchSize();
+ }
+
+ @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
+ public void flush() {
+ try {
+ List batch = metricsBuffer.drain(batchSize);
+ if (!batch.isEmpty()) {
+ metricsStore.insertBatch(batch);
+ log.debug("Flushed {} metrics to PostgreSQL", batch.size());
+ }
+ } catch (Exception e) {
+ log.error("Failed to flush metrics", e);
+ }
+ }
+
+ @Override public void start() { running = true; }
+ @Override public void stop() {
+ // Drain remaining on shutdown
+ while (metricsBuffer.size() > 0) {
+ List batch = metricsBuffer.drain(batchSize);
+ if (batch.isEmpty()) break;
+ try { metricsStore.insertBatch(batch); }
+ catch (Exception e) { log.error("Failed to flush metrics during shutdown", e); break; }
+ }
+ running = false;
+ }
+ @Override public boolean isRunning() { return running; }
+ @Override public int getPhase() { return Integer.MAX_VALUE - 1; }
+}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java
new file mode 100644
index 00000000..152bb1c9
--- /dev/null
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java
@@ -0,0 +1,48 @@
+package com.cameleer3.server.app.retention;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@Component
+public class RetentionScheduler {
+
+ private static final Logger log = LoggerFactory.getLogger(RetentionScheduler.class);
+
+ private final JdbcTemplate jdbc;
+ private final int retentionDays;
+
+ public RetentionScheduler(JdbcTemplate jdbc,
+ @Value("${cameleer.retention-days:30}") int retentionDays) {
+ this.jdbc = jdbc;
+ this.retentionDays = retentionDays;
+ }
+
+ @Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM UTC
+ public void dropExpiredChunks() {
+ String interval = retentionDays + " days";
+ try {
+ // Raw data
+ jdbc.execute("SELECT drop_chunks('executions', INTERVAL '" + interval + "')");
+ jdbc.execute("SELECT drop_chunks('processor_executions', INTERVAL '" + interval + "')");
+ jdbc.execute("SELECT drop_chunks('agent_metrics', INTERVAL '" + interval + "')");
+
+ // Continuous aggregates (keep 3x longer)
+ String caggInterval = (retentionDays * 3) + " days";
+ jdbc.execute("SELECT drop_chunks('stats_1m_all', INTERVAL '" + caggInterval + "')");
+ jdbc.execute("SELECT drop_chunks('stats_1m_app', INTERVAL '" + caggInterval + "')");
+ jdbc.execute("SELECT drop_chunks('stats_1m_route', INTERVAL '" + caggInterval + "')");
+ jdbc.execute("SELECT drop_chunks('stats_1m_processor', INTERVAL '" + caggInterval + "')");
+
+ log.info("Retention: dropped chunks older than {} days (aggregates: {} days)",
+ retentionDays, retentionDays * 3);
+ } catch (Exception e) {
+ log.error("Retention job failed", e);
+ }
+ }
+ // Note: OpenSearch daily index deletion should be handled via ILM policy
+ // configured at deployment time, not in application code.
+}