feat: add ClickHouseMetricsStore with batch insert
TDD implementation of MetricsStore backed by ClickHouse. Uses native Map(String,String) column type (no JSON cast), relies on ClickHouse DEFAULT for server_received_at, and handles null tags by substituting an empty HashMap. All 4 Testcontainers tests pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,41 @@
|
|||||||
|
package com.cameleer3.server.app.storage;
|
||||||
|
|
||||||
|
import com.cameleer3.server.core.storage.MetricsStore;
|
||||||
|
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class ClickHouseMetricsStore implements MetricsStore {
|
||||||
|
|
||||||
|
private final JdbcTemplate jdbc;
|
||||||
|
|
||||||
|
public ClickHouseMetricsStore(JdbcTemplate jdbc) {
|
||||||
|
this.jdbc = jdbc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void insertBatch(List<MetricsSnapshot> snapshots) {
|
||||||
|
if (snapshots.isEmpty()) return;
|
||||||
|
|
||||||
|
jdbc.batchUpdate("""
|
||||||
|
INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags, collected_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
snapshots.stream().map(s -> new Object[]{
|
||||||
|
s.agentId(),
|
||||||
|
s.metricName(),
|
||||||
|
s.metricValue(),
|
||||||
|
tagsToClickHouseMap(s.tags()),
|
||||||
|
Timestamp.from(s.collectedAt())
|
||||||
|
}).toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, String> tagsToClickHouseMap(Map<String, String> tags) {
|
||||||
|
if (tags == null || tags.isEmpty()) return new HashMap<>();
|
||||||
|
return new HashMap<>(tags);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,108 @@
|
|||||||
|
package com.cameleer3.server.app.storage;
|
||||||
|
|
||||||
|
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
|
||||||
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
class ClickHouseMetricsStoreTest {
|
||||||
|
|
||||||
|
@Container
|
||||||
|
static final ClickHouseContainer clickhouse =
|
||||||
|
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||||
|
|
||||||
|
private JdbcTemplate jdbc;
|
||||||
|
private ClickHouseMetricsStore store;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() {
|
||||||
|
HikariDataSource ds = new HikariDataSource();
|
||||||
|
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||||
|
ds.setUsername(clickhouse.getUsername());
|
||||||
|
ds.setPassword(clickhouse.getPassword());
|
||||||
|
|
||||||
|
jdbc = new JdbcTemplate(ds);
|
||||||
|
|
||||||
|
jdbc.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS agent_metrics (
|
||||||
|
tenant_id LowCardinality(String) DEFAULT 'default',
|
||||||
|
collected_at DateTime64(3),
|
||||||
|
agent_id LowCardinality(String),
|
||||||
|
metric_name LowCardinality(String),
|
||||||
|
metric_value Float64,
|
||||||
|
tags Map(String, String) DEFAULT map(),
|
||||||
|
server_received_at DateTime64(3) DEFAULT now64(3)
|
||||||
|
)
|
||||||
|
ENGINE = MergeTree()
|
||||||
|
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
|
||||||
|
""");
|
||||||
|
|
||||||
|
jdbc.execute("TRUNCATE TABLE agent_metrics");
|
||||||
|
|
||||||
|
store = new ClickHouseMetricsStore(jdbc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void insertBatch_writesMetricsToClickHouse() {
|
||||||
|
List<MetricsSnapshot> batch = List.of(
|
||||||
|
new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:00Z"),
|
||||||
|
"cpu.usage", 75.5, Map.of("host", "server-1")),
|
||||||
|
new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:01Z"),
|
||||||
|
"memory.free", 1024.0, null)
|
||||||
|
);
|
||||||
|
|
||||||
|
store.insertBatch(batch);
|
||||||
|
|
||||||
|
Integer count = jdbc.queryForObject(
|
||||||
|
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-1'",
|
||||||
|
Integer.class);
|
||||||
|
assertThat(count).isEqualTo(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void insertBatch_storesTags() {
|
||||||
|
store.insertBatch(List.of(
|
||||||
|
new MetricsSnapshot("agent-2", Instant.parse("2026-03-31T10:00:00Z"),
|
||||||
|
"disk.used", 500.0, Map.of("mount", "/data", "fs", "ext4"))
|
||||||
|
));
|
||||||
|
|
||||||
|
// Just verify we can read back the row with tags
|
||||||
|
Integer count = jdbc.queryForObject(
|
||||||
|
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-2'",
|
||||||
|
Integer.class);
|
||||||
|
assertThat(count).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void insertBatch_emptyList_doesNothing() {
|
||||||
|
store.insertBatch(List.of());
|
||||||
|
|
||||||
|
Integer count = jdbc.queryForObject("SELECT count() FROM agent_metrics", Integer.class);
|
||||||
|
assertThat(count).isEqualTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void insertBatch_nullTags_defaultsToEmptyMap() {
|
||||||
|
store.insertBatch(List.of(
|
||||||
|
new MetricsSnapshot("agent-3", Instant.parse("2026-03-31T10:00:00Z"),
|
||||||
|
"cpu.usage", 50.0, null)
|
||||||
|
));
|
||||||
|
|
||||||
|
Integer count = jdbc.queryForObject(
|
||||||
|
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-3'",
|
||||||
|
Integer.class);
|
||||||
|
assertThat(count).isEqualTo(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user