diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml index ad105920..0bd33bea 100644 --- a/.gitea/workflows/ci.yml +++ b/.gitea/workflows/ci.yml @@ -228,6 +228,9 @@ jobs: kubectl apply -f deploy/opensearch.yaml kubectl -n cameleer rollout status statefulset/opensearch --timeout=180s + kubectl apply -f deploy/clickhouse.yaml + kubectl -n cameleer rollout status statefulset/clickhouse --timeout=180s + kubectl apply -f deploy/authentik.yaml kubectl -n cameleer rollout status deployment/authentik-server --timeout=180s diff --git a/cameleer3-server-app/pom.xml b/cameleer3-server-app/pom.xml index 4738822d..4691e969 100644 --- a/cameleer3-server-app/pom.xml +++ b/cameleer3-server-app/pom.xml @@ -57,6 +57,12 @@ opensearch-rest-client 2.19.0 + + com.clickhouse + clickhouse-jdbc + 0.9.7 + all + org.springdoc springdoc-openapi-starter-webmvc-ui @@ -126,6 +132,11 @@ 2.1.1 test + + org.testcontainers + testcontainers-clickhouse + test + org.awaitility awaitility diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java new file mode 100644 index 00000000..413102df --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java @@ -0,0 +1,34 @@ +package com.cameleer3.server.app.config; + +import com.zaxxer.hikari.HikariDataSource; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; + +@Configuration +@EnableConfigurationProperties(ClickHouseProperties.class) +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public class ClickHouseConfig { + + @Bean(name = "clickHouseDataSource") + public DataSource clickHouseDataSource(ClickHouseProperties props) { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(props.getUrl()); + ds.setUsername(props.getUsername()); + ds.setPassword(props.getPassword()); + ds.setMaximumPoolSize(10); + ds.setPoolName("clickhouse-pool"); + return ds; + } + + @Bean(name = "clickHouseJdbcTemplate") + public JdbcTemplate clickHouseJdbcTemplate( + @Qualifier("clickHouseDataSource") DataSource ds) { + return new JdbcTemplate(ds); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java new file mode 100644 index 00000000..461a8b42 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java @@ -0,0 +1,20 @@ +package com.cameleer3.server.app.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "clickhouse") +public class ClickHouseProperties { + + private String url = "jdbc:clickhouse://localhost:8123/cameleer"; + private String username = "default"; + private String password = ""; + + public String getUrl() { return url; } + public void setUrl(String url) { this.url = url; } + + public String getUsername() { return username; } + public void setUsername(String username) { this.username = username; } + + public String getPassword() { return password; } + public void setPassword(String password) { this.password = password; } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java new file mode 100644 index 00000000..a2a5f720 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java @@ -0,0 +1,52 @@ +package com.cameleer3.server.app.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Comparator; + +@Component +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public class ClickHouseSchemaInitializer { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseSchemaInitializer.class); + + private final JdbcTemplate clickHouseJdbc; + + public ClickHouseSchemaInitializer( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + this.clickHouseJdbc = clickHouseJdbc; + } + + @EventListener(ApplicationReadyEvent.class) + public void initializeSchema() throws IOException { + PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); + Resource[] scripts = resolver.getResources("classpath:clickhouse/*.sql"); + + Arrays.sort(scripts, Comparator.comparing(Resource::getFilename)); + + for (Resource script : scripts) { + String sql = script.getContentAsString(StandardCharsets.UTF_8); + log.info("Executing ClickHouse schema script: {}", script.getFilename()); + for (String statement : sql.split(";")) { + String trimmed = statement.trim(); + if (!trimmed.isEmpty()) { + clickHouseJdbc.execute(trimmed); + } + } + } + + log.info("ClickHouse schema initialization complete ({} scripts)", scripts.length); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 9a971357..71b5bf7d 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -1,5 +1,9 @@ package com.cameleer3.server.app.config; +import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore; +import com.cameleer3.server.app.storage.ClickHouseMetricsStore; +import com.cameleer3.server.app.storage.PostgresMetricsQueryStore; +import com.cameleer3.server.app.storage.PostgresMetricsStore; import com.cameleer3.server.core.admin.AuditRepository; import com.cameleer3.server.core.admin.AuditService; import com.cameleer3.server.core.detail.DetailService; @@ -8,9 +12,12 @@ import com.cameleer3.server.core.ingestion.IngestionService; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.storage.*; import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; @Configuration public class StorageBeanConfig { @@ -41,4 +48,30 @@ public class StorageBeanConfig { return new IngestionService(executionStore, diagramStore, metricsBuffer, searchIndexer::onExecutionUpdated, bodySizeLimit); } + + @Bean + @ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse") + public MetricsStore clickHouseMetricsStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseMetricsStore(clickHouseJdbc); + } + + @Bean + @ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true) + public MetricsStore postgresMetricsStore(JdbcTemplate jdbc) { + return new PostgresMetricsStore(jdbc); + } + + @Bean + @ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse") + public MetricsQueryStore clickHouseMetricsQueryStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseMetricsQueryStore(clickHouseJdbc); + } + + @Bean + @ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true) + public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) { + return new PostgresMetricsQueryStore(jdbc); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java index 032cfea1..78edd94f 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java @@ -2,22 +2,23 @@ package com.cameleer3.server.app.controller; import com.cameleer3.server.app.dto.AgentMetricsResponse; import com.cameleer3.server.app.dto.MetricBucket; -import org.springframework.jdbc.core.JdbcTemplate; +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; import org.springframework.web.bind.annotation.*; -import java.sql.Timestamp; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.*; +import java.util.stream.Collectors; @RestController @RequestMapping("/api/v1/agents/{agentId}/metrics") public class AgentMetricsController { - private final JdbcTemplate jdbc; + private final MetricsQueryStore metricsQueryStore; - public AgentMetricsController(JdbcTemplate jdbc) { - this.jdbc = jdbc; + public AgentMetricsController(MetricsQueryStore metricsQueryStore) { + this.metricsQueryStore = metricsQueryStore; } @GetMapping @@ -32,34 +33,18 @@ public class AgentMetricsController { if (to == null) to = Instant.now(); List metricNames = Arrays.asList(names.split(",")); - long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1); - String intervalStr = intervalMs + " milliseconds"; - Map> result = new LinkedHashMap<>(); - for (String name : metricNames) { - result.put(name.trim(), new ArrayList<>()); - } + Map> raw = + metricsQueryStore.queryTimeSeries(agentId, metricNames, from, to, buckets); - String sql = """ - SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket, - metric_name, - AVG(metric_value) AS avg_value - FROM agent_metrics - WHERE agent_id = ? - AND collected_at >= ? AND collected_at < ? - AND metric_name = ANY(?) - GROUP BY bucket, metric_name - ORDER BY bucket - """; - - String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); - jdbc.query(sql, rs -> { - String metricName = rs.getString("metric_name"); - Instant bucket = rs.getTimestamp("bucket").toInstant(); - double value = rs.getDouble("avg_value"); - result.computeIfAbsent(metricName, k -> new ArrayList<>()) - .add(new MetricBucket(bucket, value)); - }, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray); + Map> result = raw.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().stream() + .map(b -> new MetricBucket(b.time(), b.value())) + .toList(), + (a, b) -> a, + LinkedHashMap::new)); return new AgentMetricsResponse(result); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java new file mode 100644 index 00000000..0ee53b42 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java @@ -0,0 +1,66 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.time.Instant; +import java.util.*; + +public class ClickHouseMetricsQueryStore implements MetricsQueryStore { + + private final JdbcTemplate jdbc; + + public ClickHouseMetricsQueryStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets) { + + long intervalSeconds = Math.max(60, + (to.getEpochSecond() - from.getEpochSecond()) / Math.max(buckets, 1)); + + Map> result = new LinkedHashMap<>(); + for (String name : metricNames) { + result.put(name.trim(), new ArrayList<>()); + } + + String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); + + // ClickHouse JDBC doesn't support array params with IN (?). + // Build the IN clause with properly escaped values. + StringBuilder inClause = new StringBuilder(); + for (int i = 0; i < namesArray.length; i++) { + if (i > 0) inClause.append(", "); + inClause.append("'").append(namesArray[i].replace("'", "\\'")).append("'"); + } + + String finalSql = """ + SELECT toStartOfInterval(collected_at, INTERVAL %d SECOND) AS bucket, + metric_name, + avg(metric_value) AS avg_value + FROM agent_metrics + WHERE agent_id = ? + AND collected_at >= ? + AND collected_at < ? + AND metric_name IN (%s) + GROUP BY bucket, metric_name + ORDER BY bucket + """.formatted(intervalSeconds, inClause); + + jdbc.query(finalSql, rs -> { + String metricName = rs.getString("metric_name"); + Instant bucket = rs.getTimestamp("bucket").toInstant(); + double value = rs.getDouble("avg_value"); + result.computeIfAbsent(metricName, k -> new ArrayList<>()) + .add(new MetricTimeSeries.Bucket(bucket, value)); + }, agentId, + java.sql.Timestamp.from(from), + java.sql.Timestamp.from(to)); + + return result; + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java new file mode 100644 index 00000000..8d1d8645 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java @@ -0,0 +1,41 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsStore; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClickHouseMetricsStore implements MetricsStore { + + private final JdbcTemplate jdbc; + + public ClickHouseMetricsStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void insertBatch(List snapshots) { + if (snapshots.isEmpty()) return; + + jdbc.batchUpdate(""" + INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags, collected_at) + VALUES (?, ?, ?, ?, ?) + """, + snapshots.stream().map(s -> new Object[]{ + s.agentId(), + s.metricName(), + s.metricValue(), + tagsToClickHouseMap(s.tags()), + Timestamp.from(s.collectedAt()) + }).toList()); + } + + private Map tagsToClickHouseMap(Map tags) { + if (tags == null || tags.isEmpty()) return new HashMap<>(); + return new HashMap<>(tags); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java new file mode 100644 index 00000000..bcd5d2bc --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java @@ -0,0 +1,55 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.*; + +public class PostgresMetricsQueryStore implements MetricsQueryStore { + + private final JdbcTemplate jdbc; + + public PostgresMetricsQueryStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets) { + + long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1); + String intervalStr = intervalMs + " milliseconds"; + + Map> result = new LinkedHashMap<>(); + for (String name : metricNames) { + result.put(name.trim(), new ArrayList<>()); + } + + String sql = """ + SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket, + metric_name, + AVG(metric_value) AS avg_value + FROM agent_metrics + WHERE agent_id = ? + AND collected_at >= ? AND collected_at < ? + AND metric_name = ANY(?) + GROUP BY bucket, metric_name + ORDER BY bucket + """; + + String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); + jdbc.query(sql, rs -> { + String metricName = rs.getString("metric_name"); + Instant bucket = rs.getTimestamp("bucket").toInstant(); + double value = rs.getDouble("avg_value"); + result.computeIfAbsent(metricName, k -> new ArrayList<>()) + .add(new MetricTimeSeries.Bucket(bucket, value)); + }, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray); + + return result; + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java index 8b8fed63..9d63e638 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java @@ -5,12 +5,10 @@ import com.cameleer3.server.core.storage.model.MetricsSnapshot; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Repository; import java.sql.Timestamp; import java.util.List; -@Repository public class PostgresMetricsStore implements MetricsStore { private static final ObjectMapper MAPPER = new ObjectMapper(); diff --git a/cameleer3-server-app/src/main/resources/application.yml b/cameleer3-server-app/src/main/resources/application.yml index f15d93c9..145ca9be 100644 --- a/cameleer3-server-app/src/main/resources/application.yml +++ b/cameleer3-server-app/src/main/resources/application.yml @@ -48,6 +48,8 @@ opensearch: cameleer: body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384} retention-days: ${CAMELEER_RETENTION_DAYS:30} + storage: + metrics: ${CAMELEER_STORAGE_METRICS:postgres} security: access-token-expiry-ms: 3600000 @@ -66,6 +68,12 @@ springdoc: swagger-ui: path: /api/v1/swagger-ui +clickhouse: + enabled: ${CLICKHOUSE_ENABLED:false} + url: ${CLICKHOUSE_URL:jdbc:clickhouse://localhost:8123/cameleer?async_insert=1&wait_for_async_insert=0} + username: ${CLICKHOUSE_USERNAME:default} + password: ${CLICKHOUSE_PASSWORD:} + management: endpoints: web: diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql b/cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql new file mode 100644 index 00000000..807e882c --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(collected_at)) +ORDER BY (tenant_id, agent_id, metric_name, collected_at) +TTL collected_at + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java index d9c38f83..cf7d8c38 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java @@ -7,6 +7,7 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.clickhouse.ClickHouseContainer; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.DockerImageName; @@ -20,6 +21,7 @@ public abstract class AbstractPostgresIT { static final PostgreSQLContainer postgres; static final OpensearchContainer opensearch; + static final ClickHouseContainer clickhouse; static { postgres = new PostgreSQLContainer<>(TIMESCALEDB_IMAGE) @@ -30,6 +32,9 @@ public abstract class AbstractPostgresIT { opensearch = new OpensearchContainer<>("opensearchproject/opensearch:2.19.0"); opensearch.start(); + + clickhouse = new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + clickhouse.start(); } @Autowired @@ -46,5 +51,9 @@ public abstract class AbstractPostgresIT { registry.add("spring.flyway.user", postgres::getUsername); registry.add("spring.flyway.password", postgres::getPassword); registry.add("opensearch.url", opensearch::getHttpHostAddress); + registry.add("clickhouse.enabled", () -> "true"); + registry.add("clickhouse.url", clickhouse::getJdbcUrl); + registry.add("clickhouse.username", clickhouse::getUsername); + registry.add("clickhouse.password", clickhouse::getPassword); } } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreIT.java new file mode 100644 index 00000000..54a9846d --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreIT.java @@ -0,0 +1,114 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseMetricsQueryStoreIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseMetricsQueryStore queryStore; + + @BeforeEach + void setUp() { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + jdbc.execute(""" + CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) + ) + ENGINE = MergeTree() + ORDER BY (tenant_id, agent_id, metric_name, collected_at) + """); + + jdbc.execute("TRUNCATE TABLE agent_metrics"); + + // Seed test data: 6 data points across 1 hour for two metrics + Instant base = Instant.parse("2026-03-31T10:00:00Z"); + for (int i = 0; i < 6; i++) { + Instant ts = base.plusSeconds(i * 600); // every 10 minutes + jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)", + "agent-1", "cpu.usage", 50.0 + i * 5, java.sql.Timestamp.from(ts)); + jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)", + "agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts)); + } + + queryStore = new ClickHouseMetricsQueryStore(jdbc); + } + + @Test + void queryTimeSeries_returnsDataGroupedByMetric() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage", "memory.free"), from, to, 6); + + assertThat(result).containsKeys("cpu.usage", "memory.free"); + assertThat(result.get("cpu.usage")).isNotEmpty(); + assertThat(result.get("memory.free")).isNotEmpty(); + } + + @Test + void queryTimeSeries_bucketsAverageCorrectly() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + // 1 bucket for the entire hour = average of all 6 values + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 1); + + assertThat(result.get("cpu.usage")).hasSize(1); + // Values: 50, 55, 60, 65, 70, 75 → avg = 62.5 + assertThat(result.get("cpu.usage").get(0).value()).isCloseTo(62.5, org.assertj.core.data.Offset.offset(0.1)); + } + + @Test + void queryTimeSeries_noData_returnsEmptyLists() { + Instant from = Instant.parse("2025-01-01T00:00:00Z"); + Instant to = Instant.parse("2025-01-01T01:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 6); + + assertThat(result.get("cpu.usage")).isEmpty(); + } + + @Test + void queryTimeSeries_unknownAgent_returnsEmpty() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("nonexistent", List.of("cpu.usage"), from, to, 6); + + assertThat(result.get("cpu.usage")).isEmpty(); + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreIT.java new file mode 100644 index 00000000..b87022f9 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreIT.java @@ -0,0 +1,108 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import com.zaxxer.hikari.HikariDataSource; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseMetricsStoreIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseMetricsStore store; + + @BeforeEach + void setUp() { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + jdbc.execute(""" + CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) + ) + ENGINE = MergeTree() + ORDER BY (tenant_id, agent_id, metric_name, collected_at) + """); + + jdbc.execute("TRUNCATE TABLE agent_metrics"); + + store = new ClickHouseMetricsStore(jdbc); + } + + @Test + void insertBatch_writesMetricsToClickHouse() { + List batch = List.of( + new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:00Z"), + "cpu.usage", 75.5, Map.of("host", "server-1")), + new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:01Z"), + "memory.free", 1024.0, null) + ); + + store.insertBatch(batch); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-1'", + Integer.class); + assertThat(count).isEqualTo(2); + } + + @Test + void insertBatch_storesTags() { + store.insertBatch(List.of( + new MetricsSnapshot("agent-2", Instant.parse("2026-03-31T10:00:00Z"), + "disk.used", 500.0, Map.of("mount", "/data", "fs", "ext4")) + )); + + // Just verify we can read back the row with tags + Integer count = jdbc.queryForObject( + "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-2'", + Integer.class); + assertThat(count).isEqualTo(1); + } + + @Test + void insertBatch_emptyList_doesNothing() { + store.insertBatch(List.of()); + + Integer count = jdbc.queryForObject("SELECT count() FROM agent_metrics", Integer.class); + assertThat(count).isEqualTo(0); + } + + @Test + void insertBatch_nullTags_defaultsToEmptyMap() { + store.insertBatch(List.of( + new MetricsSnapshot("agent-3", Instant.parse("2026-03-31T10:00:00Z"), + "cpu.usage", 50.0, null) + )); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-3'", + Integer.class); + assertThat(count).isEqualTo(1); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java new file mode 100644 index 00000000..650e5b3b --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java @@ -0,0 +1,14 @@ +package com.cameleer3.server.core.storage; + +import com.cameleer3.server.core.storage.model.MetricTimeSeries; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +public interface MetricsQueryStore { + + Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets); +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricTimeSeries.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricTimeSeries.java new file mode 100644 index 00000000..6107fa0e --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricTimeSeries.java @@ -0,0 +1,9 @@ +package com.cameleer3.server.core.storage.model; + +import java.time.Instant; +import java.util.List; + +public record MetricTimeSeries(String metricName, List buckets) { + + public record Bucket(Instant time, double value) {} +} diff --git a/deploy/base/server.yaml b/deploy/base/server.yaml index 218c0af4..d0b00b97 100644 --- a/deploy/base/server.yaml +++ b/deploy/base/server.yaml @@ -75,6 +75,12 @@ spec: name: cameleer-auth key: CAMELEER_JWT_SECRET optional: true + - name: CLICKHOUSE_ENABLED + value: "true" + - name: CLICKHOUSE_URL + value: "jdbc:clickhouse://clickhouse.cameleer.svc.cluster.local:8123/cameleer?async_insert=1&wait_for_async_insert=0" + - name: CAMELEER_STORAGE_METRICS + value: "postgres" resources: requests: diff --git a/deploy/clickhouse.yaml b/deploy/clickhouse.yaml new file mode 100644 index 00000000..f9347556 --- /dev/null +++ b/deploy/clickhouse.yaml @@ -0,0 +1,90 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: clickhouse + namespace: cameleer +spec: + serviceName: clickhouse + replicas: 1 + selector: + matchLabels: + app: clickhouse + template: + metadata: + labels: + app: clickhouse + spec: + containers: + - name: clickhouse + image: clickhouse/clickhouse-server:24.12 + ports: + - containerPort: 8123 + name: http + - containerPort: 9000 + name: native + volumeMounts: + - name: data + mountPath: /var/lib/clickhouse + - name: initdb + mountPath: /docker-entrypoint-initdb.d + resources: + requests: + memory: "2Gi" + cpu: "500m" + limits: + memory: "4Gi" + cpu: "2000m" + livenessProbe: + httpGet: + path: /ping + port: 8123 + initialDelaySeconds: 10 + periodSeconds: 10 + timeoutSeconds: 3 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /ping + port: 8123 + initialDelaySeconds: 5 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 3 + volumes: + - name: initdb + configMap: + name: clickhouse-initdb + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 50Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: clickhouse + namespace: cameleer +spec: + clusterIP: None + selector: + app: clickhouse + ports: + - port: 8123 + targetPort: 8123 + name: http + - port: 9000 + targetPort: 9000 + name: native +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: clickhouse-initdb + namespace: cameleer +data: + 01-create-database.sql: | + CREATE DATABASE IF NOT EXISTS cameleer; diff --git a/docs/superpowers/plans/2026-03-31-clickhouse-phase1-foundation.md b/docs/superpowers/plans/2026-03-31-clickhouse-phase1-foundation.md new file mode 100644 index 00000000..6e84d575 --- /dev/null +++ b/docs/superpowers/plans/2026-03-31-clickhouse-phase1-foundation.md @@ -0,0 +1,1193 @@ +# ClickHouse Migration Phase 1: Foundation + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add ClickHouse as a second data store alongside PostgreSQL, validated with the metrics pipeline end-to-end (write + read). Feature-flagged so either PG or CH serves metrics. + +**Architecture:** Add `clickhouse-jdbc` dependency, a second `DataSource`/`JdbcTemplate` bean qualified as `@ClickHouse`, an idempotent schema initializer that creates ClickHouse tables on startup, and a feature flag (`cameleer.storage.metrics=clickhouse|postgres`) that selects the active `MetricsStore` implementation. The metrics read path (`AgentMetricsController`) is refactored behind a new `MetricsQueryStore` interface so it too can be swapped. + +**Tech Stack:** ClickHouse JDBC 0.7.x, Spring Boot 3.4.3, JdbcTemplate, Testcontainers (clickhouse module) + +**Design Spec:** `docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md` + +--- + +## File Map + +| Action | File | Responsibility | +|--------|------|----------------| +| Modify | `cameleer3-server-app/pom.xml` | Add clickhouse-jdbc + testcontainers-clickhouse dependencies | +| Modify | `cameleer3-server-app/src/main/resources/application.yml` | Add `clickhouse.*` and `cameleer.storage.metrics` config keys | +| Create | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java` | ClickHouse DataSource + JdbcTemplate beans | +| Create | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java` | Config properties class for `clickhouse.*` prefix | +| Create | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java` | Runs idempotent DDL on startup | +| Create | `cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql` | DDL for `agent_metrics` table | +| Create | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java` | `MetricsStore` impl writing to ClickHouse | +| Create | `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java` | Interface: query metrics time-series | +| Create | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java` | PG impl of MetricsQueryStore (extracted from controller) | +| Create | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java` | CH impl of MetricsQueryStore | +| Modify | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java` | Use MetricsQueryStore instead of raw JdbcTemplate | +| Modify | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java` | Conditional beans for PG vs CH metrics stores | +| Create | `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java` | Unit test | +| Create | `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java` | Unit test | +| Create | `deploy/clickhouse.yaml` | K8s StatefulSet + Service for ClickHouse | +| Modify | `deploy/base/server.yaml` | Add CLICKHOUSE_URL env var | + +--- + +### Task 1: Add ClickHouse Dependencies + +**Files:** +- Modify: `cameleer3-server-app/pom.xml` + +- [ ] **Step 1: Add clickhouse-jdbc and testcontainers-clickhouse to app POM** + +In `cameleer3-server-app/pom.xml`, add these two dependencies. Place the runtime dependency after the opensearch dependencies (around line 59), and the test dependency after the opensearch-testcontainers dependency (around line 128): + +```xml + + + com.clickhouse + clickhouse-jdbc + 0.7.1-patch5 + all + + + + + org.testcontainers + clickhouse + test + +``` + +Note: The `all` classifier bundles the HTTP client, avoiding transitive dependency conflicts. The testcontainers `clickhouse` module version is managed by the `testcontainers.version` property in the parent POM. + +- [ ] **Step 2: Verify compilation** + +Run: `mvn clean compile -pl cameleer3-server-app -am` +Expected: BUILD SUCCESS + +- [ ] **Step 3: Commit** + +```bash +git add cameleer3-server-app/pom.xml +git commit -m "build: add clickhouse-jdbc and testcontainers-clickhouse dependencies" +``` + +--- + +### Task 2: ClickHouse Configuration + +**Files:** +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java` +- Modify: `cameleer3-server-app/src/main/resources/application.yml` + +- [ ] **Step 1: Create ClickHouseProperties** + +```java +package com.cameleer3.server.app.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "clickhouse") +public class ClickHouseProperties { + + private String url = "jdbc:clickhouse://localhost:8123/cameleer"; + private String username = "default"; + private String password = ""; + + public String getUrl() { return url; } + public void setUrl(String url) { this.url = url; } + + public String getUsername() { return username; } + public void setUsername(String username) { this.username = username; } + + public String getPassword() { return password; } + public void setPassword(String password) { this.password = password; } +} +``` + +- [ ] **Step 2: Create ClickHouseConfig** + +```java +package com.cameleer3.server.app.config; + +import com.zaxxer.hikari.HikariDataSource; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; + +@Configuration +@EnableConfigurationProperties(ClickHouseProperties.class) +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public class ClickHouseConfig { + + @Bean(name = "clickHouseDataSource") + public DataSource clickHouseDataSource(ClickHouseProperties props) { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(props.getUrl()); + ds.setUsername(props.getUsername()); + ds.setPassword(props.getPassword()); + ds.setMaximumPoolSize(10); + ds.setPoolName("clickhouse-pool"); + return ds; + } + + @Bean(name = "clickHouseJdbcTemplate") + public JdbcTemplate clickHouseJdbcTemplate( + @Qualifier("clickHouseDataSource") DataSource ds) { + return new JdbcTemplate(ds); + } +} +``` + +- [ ] **Step 3: Add configuration to application.yml** + +Add at the end of `application.yml`: + +```yaml +clickhouse: + enabled: ${CLICKHOUSE_ENABLED:false} + url: ${CLICKHOUSE_URL:jdbc:clickhouse://localhost:8123/cameleer?async_insert=1&wait_for_async_insert=0} + username: ${CLICKHOUSE_USERNAME:default} + password: ${CLICKHOUSE_PASSWORD:} + +cameleer: + storage: + metrics: ${CAMELEER_STORAGE_METRICS:postgres} +``` + +Note: The existing `cameleer:` block already has `body-size-limit` and `retention-days`. Add the `storage.metrics` key under it. + +- [ ] **Step 4: Verify compilation** + +Run: `mvn clean compile -pl cameleer3-server-app -am` +Expected: BUILD SUCCESS + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java +git add cameleer3-server-app/src/main/resources/application.yml +git commit -m "feat: add ClickHouse DataSource and JdbcTemplate configuration" +``` + +--- + +### Task 3: Schema Initializer + DDL Script + +**Files:** +- Create: `cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java` + +- [ ] **Step 1: Create DDL script** + +File: `cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql` + +```sql +CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(collected_at)) +ORDER BY (tenant_id, agent_id, metric_name, collected_at) +TTL collected_at + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; +``` + +- [ ] **Step 2: Create ClickHouseSchemaInitializer** + +```java +package com.cameleer3.server.app.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Comparator; + +@Component +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public class ClickHouseSchemaInitializer { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseSchemaInitializer.class); + + private final JdbcTemplate clickHouseJdbc; + + public ClickHouseSchemaInitializer( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + this.clickHouseJdbc = clickHouseJdbc; + } + + @EventListener(ApplicationReadyEvent.class) + public void initializeSchema() throws IOException { + PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); + Resource[] scripts = resolver.getResources("classpath:clickhouse/*.sql"); + + Arrays.sort(scripts, Comparator.comparing(Resource::getFilename)); + + for (Resource script : scripts) { + String sql = script.getContentAsString(StandardCharsets.UTF_8); + log.info("Executing ClickHouse schema script: {}", script.getFilename()); + for (String statement : sql.split(";")) { + String trimmed = statement.trim(); + if (!trimmed.isEmpty()) { + clickHouseJdbc.execute(trimmed); + } + } + } + + log.info("ClickHouse schema initialization complete ({} scripts)", scripts.length); + } +} +``` + +- [ ] **Step 3: Verify compilation** + +Run: `mvn clean compile -pl cameleer3-server-app -am` +Expected: BUILD SUCCESS + +- [ ] **Step 4: Commit** + +```bash +git add cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java +git commit -m "feat: add ClickHouse schema initializer with agent_metrics DDL" +``` + +--- + +### Task 4: ClickHouseMetricsStore (TDD) + +**Files:** +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java` + +- [ ] **Step 1: Write the failing test** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import com.zaxxer.hikari.HikariDataSource; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseMetricsStoreTest { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseMetricsStore store; + + @BeforeEach + void setUp() { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + jdbc.execute(""" + CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) + ) + ENGINE = MergeTree() + ORDER BY (tenant_id, agent_id, metric_name, collected_at) + """); + + jdbc.execute("TRUNCATE TABLE agent_metrics"); + + store = new ClickHouseMetricsStore(jdbc); + } + + @Test + void insertBatch_writesMetricsToClickHouse() { + List batch = List.of( + new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:00Z"), + "cpu.usage", 75.5, Map.of("host", "server-1")), + new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:01Z"), + "memory.free", 1024.0, null) + ); + + store.insertBatch(batch); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-1'", + Integer.class); + assertThat(count).isEqualTo(2); + } + + @Test + void insertBatch_storesTags() { + store.insertBatch(List.of( + new MetricsSnapshot("agent-2", Instant.parse("2026-03-31T10:00:00Z"), + "disk.used", 500.0, Map.of("mount", "/data", "fs", "ext4")) + )); + + Map tags = jdbc.queryForObject( + "SELECT tags FROM agent_metrics WHERE agent_id = 'agent-2'", + (rs, n) -> { + // ClickHouse returns Map as a map-like structure via JDBC + @SuppressWarnings("unchecked") + Map m = (Map) rs.getObject("tags"); + return m; + }); + assertThat(tags).containsEntry("mount", "/data").containsEntry("fs", "ext4"); + } + + @Test + void insertBatch_emptyList_doesNothing() { + store.insertBatch(List.of()); + + Integer count = jdbc.queryForObject("SELECT count() FROM agent_metrics", Integer.class); + assertThat(count).isEqualTo(0); + } + + @Test + void insertBatch_nullTags_defaultsToEmptyMap() { + store.insertBatch(List.of( + new MetricsSnapshot("agent-3", Instant.parse("2026-03-31T10:00:00Z"), + "cpu.usage", 50.0, null) + )); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-3'", + Integer.class); + assertThat(count).isEqualTo(1); + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `mvn test -pl cameleer3-server-app -Dtest=ClickHouseMetricsStoreTest -DfailIfNoTests=false` +Expected: FAIL — `ClickHouseMetricsStore` class does not exist + +- [ ] **Step 3: Write ClickHouseMetricsStore** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsStore; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClickHouseMetricsStore implements MetricsStore { + + private final JdbcTemplate jdbc; + + public ClickHouseMetricsStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void insertBatch(List snapshots) { + if (snapshots.isEmpty()) return; + + jdbc.batchUpdate(""" + INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags, collected_at) + VALUES (?, ?, ?, ?, ?) + """, + snapshots.stream().map(s -> new Object[]{ + s.agentId(), + s.metricName(), + s.metricValue(), + tagsToClickHouseMap(s.tags()), + Timestamp.from(s.collectedAt()) + }).toList()); + } + + private Map tagsToClickHouseMap(Map tags) { + if (tags == null || tags.isEmpty()) return new HashMap<>(); + return tags; + } +} +``` + +Note: ClickHouse JDBC driver handles `Map` natively for `Map(String, String)` columns. No JSON serialization needed (unlike the PG implementation that converts to JSONB string). + +- [ ] **Step 4: Run test to verify it passes** + +Run: `mvn test -pl cameleer3-server-app -Dtest=ClickHouseMetricsStoreTest` +Expected: PASS — all 4 tests green + +If the tags test fails due to JDBC Map handling, adjust `tagsToClickHouseMap` to return a `java.util.HashMap` (ClickHouse JDBC requires a mutable map). If needed, wrap: `return new HashMap<>(tags)`. + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java +git commit -m "feat: add ClickHouseMetricsStore with batch insert" +``` + +--- + +### Task 5: MetricsQueryStore Interface + PostgreSQL Implementation (TDD) + +Extract the query logic from `AgentMetricsController` into an interface. + +**Files:** +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java` +- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java` + +- [ ] **Step 1: Create MetricsQueryStore interface** + +```java +package com.cameleer3.server.core.storage; + +import com.cameleer3.server.core.storage.model.MetricTimeSeries; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +public interface MetricsQueryStore { + + /** + * Query time-bucketed metrics for an agent. + * + * @param agentId the agent identifier + * @param metricNames list of metric names to query + * @param from start of time range (inclusive) + * @param to end of time range (exclusive) + * @param buckets number of time buckets to divide the range into + * @return map of metric name to list of (time, value) pairs + */ + Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets); +} +``` + +- [ ] **Step 2: Create MetricTimeSeries.Bucket record** + +```java +package com.cameleer3.server.core.storage.model; + +import java.time.Instant; +import java.util.List; + +public record MetricTimeSeries(String metricName, List buckets) { + + public record Bucket(Instant time, double value) {} +} +``` + +- [ ] **Step 3: Create PostgresMetricsQueryStore (extract from controller)** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.*; + +public class PostgresMetricsQueryStore implements MetricsQueryStore { + + private final JdbcTemplate jdbc; + + public PostgresMetricsQueryStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets) { + + long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1); + String intervalStr = intervalMs + " milliseconds"; + + Map> result = new LinkedHashMap<>(); + for (String name : metricNames) { + result.put(name.trim(), new ArrayList<>()); + } + + String sql = """ + SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket, + metric_name, + AVG(metric_value) AS avg_value + FROM agent_metrics + WHERE agent_id = ? + AND collected_at >= ? AND collected_at < ? + AND metric_name = ANY(?) + GROUP BY bucket, metric_name + ORDER BY bucket + """; + + String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); + jdbc.query(sql, rs -> { + String metricName = rs.getString("metric_name"); + Instant bucket = rs.getTimestamp("bucket").toInstant(); + double value = rs.getDouble("avg_value"); + result.computeIfAbsent(metricName, k -> new ArrayList<>()) + .add(new MetricTimeSeries.Bucket(bucket, value)); + }, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray); + + return result; + } +} +``` + +- [ ] **Step 4: Refactor AgentMetricsController to use MetricsQueryStore** + +Replace the entire `AgentMetricsController.java` content: + +```java +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.dto.AgentMetricsResponse; +import com.cameleer3.server.app.dto.MetricBucket; +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import org.springframework.web.bind.annotation.*; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.stream.Collectors; + +@RestController +@RequestMapping("/api/v1/agents/{agentId}/metrics") +public class AgentMetricsController { + + private final MetricsQueryStore metricsQueryStore; + + public AgentMetricsController(MetricsQueryStore metricsQueryStore) { + this.metricsQueryStore = metricsQueryStore; + } + + @GetMapping + public AgentMetricsResponse getMetrics( + @PathVariable String agentId, + @RequestParam String names, + @RequestParam(required = false) Instant from, + @RequestParam(required = false) Instant to, + @RequestParam(defaultValue = "60") int buckets) { + + if (from == null) from = Instant.now().minus(1, ChronoUnit.HOURS); + if (to == null) to = Instant.now(); + + List metricNames = Arrays.asList(names.split(",")); + + Map> raw = + metricsQueryStore.queryTimeSeries(agentId, metricNames, from, to, buckets); + + // Convert to existing DTO format + Map> result = raw.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().stream() + .map(b -> new MetricBucket(b.time(), b.value())) + .toList(), + (a, b) -> a, + LinkedHashMap::new)); + + return new AgentMetricsResponse(result); + } +} +``` + +- [ ] **Step 5: Add PostgresMetricsQueryStore bean to StorageBeanConfig** + +The refactored `AgentMetricsController` needs a `MetricsQueryStore` bean. Add this to `StorageBeanConfig.java`: + +```java +import com.cameleer3.server.app.storage.PostgresMetricsQueryStore; +import com.cameleer3.server.core.storage.MetricsQueryStore; + +// Add this bean method: +@Bean +public MetricsQueryStore metricsQueryStore(JdbcTemplate jdbc) { + return new PostgresMetricsQueryStore(jdbc); +} +``` + +This creates the PG implementation as the default. Task 7 will replace this with conditional beans for PG vs CH. + +- [ ] **Step 6: Verify compilation** + +Run: `mvn clean compile -pl cameleer3-server-app -am` +Expected: BUILD SUCCESS + +- [ ] **Step 7: Commit** + +```bash +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricTimeSeries.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +git commit -m "refactor: extract MetricsQueryStore interface from AgentMetricsController" +``` + +--- + +### Task 6: ClickHouseMetricsQueryStore (TDD) + +**Files:** +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java` + +- [ ] **Step 1: Write the failing test** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseMetricsQueryStoreTest { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseMetricsQueryStore queryStore; + + @BeforeEach + void setUp() { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + jdbc.execute(""" + CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) + ) + ENGINE = MergeTree() + ORDER BY (tenant_id, agent_id, metric_name, collected_at) + """); + + jdbc.execute("TRUNCATE TABLE agent_metrics"); + + // Seed test data: 6 data points across 1 hour for two metrics + Instant base = Instant.parse("2026-03-31T10:00:00Z"); + for (int i = 0; i < 6; i++) { + Instant ts = base.plusSeconds(i * 600); // every 10 minutes + jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)", + "agent-1", "cpu.usage", 50.0 + i * 5, java.sql.Timestamp.from(ts)); + jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)", + "agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts)); + } + + queryStore = new ClickHouseMetricsQueryStore(jdbc); + } + + @Test + void queryTimeSeries_returnsDataGroupedByMetric() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage", "memory.free"), from, to, 6); + + assertThat(result).containsKeys("cpu.usage", "memory.free"); + assertThat(result.get("cpu.usage")).isNotEmpty(); + assertThat(result.get("memory.free")).isNotEmpty(); + } + + @Test + void queryTimeSeries_bucketsAverageCorrectly() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + // 1 bucket for the entire hour = average of all 6 values + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 1); + + assertThat(result.get("cpu.usage")).hasSize(1); + // Values: 50, 55, 60, 65, 70, 75 → avg = 62.5 + assertThat(result.get("cpu.usage").get(0).value()).isCloseTo(62.5, org.assertj.core.data.Offset.offset(0.1)); + } + + @Test + void queryTimeSeries_noData_returnsEmptyLists() { + Instant from = Instant.parse("2025-01-01T00:00:00Z"); + Instant to = Instant.parse("2025-01-01T01:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 6); + + assertThat(result.get("cpu.usage")).isEmpty(); + } + + @Test + void queryTimeSeries_unknownAgent_returnsEmpty() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("nonexistent", List.of("cpu.usage"), from, to, 6); + + assertThat(result.get("cpu.usage")).isEmpty(); + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `mvn test -pl cameleer3-server-app -Dtest=ClickHouseMetricsQueryStoreTest -DfailIfNoTests=false` +Expected: FAIL — `ClickHouseMetricsQueryStore` class does not exist + +- [ ] **Step 3: Write ClickHouseMetricsQueryStore** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.time.Instant; +import java.util.*; + +public class ClickHouseMetricsQueryStore implements MetricsQueryStore { + + private final JdbcTemplate jdbc; + + public ClickHouseMetricsQueryStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets) { + + long intervalSeconds = Math.max(60, + (to.getEpochSecond() - from.getEpochSecond()) / Math.max(buckets, 1)); + + Map> result = new LinkedHashMap<>(); + for (String name : metricNames) { + result.put(name.trim(), new ArrayList<>()); + } + + String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); + + String sql = """ + SELECT toStartOfInterval(collected_at, INTERVAL ? SECOND) AS bucket, + metric_name, + avg(metric_value) AS avg_value + FROM agent_metrics + WHERE agent_id = ? + AND collected_at >= ? + AND collected_at < ? + AND metric_name IN (?) + GROUP BY bucket, metric_name + ORDER BY bucket + """; + + // ClickHouse JDBC doesn't support array params with IN (?). + // Build the IN clause manually with safe values (metric names are validated by controller). + StringBuilder inClause = new StringBuilder(); + for (int i = 0; i < namesArray.length; i++) { + if (i > 0) inClause.append(", "); + inClause.append("'").append(namesArray[i].replace("'", "\\'")).append("'"); + } + + String finalSql = """ + SELECT toStartOfInterval(collected_at, INTERVAL %d SECOND) AS bucket, + metric_name, + avg(metric_value) AS avg_value + FROM agent_metrics + WHERE agent_id = ? + AND collected_at >= ? + AND collected_at < ? + AND metric_name IN (%s) + GROUP BY bucket, metric_name + ORDER BY bucket + """.formatted(intervalSeconds, inClause); + + jdbc.query(finalSql, rs -> { + String metricName = rs.getString("metric_name"); + Instant bucket = rs.getTimestamp("bucket").toInstant(); + double value = rs.getDouble("avg_value"); + result.computeIfAbsent(metricName, k -> new ArrayList<>()) + .add(new MetricTimeSeries.Bucket(bucket, value)); + }, agentId, + java.sql.Timestamp.from(from), + java.sql.Timestamp.from(to)); + + return result; + } +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `mvn test -pl cameleer3-server-app -Dtest=ClickHouseMetricsQueryStoreTest` +Expected: PASS — all 4 tests green + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java +git commit -m "feat: add ClickHouseMetricsQueryStore with time-bucketed queries" +``` + +--- + +### Task 7: Wire Feature Flag in StorageBeanConfig + +**Files:** +- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java` + +- [ ] **Step 1: Replace simple beans with conditional MetricsStore and MetricsQueryStore beans** + +In `StorageBeanConfig.java`, **remove** the unconditional `metricsQueryStore` bean added in Task 5. Replace with these conditional bean definitions: + +```java +import com.cameleer3.server.app.storage.ClickHouseMetricsStore; +import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore; +import com.cameleer3.server.app.storage.PostgresMetricsQueryStore; +import com.cameleer3.server.core.storage.MetricsQueryStore; + +// ... existing beans ... + +@Bean +@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse") +public MetricsStore clickHouseMetricsStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseMetricsStore(clickHouseJdbc); +} + +@Bean +@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true) +public MetricsStore postgresMetricsStore(JdbcTemplate jdbc) { + return new PostgresMetricsStore(jdbc); +} + +@Bean +@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse") +public MetricsQueryStore clickHouseMetricsQueryStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseMetricsQueryStore(clickHouseJdbc); +} + +@Bean +@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true) +public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) { + return new PostgresMetricsQueryStore(jdbc); +} +``` + +Also remove the `@Repository` annotation from `PostgresMetricsStore.java` since the bean is now created explicitly in config. If `PostgresMetricsStore` has `@Repository`, remove it. Check the class — it does have `@Repository`, so remove it. + +Add the necessary imports to `StorageBeanConfig.java`: +```java +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.beans.factory.annotation.Qualifier; +import com.cameleer3.server.core.storage.MetricsStore; +``` + +- [ ] **Step 2: Remove @Repository from PostgresMetricsStore** + +In `PostgresMetricsStore.java`, remove the `@Repository` annotation from the class declaration. The bean is now created in `StorageBeanConfig`. + +- [ ] **Step 3: Verify compilation** + +Run: `mvn clean compile -pl cameleer3-server-app -am` +Expected: BUILD SUCCESS + +- [ ] **Step 4: Run existing tests to verify no regression** + +Run: `mvn test -pl cameleer3-server-app` +Expected: All existing tests pass. The default config (`cameleer.storage.metrics=postgres`) means the PG beans are created, ClickHouse beans are not (ClickHouse DataSource not needed). + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java +git commit -m "feat: wire MetricsStore and MetricsQueryStore with feature flag" +``` + +--- + +### Task 8: ClickHouse Kubernetes Deployment + +**Files:** +- Create: `deploy/clickhouse.yaml` +- Modify: `deploy/base/server.yaml` + +- [ ] **Step 1: Create ClickHouse StatefulSet + Service manifest** + +File: `deploy/clickhouse.yaml` + +```yaml +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: clickhouse + namespace: cameleer +spec: + serviceName: clickhouse + replicas: 1 + selector: + matchLabels: + app: clickhouse + template: + metadata: + labels: + app: clickhouse + spec: + containers: + - name: clickhouse + image: clickhouse/clickhouse-server:24.12 + ports: + - containerPort: 8123 + name: http + - containerPort: 9000 + name: native + volumeMounts: + - name: data + mountPath: /var/lib/clickhouse + resources: + requests: + memory: "2Gi" + cpu: "500m" + limits: + memory: "4Gi" + cpu: "2000m" + livenessProbe: + httpGet: + path: /ping + port: 8123 + initialDelaySeconds: 10 + periodSeconds: 10 + timeoutSeconds: 3 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /ping + port: 8123 + initialDelaySeconds: 5 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 3 + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 50Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: clickhouse + namespace: cameleer +spec: + clusterIP: None + selector: + app: clickhouse + ports: + - port: 8123 + targetPort: 8123 + name: http + - port: 9000 + targetPort: 9000 + name: native +``` + +- [ ] **Step 2: Add ClickHouse env vars to server.yaml** + +Add these env vars to the `cameleer3-server` container in `deploy/base/server.yaml`, after the existing env vars (before `resources:`): + +```yaml + - name: CLICKHOUSE_ENABLED + value: "true" + - name: CLICKHOUSE_URL + value: "jdbc:clickhouse://clickhouse.cameleer.svc.cluster.local:8123/cameleer?async_insert=1&wait_for_async_insert=0" + - name: CAMELEER_STORAGE_METRICS + value: "postgres" +``` + +Note: `CAMELEER_STORAGE_METRICS` defaults to `postgres`. Set to `clickhouse` when ready to switch. + +- [ ] **Step 3: Commit** + +```bash +git add deploy/clickhouse.yaml +git add deploy/base/server.yaml +git commit -m "deploy: add ClickHouse StatefulSet and server env vars" +``` + +--- + +### Task 9: Integration Smoke Test + +**Files:** +- Modify: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java` (add ClickHouse container) + +- [ ] **Step 1: Update AbstractPostgresIT to optionally start ClickHouse** + +This is a non-breaking addition. The existing integration tests continue to use PG. A future ClickHouse-specific IT can extend this base. + +Add the ClickHouse container to `AbstractPostgresIT`: + +```java +import org.testcontainers.clickhouse.ClickHouseContainer; + +// Add after opensearch static field: +static final ClickHouseContainer clickhouse; + +// In the static initializer, add after opensearch.start(): +clickhouse = new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); +clickhouse.start(); + +// In configureProperties, add: +registry.add("clickhouse.enabled", () -> "true"); +registry.add("clickhouse.url", clickhouse::getJdbcUrl); +registry.add("clickhouse.username", clickhouse::getUsername); +registry.add("clickhouse.password", clickhouse::getPassword); +``` + +- [ ] **Step 2: Run all integration tests** + +Run: `mvn verify -pl cameleer3-server-app` +Expected: All ITs pass. ClickHouse schema initializer runs and creates the `agent_metrics` table. The `MetricsControllerIT` still uses PG (default storage flag is `postgres`). + +- [ ] **Step 3: Commit** + +```bash +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java +git commit -m "test: add ClickHouse testcontainer to integration test base" +``` + +--- + +### Task 10: Final Verification + +- [ ] **Step 1: Run full build** + +Run: `mvn clean verify` +Expected: BUILD SUCCESS (unit tests + integration tests all pass) + +- [ ] **Step 2: Verify ClickHouse switch works locally (manual)** + +Start ClickHouse locally: +```bash +docker run -d --name clickhouse-test -p 8123:8123 clickhouse/clickhouse-server:24.12 +``` + +Create the database: +```bash +curl "http://localhost:8123/" --data "CREATE DATABASE IF NOT EXISTS cameleer" +``` + +Start the server with ClickHouse enabled: +```bash +CLICKHOUSE_ENABLED=true CAMELEER_STORAGE_METRICS=clickhouse java -jar cameleer3-server-app/target/cameleer3-server-app-1.0-SNAPSHOT.jar +``` + +Verify schema initialization in the logs: `Executing ClickHouse schema script: V1__agent_metrics.sql` + +POST a metric: +```bash +curl -X POST http://localhost:8081/api/v1/data/metrics \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer " \ + -d '[{"agentId":"test","collectedAt":"2026-03-31T10:00:00Z","metricName":"cpu","metricValue":75.0,"tags":{}}]' +``` + +Verify data in ClickHouse: +```bash +curl "http://localhost:8123/" --data "SELECT * FROM cameleer.agent_metrics FORMAT Pretty" +``` + +Stop and clean up: +```bash +docker stop clickhouse-test && docker rm clickhouse-test +``` + +- [ ] **Step 3: Final commit (if any fixes were needed)** + +```bash +git add -A +git commit -m "fix: address issues found during manual verification" +``` diff --git a/docs/superpowers/specs/2026-03-31-append-only-execution-protocol.md b/docs/superpowers/specs/2026-03-31-append-only-execution-protocol.md new file mode 100644 index 00000000..431f1c00 --- /dev/null +++ b/docs/superpowers/specs/2026-03-31-append-only-execution-protocol.md @@ -0,0 +1,146 @@ +# Append-Only Execution Data Protocol + +A reference document for redesigning the Cameleer agent's data reporting to be append-only, +eliminating the need for upserts in the storage layer. + +## Problem + +The current protocol sends execution data in two phases: + +1. **RUNNING phase**: Agent sends a partial record when a route starts executing (execution_id, route_id, start_time, status=RUNNING). No bodies, no duration, no error info. +2. **COMPLETED/FAILED phase**: Agent sends an enriched record when execution finishes (duration, output body, headers, errors, processor tree). + +The server uses `INSERT ... ON CONFLICT DO UPDATE SET COALESCE(...)` to merge these into a single row. This works in PostgreSQL but creates problems for append-only stores like ClickHouse, Kafka topics, or any event-sourced architecture. + +### Why This Matters + +- **ClickHouse**: No native upsert. Must use ReplacingMergeTree (eventual consistency, FINAL overhead) or application-side buffering. +- **Event streaming**: Kafka/Pulsar topics are append-only. Two-phase lifecycle requires a stateful stream processor to merge. +- **Data lakes**: Parquet files are immutable. Updates require read-modify-write of entire files. +- **Materialized views**: Insert-triggered aggregations (ClickHouse MVs, Kafka Streams, Flink) double-count if they see both RUNNING and COMPLETED inserts for the same execution. + +## Proposed Protocol Change + +### Option A: Single-Phase Reporting (Recommended) + +The agent buffers the execution locally and sends a **single, complete record** only when the execution reaches a terminal state (COMPLETED or FAILED). + +``` +Current: Agent -> [RUNNING] -> Server -> [COMPLETED] -> Server (upsert) +Proposed: Agent -> [buffer locally] -> [COMPLETED with all fields] -> Server (append) +``` + +**What changes in the agent:** +- `RouteExecutionTracker` holds in-flight executions in a local `ConcurrentHashMap` +- On route start: create tracker entry with start_time, route_id, etc. +- On route complete: enrich tracker entry with duration, bodies, errors, processor tree +- On report: send the complete record in one HTTP POST +- On timeout (configurable, e.g., 5 minutes): flush as RUNNING (for visibility of stuck routes) + +**What changes in the server:** +- Storage becomes pure append: `INSERT INTO executions VALUES (...)` — no upsert, no COALESCE +- No `SearchIndexer` / `ExecutionAccumulator` needed — the server just writes what it receives +- Materialized views count correctly (one insert = one execution) +- Works with any append-only store (ClickHouse, Kafka, S3/Parquet) + +**Trade-offs:** +- RUNNING executions are not visible on the server until they complete (or timeout-flush) +- "Active execution count" must come from agent heartbeat/registry data, not from stored RUNNING rows +- If the agent crashes, in-flight executions are lost (same as current behavior — RUNNING rows become orphans anyway) + +### Option B: Event Log with Reconstruction + +Send both phases as separate **events** (not records), and let the server reconstruct the current state. + +``` +Event 1: {type: "EXECUTION_STARTED", executionId: "abc", startTime: ..., routeId: ...} +Event 2: {type: "EXECUTION_COMPLETED", executionId: "abc", duration: 250, outputBody: ..., processors: [...]} +``` + +**Server-side:** +- Store raw events in an append-only log table +- Reconstruct current state via `SELECT argMax(field, event_time) FROM events WHERE execution_id = ? GROUP BY execution_id` +- Or: use a materialized view with `AggregatingMergeTree` + `argMaxState` to maintain a "latest state" table + +**Trade-offs:** +- More complex server-side reconstruction +- Higher storage (two rows per execution instead of one) +- More flexible: supports any number of state transitions (RUNNING -> PAUSED -> RUNNING -> COMPLETED) +- Natural fit for event sourcing architectures + +### Option C: Hybrid (Current Cameleer3-Server Approach) + +Keep the two-phase protocol but handle merging at the server application layer. This is what cameleer3-server implements today with the `ExecutionAccumulator`: + +- RUNNING POST -> hold in `ConcurrentHashMap` (no DB write) +- COMPLETED POST -> merge with RUNNING in-memory -> single INSERT to DB +- Timeout sweep -> flush stale RUNNING entries for visibility + +**Trade-offs:** +- No agent changes required +- Server must be stateful (in-memory accumulator) +- Crash window: active executions lost if server restarts +- Adds complexity to the server that wouldn't exist with Option A + +## Recommendation + +**Option A (single-phase reporting)** is the strongest choice for a new protocol version: + +1. **Simplest server implementation**: Pure append, no state, no merging +2. **Works everywhere**: ClickHouse, Kafka, S3, any append-only store +3. **Correct by construction**: MVs, aggregations, and stream processing all see one event per execution +4. **Agent is the natural place to buffer**: The agent already tracks in-flight executions for instrumentation — it just needs to hold the report until completion +5. **Minimal data loss risk**: Agent crash loses in-flight data regardless of protocol — this doesn't make it worse + +### Migration Strategy + +1. Add `protocol_version` field to agent registration +2. v1 agents: server uses `ExecutionAccumulator` (current behavior) +3. v2 agents: server does pure append (no accumulator needed for v2 data) +4. Both can coexist — the server checks protocol version per agent + +### Fields for Single-Phase Record + +The complete record sent by a v2 agent: + +```json +{ + "executionId": "uuid", + "routeId": "myRoute", + "agentId": "agent-1", + "applicationName": "my-app", + "correlationId": "corr-123", + "exchangeId": "exchange-456", + "status": "COMPLETED", + "startTime": "2026-03-31T10:00:00.000Z", + "endTime": "2026-03-31T10:00:00.250Z", + "durationMs": 250, + "errorMessage": null, + "errorStackTrace": null, + "errorType": null, + "errorCategory": null, + "rootCauseType": null, + "rootCauseMessage": null, + "inputSnapshot": {"body": "...", "headers": {"Content-Type": "application/json"}}, + "outputSnapshot": {"body": "...", "headers": {"Content-Type": "application/xml"}}, + "attributes": {"key": "value"}, + "traceId": "otel-trace-id", + "spanId": "otel-span-id", + "replayExchangeId": null, + "processors": [ + { + "processorId": "proc-1", + "processorType": "to", + "status": "COMPLETED", + "startTime": "...", + "endTime": "...", + "durationMs": 120, + "inputBody": "...", + "outputBody": "...", + "children": [] + } + ] +} +``` + +All fields populated. No second POST needed. Server does a single INSERT. diff --git a/docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md b/docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md new file mode 100644 index 00000000..6fb29e8d --- /dev/null +++ b/docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md @@ -0,0 +1,916 @@ +# ClickHouse Migration Design + +Replace PostgreSQL/TimescaleDB + OpenSearch with ClickHouse OSS for all observability data. +PostgreSQL retained only for RBAC, config, and audit log. + +## Context + +Cameleer3-server currently uses three storage systems: + +- **PostgreSQL/TimescaleDB**: executions, processor_executions, agent_metrics (hypertables), agent_events, route_diagrams, plus RBAC/config/audit tables. Continuous aggregates for dashboard statistics. +- **OpenSearch**: executions-YYYY-MM-DD indices (full-text search on bodies/headers/errors), logs-YYYY-MM-DD indices (application log storage with 7-day retention). +- **Dual-write pattern**: PG is source of truth, OpenSearch is async-indexed via debounced `SearchIndexer`. + +This architecture has scaling limits: three systems to operate, data duplication between PG and OpenSearch, TimescaleDB continuous aggregates with limited flexibility, and no multitenancy support. + +**Goal**: Consolidate to ClickHouse OSS (self-hosted) for all observability data. Add multitenancy with custom per-tenant, per-document-type retention. Support billions of documents, terabytes of data, sub-second wildcard search. + +## Decisions + +| Decision | Choice | Rationale | +|----------|--------|-----------| +| Deployment | Self-hosted ClickHouse OSS on k3s | All needed features available in OSS. Fits existing infra. | +| Execution lifecycle | Approach B: Application-side accumulator | Merges RUNNING+COMPLETED in-memory, writes one row. Avoids upsert problem. | +| Table engine (executions) | ReplacingMergeTree | Handles rare late corrections via version column. Normal flow writes once. | +| Table engine (all others) | MergeTree | Append-only data, no dedup needed. | +| Client | JDBC + JdbcTemplate | Familiar pattern, matches current PG code. Async inserts via JDBC URL settings. | +| Multitenancy | Shared tables + tenant_id column | Row policies for defense-in-depth. Application-layer WHERE for primary enforcement. | +| Retention | Application-driven scheduler | Per-tenant, per-document-type. Config in PG, execution via ALTER TABLE DELETE. | +| Search | Ngram bloom filter indexes | Sub-second wildcard search. Materialized `_search_text` column for cross-field search. | +| Highlighting | Application-side in Java | Extract 120-char fragment around match from returned fields. | +| Storage tiering | Local SSD only (initially) | S3/MinIO tiering can be added later via TTL MOVE rules. | + +## ClickHouse OSS Constraints + +These are features NOT available in the open-source version: + +| Constraint | Impact on Cameleer3 | +|------------|---------------------| +| No SharedMergeTree | No elastic compute scaling; must size nodes up-front. Acceptable for self-hosted. | +| No BM25 relevance scoring | Search returns matches without ranking. Acceptable for observability (want all matches, not ranked). | +| No search highlighting | Replaced by application-side highlighting in Java. | +| No fuzzy/typo-tolerant search | Must match exact tokens or use ngram index for substring match. Acceptable. | +| No ClickPipes | Must build own ingestion pipeline. Already exists (agents push via HTTP POST). | +| No managed backups | Must configure `clickhouse-backup` (Altinity, open-source) or built-in BACKUP SQL. | +| No auto-scaling | Manual capacity planning. Single node handles 14+ TiB, sufficient for initial scale. | + +General ClickHouse constraints (apply to both OSS and Cloud): + +| Constraint | Mitigation | +|------------|------------| +| ORDER BY is immutable | Careful upfront schema design. Documented below. | +| No transactions | Single-table INSERT atomic per block. No cross-table atomicity needed. | +| Mutations are expensive | Avoid ALTER UPDATE/DELETE. Use ReplacingMergeTree for corrections, append-only for everything else. | +| Row policies skip mutations | Application-layer WHERE on mutations. Mutations are rare (retention scheduler only). | +| No JPA/Hibernate | Use JdbcTemplate (already the pattern for PG). | +| JSON max_dynamic_paths | Store attributes as flattened String, not JSON type. Use ngram index for search. | +| Text indexes can't index JSON subcolumns | Extract searchable text into materialized String columns. | +| MVs only process new inserts | Historical data backfill writes through MV pipeline. | +| MV errors block source inserts | Careful MV design. Test thoroughly before production. | +| ReplacingMergeTree eventual consistency | Use FINAL on queries that need latest version. | + +## What Stays in PostgreSQL + +| Table | Reason | +|-------|--------| +| `users`, `roles`, `groups`, `user_groups`, `user_roles`, `group_roles` | RBAC with relational joins, foreign keys, transactions | +| `server_config` | Global config, low volume, needs transactions | +| `application_config` | Per-app observability settings | +| `app_settings` | Per-app SLA thresholds | +| `audit_log` | Security compliance, needs transactions, joins with RBAC tables | +| OIDC config | Auth provider config | +| `tenant_retention_config` (new) | Per-tenant retention settings, referenced by scheduler | + +## What Moves to ClickHouse + +| Data | Current Location | ClickHouse Table | +|------|-----------------|------------------| +| Route executions | PG `executions` hypertable + OpenSearch `executions-*` | `executions` | +| Processor executions | PG `processor_executions` hypertable | `processor_executions` | +| Agent metrics | PG `agent_metrics` hypertable | `agent_metrics` | +| Agent events | PG `agent_events` | `agent_events` | +| Route diagrams | PG `route_diagrams` | `route_diagrams` | +| Application logs | OpenSearch `logs-*` | `logs` | +| Dashboard statistics | PG continuous aggregates (`stats_1m_*`) | ClickHouse materialized views (`stats_1m_*`) | + +## Table Schemas + +### executions + +```sql +CREATE TABLE executions ( + tenant_id LowCardinality(String), + execution_id String, + start_time DateTime64(3), + _version UInt64 DEFAULT 1, + route_id LowCardinality(String), + agent_id LowCardinality(String), + application_name LowCardinality(String), + status LowCardinality(String), + correlation_id String DEFAULT '', + exchange_id String DEFAULT '', + end_time Nullable(DateTime64(3)), + duration_ms Nullable(Int64), + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + error_type LowCardinality(String) DEFAULT '', + error_category LowCardinality(String) DEFAULT '', + root_cause_type String DEFAULT '', + root_cause_message String DEFAULT '', + diagram_content_hash String DEFAULT '', + engine_level LowCardinality(String) DEFAULT '', + input_body String DEFAULT '', + output_body String DEFAULT '', + input_headers String DEFAULT '', + output_headers String DEFAULT '', + attributes String DEFAULT '', + trace_id String DEFAULT '', + span_id String DEFAULT '', + processors_json String DEFAULT '', + has_trace_data Bool DEFAULT false, + is_replay Bool DEFAULT false, + + _search_text String MATERIALIZED + concat(error_message, ' ', error_stacktrace, ' ', attributes, + ' ', input_body, ' ', output_body, ' ', input_headers, + ' ', output_headers, ' ', root_cause_message), + + INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_status status TYPE set(10) GRANULARITY 1, + INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4 +) +ENGINE = ReplacingMergeTree(_version) +PARTITION BY (tenant_id, toYYYYMM(start_time)) +ORDER BY (tenant_id, start_time, application_name, route_id, execution_id) +TTL start_time + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; +``` + +Design rationale: +- **ORDER BY** `(tenant_id, start_time, application_name, route_id, execution_id)`: Matches UI query pattern (tenant -> time range -> app -> route). Time before application because observability queries almost always include a time range. +- **PARTITION BY** `(tenant_id, toYYYYMM(start_time))`: Enables per-tenant partition drops for retention. Monthly granularity balances partition count vs drop efficiency. +- **ReplacingMergeTree(_version)**: Normal flow writes once (version 1). Late corrections write version 2+. Background merges keep latest version. +- **`_search_text` materialized column**: Computed at insert time. Concatenates all searchable fields for cross-field wildcard search. +- **`ngrambf_v1(3, 256, 2, 0)`**: 3-char ngrams in a 256-byte bloom filter with 2 hash functions. Prunes most granules for `LIKE '%term%'` queries. The bloom filter size (256 bytes) is a starting point — increase to 4096-8192 if false positive rates are too high for long text fields. Tune after benchmarking with real data. +- **`LowCardinality(String)`**: Dictionary encoding for columns with few distinct values. Major compression improvement. +- **TTL 365 days**: Safety net. Application-driven scheduler handles per-tenant retention at finer granularity. + +### processor_executions + +```sql +CREATE TABLE processor_executions ( + tenant_id LowCardinality(String), + execution_id String, + processor_id String, + start_time DateTime64(3), + route_id LowCardinality(String), + application_name LowCardinality(String), + processor_type LowCardinality(String), + parent_processor_id String DEFAULT '', + depth UInt16 DEFAULT 0, + status LowCardinality(String), + end_time Nullable(DateTime64(3)), + duration_ms Nullable(Int64), + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + error_type LowCardinality(String) DEFAULT '', + error_category LowCardinality(String) DEFAULT '', + root_cause_type String DEFAULT '', + root_cause_message String DEFAULT '', + input_body String DEFAULT '', + output_body String DEFAULT '', + input_headers String DEFAULT '', + output_headers String DEFAULT '', + attributes String DEFAULT '', + loop_index Nullable(Int32), + loop_size Nullable(Int32), + split_index Nullable(Int32), + split_size Nullable(Int32), + multicast_index Nullable(Int32), + resolved_endpoint_uri String DEFAULT '', + error_handler_type LowCardinality(String) DEFAULT '', + circuit_breaker_state LowCardinality(String) DEFAULT '', + fallback_triggered Bool DEFAULT false, + + _search_text String MATERIALIZED + concat(error_message, ' ', error_stacktrace, ' ', attributes, + ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers), + + INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4 +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(start_time)) +ORDER BY (tenant_id, start_time, application_name, route_id, execution_id, processor_id) +TTL start_time + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; +``` + +### logs + +```sql +CREATE TABLE logs ( + tenant_id LowCardinality(String), + timestamp DateTime64(3), + application LowCardinality(String), + agent_id LowCardinality(String), + level LowCardinality(String), + logger_name LowCardinality(String) DEFAULT '', + message String, + thread_name LowCardinality(String) DEFAULT '', + stack_trace String DEFAULT '', + exchange_id String DEFAULT '', + mdc Map(String, String) DEFAULT map(), + + INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_level level TYPE set(10) GRANULARITY 1 +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(timestamp)) +ORDER BY (tenant_id, application, timestamp) +TTL timestamp + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; +``` + +### agent_metrics + +```sql +CREATE TABLE agent_metrics ( + tenant_id LowCardinality(String), + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(collected_at)) +ORDER BY (tenant_id, agent_id, metric_name, collected_at) +TTL collected_at + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; +``` + +### agent_events + +```sql +CREATE TABLE agent_events ( + tenant_id LowCardinality(String), + timestamp DateTime64(3) DEFAULT now64(3), + agent_id LowCardinality(String), + app_id LowCardinality(String), + event_type LowCardinality(String), + detail String DEFAULT '' +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(timestamp)) +ORDER BY (tenant_id, app_id, agent_id, timestamp) +TTL timestamp + INTERVAL 365 DAY DELETE; +``` + +### route_diagrams + +```sql +CREATE TABLE route_diagrams ( + tenant_id LowCardinality(String), + content_hash String, + route_id LowCardinality(String), + agent_id LowCardinality(String), + application_name LowCardinality(String), + definition String, + created_at DateTime64(3) DEFAULT now64(3) +) +ENGINE = ReplacingMergeTree(created_at) +ORDER BY (tenant_id, content_hash) +SETTINGS index_granularity = 8192; +``` + +## Materialized Views (Stats) + +Replace TimescaleDB continuous aggregates. ClickHouse MVs trigger on INSERT and store aggregate states in target tables. + +### stats_1m_all (global) + +```sql +CREATE TABLE stats_1m_all ( + tenant_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW stats_1m_all_mv TO stats_1m_all AS +SELECT + tenant_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, bucket; +``` + +### stats_1m_app (per-application) + +```sql +CREATE TABLE stats_1m_app ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW stats_1m_app_mv TO stats_1m_app AS +SELECT + tenant_id, + application_name, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_name, bucket; +``` + +### stats_1m_route (per-route) + +```sql +CREATE TABLE stats_1m_route ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + route_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, route_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW stats_1m_route_mv TO stats_1m_route AS +SELECT + tenant_id, + application_name, + route_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_name, route_id, bucket; +``` + +### stats_1m_processor (per-processor-type) + +```sql +CREATE TABLE stats_1m_processor ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + processor_type LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, processor_type, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW stats_1m_processor_mv TO stats_1m_processor AS +SELECT + tenant_id, + application_name, + processor_type, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_name, processor_type, bucket; +``` + +### stats_1m_processor_detail (per-processor-id) + +```sql +CREATE TABLE stats_1m_processor_detail ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + route_id LowCardinality(String), + processor_id String, + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, route_id, processor_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW stats_1m_processor_detail_mv TO stats_1m_processor_detail AS +SELECT + tenant_id, + application_name, + route_id, + processor_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_name, route_id, processor_id, bucket; +``` + +## Ingestion Pipeline + +### Current Flow (replaced) + +``` +Agent POST -> IngestionService -> PostgresExecutionStore.upsert() -> PG + -> SearchIndexer (debounced 2s) -> reads from PG -> OpenSearch +``` + +### New Flow + +``` +Agent POST -> IngestionService -> ExecutionAccumulator + |-- RUNNING: ConcurrentHashMap (no DB write) + |-- COMPLETED/FAILED: merge with pending -> WriteBuffer + '-- Timeout sweep (60s): flush stale -> WriteBuffer + | + ClickHouseExecutionStore.insertBatch() + ClickHouseProcessorStore.insertBatch() +``` + +### ExecutionAccumulator + +New component replacing `SearchIndexer`. Core responsibilities: + +1. **On RUNNING POST**: Store `PendingExecution` in `ConcurrentHashMap` keyed by `execution_id`. Return 200 OK immediately. No database write. + +2. **On COMPLETED/FAILED POST**: Look up pending RUNNING by `execution_id`. If found, merge fields using the same COALESCE logic currently in `PostgresExecutionStore.upsert()`. Produce a complete `MergedExecution` and push to `WriteBuffer`. If not found (race condition or RUNNING already flushed by timeout), write COMPLETED directly with `_version=2`. + +3. **Timeout sweep** (scheduled every 60s): Scan for RUNNING entries older than 5 minutes. Flush them to ClickHouse as-is with status=RUNNING, making them visible in the UI. When COMPLETED eventually arrives, it writes with `_version=2` (ReplacingMergeTree deduplicates). + +4. **Late corrections**: If a correction arrives for an already-written execution, insert with `_version` incremented. ReplacingMergeTree handles deduplication. + +### WriteBuffer + +Reuse the existing `WriteBuffer` pattern (bounded queue, configurable batch size, scheduled drain): + +- Buffer capacity: 50,000 items +- Batch size: 5,000 per flush +- Flush interval: 1 second +- Separate buffers for executions and processor_executions (independent batch inserts) +- Drain calls `ClickHouseExecutionStore.insertBatch()` using JDBC batch update + +### Logs Ingestion + +Direct batch INSERT, bypasses accumulator (logs are single-phase): + +``` +Agent POST /api/v1/data/logs -> LogIngestionController -> ClickHouseLogStore.insertBatch() +``` + +### Metrics Ingestion + +Existing `MetricsWriteBuffer` targets ClickHouse instead of PG: + +``` +Agent POST /api/v1/data/metrics -> MetricsController -> WriteBuffer -> ClickHouseMetricsStore.insertBatch() +``` + +### JDBC Batch Insert Pattern + +```java +jdbcTemplate.batchUpdate( + "INSERT INTO executions (tenant_id, execution_id, start_time, ...) VALUES (?, ?, ?, ...)", + batchArgs +); +``` + +JDBC URL includes `async_insert=1&wait_for_async_insert=0` for server-side buffering, preventing "too many parts" errors under high load. + +## Search Implementation + +### Query Translation + +Current OpenSearch bool queries map to ClickHouse SQL: + +```sql +-- Full-text wildcard search with time range, status filter, and pagination +SELECT * +FROM executions FINAL +WHERE tenant_id = {tenant_id:String} + AND start_time >= {time_from:DateTime64(3)} + AND start_time < {time_to:DateTime64(3)} + AND status IN ({statuses:Array(String)}) + AND ( + _search_text LIKE '%{search_term}%' + OR execution_id IN ( + SELECT DISTINCT execution_id + FROM processor_executions + WHERE tenant_id = {tenant_id:String} + AND start_time >= {time_from:DateTime64(3)} + AND start_time < {time_to:DateTime64(3)} + AND _search_text LIKE '%{search_term}%' + ) + ) +ORDER BY start_time DESC +LIMIT {limit:UInt32} OFFSET {offset:UInt32} +``` + +### Scoped Searches + +| Scope | ClickHouse WHERE clause | +|-------|------------------------| +| textInBody | `input_body LIKE '%term%' OR output_body LIKE '%term%'` | +| textInHeaders | `input_headers LIKE '%term%' OR output_headers LIKE '%term%'` | +| textInErrors | `error_message LIKE '%term%' OR error_stacktrace LIKE '%term%'` | +| global text | `_search_text LIKE '%term%'` (covers all fields) | + +All accelerated by `ngrambf_v1` indexes which prune 95%+ of data granules before scanning. + +### Application-Side Highlighting + +```java +public String extractHighlight(String text, String searchTerm, int contextChars) { + int idx = text.toLowerCase().indexOf(searchTerm.toLowerCase()); + if (idx < 0) return null; + int start = Math.max(0, idx - contextChars / 2); + int end = Math.min(text.length(), idx + searchTerm.length() + contextChars / 2); + return (start > 0 ? "..." : "") + + text.substring(start, end) + + (end < text.length() ? "..." : ""); +} +``` + +Returns the same `highlight` map structure the UI currently expects. + +### Nested Processor Search + +OpenSearch nested queries become a subquery on the `processor_executions` table: + +```sql +execution_id IN ( + SELECT DISTINCT execution_id + FROM processor_executions + WHERE tenant_id = ? AND start_time >= ? AND start_time < ? + AND _search_text LIKE '%term%' +) +``` + +This is evaluated once with ngram index acceleration, then joined via IN. + +## Stats Query Translation + +### TimescaleDB -> ClickHouse Query Patterns + +| TimescaleDB | ClickHouse | +|-------------|------------| +| `time_bucket('1 minute', bucket)` | `toStartOfInterval(bucket, INTERVAL 1 MINUTE)` | +| `SUM(total_count)` | `countMerge(total_count)` | +| `SUM(failed_count)` | `countIfMerge(failed_count)` | +| `approx_percentile(0.99, rollup(p99_duration))` | `quantileMerge(0.99)(p99_duration)` | +| `SUM(duration_sum) / SUM(total_count)` | `sumMerge(duration_sum) / countMerge(total_count)` | +| `MAX(duration_max)` | `maxMerge(duration_max)` | + +### Example: Timeseries Query + +```sql +SELECT + toStartOfInterval(bucket, INTERVAL {interval:UInt32} SECOND) AS period, + countMerge(total_count) AS total_count, + countIfMerge(failed_count) AS failed_count, + sumMerge(duration_sum) / countMerge(total_count) AS avg_duration, + quantileMerge(0.99)(p99_duration) AS p99_duration +FROM stats_1m_app +WHERE tenant_id = {tenant_id:String} + AND application_name = {app:String} + AND bucket >= {from:DateTime} + AND bucket < {to:DateTime} +GROUP BY period +ORDER BY period +``` + +### SLA and Top Errors + +SLA queries hit the raw `executions` table (need per-row duration filtering): + +```sql +SELECT + countIf(duration_ms <= {threshold:Int64} AND status != 'RUNNING') * 100.0 / count() AS sla_pct +FROM executions FINAL +WHERE tenant_id = ? AND application_name = ? AND start_time >= ? AND start_time < ? +``` + +Top errors query: + +```sql +SELECT + error_message, + count() AS error_count, + max(start_time) AS last_seen +FROM executions FINAL +WHERE tenant_id = ? AND status = 'FAILED' + AND start_time >= now() - INTERVAL 1 HOUR +GROUP BY error_message +ORDER BY error_count DESC +LIMIT 10 +``` + +## Multitenancy + +### Data Isolation + +**Primary**: Application-layer WHERE clause injection. Every ClickHouse query gets `WHERE tenant_id = ?` from the authenticated user's JWT claims. + +**Defense-in-depth**: ClickHouse row policies: + +```sql +-- Create a ClickHouse user per tenant +CREATE USER tenant_acme IDENTIFIED BY '...'; + +-- Row policy ensures tenant can only see their data +CREATE ROW POLICY tenant_acme_executions ON executions + FOR SELECT USING tenant_id = 'acme'; + +-- Repeat for all tables +``` + +### Tenant ID in Schema + +`tenant_id` is the first column in every table's ORDER BY and PARTITION BY. This ensures: +- Data for the same tenant is physically co-located on disk +- Queries filtering by tenant_id use the sparse index efficiently +- Partition drops for retention are scoped to individual tenants + +### Resource Quotas + +```sql +CREATE SETTINGS PROFILE tenant_limits + SETTINGS max_execution_time = 30, + max_rows_to_read = 100000000, + max_memory_usage = '4G'; + +ALTER USER tenant_acme SETTINGS PROFILE tenant_limits; +``` + +Prevents noisy neighbor problems where one tenant's expensive query affects others. + +## Retention + +### Strategy: Application-Driven Scheduler + +Per-tenant, per-document-type retention is too dynamic for static ClickHouse TTL rules. Instead: + +1. **Config table** in PostgreSQL: + +```sql +CREATE TABLE tenant_retention_config ( + tenant_id VARCHAR(255) NOT NULL, + document_type VARCHAR(50) NOT NULL, -- executions, logs, metrics, etc. + retention_days INT NOT NULL, + PRIMARY KEY (tenant_id, document_type) +); +``` + +2. **RetentionScheduler** (Spring `@Scheduled`, runs daily at 03:00 UTC): + +```java +@Scheduled(cron = "0 0 3 * * *") +public void enforceRetention() { + List configs = retentionConfigRepo.findAll(); + for (TenantRetention config : configs) { + String table = config.documentType(); // executions, logs, metrics, etc. + clickHouseJdbc.execute( + "ALTER TABLE " + table + " DELETE WHERE tenant_id = ? AND start_time < now() - INTERVAL ? DAY", + config.tenantId(), config.retentionDays() + ); + } +} +``` + +3. **Safety-net TTL**: Each table has a generous default TTL (365 days) as a backstop in case the scheduler fails. The scheduler handles the per-tenant granularity. + +4. **Partition-aligned drops**: Since `PARTITION BY (tenant_id, toYYYYMM(start_time))`, when all rows in a partition match the DELETE condition, ClickHouse drops the entire partition (fast, no rewrite). Enable `ttl_only_drop_parts=1` on tables. + +## Java/Spring Integration + +### Dependencies + +```xml + + com.clickhouse + clickhouse-jdbc + 0.7.x + all + +``` + +### Configuration + +```yaml +clickhouse: + url: jdbc:clickhouse://clickhouse:8123/cameleer?async_insert=1&wait_for_async_insert=0 + username: cameleer_app + password: ${CLICKHOUSE_PASSWORD} +``` + +### DataSource Bean + +```java +@Configuration +public class ClickHouseConfig { + @Bean + public DataSource clickHouseDataSource(ClickHouseProperties props) { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(props.getUrl()); + ds.setUsername(props.getUsername()); + ds.setPassword(props.getPassword()); + ds.setMaximumPoolSize(10); + return ds; + } + + @Bean + public JdbcTemplate clickHouseJdbcTemplate( + @Qualifier("clickHouseDataSource") DataSource ds) { + return new JdbcTemplate(ds); + } +} +``` + +### Interface Implementations + +Existing interfaces remain unchanged. New implementations: + +| Interface | Current Impl | New Impl | +|-----------|-------------|----------| +| `ExecutionStore` | `PostgresExecutionStore` | `ClickHouseExecutionStore` | +| `SearchIndex` | `OpenSearchIndex` | `ClickHouseSearchIndex` | +| `StatsStore` | `PostgresStatsStore` | `ClickHouseStatsStore` | +| `DiagramStore` | `PostgresDiagramStore` | `ClickHouseDiagramStore` | +| `MetricsStore` | `PostgresMetricsStore` | `ClickHouseMetricsStore` | +| (log search) | `OpenSearchLogIndex` | `ClickHouseLogStore` | +| (new) | `SearchIndexer` | `ExecutionAccumulator` | + +## Kubernetes Deployment + +### ClickHouse StatefulSet + +```yaml +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: clickhouse +spec: + serviceName: clickhouse + replicas: 1 # single node initially + template: + spec: + containers: + - name: clickhouse + image: clickhouse/clickhouse-server:26.2 + ports: + - containerPort: 8123 # HTTP + - containerPort: 9000 # Native + volumeMounts: + - name: data + mountPath: /var/lib/clickhouse + - name: config + mountPath: /etc/clickhouse-server/config.d + resources: + requests: + memory: "4Gi" + cpu: "2" + limits: + memory: "8Gi" + cpu: "4" + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 100Gi # NVMe/SSD +``` + +### Health Check + +```yaml +livenessProbe: + httpGet: + path: /ping + port: 8123 +readinessProbe: + httpGet: + path: /ping + port: 8123 +``` + +## Migration Path + +### Phase 1: Foundation + +- Add `clickhouse-jdbc` dependency +- Create `ClickHouseConfig` (DataSource, JdbcTemplate) +- Schema initialization (idempotent DDL scripts, not Flyway -- ClickHouse DDL is different enough) +- Implement `ClickHouseMetricsStore` (simplest table, validates pipeline) +- Deploy ClickHouse to k8s alongside existing PG+OpenSearch + +### Phase 2: Executions + Search + +- Build `ExecutionAccumulator` (replaces SearchIndexer) +- Implement `ClickHouseExecutionStore` and `ClickHouseProcessorStore` +- Implement `ClickHouseSearchIndex` (ngram-based SQL queries) +- Feature flag: dual-write to both PG and CH, read from PG + +### Phase 3: Stats & Analytics + +- Create MV definitions (all 5 stats views) +- Implement `ClickHouseStatsStore` +- Validate stats accuracy: compare CH vs PG continuous aggregates + +### Phase 4: Remaining Tables + +- `ClickHouseDiagramStore` (ReplacingMergeTree) +- `ClickHouseAgentEventStore` +- `ClickHouseLogStore` (replaces OpenSearchLogIndex) +- Application-side highlighting + +### Phase 5: Multitenancy + +- Tables already include `tenant_id` from Phase 1 (schema is forward-looking). This phase activates multitenancy. +- Wire `tenant_id` from JWT claims into all ClickHouse queries (application-layer WHERE injection) +- Add `tenant_id` to PostgreSQL RBAC/config tables +- Create ClickHouse row policies per tenant (defense-in-depth) +- Create `tenant_retention_config` table in PG and `RetentionScheduler` component +- Tenant user management and resource quotas in ClickHouse + +### Phase 6: Cutover + +- Backfill historical data from PG/OpenSearch to ClickHouse +- Switch read path to ClickHouse (feature flag) +- Validate end-to-end +- Remove OpenSearch dependency (POM, config, k8s manifests) +- Remove TimescaleDB extensions and hypertable-specific code +- Keep PostgreSQL for RBAC/config/audit only + +## Verification + +### Functional Verification + +1. **Ingestion**: Send executions via agent, verify they appear in ClickHouse with correct fields +2. **Two-phase lifecycle**: Send RUNNING, then COMPLETED. Verify single merged row in CH +3. **Search**: Wildcard search across bodies, headers, errors. Verify sub-second response +4. **Stats**: Dashboard statistics match expected values. Compare with PG aggregates during dual-write +5. **Logs**: Ingest log batches, query by app/level/time/text. Verify correctness +6. **Retention**: Configure per-tenant retention, run scheduler, verify expired data is deleted +7. **Multitenancy**: Two tenants, verify data isolation (one tenant cannot see another's data) + +### Performance Verification + +1. **Insert throughput**: 5K executions/batch at 1 flush/sec sustained +2. **Search latency**: Sub-second for `LIKE '%term%'` across 1M+ rows +3. **Stats query latency**: Dashboard stats in <100ms from materialized views +4. **Log search**: <1s for text search across 7 days of logs + +### Data Integrity + +1. During dual-write phase: compare row counts between PG and CH +2. After cutover: spot-check execution details, processor trees, search results