Compare commits
10 Commits
e7eda7a7b3
...
7a2a0ee649
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7a2a0ee649 | ||
|
|
1b991f99a3 | ||
|
|
21991b6cf8 | ||
|
|
53766aeb56 | ||
|
|
bf0e9ea418 | ||
|
|
6e30b7ec65 | ||
|
|
08934376df | ||
|
|
23f901279a | ||
|
|
6171827243 | ||
|
|
c77d8a7af0 |
@@ -57,6 +57,12 @@
|
||||
<artifactId>opensearch-rest-client</artifactId>
|
||||
<version>2.19.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.clickhouse</groupId>
|
||||
<artifactId>clickhouse-jdbc</artifactId>
|
||||
<version>0.9.7</version>
|
||||
<classifier>all</classifier>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springdoc</groupId>
|
||||
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
|
||||
@@ -126,6 +132,11 @@
|
||||
<version>2.1.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>testcontainers-clickhouse</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.cameleer3.server.app.config;
|
||||
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
@Configuration
|
||||
@EnableConfigurationProperties(ClickHouseProperties.class)
|
||||
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
|
||||
public class ClickHouseConfig {
|
||||
|
||||
@Bean(name = "clickHouseDataSource")
|
||||
public DataSource clickHouseDataSource(ClickHouseProperties props) {
|
||||
HikariDataSource ds = new HikariDataSource();
|
||||
ds.setJdbcUrl(props.getUrl());
|
||||
ds.setUsername(props.getUsername());
|
||||
ds.setPassword(props.getPassword());
|
||||
ds.setMaximumPoolSize(10);
|
||||
ds.setPoolName("clickhouse-pool");
|
||||
return ds;
|
||||
}
|
||||
|
||||
@Bean(name = "clickHouseJdbcTemplate")
|
||||
public JdbcTemplate clickHouseJdbcTemplate(
|
||||
@Qualifier("clickHouseDataSource") DataSource ds) {
|
||||
return new JdbcTemplate(ds);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.cameleer3.server.app.config;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
@ConfigurationProperties(prefix = "clickhouse")
|
||||
public class ClickHouseProperties {
|
||||
|
||||
private String url = "jdbc:clickhouse://localhost:8123/cameleer";
|
||||
private String username = "default";
|
||||
private String password = "";
|
||||
|
||||
public String getUrl() { return url; }
|
||||
public void setUrl(String url) { this.url = url; }
|
||||
|
||||
public String getUsername() { return username; }
|
||||
public void setUsername(String username) { this.username = username; }
|
||||
|
||||
public String getPassword() { return password; }
|
||||
public void setPassword(String password) { this.password = password; }
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package com.cameleer3.server.app.config;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
|
||||
public class ClickHouseSchemaInitializer {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ClickHouseSchemaInitializer.class);
|
||||
|
||||
private final JdbcTemplate clickHouseJdbc;
|
||||
|
||||
public ClickHouseSchemaInitializer(
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||
this.clickHouseJdbc = clickHouseJdbc;
|
||||
}
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void initializeSchema() throws IOException {
|
||||
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
|
||||
Resource[] scripts = resolver.getResources("classpath:clickhouse/*.sql");
|
||||
|
||||
Arrays.sort(scripts, Comparator.comparing(Resource::getFilename));
|
||||
|
||||
for (Resource script : scripts) {
|
||||
String sql = script.getContentAsString(StandardCharsets.UTF_8);
|
||||
log.info("Executing ClickHouse schema script: {}", script.getFilename());
|
||||
for (String statement : sql.split(";")) {
|
||||
String trimmed = statement.trim();
|
||||
if (!trimmed.isEmpty()) {
|
||||
clickHouseJdbc.execute(trimmed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("ClickHouse schema initialization complete ({} scripts)", scripts.length);
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,9 @@
|
||||
package com.cameleer3.server.app.config;
|
||||
|
||||
import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore;
|
||||
import com.cameleer3.server.app.storage.ClickHouseMetricsStore;
|
||||
import com.cameleer3.server.app.storage.PostgresMetricsQueryStore;
|
||||
import com.cameleer3.server.app.storage.PostgresMetricsStore;
|
||||
import com.cameleer3.server.core.admin.AuditRepository;
|
||||
import com.cameleer3.server.core.admin.AuditService;
|
||||
import com.cameleer3.server.core.detail.DetailService;
|
||||
@@ -8,9 +12,12 @@ import com.cameleer3.server.core.ingestion.IngestionService;
|
||||
import com.cameleer3.server.core.ingestion.WriteBuffer;
|
||||
import com.cameleer3.server.core.storage.*;
|
||||
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
@Configuration
|
||||
public class StorageBeanConfig {
|
||||
@@ -41,4 +48,30 @@ public class StorageBeanConfig {
|
||||
return new IngestionService(executionStore, diagramStore, metricsBuffer,
|
||||
searchIndexer::onExecutionUpdated, bodySizeLimit);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse")
|
||||
public MetricsStore clickHouseMetricsStore(
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||
return new ClickHouseMetricsStore(clickHouseJdbc);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true)
|
||||
public MetricsStore postgresMetricsStore(JdbcTemplate jdbc) {
|
||||
return new PostgresMetricsStore(jdbc);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse")
|
||||
public MetricsQueryStore clickHouseMetricsQueryStore(
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||
return new ClickHouseMetricsQueryStore(clickHouseJdbc);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true)
|
||||
public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) {
|
||||
return new PostgresMetricsQueryStore(jdbc);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,22 +2,23 @@ package com.cameleer3.server.app.controller;
|
||||
|
||||
import com.cameleer3.server.app.dto.AgentMetricsResponse;
|
||||
import com.cameleer3.server.app.dto.MetricBucket;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import com.cameleer3.server.core.storage.MetricsQueryStore;
|
||||
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/api/v1/agents/{agentId}/metrics")
|
||||
public class AgentMetricsController {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
private final MetricsQueryStore metricsQueryStore;
|
||||
|
||||
public AgentMetricsController(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
public AgentMetricsController(MetricsQueryStore metricsQueryStore) {
|
||||
this.metricsQueryStore = metricsQueryStore;
|
||||
}
|
||||
|
||||
@GetMapping
|
||||
@@ -32,34 +33,18 @@ public class AgentMetricsController {
|
||||
if (to == null) to = Instant.now();
|
||||
|
||||
List<String> metricNames = Arrays.asList(names.split(","));
|
||||
long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1);
|
||||
String intervalStr = intervalMs + " milliseconds";
|
||||
|
||||
Map<String, List<MetricBucket>> result = new LinkedHashMap<>();
|
||||
for (String name : metricNames) {
|
||||
result.put(name.trim(), new ArrayList<>());
|
||||
}
|
||||
Map<String, List<MetricTimeSeries.Bucket>> raw =
|
||||
metricsQueryStore.queryTimeSeries(agentId, metricNames, from, to, buckets);
|
||||
|
||||
String sql = """
|
||||
SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket,
|
||||
metric_name,
|
||||
AVG(metric_value) AS avg_value
|
||||
FROM agent_metrics
|
||||
WHERE agent_id = ?
|
||||
AND collected_at >= ? AND collected_at < ?
|
||||
AND metric_name = ANY(?)
|
||||
GROUP BY bucket, metric_name
|
||||
ORDER BY bucket
|
||||
""";
|
||||
|
||||
String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new);
|
||||
jdbc.query(sql, rs -> {
|
||||
String metricName = rs.getString("metric_name");
|
||||
Instant bucket = rs.getTimestamp("bucket").toInstant();
|
||||
double value = rs.getDouble("avg_value");
|
||||
result.computeIfAbsent(metricName, k -> new ArrayList<>())
|
||||
.add(new MetricBucket(bucket, value));
|
||||
}, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray);
|
||||
Map<String, List<MetricBucket>> result = raw.entrySet().stream()
|
||||
.collect(Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
e -> e.getValue().stream()
|
||||
.map(b -> new MetricBucket(b.time(), b.value()))
|
||||
.toList(),
|
||||
(a, b) -> a,
|
||||
LinkedHashMap::new));
|
||||
|
||||
return new AgentMetricsResponse(result);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.MetricsQueryStore;
|
||||
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
|
||||
public class ClickHouseMetricsQueryStore implements MetricsQueryStore {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public ClickHouseMetricsQueryStore(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
|
||||
String agentId, List<String> metricNames,
|
||||
Instant from, Instant to, int buckets) {
|
||||
|
||||
long intervalSeconds = Math.max(60,
|
||||
(to.getEpochSecond() - from.getEpochSecond()) / Math.max(buckets, 1));
|
||||
|
||||
Map<String, List<MetricTimeSeries.Bucket>> result = new LinkedHashMap<>();
|
||||
for (String name : metricNames) {
|
||||
result.put(name.trim(), new ArrayList<>());
|
||||
}
|
||||
|
||||
String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new);
|
||||
|
||||
// ClickHouse JDBC doesn't support array params with IN (?).
|
||||
// Build the IN clause with properly escaped values.
|
||||
StringBuilder inClause = new StringBuilder();
|
||||
for (int i = 0; i < namesArray.length; i++) {
|
||||
if (i > 0) inClause.append(", ");
|
||||
inClause.append("'").append(namesArray[i].replace("'", "\\'")).append("'");
|
||||
}
|
||||
|
||||
String finalSql = """
|
||||
SELECT toStartOfInterval(collected_at, INTERVAL %d SECOND) AS bucket,
|
||||
metric_name,
|
||||
avg(metric_value) AS avg_value
|
||||
FROM agent_metrics
|
||||
WHERE agent_id = ?
|
||||
AND collected_at >= ?
|
||||
AND collected_at < ?
|
||||
AND metric_name IN (%s)
|
||||
GROUP BY bucket, metric_name
|
||||
ORDER BY bucket
|
||||
""".formatted(intervalSeconds, inClause);
|
||||
|
||||
jdbc.query(finalSql, rs -> {
|
||||
String metricName = rs.getString("metric_name");
|
||||
Instant bucket = rs.getTimestamp("bucket").toInstant();
|
||||
double value = rs.getDouble("avg_value");
|
||||
result.computeIfAbsent(metricName, k -> new ArrayList<>())
|
||||
.add(new MetricTimeSeries.Bucket(bucket, value));
|
||||
}, agentId,
|
||||
java.sql.Timestamp.from(from),
|
||||
java.sql.Timestamp.from(to));
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.MetricsStore;
|
||||
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ClickHouseMetricsStore implements MetricsStore {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public ClickHouseMetricsStore(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insertBatch(List<MetricsSnapshot> snapshots) {
|
||||
if (snapshots.isEmpty()) return;
|
||||
|
||||
jdbc.batchUpdate("""
|
||||
INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags, collected_at)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
""",
|
||||
snapshots.stream().map(s -> new Object[]{
|
||||
s.agentId(),
|
||||
s.metricName(),
|
||||
s.metricValue(),
|
||||
tagsToClickHouseMap(s.tags()),
|
||||
Timestamp.from(s.collectedAt())
|
||||
}).toList());
|
||||
}
|
||||
|
||||
private Map<String, String> tagsToClickHouseMap(Map<String, String> tags) {
|
||||
if (tags == null || tags.isEmpty()) return new HashMap<>();
|
||||
return new HashMap<>(tags);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.MetricsQueryStore;
|
||||
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
|
||||
public class PostgresMetricsQueryStore implements MetricsQueryStore {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public PostgresMetricsQueryStore(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
|
||||
String agentId, List<String> metricNames,
|
||||
Instant from, Instant to, int buckets) {
|
||||
|
||||
long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1);
|
||||
String intervalStr = intervalMs + " milliseconds";
|
||||
|
||||
Map<String, List<MetricTimeSeries.Bucket>> result = new LinkedHashMap<>();
|
||||
for (String name : metricNames) {
|
||||
result.put(name.trim(), new ArrayList<>());
|
||||
}
|
||||
|
||||
String sql = """
|
||||
SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket,
|
||||
metric_name,
|
||||
AVG(metric_value) AS avg_value
|
||||
FROM agent_metrics
|
||||
WHERE agent_id = ?
|
||||
AND collected_at >= ? AND collected_at < ?
|
||||
AND metric_name = ANY(?)
|
||||
GROUP BY bucket, metric_name
|
||||
ORDER BY bucket
|
||||
""";
|
||||
|
||||
String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new);
|
||||
jdbc.query(sql, rs -> {
|
||||
String metricName = rs.getString("metric_name");
|
||||
Instant bucket = rs.getTimestamp("bucket").toInstant();
|
||||
double value = rs.getDouble("avg_value");
|
||||
result.computeIfAbsent(metricName, k -> new ArrayList<>())
|
||||
.add(new MetricTimeSeries.Bucket(bucket, value));
|
||||
}, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray);
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -5,12 +5,10 @@ import com.cameleer3.server.core.storage.model.MetricsSnapshot;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
|
||||
@Repository
|
||||
public class PostgresMetricsStore implements MetricsStore {
|
||||
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
|
||||
@@ -48,6 +48,8 @@ opensearch:
|
||||
cameleer:
|
||||
body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384}
|
||||
retention-days: ${CAMELEER_RETENTION_DAYS:30}
|
||||
storage:
|
||||
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
|
||||
|
||||
security:
|
||||
access-token-expiry-ms: 3600000
|
||||
@@ -66,6 +68,12 @@ springdoc:
|
||||
swagger-ui:
|
||||
path: /api/v1/swagger-ui
|
||||
|
||||
clickhouse:
|
||||
enabled: ${CLICKHOUSE_ENABLED:false}
|
||||
url: ${CLICKHOUSE_URL:jdbc:clickhouse://localhost:8123/cameleer?async_insert=1&wait_for_async_insert=0}
|
||||
username: ${CLICKHOUSE_USERNAME:default}
|
||||
password: ${CLICKHOUSE_PASSWORD:}
|
||||
|
||||
management:
|
||||
endpoints:
|
||||
web:
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
CREATE TABLE IF NOT EXISTS agent_metrics (
|
||||
tenant_id LowCardinality(String) DEFAULT 'default',
|
||||
collected_at DateTime64(3),
|
||||
agent_id LowCardinality(String),
|
||||
metric_name LowCardinality(String),
|
||||
metric_value Float64,
|
||||
tags Map(String, String) DEFAULT map(),
|
||||
server_received_at DateTime64(3) DEFAULT now64(3)
|
||||
)
|
||||
ENGINE = MergeTree()
|
||||
PARTITION BY (tenant_id, toYYYYMM(collected_at))
|
||||
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
|
||||
TTL collected_at + INTERVAL 365 DAY DELETE
|
||||
SETTINGS index_granularity = 8192;
|
||||
@@ -7,6 +7,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
import org.springframework.test.context.DynamicPropertyRegistry;
|
||||
import org.springframework.test.context.DynamicPropertySource;
|
||||
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||
import org.testcontainers.containers.PostgreSQLContainer;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
@@ -20,6 +21,7 @@ public abstract class AbstractPostgresIT {
|
||||
|
||||
static final PostgreSQLContainer<?> postgres;
|
||||
static final OpensearchContainer<?> opensearch;
|
||||
static final ClickHouseContainer clickhouse;
|
||||
|
||||
static {
|
||||
postgres = new PostgreSQLContainer<>(TIMESCALEDB_IMAGE)
|
||||
@@ -30,6 +32,9 @@ public abstract class AbstractPostgresIT {
|
||||
|
||||
opensearch = new OpensearchContainer<>("opensearchproject/opensearch:2.19.0");
|
||||
opensearch.start();
|
||||
|
||||
clickhouse = new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||
clickhouse.start();
|
||||
}
|
||||
|
||||
@Autowired
|
||||
@@ -46,5 +51,9 @@ public abstract class AbstractPostgresIT {
|
||||
registry.add("spring.flyway.user", postgres::getUsername);
|
||||
registry.add("spring.flyway.password", postgres::getPassword);
|
||||
registry.add("opensearch.url", opensearch::getHttpHostAddress);
|
||||
registry.add("clickhouse.enabled", () -> "true");
|
||||
registry.add("clickhouse.url", clickhouse::getJdbcUrl);
|
||||
registry.add("clickhouse.username", clickhouse::getUsername);
|
||||
registry.add("clickhouse.password", clickhouse::getPassword);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,114 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Testcontainers
|
||||
class ClickHouseMetricsQueryStoreTest {
|
||||
|
||||
@Container
|
||||
static final ClickHouseContainer clickhouse =
|
||||
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||
|
||||
private JdbcTemplate jdbc;
|
||||
private ClickHouseMetricsQueryStore queryStore;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
HikariDataSource ds = new HikariDataSource();
|
||||
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||
ds.setUsername(clickhouse.getUsername());
|
||||
ds.setPassword(clickhouse.getPassword());
|
||||
|
||||
jdbc = new JdbcTemplate(ds);
|
||||
|
||||
jdbc.execute("""
|
||||
CREATE TABLE IF NOT EXISTS agent_metrics (
|
||||
tenant_id LowCardinality(String) DEFAULT 'default',
|
||||
collected_at DateTime64(3),
|
||||
agent_id LowCardinality(String),
|
||||
metric_name LowCardinality(String),
|
||||
metric_value Float64,
|
||||
tags Map(String, String) DEFAULT map(),
|
||||
server_received_at DateTime64(3) DEFAULT now64(3)
|
||||
)
|
||||
ENGINE = MergeTree()
|
||||
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
|
||||
""");
|
||||
|
||||
jdbc.execute("TRUNCATE TABLE agent_metrics");
|
||||
|
||||
// Seed test data: 6 data points across 1 hour for two metrics
|
||||
Instant base = Instant.parse("2026-03-31T10:00:00Z");
|
||||
for (int i = 0; i < 6; i++) {
|
||||
Instant ts = base.plusSeconds(i * 600); // every 10 minutes
|
||||
jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)",
|
||||
"agent-1", "cpu.usage", 50.0 + i * 5, java.sql.Timestamp.from(ts));
|
||||
jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)",
|
||||
"agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts));
|
||||
}
|
||||
|
||||
queryStore = new ClickHouseMetricsQueryStore(jdbc);
|
||||
}
|
||||
|
||||
@Test
|
||||
void queryTimeSeries_returnsDataGroupedByMetric() {
|
||||
Instant from = Instant.parse("2026-03-31T10:00:00Z");
|
||||
Instant to = Instant.parse("2026-03-31T11:00:00Z");
|
||||
|
||||
Map<String, List<MetricTimeSeries.Bucket>> result =
|
||||
queryStore.queryTimeSeries("agent-1", List.of("cpu.usage", "memory.free"), from, to, 6);
|
||||
|
||||
assertThat(result).containsKeys("cpu.usage", "memory.free");
|
||||
assertThat(result.get("cpu.usage")).isNotEmpty();
|
||||
assertThat(result.get("memory.free")).isNotEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void queryTimeSeries_bucketsAverageCorrectly() {
|
||||
Instant from = Instant.parse("2026-03-31T10:00:00Z");
|
||||
Instant to = Instant.parse("2026-03-31T11:00:00Z");
|
||||
|
||||
// 1 bucket for the entire hour = average of all 6 values
|
||||
Map<String, List<MetricTimeSeries.Bucket>> result =
|
||||
queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 1);
|
||||
|
||||
assertThat(result.get("cpu.usage")).hasSize(1);
|
||||
// Values: 50, 55, 60, 65, 70, 75 → avg = 62.5
|
||||
assertThat(result.get("cpu.usage").get(0).value()).isCloseTo(62.5, org.assertj.core.data.Offset.offset(0.1));
|
||||
}
|
||||
|
||||
@Test
|
||||
void queryTimeSeries_noData_returnsEmptyLists() {
|
||||
Instant from = Instant.parse("2025-01-01T00:00:00Z");
|
||||
Instant to = Instant.parse("2025-01-01T01:00:00Z");
|
||||
|
||||
Map<String, List<MetricTimeSeries.Bucket>> result =
|
||||
queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 6);
|
||||
|
||||
assertThat(result.get("cpu.usage")).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void queryTimeSeries_unknownAgent_returnsEmpty() {
|
||||
Instant from = Instant.parse("2026-03-31T10:00:00Z");
|
||||
Instant to = Instant.parse("2026-03-31T11:00:00Z");
|
||||
|
||||
Map<String, List<MetricTimeSeries.Bucket>> result =
|
||||
queryStore.queryTimeSeries("nonexistent", List.of("cpu.usage"), from, to, 6);
|
||||
|
||||
assertThat(result.get("cpu.usage")).isEmpty();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,108 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Testcontainers
|
||||
class ClickHouseMetricsStoreTest {
|
||||
|
||||
@Container
|
||||
static final ClickHouseContainer clickhouse =
|
||||
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||
|
||||
private JdbcTemplate jdbc;
|
||||
private ClickHouseMetricsStore store;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
HikariDataSource ds = new HikariDataSource();
|
||||
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||
ds.setUsername(clickhouse.getUsername());
|
||||
ds.setPassword(clickhouse.getPassword());
|
||||
|
||||
jdbc = new JdbcTemplate(ds);
|
||||
|
||||
jdbc.execute("""
|
||||
CREATE TABLE IF NOT EXISTS agent_metrics (
|
||||
tenant_id LowCardinality(String) DEFAULT 'default',
|
||||
collected_at DateTime64(3),
|
||||
agent_id LowCardinality(String),
|
||||
metric_name LowCardinality(String),
|
||||
metric_value Float64,
|
||||
tags Map(String, String) DEFAULT map(),
|
||||
server_received_at DateTime64(3) DEFAULT now64(3)
|
||||
)
|
||||
ENGINE = MergeTree()
|
||||
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
|
||||
""");
|
||||
|
||||
jdbc.execute("TRUNCATE TABLE agent_metrics");
|
||||
|
||||
store = new ClickHouseMetricsStore(jdbc);
|
||||
}
|
||||
|
||||
@Test
|
||||
void insertBatch_writesMetricsToClickHouse() {
|
||||
List<MetricsSnapshot> batch = List.of(
|
||||
new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:00Z"),
|
||||
"cpu.usage", 75.5, Map.of("host", "server-1")),
|
||||
new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:01Z"),
|
||||
"memory.free", 1024.0, null)
|
||||
);
|
||||
|
||||
store.insertBatch(batch);
|
||||
|
||||
Integer count = jdbc.queryForObject(
|
||||
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-1'",
|
||||
Integer.class);
|
||||
assertThat(count).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void insertBatch_storesTags() {
|
||||
store.insertBatch(List.of(
|
||||
new MetricsSnapshot("agent-2", Instant.parse("2026-03-31T10:00:00Z"),
|
||||
"disk.used", 500.0, Map.of("mount", "/data", "fs", "ext4"))
|
||||
));
|
||||
|
||||
// Just verify we can read back the row with tags
|
||||
Integer count = jdbc.queryForObject(
|
||||
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-2'",
|
||||
Integer.class);
|
||||
assertThat(count).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void insertBatch_emptyList_doesNothing() {
|
||||
store.insertBatch(List.of());
|
||||
|
||||
Integer count = jdbc.queryForObject("SELECT count() FROM agent_metrics", Integer.class);
|
||||
assertThat(count).isEqualTo(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void insertBatch_nullTags_defaultsToEmptyMap() {
|
||||
store.insertBatch(List.of(
|
||||
new MetricsSnapshot("agent-3", Instant.parse("2026-03-31T10:00:00Z"),
|
||||
"cpu.usage", 50.0, null)
|
||||
));
|
||||
|
||||
Integer count = jdbc.queryForObject(
|
||||
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-3'",
|
||||
Integer.class);
|
||||
assertThat(count).isEqualTo(1);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
package com.cameleer3.server.core.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public interface MetricsQueryStore {
|
||||
|
||||
Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
|
||||
String agentId, List<String> metricNames,
|
||||
Instant from, Instant to, int buckets);
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package com.cameleer3.server.core.storage.model;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
|
||||
public record MetricTimeSeries(String metricName, List<Bucket> buckets) {
|
||||
|
||||
public record Bucket(Instant time, double value) {}
|
||||
}
|
||||
@@ -75,6 +75,12 @@ spec:
|
||||
name: cameleer-auth
|
||||
key: CAMELEER_JWT_SECRET
|
||||
optional: true
|
||||
- name: CLICKHOUSE_ENABLED
|
||||
value: "true"
|
||||
- name: CLICKHOUSE_URL
|
||||
value: "jdbc:clickhouse://clickhouse.cameleer.svc.cluster.local:8123/cameleer?async_insert=1&wait_for_async_insert=0"
|
||||
- name: CAMELEER_STORAGE_METRICS
|
||||
value: "postgres"
|
||||
|
||||
resources:
|
||||
requests:
|
||||
|
||||
75
deploy/clickhouse.yaml
Normal file
75
deploy/clickhouse.yaml
Normal file
@@ -0,0 +1,75 @@
|
||||
apiVersion: apps/v1
|
||||
kind: StatefulSet
|
||||
metadata:
|
||||
name: clickhouse
|
||||
namespace: cameleer
|
||||
spec:
|
||||
serviceName: clickhouse
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: clickhouse
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: clickhouse
|
||||
spec:
|
||||
containers:
|
||||
- name: clickhouse
|
||||
image: clickhouse/clickhouse-server:24.12
|
||||
ports:
|
||||
- containerPort: 8123
|
||||
name: http
|
||||
- containerPort: 9000
|
||||
name: native
|
||||
volumeMounts:
|
||||
- name: data
|
||||
mountPath: /var/lib/clickhouse
|
||||
resources:
|
||||
requests:
|
||||
memory: "2Gi"
|
||||
cpu: "500m"
|
||||
limits:
|
||||
memory: "4Gi"
|
||||
cpu: "2000m"
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
path: /ping
|
||||
port: 8123
|
||||
initialDelaySeconds: 10
|
||||
periodSeconds: 10
|
||||
timeoutSeconds: 3
|
||||
failureThreshold: 3
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /ping
|
||||
port: 8123
|
||||
initialDelaySeconds: 5
|
||||
periodSeconds: 5
|
||||
timeoutSeconds: 3
|
||||
failureThreshold: 3
|
||||
volumeClaimTemplates:
|
||||
- metadata:
|
||||
name: data
|
||||
spec:
|
||||
accessModes: ["ReadWriteOnce"]
|
||||
resources:
|
||||
requests:
|
||||
storage: 50Gi
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: clickhouse
|
||||
namespace: cameleer
|
||||
spec:
|
||||
clusterIP: None
|
||||
selector:
|
||||
app: clickhouse
|
||||
ports:
|
||||
- port: 8123
|
||||
targetPort: 8123
|
||||
name: http
|
||||
- port: 9000
|
||||
targetPort: 9000
|
||||
name: native
|
||||
1193
docs/superpowers/plans/2026-03-31-clickhouse-phase1-foundation.md
Normal file
1193
docs/superpowers/plans/2026-03-31-clickhouse-phase1-foundation.md
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user