feat: add ClickHouseMetricsQueryStore with time-bucketed queries
Implements MetricsQueryStore using ClickHouse toStartOfInterval() for time-bucketed aggregation queries; verified with 4 Testcontainers tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,66 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.MetricsQueryStore;
|
||||
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
|
||||
public class ClickHouseMetricsQueryStore implements MetricsQueryStore {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public ClickHouseMetricsQueryStore(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
|
||||
String agentId, List<String> metricNames,
|
||||
Instant from, Instant to, int buckets) {
|
||||
|
||||
long intervalSeconds = Math.max(60,
|
||||
(to.getEpochSecond() - from.getEpochSecond()) / Math.max(buckets, 1));
|
||||
|
||||
Map<String, List<MetricTimeSeries.Bucket>> result = new LinkedHashMap<>();
|
||||
for (String name : metricNames) {
|
||||
result.put(name.trim(), new ArrayList<>());
|
||||
}
|
||||
|
||||
String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new);
|
||||
|
||||
// ClickHouse JDBC doesn't support array params with IN (?).
|
||||
// Build the IN clause with properly escaped values.
|
||||
StringBuilder inClause = new StringBuilder();
|
||||
for (int i = 0; i < namesArray.length; i++) {
|
||||
if (i > 0) inClause.append(", ");
|
||||
inClause.append("'").append(namesArray[i].replace("'", "\\'")).append("'");
|
||||
}
|
||||
|
||||
String finalSql = """
|
||||
SELECT toStartOfInterval(collected_at, INTERVAL %d SECOND) AS bucket,
|
||||
metric_name,
|
||||
avg(metric_value) AS avg_value
|
||||
FROM agent_metrics
|
||||
WHERE agent_id = ?
|
||||
AND collected_at >= ?
|
||||
AND collected_at < ?
|
||||
AND metric_name IN (%s)
|
||||
GROUP BY bucket, metric_name
|
||||
ORDER BY bucket
|
||||
""".formatted(intervalSeconds, inClause);
|
||||
|
||||
jdbc.query(finalSql, rs -> {
|
||||
String metricName = rs.getString("metric_name");
|
||||
Instant bucket = rs.getTimestamp("bucket").toInstant();
|
||||
double value = rs.getDouble("avg_value");
|
||||
result.computeIfAbsent(metricName, k -> new ArrayList<>())
|
||||
.add(new MetricTimeSeries.Bucket(bucket, value));
|
||||
}, agentId,
|
||||
java.sql.Timestamp.from(from),
|
||||
java.sql.Timestamp.from(to));
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,114 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Testcontainers
|
||||
class ClickHouseMetricsQueryStoreTest {
|
||||
|
||||
@Container
|
||||
static final ClickHouseContainer clickhouse =
|
||||
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||
|
||||
private JdbcTemplate jdbc;
|
||||
private ClickHouseMetricsQueryStore queryStore;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
HikariDataSource ds = new HikariDataSource();
|
||||
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||
ds.setUsername(clickhouse.getUsername());
|
||||
ds.setPassword(clickhouse.getPassword());
|
||||
|
||||
jdbc = new JdbcTemplate(ds);
|
||||
|
||||
jdbc.execute("""
|
||||
CREATE TABLE IF NOT EXISTS agent_metrics (
|
||||
tenant_id LowCardinality(String) DEFAULT 'default',
|
||||
collected_at DateTime64(3),
|
||||
agent_id LowCardinality(String),
|
||||
metric_name LowCardinality(String),
|
||||
metric_value Float64,
|
||||
tags Map(String, String) DEFAULT map(),
|
||||
server_received_at DateTime64(3) DEFAULT now64(3)
|
||||
)
|
||||
ENGINE = MergeTree()
|
||||
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
|
||||
""");
|
||||
|
||||
jdbc.execute("TRUNCATE TABLE agent_metrics");
|
||||
|
||||
// Seed test data: 6 data points across 1 hour for two metrics
|
||||
Instant base = Instant.parse("2026-03-31T10:00:00Z");
|
||||
for (int i = 0; i < 6; i++) {
|
||||
Instant ts = base.plusSeconds(i * 600); // every 10 minutes
|
||||
jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)",
|
||||
"agent-1", "cpu.usage", 50.0 + i * 5, java.sql.Timestamp.from(ts));
|
||||
jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)",
|
||||
"agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts));
|
||||
}
|
||||
|
||||
queryStore = new ClickHouseMetricsQueryStore(jdbc);
|
||||
}
|
||||
|
||||
@Test
|
||||
void queryTimeSeries_returnsDataGroupedByMetric() {
|
||||
Instant from = Instant.parse("2026-03-31T10:00:00Z");
|
||||
Instant to = Instant.parse("2026-03-31T11:00:00Z");
|
||||
|
||||
Map<String, List<MetricTimeSeries.Bucket>> result =
|
||||
queryStore.queryTimeSeries("agent-1", List.of("cpu.usage", "memory.free"), from, to, 6);
|
||||
|
||||
assertThat(result).containsKeys("cpu.usage", "memory.free");
|
||||
assertThat(result.get("cpu.usage")).isNotEmpty();
|
||||
assertThat(result.get("memory.free")).isNotEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void queryTimeSeries_bucketsAverageCorrectly() {
|
||||
Instant from = Instant.parse("2026-03-31T10:00:00Z");
|
||||
Instant to = Instant.parse("2026-03-31T11:00:00Z");
|
||||
|
||||
// 1 bucket for the entire hour = average of all 6 values
|
||||
Map<String, List<MetricTimeSeries.Bucket>> result =
|
||||
queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 1);
|
||||
|
||||
assertThat(result.get("cpu.usage")).hasSize(1);
|
||||
// Values: 50, 55, 60, 65, 70, 75 → avg = 62.5
|
||||
assertThat(result.get("cpu.usage").get(0).value()).isCloseTo(62.5, org.assertj.core.data.Offset.offset(0.1));
|
||||
}
|
||||
|
||||
@Test
|
||||
void queryTimeSeries_noData_returnsEmptyLists() {
|
||||
Instant from = Instant.parse("2025-01-01T00:00:00Z");
|
||||
Instant to = Instant.parse("2025-01-01T01:00:00Z");
|
||||
|
||||
Map<String, List<MetricTimeSeries.Bucket>> result =
|
||||
queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 6);
|
||||
|
||||
assertThat(result.get("cpu.usage")).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void queryTimeSeries_unknownAgent_returnsEmpty() {
|
||||
Instant from = Instant.parse("2026-03-31T10:00:00Z");
|
||||
Instant to = Instant.parse("2026-03-31T11:00:00Z");
|
||||
|
||||
Map<String, List<MetricTimeSeries.Bucket>> result =
|
||||
queryStore.queryTimeSeries("nonexistent", List.of("cpu.usage"), from, to, 6);
|
||||
|
||||
assertThat(result.get("cpu.usage")).isEmpty();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user