feat(clickhouse): add ClickHouseAgentEventRepository with integration tests
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,73 @@
|
|||||||
|
package com.cameleer3.server.app.storage;
|
||||||
|
|
||||||
|
import com.cameleer3.server.core.agent.AgentEventRecord;
|
||||||
|
import com.cameleer3.server.core.agent.AgentEventRepository;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ClickHouse implementation of {@link AgentEventRepository}.
|
||||||
|
* <p>
|
||||||
|
* The ClickHouse table has no {@code id} column (no BIGSERIAL equivalent),
|
||||||
|
* so all returned {@link AgentEventRecord} instances have {@code id = 0}.
|
||||||
|
*/
|
||||||
|
public class ClickHouseAgentEventRepository implements AgentEventRepository {
|
||||||
|
|
||||||
|
private static final String TENANT = "default";
|
||||||
|
|
||||||
|
private static final String INSERT_SQL =
|
||||||
|
"INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail) VALUES (?, ?, ?, ?, ?)";
|
||||||
|
|
||||||
|
private static final String SELECT_BASE =
|
||||||
|
"SELECT 0 AS id, agent_id, app_id, event_type, detail, timestamp FROM agent_events WHERE tenant_id = ?";
|
||||||
|
|
||||||
|
private final JdbcTemplate jdbc;
|
||||||
|
|
||||||
|
public ClickHouseAgentEventRepository(JdbcTemplate jdbc) {
|
||||||
|
this.jdbc = jdbc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void insert(String agentId, String appId, String eventType, String detail) {
|
||||||
|
jdbc.update(INSERT_SQL, TENANT, agentId, appId, eventType, detail);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<AgentEventRecord> query(String appId, String agentId, Instant from, Instant to, int limit) {
|
||||||
|
var sql = new StringBuilder(SELECT_BASE);
|
||||||
|
var params = new ArrayList<Object>();
|
||||||
|
params.add(TENANT);
|
||||||
|
|
||||||
|
if (appId != null) {
|
||||||
|
sql.append(" AND app_id = ?");
|
||||||
|
params.add(appId);
|
||||||
|
}
|
||||||
|
if (agentId != null) {
|
||||||
|
sql.append(" AND agent_id = ?");
|
||||||
|
params.add(agentId);
|
||||||
|
}
|
||||||
|
if (from != null) {
|
||||||
|
sql.append(" AND timestamp >= ?");
|
||||||
|
params.add(Timestamp.from(from));
|
||||||
|
}
|
||||||
|
if (to != null) {
|
||||||
|
sql.append(" AND timestamp < ?");
|
||||||
|
params.add(Timestamp.from(to));
|
||||||
|
}
|
||||||
|
sql.append(" ORDER BY timestamp DESC LIMIT ?");
|
||||||
|
params.add(limit);
|
||||||
|
|
||||||
|
return jdbc.query(sql.toString(), (rs, rowNum) -> new AgentEventRecord(
|
||||||
|
rs.getLong("id"),
|
||||||
|
rs.getString("agent_id"),
|
||||||
|
rs.getString("app_id"),
|
||||||
|
rs.getString("event_type"),
|
||||||
|
rs.getString("detail"),
|
||||||
|
rs.getTimestamp("timestamp").toInstant()
|
||||||
|
), params.toArray());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,158 @@
|
|||||||
|
package com.cameleer3.server.app.storage;
|
||||||
|
|
||||||
|
import com.cameleer3.server.core.agent.AgentEventRecord;
|
||||||
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
class ClickHouseAgentEventRepositoryIT {
|
||||||
|
|
||||||
|
@Container
|
||||||
|
static final ClickHouseContainer clickhouse =
|
||||||
|
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||||
|
|
||||||
|
private JdbcTemplate jdbc;
|
||||||
|
private ClickHouseAgentEventRepository repo;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() throws Exception {
|
||||||
|
HikariDataSource ds = new HikariDataSource();
|
||||||
|
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||||
|
ds.setUsername(clickhouse.getUsername());
|
||||||
|
ds.setPassword(clickhouse.getPassword());
|
||||||
|
|
||||||
|
jdbc = new JdbcTemplate(ds);
|
||||||
|
|
||||||
|
String ddl = new ClassPathResource("clickhouse/V7__agent_events.sql")
|
||||||
|
.getContentAsString(StandardCharsets.UTF_8);
|
||||||
|
jdbc.execute(ddl);
|
||||||
|
jdbc.execute("TRUNCATE TABLE agent_events");
|
||||||
|
|
||||||
|
repo = new ClickHouseAgentEventRepository(jdbc);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Helpers ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Insert a row with an explicit timestamp so tests can control ordering and ranges.
|
||||||
|
*/
|
||||||
|
private void insertAt(String agentId, String appId, String eventType, String detail, Instant ts) {
|
||||||
|
jdbc.update(
|
||||||
|
"INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail, timestamp) VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
|
"default", agentId, appId, eventType, detail, Timestamp.from(ts));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Tests ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void insert_writesEvent() {
|
||||||
|
repo.insert("agent-1", "app-a", "CONNECTED", "agent came online");
|
||||||
|
|
||||||
|
Long count = jdbc.queryForObject(
|
||||||
|
"SELECT count() FROM agent_events WHERE agent_id = 'agent-1'",
|
||||||
|
Long.class);
|
||||||
|
assertThat(count).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_byAppId_filtersCorrectly() {
|
||||||
|
repo.insert("agent-1", "app-x", "CONNECTED", "");
|
||||||
|
repo.insert("agent-2", "app-y", "DISCONNECTED", "");
|
||||||
|
|
||||||
|
List<AgentEventRecord> results = repo.query("app-x", null, null, null, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).appId()).isEqualTo("app-x");
|
||||||
|
assertThat(results.get(0).agentId()).isEqualTo("agent-1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_byAgentId_filtersCorrectly() {
|
||||||
|
repo.insert("agent-alpha", "app-shared", "CONNECTED", "");
|
||||||
|
repo.insert("agent-beta", "app-shared", "CONNECTED", "");
|
||||||
|
|
||||||
|
List<AgentEventRecord> results = repo.query(null, "agent-alpha", null, null, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).agentId()).isEqualTo("agent-alpha");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_byTimeRange_filtersCorrectly() {
|
||||||
|
Instant t1 = Instant.parse("2026-01-01T10:00:00Z");
|
||||||
|
Instant t2 = Instant.parse("2026-01-01T11:00:00Z");
|
||||||
|
Instant t3 = Instant.parse("2026-01-01T12:00:00Z");
|
||||||
|
|
||||||
|
insertAt("agent-1", "app-a", "CONNECTED", "early", t1);
|
||||||
|
insertAt("agent-1", "app-a", "HEARTBEAT", "mid", t2);
|
||||||
|
insertAt("agent-1", "app-a", "DISCONNECTED", "late", t3);
|
||||||
|
|
||||||
|
// Query [t2, t3) — should return only the middle event
|
||||||
|
List<AgentEventRecord> results = repo.query(null, null, t2, t3, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).eventType()).isEqualTo("HEARTBEAT");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_respectsLimit() {
|
||||||
|
Instant base = Instant.parse("2026-02-01T00:00:00Z");
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
insertAt("agent-1", "app-a", "HEARTBEAT", "beat-" + i, base.plusSeconds(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<AgentEventRecord> results = repo.query(null, null, null, null, 3);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_returnsZeroId() {
|
||||||
|
repo.insert("agent-1", "app-a", "CONNECTED", "");
|
||||||
|
|
||||||
|
List<AgentEventRecord> results = repo.query(null, null, null, null, 10);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0).id()).isEqualTo(0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_noFilters_returnsAllEvents() {
|
||||||
|
repo.insert("agent-1", "app-a", "CONNECTED", "");
|
||||||
|
repo.insert("agent-2", "app-b", "DISCONNECTED", "");
|
||||||
|
|
||||||
|
List<AgentEventRecord> results = repo.query(null, null, null, null, 100);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void query_resultsOrderedByTimestampDesc() {
|
||||||
|
Instant t1 = Instant.parse("2026-03-01T08:00:00Z");
|
||||||
|
Instant t2 = Instant.parse("2026-03-01T09:00:00Z");
|
||||||
|
Instant t3 = Instant.parse("2026-03-01T10:00:00Z");
|
||||||
|
|
||||||
|
insertAt("agent-1", "app-a", "FIRST", "", t1);
|
||||||
|
insertAt("agent-1", "app-a", "SECOND", "", t2);
|
||||||
|
insertAt("agent-1", "app-a", "THIRD", "", t3);
|
||||||
|
|
||||||
|
List<AgentEventRecord> results = repo.query(null, null, null, null, 100);
|
||||||
|
|
||||||
|
assertThat(results.get(0).eventType()).isEqualTo("THIRD");
|
||||||
|
assertThat(results.get(1).eventType()).isEqualTo("SECOND");
|
||||||
|
assertThat(results.get(2).eventType()).isEqualTo("FIRST");
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user