Files
cameleer-server/docs/superpowers/plans/2026-03-31-clickhouse-phase1-foundation.md
hsiegeln cb3ebfea7c
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 18s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
chore: rename cameleer3 to cameleer
Rename Java packages from com.cameleer3 to com.cameleer, module
directories from cameleer3-* to cameleer-*, and all references
throughout workflows, Dockerfiles, docs, migrations, and pom.xml.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 15:28:42 +02:00

1194 lines
42 KiB
Markdown

# ClickHouse Migration Phase 1: Foundation
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add ClickHouse as a second data store alongside PostgreSQL, validated with the metrics pipeline end-to-end (write + read). Feature-flagged so either PG or CH serves metrics.
**Architecture:** Add `clickhouse-jdbc` dependency, a second `DataSource`/`JdbcTemplate` bean qualified as `@ClickHouse`, an idempotent schema initializer that creates ClickHouse tables on startup, and a feature flag (`cameleer.storage.metrics=clickhouse|postgres`) that selects the active `MetricsStore` implementation. The metrics read path (`AgentMetricsController`) is refactored behind a new `MetricsQueryStore` interface so it too can be swapped.
**Tech Stack:** ClickHouse JDBC 0.7.x, Spring Boot 3.4.3, JdbcTemplate, Testcontainers (clickhouse module)
**Design Spec:** `docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md`
---
## File Map
| Action | File | Responsibility |
|--------|------|----------------|
| Modify | `cameleer-server-app/pom.xml` | Add clickhouse-jdbc + testcontainers-clickhouse dependencies |
| Modify | `cameleer-server-app/src/main/resources/application.yml` | Add `clickhouse.*` and `cameleer.storage.metrics` config keys |
| Create | `cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseConfig.java` | ClickHouse DataSource + JdbcTemplate beans |
| Create | `cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseProperties.java` | Config properties class for `clickhouse.*` prefix |
| Create | `cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseSchemaInitializer.java` | Runs idempotent DDL on startup |
| Create | `cameleer-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql` | DDL for `agent_metrics` table |
| Create | `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseMetricsStore.java` | `MetricsStore` impl writing to ClickHouse |
| Create | `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/MetricsQueryStore.java` | Interface: query metrics time-series |
| Create | `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresMetricsQueryStore.java` | PG impl of MetricsQueryStore (extracted from controller) |
| Create | `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseMetricsQueryStore.java` | CH impl of MetricsQueryStore |
| Modify | `cameleer-server-app/src/main/java/com/cameleer/server/app/controller/AgentMetricsController.java` | Use MetricsQueryStore instead of raw JdbcTemplate |
| Modify | `cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java` | Conditional beans for PG vs CH metrics stores |
| Create | `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseMetricsStoreTest.java` | Unit test |
| Create | `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseMetricsQueryStoreTest.java` | Unit test |
| Create | `deploy/clickhouse.yaml` | K8s StatefulSet + Service for ClickHouse |
| Modify | `deploy/base/server.yaml` | Add CLICKHOUSE_URL env var |
---
### Task 1: Add ClickHouse Dependencies
**Files:**
- Modify: `cameleer-server-app/pom.xml`
- [ ] **Step 1: Add clickhouse-jdbc and testcontainers-clickhouse to app POM**
In `cameleer-server-app/pom.xml`, add these two dependencies. Place the runtime dependency after the opensearch dependencies (around line 59), and the test dependency after the opensearch-testcontainers dependency (around line 128):
```xml
<!-- After opensearch-rest-client dependency, ~line 59 -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.7.1-patch5</version>
<classifier>all</classifier>
</dependency>
<!-- After opensearch-testcontainers test dependency, ~line 128 -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>clickhouse</artifactId>
<scope>test</scope>
</dependency>
```
Note: The `all` classifier bundles the HTTP client, avoiding transitive dependency conflicts. The testcontainers `clickhouse` module version is managed by the `testcontainers.version` property in the parent POM.
- [ ] **Step 2: Verify compilation**
Run: `mvn clean compile -pl cameleer-server-app -am`
Expected: BUILD SUCCESS
- [ ] **Step 3: Commit**
```bash
git add cameleer-server-app/pom.xml
git commit -m "build: add clickhouse-jdbc and testcontainers-clickhouse dependencies"
```
---
### Task 2: ClickHouse Configuration
**Files:**
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseProperties.java`
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseConfig.java`
- Modify: `cameleer-server-app/src/main/resources/application.yml`
- [ ] **Step 1: Create ClickHouseProperties**
```java
package com.cameleer.server.app.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "clickhouse")
public class ClickHouseProperties {
private String url = "jdbc:clickhouse://localhost:8123/cameleer";
private String username = "default";
private String password = "";
public String getUrl() { return url; }
public void setUrl(String url) { this.url = url; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getPassword() { return password; }
public void setPassword(String password) { this.password = password; }
}
```
- [ ] **Step 2: Create ClickHouseConfig**
```java
package com.cameleer.server.app.config;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
@Configuration
@EnableConfigurationProperties(ClickHouseProperties.class)
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public class ClickHouseConfig {
@Bean(name = "clickHouseDataSource")
public DataSource clickHouseDataSource(ClickHouseProperties props) {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(props.getUrl());
ds.setUsername(props.getUsername());
ds.setPassword(props.getPassword());
ds.setMaximumPoolSize(10);
ds.setPoolName("clickhouse-pool");
return ds;
}
@Bean(name = "clickHouseJdbcTemplate")
public JdbcTemplate clickHouseJdbcTemplate(
@Qualifier("clickHouseDataSource") DataSource ds) {
return new JdbcTemplate(ds);
}
}
```
- [ ] **Step 3: Add configuration to application.yml**
Add at the end of `application.yml`:
```yaml
clickhouse:
enabled: ${CLICKHOUSE_ENABLED:false}
url: ${CLICKHOUSE_URL:jdbc:clickhouse://localhost:8123/cameleer?async_insert=1&wait_for_async_insert=0}
username: ${CLICKHOUSE_USERNAME:default}
password: ${CLICKHOUSE_PASSWORD:}
cameleer:
storage:
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
```
Note: The existing `cameleer:` block already has `body-size-limit` and `retention-days`. Add the `storage.metrics` key under it.
- [ ] **Step 4: Verify compilation**
Run: `mvn clean compile -pl cameleer-server-app -am`
Expected: BUILD SUCCESS
- [ ] **Step 5: Commit**
```bash
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseProperties.java
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseConfig.java
git add cameleer-server-app/src/main/resources/application.yml
git commit -m "feat: add ClickHouse DataSource and JdbcTemplate configuration"
```
---
### Task 3: Schema Initializer + DDL Script
**Files:**
- Create: `cameleer-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql`
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseSchemaInitializer.java`
- [ ] **Step 1: Create DDL script**
File: `cameleer-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql`
```sql
CREATE TABLE IF NOT EXISTS agent_metrics (
tenant_id LowCardinality(String) DEFAULT 'default',
collected_at DateTime64(3),
agent_id LowCardinality(String),
metric_name LowCardinality(String),
metric_value Float64,
tags Map(String, String) DEFAULT map(),
server_received_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(collected_at))
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
TTL collected_at + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;
```
- [ ] **Step 2: Create ClickHouseSchemaInitializer**
```java
package com.cameleer.server.app.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Comparator;
@Component
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public class ClickHouseSchemaInitializer {
private static final Logger log = LoggerFactory.getLogger(ClickHouseSchemaInitializer.class);
private final JdbcTemplate clickHouseJdbc;
public ClickHouseSchemaInitializer(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
this.clickHouseJdbc = clickHouseJdbc;
}
@EventListener(ApplicationReadyEvent.class)
public void initializeSchema() throws IOException {
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
Resource[] scripts = resolver.getResources("classpath:clickhouse/*.sql");
Arrays.sort(scripts, Comparator.comparing(Resource::getFilename));
for (Resource script : scripts) {
String sql = script.getContentAsString(StandardCharsets.UTF_8);
log.info("Executing ClickHouse schema script: {}", script.getFilename());
for (String statement : sql.split(";")) {
String trimmed = statement.trim();
if (!trimmed.isEmpty()) {
clickHouseJdbc.execute(trimmed);
}
}
}
log.info("ClickHouse schema initialization complete ({} scripts)", scripts.length);
}
}
```
- [ ] **Step 3: Verify compilation**
Run: `mvn clean compile -pl cameleer-server-app -am`
Expected: BUILD SUCCESS
- [ ] **Step 4: Commit**
```bash
git add cameleer-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseSchemaInitializer.java
git commit -m "feat: add ClickHouse schema initializer with agent_metrics DDL"
```
---
### Task 4: ClickHouseMetricsStore (TDD)
**Files:**
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseMetricsStoreTest.java`
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseMetricsStore.java`
- [ ] **Step 1: Write the failing test**
```java
package com.cameleer.server.app.storage;
import com.cameleer.server.core.storage.model.MetricsSnapshot;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import com.zaxxer.hikari.HikariDataSource;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class ClickHouseMetricsStoreTest {
@Container
static final ClickHouseContainer clickhouse =
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
private JdbcTemplate jdbc;
private ClickHouseMetricsStore store;
@BeforeEach
void setUp() {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(clickhouse.getJdbcUrl());
ds.setUsername(clickhouse.getUsername());
ds.setPassword(clickhouse.getPassword());
jdbc = new JdbcTemplate(ds);
jdbc.execute("""
CREATE TABLE IF NOT EXISTS agent_metrics (
tenant_id LowCardinality(String) DEFAULT 'default',
collected_at DateTime64(3),
agent_id LowCardinality(String),
metric_name LowCardinality(String),
metric_value Float64,
tags Map(String, String) DEFAULT map(),
server_received_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
""");
jdbc.execute("TRUNCATE TABLE agent_metrics");
store = new ClickHouseMetricsStore(jdbc);
}
@Test
void insertBatch_writesMetricsToClickHouse() {
List<MetricsSnapshot> batch = List.of(
new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:00Z"),
"cpu.usage", 75.5, Map.of("host", "server-1")),
new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:01Z"),
"memory.free", 1024.0, null)
);
store.insertBatch(batch);
Integer count = jdbc.queryForObject(
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-1'",
Integer.class);
assertThat(count).isEqualTo(2);
}
@Test
void insertBatch_storesTags() {
store.insertBatch(List.of(
new MetricsSnapshot("agent-2", Instant.parse("2026-03-31T10:00:00Z"),
"disk.used", 500.0, Map.of("mount", "/data", "fs", "ext4"))
));
Map<String, String> tags = jdbc.queryForObject(
"SELECT tags FROM agent_metrics WHERE agent_id = 'agent-2'",
(rs, n) -> {
// ClickHouse returns Map as a map-like structure via JDBC
@SuppressWarnings("unchecked")
Map<String, String> m = (Map<String, String>) rs.getObject("tags");
return m;
});
assertThat(tags).containsEntry("mount", "/data").containsEntry("fs", "ext4");
}
@Test
void insertBatch_emptyList_doesNothing() {
store.insertBatch(List.of());
Integer count = jdbc.queryForObject("SELECT count() FROM agent_metrics", Integer.class);
assertThat(count).isEqualTo(0);
}
@Test
void insertBatch_nullTags_defaultsToEmptyMap() {
store.insertBatch(List.of(
new MetricsSnapshot("agent-3", Instant.parse("2026-03-31T10:00:00Z"),
"cpu.usage", 50.0, null)
));
Integer count = jdbc.queryForObject(
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-3'",
Integer.class);
assertThat(count).isEqualTo(1);
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `mvn test -pl cameleer-server-app -Dtest=ClickHouseMetricsStoreTest -DfailIfNoTests=false`
Expected: FAIL — `ClickHouseMetricsStore` class does not exist
- [ ] **Step 3: Write ClickHouseMetricsStore**
```java
package com.cameleer.server.app.storage;
import com.cameleer.server.core.storage.MetricsStore;
import com.cameleer.server.core.storage.model.MetricsSnapshot;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ClickHouseMetricsStore implements MetricsStore {
private final JdbcTemplate jdbc;
public ClickHouseMetricsStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public void insertBatch(List<MetricsSnapshot> snapshots) {
if (snapshots.isEmpty()) return;
jdbc.batchUpdate("""
INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags, collected_at)
VALUES (?, ?, ?, ?, ?)
""",
snapshots.stream().map(s -> new Object[]{
s.agentId(),
s.metricName(),
s.metricValue(),
tagsToClickHouseMap(s.tags()),
Timestamp.from(s.collectedAt())
}).toList());
}
private Map<String, String> tagsToClickHouseMap(Map<String, String> tags) {
if (tags == null || tags.isEmpty()) return new HashMap<>();
return tags;
}
}
```
Note: ClickHouse JDBC driver handles `Map<String, String>` natively for `Map(String, String)` columns. No JSON serialization needed (unlike the PG implementation that converts to JSONB string).
- [ ] **Step 4: Run test to verify it passes**
Run: `mvn test -pl cameleer-server-app -Dtest=ClickHouseMetricsStoreTest`
Expected: PASS — all 4 tests green
If the tags test fails due to JDBC Map handling, adjust `tagsToClickHouseMap` to return a `java.util.HashMap` (ClickHouse JDBC requires a mutable map). If needed, wrap: `return new HashMap<>(tags)`.
- [ ] **Step 5: Commit**
```bash
git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseMetricsStoreTest.java
git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseMetricsStore.java
git commit -m "feat: add ClickHouseMetricsStore with batch insert"
```
---
### Task 5: MetricsQueryStore Interface + PostgreSQL Implementation (TDD)
Extract the query logic from `AgentMetricsController` into an interface.
**Files:**
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/MetricsQueryStore.java`
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresMetricsQueryStore.java`
- Modify: `cameleer-server-app/src/main/java/com/cameleer/server/app/controller/AgentMetricsController.java`
- [ ] **Step 1: Create MetricsQueryStore interface**
```java
package com.cameleer.server.core.storage;
import com.cameleer.server.core.storage.model.MetricTimeSeries;
import java.time.Instant;
import java.util.List;
import java.util.Map;
public interface MetricsQueryStore {
/**
* Query time-bucketed metrics for an agent.
*
* @param agentId the agent identifier
* @param metricNames list of metric names to query
* @param from start of time range (inclusive)
* @param to end of time range (exclusive)
* @param buckets number of time buckets to divide the range into
* @return map of metric name to list of (time, value) pairs
*/
Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
String agentId, List<String> metricNames,
Instant from, Instant to, int buckets);
}
```
- [ ] **Step 2: Create MetricTimeSeries.Bucket record**
```java
package com.cameleer.server.core.storage.model;
import java.time.Instant;
import java.util.List;
public record MetricTimeSeries(String metricName, List<Bucket> buckets) {
public record Bucket(Instant time, double value) {}
}
```
- [ ] **Step 3: Create PostgresMetricsQueryStore (extract from controller)**
```java
package com.cameleer.server.app.storage;
import com.cameleer.server.core.storage.MetricsQueryStore;
import com.cameleer.server.core.storage.model.MetricTimeSeries;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.*;
public class PostgresMetricsQueryStore implements MetricsQueryStore {
private final JdbcTemplate jdbc;
public PostgresMetricsQueryStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
String agentId, List<String> metricNames,
Instant from, Instant to, int buckets) {
long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1);
String intervalStr = intervalMs + " milliseconds";
Map<String, List<MetricTimeSeries.Bucket>> result = new LinkedHashMap<>();
for (String name : metricNames) {
result.put(name.trim(), new ArrayList<>());
}
String sql = """
SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket,
metric_name,
AVG(metric_value) AS avg_value
FROM agent_metrics
WHERE agent_id = ?
AND collected_at >= ? AND collected_at < ?
AND metric_name = ANY(?)
GROUP BY bucket, metric_name
ORDER BY bucket
""";
String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new);
jdbc.query(sql, rs -> {
String metricName = rs.getString("metric_name");
Instant bucket = rs.getTimestamp("bucket").toInstant();
double value = rs.getDouble("avg_value");
result.computeIfAbsent(metricName, k -> new ArrayList<>())
.add(new MetricTimeSeries.Bucket(bucket, value));
}, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray);
return result;
}
}
```
- [ ] **Step 4: Refactor AgentMetricsController to use MetricsQueryStore**
Replace the entire `AgentMetricsController.java` content:
```java
package com.cameleer.server.app.controller;
import com.cameleer.server.app.dto.AgentMetricsResponse;
import com.cameleer.server.app.dto.MetricBucket;
import com.cameleer.server.core.storage.MetricsQueryStore;
import com.cameleer.server.core.storage.model.MetricTimeSeries;
import org.springframework.web.bind.annotation.*;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Collectors;
@RestController
@RequestMapping("/api/v1/agents/{agentId}/metrics")
public class AgentMetricsController {
private final MetricsQueryStore metricsQueryStore;
public AgentMetricsController(MetricsQueryStore metricsQueryStore) {
this.metricsQueryStore = metricsQueryStore;
}
@GetMapping
public AgentMetricsResponse getMetrics(
@PathVariable String agentId,
@RequestParam String names,
@RequestParam(required = false) Instant from,
@RequestParam(required = false) Instant to,
@RequestParam(defaultValue = "60") int buckets) {
if (from == null) from = Instant.now().minus(1, ChronoUnit.HOURS);
if (to == null) to = Instant.now();
List<String> metricNames = Arrays.asList(names.split(","));
Map<String, List<MetricTimeSeries.Bucket>> raw =
metricsQueryStore.queryTimeSeries(agentId, metricNames, from, to, buckets);
// Convert to existing DTO format
Map<String, List<MetricBucket>> result = raw.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().stream()
.map(b -> new MetricBucket(b.time(), b.value()))
.toList(),
(a, b) -> a,
LinkedHashMap::new));
return new AgentMetricsResponse(result);
}
}
```
- [ ] **Step 5: Add PostgresMetricsQueryStore bean to StorageBeanConfig**
The refactored `AgentMetricsController` needs a `MetricsQueryStore` bean. Add this to `StorageBeanConfig.java`:
```java
import com.cameleer.server.app.storage.PostgresMetricsQueryStore;
import com.cameleer.server.core.storage.MetricsQueryStore;
// Add this bean method:
@Bean
public MetricsQueryStore metricsQueryStore(JdbcTemplate jdbc) {
return new PostgresMetricsQueryStore(jdbc);
}
```
This creates the PG implementation as the default. Task 7 will replace this with conditional beans for PG vs CH.
- [ ] **Step 6: Verify compilation**
Run: `mvn clean compile -pl cameleer-server-app -am`
Expected: BUILD SUCCESS
- [ ] **Step 7: Commit**
```bash
git add cameleer-server-core/src/main/java/com/cameleer/server/core/storage/MetricsQueryStore.java
git add cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/MetricTimeSeries.java
git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresMetricsQueryStore.java
git add cameleer-server-app/src/main/java/com/cameleer/server/app/controller/AgentMetricsController.java
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java
git commit -m "refactor: extract MetricsQueryStore interface from AgentMetricsController"
```
---
### Task 6: ClickHouseMetricsQueryStore (TDD)
**Files:**
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseMetricsQueryStoreTest.java`
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseMetricsQueryStore.java`
- [ ] **Step 1: Write the failing test**
```java
package com.cameleer.server.app.storage;
import com.cameleer.server.core.storage.model.MetricTimeSeries;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class ClickHouseMetricsQueryStoreTest {
@Container
static final ClickHouseContainer clickhouse =
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
private JdbcTemplate jdbc;
private ClickHouseMetricsQueryStore queryStore;
@BeforeEach
void setUp() {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(clickhouse.getJdbcUrl());
ds.setUsername(clickhouse.getUsername());
ds.setPassword(clickhouse.getPassword());
jdbc = new JdbcTemplate(ds);
jdbc.execute("""
CREATE TABLE IF NOT EXISTS agent_metrics (
tenant_id LowCardinality(String) DEFAULT 'default',
collected_at DateTime64(3),
agent_id LowCardinality(String),
metric_name LowCardinality(String),
metric_value Float64,
tags Map(String, String) DEFAULT map(),
server_received_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
""");
jdbc.execute("TRUNCATE TABLE agent_metrics");
// Seed test data: 6 data points across 1 hour for two metrics
Instant base = Instant.parse("2026-03-31T10:00:00Z");
for (int i = 0; i < 6; i++) {
Instant ts = base.plusSeconds(i * 600); // every 10 minutes
jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)",
"agent-1", "cpu.usage", 50.0 + i * 5, java.sql.Timestamp.from(ts));
jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)",
"agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts));
}
queryStore = new ClickHouseMetricsQueryStore(jdbc);
}
@Test
void queryTimeSeries_returnsDataGroupedByMetric() {
Instant from = Instant.parse("2026-03-31T10:00:00Z");
Instant to = Instant.parse("2026-03-31T11:00:00Z");
Map<String, List<MetricTimeSeries.Bucket>> result =
queryStore.queryTimeSeries("agent-1", List.of("cpu.usage", "memory.free"), from, to, 6);
assertThat(result).containsKeys("cpu.usage", "memory.free");
assertThat(result.get("cpu.usage")).isNotEmpty();
assertThat(result.get("memory.free")).isNotEmpty();
}
@Test
void queryTimeSeries_bucketsAverageCorrectly() {
Instant from = Instant.parse("2026-03-31T10:00:00Z");
Instant to = Instant.parse("2026-03-31T11:00:00Z");
// 1 bucket for the entire hour = average of all 6 values
Map<String, List<MetricTimeSeries.Bucket>> result =
queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 1);
assertThat(result.get("cpu.usage")).hasSize(1);
// Values: 50, 55, 60, 65, 70, 75 → avg = 62.5
assertThat(result.get("cpu.usage").get(0).value()).isCloseTo(62.5, org.assertj.core.data.Offset.offset(0.1));
}
@Test
void queryTimeSeries_noData_returnsEmptyLists() {
Instant from = Instant.parse("2025-01-01T00:00:00Z");
Instant to = Instant.parse("2025-01-01T01:00:00Z");
Map<String, List<MetricTimeSeries.Bucket>> result =
queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 6);
assertThat(result.get("cpu.usage")).isEmpty();
}
@Test
void queryTimeSeries_unknownAgent_returnsEmpty() {
Instant from = Instant.parse("2026-03-31T10:00:00Z");
Instant to = Instant.parse("2026-03-31T11:00:00Z");
Map<String, List<MetricTimeSeries.Bucket>> result =
queryStore.queryTimeSeries("nonexistent", List.of("cpu.usage"), from, to, 6);
assertThat(result.get("cpu.usage")).isEmpty();
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `mvn test -pl cameleer-server-app -Dtest=ClickHouseMetricsQueryStoreTest -DfailIfNoTests=false`
Expected: FAIL — `ClickHouseMetricsQueryStore` class does not exist
- [ ] **Step 3: Write ClickHouseMetricsQueryStore**
```java
package com.cameleer.server.app.storage;
import com.cameleer.server.core.storage.MetricsQueryStore;
import com.cameleer.server.core.storage.model.MetricTimeSeries;
import org.springframework.jdbc.core.JdbcTemplate;
import java.time.Instant;
import java.util.*;
public class ClickHouseMetricsQueryStore implements MetricsQueryStore {
private final JdbcTemplate jdbc;
public ClickHouseMetricsQueryStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
String agentId, List<String> metricNames,
Instant from, Instant to, int buckets) {
long intervalSeconds = Math.max(60,
(to.getEpochSecond() - from.getEpochSecond()) / Math.max(buckets, 1));
Map<String, List<MetricTimeSeries.Bucket>> result = new LinkedHashMap<>();
for (String name : metricNames) {
result.put(name.trim(), new ArrayList<>());
}
String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new);
String sql = """
SELECT toStartOfInterval(collected_at, INTERVAL ? SECOND) AS bucket,
metric_name,
avg(metric_value) AS avg_value
FROM agent_metrics
WHERE agent_id = ?
AND collected_at >= ?
AND collected_at < ?
AND metric_name IN (?)
GROUP BY bucket, metric_name
ORDER BY bucket
""";
// ClickHouse JDBC doesn't support array params with IN (?).
// Build the IN clause manually with safe values (metric names are validated by controller).
StringBuilder inClause = new StringBuilder();
for (int i = 0; i < namesArray.length; i++) {
if (i > 0) inClause.append(", ");
inClause.append("'").append(namesArray[i].replace("'", "\\'")).append("'");
}
String finalSql = """
SELECT toStartOfInterval(collected_at, INTERVAL %d SECOND) AS bucket,
metric_name,
avg(metric_value) AS avg_value
FROM agent_metrics
WHERE agent_id = ?
AND collected_at >= ?
AND collected_at < ?
AND metric_name IN (%s)
GROUP BY bucket, metric_name
ORDER BY bucket
""".formatted(intervalSeconds, inClause);
jdbc.query(finalSql, rs -> {
String metricName = rs.getString("metric_name");
Instant bucket = rs.getTimestamp("bucket").toInstant();
double value = rs.getDouble("avg_value");
result.computeIfAbsent(metricName, k -> new ArrayList<>())
.add(new MetricTimeSeries.Bucket(bucket, value));
}, agentId,
java.sql.Timestamp.from(from),
java.sql.Timestamp.from(to));
return result;
}
}
```
- [ ] **Step 4: Run test to verify it passes**
Run: `mvn test -pl cameleer-server-app -Dtest=ClickHouseMetricsQueryStoreTest`
Expected: PASS — all 4 tests green
- [ ] **Step 5: Commit**
```bash
git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseMetricsQueryStoreTest.java
git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseMetricsQueryStore.java
git commit -m "feat: add ClickHouseMetricsQueryStore with time-bucketed queries"
```
---
### Task 7: Wire Feature Flag in StorageBeanConfig
**Files:**
- Modify: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java`
- [ ] **Step 1: Replace simple beans with conditional MetricsStore and MetricsQueryStore beans**
In `StorageBeanConfig.java`, **remove** the unconditional `metricsQueryStore` bean added in Task 5. Replace with these conditional bean definitions:
```java
import com.cameleer.server.app.storage.ClickHouseMetricsStore;
import com.cameleer.server.app.storage.ClickHouseMetricsQueryStore;
import com.cameleer.server.app.storage.PostgresMetricsQueryStore;
import com.cameleer.server.core.storage.MetricsQueryStore;
// ... existing beans ...
@Bean
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse")
public MetricsStore clickHouseMetricsStore(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseMetricsStore(clickHouseJdbc);
}
@Bean
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true)
public MetricsStore postgresMetricsStore(JdbcTemplate jdbc) {
return new PostgresMetricsStore(jdbc);
}
@Bean
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse")
public MetricsQueryStore clickHouseMetricsQueryStore(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseMetricsQueryStore(clickHouseJdbc);
}
@Bean
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true)
public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) {
return new PostgresMetricsQueryStore(jdbc);
}
```
Also remove the `@Repository` annotation from `PostgresMetricsStore.java` since the bean is now created explicitly in config. If `PostgresMetricsStore` has `@Repository`, remove it. Check the class — it does have `@Repository`, so remove it.
Add the necessary imports to `StorageBeanConfig.java`:
```java
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.beans.factory.annotation.Qualifier;
import com.cameleer.server.core.storage.MetricsStore;
```
- [ ] **Step 2: Remove @Repository from PostgresMetricsStore**
In `PostgresMetricsStore.java`, remove the `@Repository` annotation from the class declaration. The bean is now created in `StorageBeanConfig`.
- [ ] **Step 3: Verify compilation**
Run: `mvn clean compile -pl cameleer-server-app -am`
Expected: BUILD SUCCESS
- [ ] **Step 4: Run existing tests to verify no regression**
Run: `mvn test -pl cameleer-server-app`
Expected: All existing tests pass. The default config (`cameleer.storage.metrics=postgres`) means the PG beans are created, ClickHouse beans are not (ClickHouse DataSource not needed).
- [ ] **Step 5: Commit**
```bash
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java
git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresMetricsStore.java
git commit -m "feat: wire MetricsStore and MetricsQueryStore with feature flag"
```
---
### Task 8: ClickHouse Kubernetes Deployment
**Files:**
- Create: `deploy/clickhouse.yaml`
- Modify: `deploy/base/server.yaml`
- [ ] **Step 1: Create ClickHouse StatefulSet + Service manifest**
File: `deploy/clickhouse.yaml`
```yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: clickhouse
namespace: cameleer
spec:
serviceName: clickhouse
replicas: 1
selector:
matchLabels:
app: clickhouse
template:
metadata:
labels:
app: clickhouse
spec:
containers:
- name: clickhouse
image: clickhouse/clickhouse-server:24.12
ports:
- containerPort: 8123
name: http
- containerPort: 9000
name: native
volumeMounts:
- name: data
mountPath: /var/lib/clickhouse
resources:
requests:
memory: "2Gi"
cpu: "500m"
limits:
memory: "4Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /ping
port: 8123
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 3
readinessProbe:
httpGet:
path: /ping
port: 8123
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 50Gi
---
apiVersion: v1
kind: Service
metadata:
name: clickhouse
namespace: cameleer
spec:
clusterIP: None
selector:
app: clickhouse
ports:
- port: 8123
targetPort: 8123
name: http
- port: 9000
targetPort: 9000
name: native
```
- [ ] **Step 2: Add ClickHouse env vars to server.yaml**
Add these env vars to the `cameleer-server` container in `deploy/base/server.yaml`, after the existing env vars (before `resources:`):
```yaml
- name: CLICKHOUSE_ENABLED
value: "true"
- name: CLICKHOUSE_URL
value: "jdbc:clickhouse://clickhouse.cameleer.svc.cluster.local:8123/cameleer?async_insert=1&wait_for_async_insert=0"
- name: CAMELEER_STORAGE_METRICS
value: "postgres"
```
Note: `CAMELEER_STORAGE_METRICS` defaults to `postgres`. Set to `clickhouse` when ready to switch.
- [ ] **Step 3: Commit**
```bash
git add deploy/clickhouse.yaml
git add deploy/base/server.yaml
git commit -m "deploy: add ClickHouse StatefulSet and server env vars"
```
---
### Task 9: Integration Smoke Test
**Files:**
- Modify: `cameleer-server-app/src/test/java/com/cameleer/server/app/AbstractPostgresIT.java` (add ClickHouse container)
- [ ] **Step 1: Update AbstractPostgresIT to optionally start ClickHouse**
This is a non-breaking addition. The existing integration tests continue to use PG. A future ClickHouse-specific IT can extend this base.
Add the ClickHouse container to `AbstractPostgresIT`:
```java
import org.testcontainers.clickhouse.ClickHouseContainer;
// Add after opensearch static field:
static final ClickHouseContainer clickhouse;
// In the static initializer, add after opensearch.start():
clickhouse = new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
clickhouse.start();
// In configureProperties, add:
registry.add("clickhouse.enabled", () -> "true");
registry.add("clickhouse.url", clickhouse::getJdbcUrl);
registry.add("clickhouse.username", clickhouse::getUsername);
registry.add("clickhouse.password", clickhouse::getPassword);
```
- [ ] **Step 2: Run all integration tests**
Run: `mvn verify -pl cameleer-server-app`
Expected: All ITs pass. ClickHouse schema initializer runs and creates the `agent_metrics` table. The `MetricsControllerIT` still uses PG (default storage flag is `postgres`).
- [ ] **Step 3: Commit**
```bash
git add cameleer-server-app/src/test/java/com/cameleer/server/app/AbstractPostgresIT.java
git commit -m "test: add ClickHouse testcontainer to integration test base"
```
---
### Task 10: Final Verification
- [ ] **Step 1: Run full build**
Run: `mvn clean verify`
Expected: BUILD SUCCESS (unit tests + integration tests all pass)
- [ ] **Step 2: Verify ClickHouse switch works locally (manual)**
Start ClickHouse locally:
```bash
docker run -d --name clickhouse-test -p 8123:8123 clickhouse/clickhouse-server:24.12
```
Create the database:
```bash
curl "http://localhost:8123/" --data "CREATE DATABASE IF NOT EXISTS cameleer"
```
Start the server with ClickHouse enabled:
```bash
CLICKHOUSE_ENABLED=true CAMELEER_STORAGE_METRICS=clickhouse java -jar cameleer-server-app/target/cameleer-server-app-1.0-SNAPSHOT.jar
```
Verify schema initialization in the logs: `Executing ClickHouse schema script: V1__agent_metrics.sql`
POST a metric:
```bash
curl -X POST http://localhost:8081/api/v1/data/metrics \
-H "Content-Type: application/json" \
-H "Authorization: Bearer <agent-token>" \
-d '[{"agentId":"test","collectedAt":"2026-03-31T10:00:00Z","metricName":"cpu","metricValue":75.0,"tags":{}}]'
```
Verify data in ClickHouse:
```bash
curl "http://localhost:8123/" --data "SELECT * FROM cameleer.agent_metrics FORMAT Pretty"
```
Stop and clean up:
```bash
docker stop clickhouse-test && docker rm clickhouse-test
```
- [ ] **Step 3: Final commit (if any fixes were needed)**
```bash
git add -A
git commit -m "fix: address issues found during manual verification"
```