diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepository.java new file mode 100644 index 00000000..1ddbad44 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepository.java @@ -0,0 +1,73 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.agent.AgentEventRecord; +import com.cameleer3.server.core.agent.AgentEventRepository; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +/** + * ClickHouse implementation of {@link AgentEventRepository}. + *

+ * The ClickHouse table has no {@code id} column (no BIGSERIAL equivalent), + * so all returned {@link AgentEventRecord} instances have {@code id = 0}. + */ +public class ClickHouseAgentEventRepository implements AgentEventRepository { + + private static final String TENANT = "default"; + + private static final String INSERT_SQL = + "INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail) VALUES (?, ?, ?, ?, ?)"; + + private static final String SELECT_BASE = + "SELECT 0 AS id, agent_id, app_id, event_type, detail, timestamp FROM agent_events WHERE tenant_id = ?"; + + private final JdbcTemplate jdbc; + + public ClickHouseAgentEventRepository(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void insert(String agentId, String appId, String eventType, String detail) { + jdbc.update(INSERT_SQL, TENANT, agentId, appId, eventType, detail); + } + + @Override + public List query(String appId, String agentId, Instant from, Instant to, int limit) { + var sql = new StringBuilder(SELECT_BASE); + var params = new ArrayList(); + params.add(TENANT); + + if (appId != null) { + sql.append(" AND app_id = ?"); + params.add(appId); + } + if (agentId != null) { + sql.append(" AND agent_id = ?"); + params.add(agentId); + } + if (from != null) { + sql.append(" AND timestamp >= ?"); + params.add(Timestamp.from(from)); + } + if (to != null) { + sql.append(" AND timestamp < ?"); + params.add(Timestamp.from(to)); + } + sql.append(" ORDER BY timestamp DESC LIMIT ?"); + params.add(limit); + + return jdbc.query(sql.toString(), (rs, rowNum) -> new AgentEventRecord( + rs.getLong("id"), + rs.getString("agent_id"), + rs.getString("app_id"), + rs.getString("event_type"), + rs.getString("detail"), + rs.getTimestamp("timestamp").toInstant() + ), params.toArray()); + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepositoryIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepositoryIT.java new file mode 100644 index 00000000..735e29d3 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepositoryIT.java @@ -0,0 +1,158 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.agent.AgentEventRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseAgentEventRepositoryIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseAgentEventRepository repo; + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + String ddl = new ClassPathResource("clickhouse/V7__agent_events.sql") + .getContentAsString(StandardCharsets.UTF_8); + jdbc.execute(ddl); + jdbc.execute("TRUNCATE TABLE agent_events"); + + repo = new ClickHouseAgentEventRepository(jdbc); + } + + // ── Helpers ────────────────────────────────────────────────────────────── + + /** + * Insert a row with an explicit timestamp so tests can control ordering and ranges. + */ + private void insertAt(String agentId, String appId, String eventType, String detail, Instant ts) { + jdbc.update( + "INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail, timestamp) VALUES (?, ?, ?, ?, ?, ?)", + "default", agentId, appId, eventType, detail, Timestamp.from(ts)); + } + + // ── Tests ───────────────────────────────────────────────────────────────── + + @Test + void insert_writesEvent() { + repo.insert("agent-1", "app-a", "CONNECTED", "agent came online"); + + Long count = jdbc.queryForObject( + "SELECT count() FROM agent_events WHERE agent_id = 'agent-1'", + Long.class); + assertThat(count).isEqualTo(1); + } + + @Test + void query_byAppId_filtersCorrectly() { + repo.insert("agent-1", "app-x", "CONNECTED", ""); + repo.insert("agent-2", "app-y", "DISCONNECTED", ""); + + List results = repo.query("app-x", null, null, null, 100); + + assertThat(results).hasSize(1); + assertThat(results.get(0).appId()).isEqualTo("app-x"); + assertThat(results.get(0).agentId()).isEqualTo("agent-1"); + } + + @Test + void query_byAgentId_filtersCorrectly() { + repo.insert("agent-alpha", "app-shared", "CONNECTED", ""); + repo.insert("agent-beta", "app-shared", "CONNECTED", ""); + + List results = repo.query(null, "agent-alpha", null, null, 100); + + assertThat(results).hasSize(1); + assertThat(results.get(0).agentId()).isEqualTo("agent-alpha"); + } + + @Test + void query_byTimeRange_filtersCorrectly() { + Instant t1 = Instant.parse("2026-01-01T10:00:00Z"); + Instant t2 = Instant.parse("2026-01-01T11:00:00Z"); + Instant t3 = Instant.parse("2026-01-01T12:00:00Z"); + + insertAt("agent-1", "app-a", "CONNECTED", "early", t1); + insertAt("agent-1", "app-a", "HEARTBEAT", "mid", t2); + insertAt("agent-1", "app-a", "DISCONNECTED", "late", t3); + + // Query [t2, t3) — should return only the middle event + List results = repo.query(null, null, t2, t3, 100); + + assertThat(results).hasSize(1); + assertThat(results.get(0).eventType()).isEqualTo("HEARTBEAT"); + } + + @Test + void query_respectsLimit() { + Instant base = Instant.parse("2026-02-01T00:00:00Z"); + for (int i = 0; i < 10; i++) { + insertAt("agent-1", "app-a", "HEARTBEAT", "beat-" + i, base.plusSeconds(i)); + } + + List results = repo.query(null, null, null, null, 3); + + assertThat(results).hasSize(3); + } + + @Test + void query_returnsZeroId() { + repo.insert("agent-1", "app-a", "CONNECTED", ""); + + List results = repo.query(null, null, null, null, 10); + + assertThat(results).hasSize(1); + assertThat(results.get(0).id()).isEqualTo(0L); + } + + @Test + void query_noFilters_returnsAllEvents() { + repo.insert("agent-1", "app-a", "CONNECTED", ""); + repo.insert("agent-2", "app-b", "DISCONNECTED", ""); + + List results = repo.query(null, null, null, null, 100); + + assertThat(results).hasSize(2); + } + + @Test + void query_resultsOrderedByTimestampDesc() { + Instant t1 = Instant.parse("2026-03-01T08:00:00Z"); + Instant t2 = Instant.parse("2026-03-01T09:00:00Z"); + Instant t3 = Instant.parse("2026-03-01T10:00:00Z"); + + insertAt("agent-1", "app-a", "FIRST", "", t1); + insertAt("agent-1", "app-a", "SECOND", "", t2); + insertAt("agent-1", "app-a", "THIRD", "", t3); + + List results = repo.query(null, null, null, null, 100); + + assertThat(results.get(0).eventType()).isEqualTo("THIRD"); + assertThat(results.get(1).eventType()).isEqualTo("SECOND"); + assertThat(results.get(2).eventType()).isEqualTo("FIRST"); + } +}