Compare commits

10 Commits

Author SHA1 Message Date
hsiegeln
7a2a0ee649 test: add ClickHouse testcontainer to integration test base
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 2m29s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
CI / cleanup-branch (pull_request) Has been skipped
CI / build (pull_request) Failing after 2m28s
CI / docker (pull_request) Has been skipped
CI / deploy (pull_request) Has been skipped
CI / deploy-feature (pull_request) Has been skipped
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 17:09:09 +02:00
hsiegeln
1b991f99a3 deploy: add ClickHouse StatefulSet and server env vars
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 17:08:42 +02:00
hsiegeln
21991b6cf8 feat: wire MetricsStore and MetricsQueryStore with feature flag
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 17:07:35 +02:00
hsiegeln
53766aeb56 feat: add ClickHouseMetricsQueryStore with time-bucketed queries
Implements MetricsQueryStore using ClickHouse toStartOfInterval() for
time-bucketed aggregation queries; verified with 4 Testcontainers tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 17:05:45 +02:00
hsiegeln
bf0e9ea418 refactor: extract MetricsQueryStore interface from AgentMetricsController
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 17:00:57 +02:00
hsiegeln
6e30b7ec65 feat: add ClickHouseMetricsStore with batch insert
TDD implementation of MetricsStore backed by ClickHouse. Uses native
Map(String,String) column type (no JSON cast), relies on ClickHouse
DEFAULT for server_received_at, and handles null tags by substituting
an empty HashMap. All 4 Testcontainers tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 16:58:20 +02:00
hsiegeln
08934376df feat: add ClickHouse schema initializer with agent_metrics DDL
Adds ClickHouseSchemaInitializer that runs on ApplicationReadyEvent,
scanning classpath:clickhouse/*.sql in filename order and executing each
statement. Adds V1__agent_metrics.sql with MergeTree table, tenant/agent
partitioning, and 365-day TTL.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 16:51:21 +02:00
hsiegeln
23f901279a feat: add ClickHouse DataSource and JdbcTemplate configuration
Adds ClickHouseProperties (bound to clickhouse.*), ClickHouseConfig
(conditional HikariDataSource + JdbcTemplate beans), and extends
application.yml with clickhouse.enabled/url/username/password and
cameleer.storage.metrics properties.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 16:51:14 +02:00
hsiegeln
6171827243 build: add clickhouse-jdbc and testcontainers-clickhouse dependencies
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 16:49:04 +02:00
hsiegeln
c77d8a7af0 docs: add Phase 1 implementation plan for ClickHouse migration
10-task TDD plan covering: CH dependency, config, schema init,
ClickHouseMetricsStore, MetricsQueryStore interface extraction,
ClickHouseMetricsQueryStore, feature flag wiring, k8s deployment,
integration tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 16:43:14 +02:00
20 changed files with 1878 additions and 33 deletions

View File

@@ -57,6 +57,12 @@
<artifactId>opensearch-rest-client</artifactId> <artifactId>opensearch-rest-client</artifactId>
<version>2.19.0</version> <version>2.19.0</version>
</dependency> </dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.9.7</version>
<classifier>all</classifier>
</dependency>
<dependency> <dependency>
<groupId>org.springdoc</groupId> <groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId> <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
@@ -126,6 +132,11 @@
<version>2.1.1</version> <version>2.1.1</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-clickhouse</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.awaitility</groupId> <groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId> <artifactId>awaitility</artifactId>

View File

@@ -0,0 +1,34 @@
package com.cameleer3.server.app.config;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
@Configuration
@EnableConfigurationProperties(ClickHouseProperties.class)
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public class ClickHouseConfig {
@Bean(name = "clickHouseDataSource")
public DataSource clickHouseDataSource(ClickHouseProperties props) {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(props.getUrl());
ds.setUsername(props.getUsername());
ds.setPassword(props.getPassword());
ds.setMaximumPoolSize(10);
ds.setPoolName("clickhouse-pool");
return ds;
}
@Bean(name = "clickHouseJdbcTemplate")
public JdbcTemplate clickHouseJdbcTemplate(
@Qualifier("clickHouseDataSource") DataSource ds) {
return new JdbcTemplate(ds);
}
}

View File

@@ -0,0 +1,20 @@
package com.cameleer3.server.app.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "clickhouse")
public class ClickHouseProperties {
private String url = "jdbc:clickhouse://localhost:8123/cameleer";
private String username = "default";
private String password = "";
public String getUrl() { return url; }
public void setUrl(String url) { this.url = url; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getPassword() { return password; }
public void setPassword(String password) { this.password = password; }
}

View File

@@ -0,0 +1,52 @@
package com.cameleer3.server.app.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Comparator;
@Component
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public class ClickHouseSchemaInitializer {
private static final Logger log = LoggerFactory.getLogger(ClickHouseSchemaInitializer.class);
private final JdbcTemplate clickHouseJdbc;
public ClickHouseSchemaInitializer(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
this.clickHouseJdbc = clickHouseJdbc;
}
@EventListener(ApplicationReadyEvent.class)
public void initializeSchema() throws IOException {
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
Resource[] scripts = resolver.getResources("classpath:clickhouse/*.sql");
Arrays.sort(scripts, Comparator.comparing(Resource::getFilename));
for (Resource script : scripts) {
String sql = script.getContentAsString(StandardCharsets.UTF_8);
log.info("Executing ClickHouse schema script: {}", script.getFilename());
for (String statement : sql.split(";")) {
String trimmed = statement.trim();
if (!trimmed.isEmpty()) {
clickHouseJdbc.execute(trimmed);
}
}
}
log.info("ClickHouse schema initialization complete ({} scripts)", scripts.length);
}
}

View File

@@ -1,5 +1,9 @@
package com.cameleer3.server.app.config; package com.cameleer3.server.app.config;
import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore;
import com.cameleer3.server.app.storage.ClickHouseMetricsStore;
import com.cameleer3.server.app.storage.PostgresMetricsQueryStore;
import com.cameleer3.server.app.storage.PostgresMetricsStore;
import com.cameleer3.server.core.admin.AuditRepository; import com.cameleer3.server.core.admin.AuditRepository;
import com.cameleer3.server.core.admin.AuditService; import com.cameleer3.server.core.admin.AuditService;
import com.cameleer3.server.core.detail.DetailService; import com.cameleer3.server.core.detail.DetailService;
@@ -8,9 +12,12 @@ import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.*; import com.cameleer3.server.core.storage.*;
import com.cameleer3.server.core.storage.model.MetricsSnapshot; import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
@Configuration @Configuration
public class StorageBeanConfig { public class StorageBeanConfig {
@@ -41,4 +48,30 @@ public class StorageBeanConfig {
return new IngestionService(executionStore, diagramStore, metricsBuffer, return new IngestionService(executionStore, diagramStore, metricsBuffer,
searchIndexer::onExecutionUpdated, bodySizeLimit); searchIndexer::onExecutionUpdated, bodySizeLimit);
} }
@Bean
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse")
public MetricsStore clickHouseMetricsStore(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseMetricsStore(clickHouseJdbc);
}
@Bean
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true)
public MetricsStore postgresMetricsStore(JdbcTemplate jdbc) {
return new PostgresMetricsStore(jdbc);
}
@Bean
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse")
public MetricsQueryStore clickHouseMetricsQueryStore(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseMetricsQueryStore(clickHouseJdbc);
}
@Bean
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true)
public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) {
return new PostgresMetricsQueryStore(jdbc);
}
} }

View File

@@ -2,22 +2,23 @@ package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.dto.AgentMetricsResponse; import com.cameleer3.server.app.dto.AgentMetricsResponse;
import com.cameleer3.server.app.dto.MetricBucket; import com.cameleer3.server.app.dto.MetricBucket;
import org.springframework.jdbc.core.JdbcTemplate; import com.cameleer3.server.core.storage.MetricsQueryStore;
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.sql.Timestamp;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
@RestController @RestController
@RequestMapping("/api/v1/agents/{agentId}/metrics") @RequestMapping("/api/v1/agents/{agentId}/metrics")
public class AgentMetricsController { public class AgentMetricsController {
private final JdbcTemplate jdbc; private final MetricsQueryStore metricsQueryStore;
public AgentMetricsController(JdbcTemplate jdbc) { public AgentMetricsController(MetricsQueryStore metricsQueryStore) {
this.jdbc = jdbc; this.metricsQueryStore = metricsQueryStore;
} }
@GetMapping @GetMapping
@@ -32,34 +33,18 @@ public class AgentMetricsController {
if (to == null) to = Instant.now(); if (to == null) to = Instant.now();
List<String> metricNames = Arrays.asList(names.split(",")); List<String> metricNames = Arrays.asList(names.split(","));
long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1);
String intervalStr = intervalMs + " milliseconds";
Map<String, List<MetricBucket>> result = new LinkedHashMap<>(); Map<String, List<MetricTimeSeries.Bucket>> raw =
for (String name : metricNames) { metricsQueryStore.queryTimeSeries(agentId, metricNames, from, to, buckets);
result.put(name.trim(), new ArrayList<>());
}
String sql = """ Map<String, List<MetricBucket>> result = raw.entrySet().stream()
SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket, .collect(Collectors.toMap(
metric_name, Map.Entry::getKey,
AVG(metric_value) AS avg_value e -> e.getValue().stream()
FROM agent_metrics .map(b -> new MetricBucket(b.time(), b.value()))
WHERE agent_id = ? .toList(),
AND collected_at >= ? AND collected_at < ? (a, b) -> a,
AND metric_name = ANY(?) LinkedHashMap::new));
GROUP BY bucket, metric_name
ORDER BY bucket
""";
String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new);
jdbc.query(sql, rs -> {
String metricName = rs.getString("metric_name");
Instant bucket = rs.getTimestamp("bucket").toInstant();
double value = rs.getDouble("avg_value");
result.computeIfAbsent(metricName, k -> new ArrayList<>())
.add(new MetricBucket(bucket, value));
}, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray);
return new AgentMetricsResponse(result); return new AgentMetricsResponse(result);
} }

View File

@@ -0,0 +1,66 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.storage.MetricsQueryStore;
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
import org.springframework.jdbc.core.JdbcTemplate;
import java.time.Instant;
import java.util.*;
public class ClickHouseMetricsQueryStore implements MetricsQueryStore {
private final JdbcTemplate jdbc;
public ClickHouseMetricsQueryStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
String agentId, List<String> metricNames,
Instant from, Instant to, int buckets) {
long intervalSeconds = Math.max(60,
(to.getEpochSecond() - from.getEpochSecond()) / Math.max(buckets, 1));
Map<String, List<MetricTimeSeries.Bucket>> result = new LinkedHashMap<>();
for (String name : metricNames) {
result.put(name.trim(), new ArrayList<>());
}
String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new);
// ClickHouse JDBC doesn't support array params with IN (?).
// Build the IN clause with properly escaped values.
StringBuilder inClause = new StringBuilder();
for (int i = 0; i < namesArray.length; i++) {
if (i > 0) inClause.append(", ");
inClause.append("'").append(namesArray[i].replace("'", "\\'")).append("'");
}
String finalSql = """
SELECT toStartOfInterval(collected_at, INTERVAL %d SECOND) AS bucket,
metric_name,
avg(metric_value) AS avg_value
FROM agent_metrics
WHERE agent_id = ?
AND collected_at >= ?
AND collected_at < ?
AND metric_name IN (%s)
GROUP BY bucket, metric_name
ORDER BY bucket
""".formatted(intervalSeconds, inClause);
jdbc.query(finalSql, rs -> {
String metricName = rs.getString("metric_name");
Instant bucket = rs.getTimestamp("bucket").toInstant();
double value = rs.getDouble("avg_value");
result.computeIfAbsent(metricName, k -> new ArrayList<>())
.add(new MetricTimeSeries.Bucket(bucket, value));
}, agentId,
java.sql.Timestamp.from(from),
java.sql.Timestamp.from(to));
return result;
}
}

View File

@@ -0,0 +1,41 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.storage.MetricsStore;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ClickHouseMetricsStore implements MetricsStore {
private final JdbcTemplate jdbc;
public ClickHouseMetricsStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public void insertBatch(List<MetricsSnapshot> snapshots) {
if (snapshots.isEmpty()) return;
jdbc.batchUpdate("""
INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags, collected_at)
VALUES (?, ?, ?, ?, ?)
""",
snapshots.stream().map(s -> new Object[]{
s.agentId(),
s.metricName(),
s.metricValue(),
tagsToClickHouseMap(s.tags()),
Timestamp.from(s.collectedAt())
}).toList());
}
private Map<String, String> tagsToClickHouseMap(Map<String, String> tags) {
if (tags == null || tags.isEmpty()) return new HashMap<>();
return new HashMap<>(tags);
}
}

View File

@@ -0,0 +1,55 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.storage.MetricsQueryStore;
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.*;
public class PostgresMetricsQueryStore implements MetricsQueryStore {
private final JdbcTemplate jdbc;
public PostgresMetricsQueryStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
String agentId, List<String> metricNames,
Instant from, Instant to, int buckets) {
long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1);
String intervalStr = intervalMs + " milliseconds";
Map<String, List<MetricTimeSeries.Bucket>> result = new LinkedHashMap<>();
for (String name : metricNames) {
result.put(name.trim(), new ArrayList<>());
}
String sql = """
SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket,
metric_name,
AVG(metric_value) AS avg_value
FROM agent_metrics
WHERE agent_id = ?
AND collected_at >= ? AND collected_at < ?
AND metric_name = ANY(?)
GROUP BY bucket, metric_name
ORDER BY bucket
""";
String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new);
jdbc.query(sql, rs -> {
String metricName = rs.getString("metric_name");
Instant bucket = rs.getTimestamp("bucket").toInstant();
double value = rs.getDouble("avg_value");
result.computeIfAbsent(metricName, k -> new ArrayList<>())
.add(new MetricTimeSeries.Bucket(bucket, value));
}, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray);
return result;
}
}

View File

@@ -5,12 +5,10 @@ import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.List; import java.util.List;
@Repository
public class PostgresMetricsStore implements MetricsStore { public class PostgresMetricsStore implements MetricsStore {
private static final ObjectMapper MAPPER = new ObjectMapper(); private static final ObjectMapper MAPPER = new ObjectMapper();

View File

@@ -48,6 +48,8 @@ opensearch:
cameleer: cameleer:
body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384} body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384}
retention-days: ${CAMELEER_RETENTION_DAYS:30} retention-days: ${CAMELEER_RETENTION_DAYS:30}
storage:
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
security: security:
access-token-expiry-ms: 3600000 access-token-expiry-ms: 3600000
@@ -66,6 +68,12 @@ springdoc:
swagger-ui: swagger-ui:
path: /api/v1/swagger-ui path: /api/v1/swagger-ui
clickhouse:
enabled: ${CLICKHOUSE_ENABLED:false}
url: ${CLICKHOUSE_URL:jdbc:clickhouse://localhost:8123/cameleer?async_insert=1&wait_for_async_insert=0}
username: ${CLICKHOUSE_USERNAME:default}
password: ${CLICKHOUSE_PASSWORD:}
management: management:
endpoints: endpoints:
web: web:

View File

@@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS agent_metrics (
tenant_id LowCardinality(String) DEFAULT 'default',
collected_at DateTime64(3),
agent_id LowCardinality(String),
metric_name LowCardinality(String),
metric_value Float64,
tags Map(String, String) DEFAULT map(),
server_received_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(collected_at))
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
TTL collected_at + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;

View File

@@ -7,6 +7,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource; import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerImageName;
@@ -20,6 +21,7 @@ public abstract class AbstractPostgresIT {
static final PostgreSQLContainer<?> postgres; static final PostgreSQLContainer<?> postgres;
static final OpensearchContainer<?> opensearch; static final OpensearchContainer<?> opensearch;
static final ClickHouseContainer clickhouse;
static { static {
postgres = new PostgreSQLContainer<>(TIMESCALEDB_IMAGE) postgres = new PostgreSQLContainer<>(TIMESCALEDB_IMAGE)
@@ -30,6 +32,9 @@ public abstract class AbstractPostgresIT {
opensearch = new OpensearchContainer<>("opensearchproject/opensearch:2.19.0"); opensearch = new OpensearchContainer<>("opensearchproject/opensearch:2.19.0");
opensearch.start(); opensearch.start();
clickhouse = new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
clickhouse.start();
} }
@Autowired @Autowired
@@ -46,5 +51,9 @@ public abstract class AbstractPostgresIT {
registry.add("spring.flyway.user", postgres::getUsername); registry.add("spring.flyway.user", postgres::getUsername);
registry.add("spring.flyway.password", postgres::getPassword); registry.add("spring.flyway.password", postgres::getPassword);
registry.add("opensearch.url", opensearch::getHttpHostAddress); registry.add("opensearch.url", opensearch::getHttpHostAddress);
registry.add("clickhouse.enabled", () -> "true");
registry.add("clickhouse.url", clickhouse::getJdbcUrl);
registry.add("clickhouse.username", clickhouse::getUsername);
registry.add("clickhouse.password", clickhouse::getPassword);
} }
} }

View File

@@ -0,0 +1,114 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class ClickHouseMetricsQueryStoreTest {
@Container
static final ClickHouseContainer clickhouse =
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
private JdbcTemplate jdbc;
private ClickHouseMetricsQueryStore queryStore;
@BeforeEach
void setUp() {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(clickhouse.getJdbcUrl());
ds.setUsername(clickhouse.getUsername());
ds.setPassword(clickhouse.getPassword());
jdbc = new JdbcTemplate(ds);
jdbc.execute("""
CREATE TABLE IF NOT EXISTS agent_metrics (
tenant_id LowCardinality(String) DEFAULT 'default',
collected_at DateTime64(3),
agent_id LowCardinality(String),
metric_name LowCardinality(String),
metric_value Float64,
tags Map(String, String) DEFAULT map(),
server_received_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
""");
jdbc.execute("TRUNCATE TABLE agent_metrics");
// Seed test data: 6 data points across 1 hour for two metrics
Instant base = Instant.parse("2026-03-31T10:00:00Z");
for (int i = 0; i < 6; i++) {
Instant ts = base.plusSeconds(i * 600); // every 10 minutes
jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)",
"agent-1", "cpu.usage", 50.0 + i * 5, java.sql.Timestamp.from(ts));
jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)",
"agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts));
}
queryStore = new ClickHouseMetricsQueryStore(jdbc);
}
@Test
void queryTimeSeries_returnsDataGroupedByMetric() {
Instant from = Instant.parse("2026-03-31T10:00:00Z");
Instant to = Instant.parse("2026-03-31T11:00:00Z");
Map<String, List<MetricTimeSeries.Bucket>> result =
queryStore.queryTimeSeries("agent-1", List.of("cpu.usage", "memory.free"), from, to, 6);
assertThat(result).containsKeys("cpu.usage", "memory.free");
assertThat(result.get("cpu.usage")).isNotEmpty();
assertThat(result.get("memory.free")).isNotEmpty();
}
@Test
void queryTimeSeries_bucketsAverageCorrectly() {
Instant from = Instant.parse("2026-03-31T10:00:00Z");
Instant to = Instant.parse("2026-03-31T11:00:00Z");
// 1 bucket for the entire hour = average of all 6 values
Map<String, List<MetricTimeSeries.Bucket>> result =
queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 1);
assertThat(result.get("cpu.usage")).hasSize(1);
// Values: 50, 55, 60, 65, 70, 75 → avg = 62.5
assertThat(result.get("cpu.usage").get(0).value()).isCloseTo(62.5, org.assertj.core.data.Offset.offset(0.1));
}
@Test
void queryTimeSeries_noData_returnsEmptyLists() {
Instant from = Instant.parse("2025-01-01T00:00:00Z");
Instant to = Instant.parse("2025-01-01T01:00:00Z");
Map<String, List<MetricTimeSeries.Bucket>> result =
queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 6);
assertThat(result.get("cpu.usage")).isEmpty();
}
@Test
void queryTimeSeries_unknownAgent_returnsEmpty() {
Instant from = Instant.parse("2026-03-31T10:00:00Z");
Instant to = Instant.parse("2026-03-31T11:00:00Z");
Map<String, List<MetricTimeSeries.Bucket>> result =
queryStore.queryTimeSeries("nonexistent", List.of("cpu.usage"), from, to, 6);
assertThat(result.get("cpu.usage")).isEmpty();
}
}

View File

@@ -0,0 +1,108 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import com.zaxxer.hikari.HikariDataSource;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class ClickHouseMetricsStoreTest {
@Container
static final ClickHouseContainer clickhouse =
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
private JdbcTemplate jdbc;
private ClickHouseMetricsStore store;
@BeforeEach
void setUp() {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(clickhouse.getJdbcUrl());
ds.setUsername(clickhouse.getUsername());
ds.setPassword(clickhouse.getPassword());
jdbc = new JdbcTemplate(ds);
jdbc.execute("""
CREATE TABLE IF NOT EXISTS agent_metrics (
tenant_id LowCardinality(String) DEFAULT 'default',
collected_at DateTime64(3),
agent_id LowCardinality(String),
metric_name LowCardinality(String),
metric_value Float64,
tags Map(String, String) DEFAULT map(),
server_received_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
""");
jdbc.execute("TRUNCATE TABLE agent_metrics");
store = new ClickHouseMetricsStore(jdbc);
}
@Test
void insertBatch_writesMetricsToClickHouse() {
List<MetricsSnapshot> batch = List.of(
new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:00Z"),
"cpu.usage", 75.5, Map.of("host", "server-1")),
new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:01Z"),
"memory.free", 1024.0, null)
);
store.insertBatch(batch);
Integer count = jdbc.queryForObject(
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-1'",
Integer.class);
assertThat(count).isEqualTo(2);
}
@Test
void insertBatch_storesTags() {
store.insertBatch(List.of(
new MetricsSnapshot("agent-2", Instant.parse("2026-03-31T10:00:00Z"),
"disk.used", 500.0, Map.of("mount", "/data", "fs", "ext4"))
));
// Just verify we can read back the row with tags
Integer count = jdbc.queryForObject(
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-2'",
Integer.class);
assertThat(count).isEqualTo(1);
}
@Test
void insertBatch_emptyList_doesNothing() {
store.insertBatch(List.of());
Integer count = jdbc.queryForObject("SELECT count() FROM agent_metrics", Integer.class);
assertThat(count).isEqualTo(0);
}
@Test
void insertBatch_nullTags_defaultsToEmptyMap() {
store.insertBatch(List.of(
new MetricsSnapshot("agent-3", Instant.parse("2026-03-31T10:00:00Z"),
"cpu.usage", 50.0, null)
));
Integer count = jdbc.queryForObject(
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-3'",
Integer.class);
assertThat(count).isEqualTo(1);
}
}

View File

@@ -0,0 +1,14 @@
package com.cameleer3.server.core.storage;
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
import java.time.Instant;
import java.util.List;
import java.util.Map;
public interface MetricsQueryStore {
Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
String agentId, List<String> metricNames,
Instant from, Instant to, int buckets);
}

View File

@@ -0,0 +1,9 @@
package com.cameleer3.server.core.storage.model;
import java.time.Instant;
import java.util.List;
public record MetricTimeSeries(String metricName, List<Bucket> buckets) {
public record Bucket(Instant time, double value) {}
}

View File

@@ -75,6 +75,12 @@ spec:
name: cameleer-auth name: cameleer-auth
key: CAMELEER_JWT_SECRET key: CAMELEER_JWT_SECRET
optional: true optional: true
- name: CLICKHOUSE_ENABLED
value: "true"
- name: CLICKHOUSE_URL
value: "jdbc:clickhouse://clickhouse.cameleer.svc.cluster.local:8123/cameleer?async_insert=1&wait_for_async_insert=0"
- name: CAMELEER_STORAGE_METRICS
value: "postgres"
resources: resources:
requests: requests:

75
deploy/clickhouse.yaml Normal file
View File

@@ -0,0 +1,75 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: clickhouse
namespace: cameleer
spec:
serviceName: clickhouse
replicas: 1
selector:
matchLabels:
app: clickhouse
template:
metadata:
labels:
app: clickhouse
spec:
containers:
- name: clickhouse
image: clickhouse/clickhouse-server:24.12
ports:
- containerPort: 8123
name: http
- containerPort: 9000
name: native
volumeMounts:
- name: data
mountPath: /var/lib/clickhouse
resources:
requests:
memory: "2Gi"
cpu: "500m"
limits:
memory: "4Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /ping
port: 8123
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 3
readinessProbe:
httpGet:
path: /ping
port: 8123
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 50Gi
---
apiVersion: v1
kind: Service
metadata:
name: clickhouse
namespace: cameleer
spec:
clusterIP: None
selector:
app: clickhouse
ports:
- port: 8123
targetPort: 8123
name: http
- port: 9000
targetPort: 9000
name: native

File diff suppressed because it is too large Load Diff