feat(clickhouse): wire ChunkAccumulator, flush scheduler, and search feature flag

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-31 19:21:19 +02:00
parent 6052407c82
commit 31f7113b3f
5 changed files with 64 additions and 0 deletions

View File

@@ -1,7 +1,10 @@
package com.cameleer3.server.app.config;
import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -19,4 +22,16 @@ public class IngestionBeanConfig {
public WriteBuffer<MetricsSnapshot> metricsBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public WriteBuffer<MergedExecution> executionBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
}

View File

@@ -8,7 +8,12 @@ import com.cameleer3.server.core.admin.AuditRepository;
import com.cameleer3.server.core.admin.AuditService;
import com.cameleer3.server.core.detail.DetailService;
import com.cameleer3.server.core.indexing.SearchIndexer;
import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler;
import com.cameleer3.server.app.search.ClickHouseSearchIndex;
import com.cameleer3.server.app.storage.ClickHouseExecutionStore;
import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.*;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
@@ -74,4 +79,43 @@ public class StorageBeanConfig {
public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) {
return new PostgresMetricsQueryStore(jdbc);
}
// ── ClickHouse Execution Store ──────────────────────────────────────
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public ClickHouseExecutionStore clickHouseExecutionStore(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseExecutionStore(clickHouseJdbc);
}
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public ChunkAccumulator chunkAccumulator(
WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer) {
return new ChunkAccumulator(
executionBuffer::offer,
processorBatchBuffer::offer,
java.time.Duration.ofMinutes(5));
}
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public ExecutionFlushScheduler executionFlushScheduler(
WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
ClickHouseExecutionStore executionStore,
ChunkAccumulator accumulator,
IngestionConfig config) {
return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer,
executionStore, accumulator, config);
}
@Bean
@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse")
public SearchIndex clickHouseSearchIndex(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseSearchIndex(clickHouseJdbc);
}
}

View File

@@ -20,6 +20,7 @@ import org.opensearch.client.opensearch.indices.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Repository;
import java.io.IOException;
@@ -30,6 +31,7 @@ import java.util.*;
import java.util.stream.Collectors;
@Repository
@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "opensearch", matchIfMissing = true)
public class OpenSearchIndex implements SearchIndex {
private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class);

View File

@@ -50,6 +50,7 @@ cameleer:
retention-days: ${CAMELEER_RETENTION_DAYS:30}
storage:
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
search: ${CAMELEER_STORAGE_SEARCH:opensearch}
security:
access-token-expiry-ms: 3600000

View File

@@ -91,6 +91,8 @@ spec:
key: CLICKHOUSE_PASSWORD
- name: CAMELEER_STORAGE_METRICS
value: "postgres"
- name: CAMELEER_STORAGE_SEARCH
value: "opensearch"
resources:
requests: