diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java new file mode 100644 index 00000000..0ee53b42 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java @@ -0,0 +1,66 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.time.Instant; +import java.util.*; + +public class ClickHouseMetricsQueryStore implements MetricsQueryStore { + + private final JdbcTemplate jdbc; + + public ClickHouseMetricsQueryStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets) { + + long intervalSeconds = Math.max(60, + (to.getEpochSecond() - from.getEpochSecond()) / Math.max(buckets, 1)); + + Map> result = new LinkedHashMap<>(); + for (String name : metricNames) { + result.put(name.trim(), new ArrayList<>()); + } + + String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); + + // ClickHouse JDBC doesn't support array params with IN (?). + // Build the IN clause with properly escaped values. + StringBuilder inClause = new StringBuilder(); + for (int i = 0; i < namesArray.length; i++) { + if (i > 0) inClause.append(", "); + inClause.append("'").append(namesArray[i].replace("'", "\\'")).append("'"); + } + + String finalSql = """ + SELECT toStartOfInterval(collected_at, INTERVAL %d SECOND) AS bucket, + metric_name, + avg(metric_value) AS avg_value + FROM agent_metrics + WHERE agent_id = ? + AND collected_at >= ? + AND collected_at < ? + AND metric_name IN (%s) + GROUP BY bucket, metric_name + ORDER BY bucket + """.formatted(intervalSeconds, inClause); + + jdbc.query(finalSql, rs -> { + String metricName = rs.getString("metric_name"); + Instant bucket = rs.getTimestamp("bucket").toInstant(); + double value = rs.getDouble("avg_value"); + result.computeIfAbsent(metricName, k -> new ArrayList<>()) + .add(new MetricTimeSeries.Bucket(bucket, value)); + }, agentId, + java.sql.Timestamp.from(from), + java.sql.Timestamp.from(to)); + + return result; + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java new file mode 100644 index 00000000..08287fd6 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java @@ -0,0 +1,114 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseMetricsQueryStoreTest { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseMetricsQueryStore queryStore; + + @BeforeEach + void setUp() { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + jdbc.execute(""" + CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) + ) + ENGINE = MergeTree() + ORDER BY (tenant_id, agent_id, metric_name, collected_at) + """); + + jdbc.execute("TRUNCATE TABLE agent_metrics"); + + // Seed test data: 6 data points across 1 hour for two metrics + Instant base = Instant.parse("2026-03-31T10:00:00Z"); + for (int i = 0; i < 6; i++) { + Instant ts = base.plusSeconds(i * 600); // every 10 minutes + jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)", + "agent-1", "cpu.usage", 50.0 + i * 5, java.sql.Timestamp.from(ts)); + jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)", + "agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts)); + } + + queryStore = new ClickHouseMetricsQueryStore(jdbc); + } + + @Test + void queryTimeSeries_returnsDataGroupedByMetric() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage", "memory.free"), from, to, 6); + + assertThat(result).containsKeys("cpu.usage", "memory.free"); + assertThat(result.get("cpu.usage")).isNotEmpty(); + assertThat(result.get("memory.free")).isNotEmpty(); + } + + @Test + void queryTimeSeries_bucketsAverageCorrectly() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + // 1 bucket for the entire hour = average of all 6 values + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 1); + + assertThat(result.get("cpu.usage")).hasSize(1); + // Values: 50, 55, 60, 65, 70, 75 → avg = 62.5 + assertThat(result.get("cpu.usage").get(0).value()).isCloseTo(62.5, org.assertj.core.data.Offset.offset(0.1)); + } + + @Test + void queryTimeSeries_noData_returnsEmptyLists() { + Instant from = Instant.parse("2025-01-01T00:00:00Z"); + Instant to = Instant.parse("2025-01-01T01:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 6); + + assertThat(result.get("cpu.usage")).isEmpty(); + } + + @Test + void queryTimeSeries_unknownAgent_returnsEmpty() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("nonexistent", List.of("cpu.usage"), from, to, 6); + + assertThat(result.get("cpu.usage")).isEmpty(); + } +}