From 31f7113b3fd6f897e0a3a111fa3ee4eafed5d95b Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 19:21:19 +0200 Subject: [PATCH] feat(clickhouse): wire ChunkAccumulator, flush scheduler, and search feature flag Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/config/IngestionBeanConfig.java | 15 +++++++ .../server/app/config/StorageBeanConfig.java | 44 +++++++++++++++++++ .../server/app/search/OpenSearchIndex.java | 2 + .../src/main/resources/application.yml | 1 + deploy/base/server.yaml | 2 + 5 files changed, 64 insertions(+) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java index c0d3a479..3f38c4b6 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java @@ -1,7 +1,10 @@ package com.cameleer3.server.app.config; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -19,4 +22,16 @@ public class IngestionBeanConfig { public WriteBuffer metricsBuffer(IngestionConfig config) { return new WriteBuffer<>(config.getBufferCapacity()); } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public WriteBuffer executionBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); + } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public WriteBuffer processorBatchBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 71b5bf7d..ab733408 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -8,7 +8,12 @@ import com.cameleer3.server.core.admin.AuditRepository; import com.cameleer3.server.core.admin.AuditService; import com.cameleer3.server.core.detail.DetailService; import com.cameleer3.server.core.indexing.SearchIndexer; +import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler; +import com.cameleer3.server.app.search.ClickHouseSearchIndex; +import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.IngestionService; +import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.storage.*; import com.cameleer3.server.core.storage.model.MetricsSnapshot; @@ -74,4 +79,43 @@ public class StorageBeanConfig { public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) { return new PostgresMetricsQueryStore(jdbc); } + + // ── ClickHouse Execution Store ────────────────────────────────────── + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public ClickHouseExecutionStore clickHouseExecutionStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseExecutionStore(clickHouseJdbc); + } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public ChunkAccumulator chunkAccumulator( + WriteBuffer executionBuffer, + WriteBuffer processorBatchBuffer) { + return new ChunkAccumulator( + executionBuffer::offer, + processorBatchBuffer::offer, + java.time.Duration.ofMinutes(5)); + } + + @Bean + @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") + public ExecutionFlushScheduler executionFlushScheduler( + WriteBuffer executionBuffer, + WriteBuffer processorBatchBuffer, + ClickHouseExecutionStore executionStore, + ChunkAccumulator accumulator, + IngestionConfig config) { + return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer, + executionStore, accumulator, config); + } + + @Bean + @ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse") + public SearchIndex clickHouseSearchIndex( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseSearchIndex(clickHouseJdbc); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java index 2421d7da..d63f90f9 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java @@ -20,6 +20,7 @@ import org.opensearch.client.opensearch.indices.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Repository; import java.io.IOException; @@ -30,6 +31,7 @@ import java.util.*; import java.util.stream.Collectors; @Repository +@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "opensearch", matchIfMissing = true) public class OpenSearchIndex implements SearchIndex { private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class); diff --git a/cameleer3-server-app/src/main/resources/application.yml b/cameleer3-server-app/src/main/resources/application.yml index abab463e..55cd27e1 100644 --- a/cameleer3-server-app/src/main/resources/application.yml +++ b/cameleer3-server-app/src/main/resources/application.yml @@ -50,6 +50,7 @@ cameleer: retention-days: ${CAMELEER_RETENTION_DAYS:30} storage: metrics: ${CAMELEER_STORAGE_METRICS:postgres} + search: ${CAMELEER_STORAGE_SEARCH:opensearch} security: access-token-expiry-ms: 3600000 diff --git a/deploy/base/server.yaml b/deploy/base/server.yaml index 79228066..06c131a3 100644 --- a/deploy/base/server.yaml +++ b/deploy/base/server.yaml @@ -91,6 +91,8 @@ spec: key: CLICKHOUSE_PASSWORD - name: CAMELEER_STORAGE_METRICS value: "postgres" + - name: CAMELEER_STORAGE_SEARCH + value: "opensearch" resources: requests: