feat(events): cursor-paginate agent events (ClickHouse impl)

Orders by (timestamp DESC, instance_id ASC). Cursor is
base64url('timestampIso|instanceId') with a tuple keyset predicate
for stable paging across ties.
This commit is contained in:
hsiegeln
2026-04-17 11:57:35 +02:00
parent 67a834153e
commit d293dafb99
2 changed files with 139 additions and 20 deletions

View File

@@ -1,12 +1,15 @@
package com.cameleer.server.app.storage; package com.cameleer.server.app.storage;
import com.cameleer.server.core.agent.AgentEventPage;
import com.cameleer.server.core.agent.AgentEventRecord; import com.cameleer.server.core.agent.AgentEventRecord;
import com.cameleer.server.core.agent.AgentEventRepository; import com.cameleer.server.core.agent.AgentEventRepository;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Base64;
import java.util.List; import java.util.List;
/** /**
@@ -43,26 +46,11 @@ public class ClickHouseAgentEventRepository implements AgentEventRepository {
var params = new ArrayList<Object>(); var params = new ArrayList<Object>();
params.add(tenantId); params.add(tenantId);
if (applicationId != null) { if (applicationId != null) { sql.append(" AND application_id = ?"); params.add(applicationId); }
sql.append(" AND application_id = ?"); if (instanceId != null) { sql.append(" AND instance_id = ?"); params.add(instanceId); }
params.add(applicationId); if (environment != null) { sql.append(" AND environment = ?"); params.add(environment); }
} if (from != null) { sql.append(" AND timestamp >= ?"); params.add(Timestamp.from(from)); }
if (instanceId != null) { if (to != null) { sql.append(" AND timestamp < ?"); params.add(Timestamp.from(to)); }
sql.append(" AND instance_id = ?");
params.add(instanceId);
}
if (environment != null) {
sql.append(" AND environment = ?");
params.add(environment);
}
if (from != null) {
sql.append(" AND timestamp >= ?");
params.add(Timestamp.from(from));
}
if (to != null) {
sql.append(" AND timestamp < ?");
params.add(Timestamp.from(to));
}
sql.append(" ORDER BY timestamp DESC LIMIT ?"); sql.append(" ORDER BY timestamp DESC LIMIT ?");
params.add(limit); params.add(limit);
@@ -75,4 +63,61 @@ public class ClickHouseAgentEventRepository implements AgentEventRepository {
rs.getTimestamp("timestamp").toInstant() rs.getTimestamp("timestamp").toInstant()
), params.toArray()); ), params.toArray());
} }
@Override
public AgentEventPage queryPage(String applicationId, String instanceId, String environment,
Instant from, Instant to, String cursor, int limit) {
var sql = new StringBuilder(SELECT_BASE);
var params = new ArrayList<Object>();
params.add(tenantId);
if (applicationId != null) { sql.append(" AND application_id = ?"); params.add(applicationId); }
if (instanceId != null) { sql.append(" AND instance_id = ?"); params.add(instanceId); }
if (environment != null) { sql.append(" AND environment = ?"); params.add(environment); }
if (from != null) { sql.append(" AND timestamp >= ?"); params.add(Timestamp.from(from)); }
if (to != null) { sql.append(" AND timestamp < ?"); params.add(Timestamp.from(to)); }
if (cursor != null && !cursor.isEmpty()) {
String decoded = new String(Base64.getUrlDecoder().decode(cursor), StandardCharsets.UTF_8);
int bar = decoded.indexOf('|');
if (bar <= 0) {
throw new IllegalArgumentException("Malformed cursor");
}
Instant cursorTs = Instant.parse(decoded.substring(0, bar));
String cursorInstance = decoded.substring(bar + 1);
sql.append(" AND (timestamp < ? OR (timestamp = ? AND instance_id > ?))");
params.add(Timestamp.from(cursorTs));
params.add(Timestamp.from(cursorTs));
params.add(cursorInstance);
}
sql.append(" ORDER BY timestamp DESC, instance_id ASC LIMIT ?");
int fetchLimit = limit + 1;
params.add(fetchLimit);
List<AgentEventRecord> results = new ArrayList<>(jdbc.query(sql.toString(),
(rs, rowNum) -> new AgentEventRecord(
rs.getLong("id"),
rs.getString("instance_id"),
rs.getString("application_id"),
rs.getString("event_type"),
rs.getString("detail"),
rs.getTimestamp("timestamp").toInstant()
), params.toArray()));
boolean hasMore = results.size() > limit;
if (hasMore) {
results = new ArrayList<>(results.subList(0, limit));
}
String nextCursor = null;
if (hasMore && !results.isEmpty()) {
AgentEventRecord last = results.get(results.size() - 1);
String raw = last.timestamp().toString() + "|" + last.instanceId();
nextCursor = Base64.getUrlEncoder().withoutPadding()
.encodeToString(raw.getBytes(StandardCharsets.UTF_8));
}
return new AgentEventPage(results, nextCursor, hasMore);
}
} }

View File

@@ -153,4 +153,78 @@ class ClickHouseAgentEventRepositoryIT {
assertThat(results.get(1).eventType()).isEqualTo("SECOND"); assertThat(results.get(1).eventType()).isEqualTo("SECOND");
assertThat(results.get(2).eventType()).isEqualTo("FIRST"); assertThat(results.get(2).eventType()).isEqualTo("FIRST");
} }
@Test
void queryPage_emptyTable_returnsEmptyPage() {
com.cameleer.server.core.agent.AgentEventPage page =
repo.queryPage(null, null, null, null, null, null, 10);
assertThat(page.data()).isEmpty();
assertThat(page.hasMore()).isFalse();
assertThat(page.nextCursor()).isNull();
}
@Test
void queryPage_boundary_noHasMoreWhenLimitEqualsRowCount() {
Instant base = Instant.parse("2026-04-01T10:00:00Z");
for (int i = 0; i < 3; i++) {
insertAt("agent-1", "app-a", "TICK", "t" + i, base.plusSeconds(i));
}
com.cameleer.server.core.agent.AgentEventPage page =
repo.queryPage(null, null, null, null, null, null, 3);
assertThat(page.data()).hasSize(3);
assertThat(page.hasMore()).isFalse();
assertThat(page.nextCursor()).isNull();
}
@Test
void queryPage_paginatesAcrossThreePages() {
Instant base = Instant.parse("2026-04-01T10:00:00Z");
for (int i = 0; i < 5; i++) {
insertAt("agent-1", "app-a", "TICK", "t" + i, base.plusSeconds(i));
}
com.cameleer.server.core.agent.AgentEventPage p1 =
repo.queryPage(null, null, null, null, null, null, 2);
assertThat(p1.data()).hasSize(2);
assertThat(p1.hasMore()).isTrue();
assertThat(p1.nextCursor()).isNotBlank();
assertThat(p1.data().get(0).detail()).isEqualTo("t4");
assertThat(p1.data().get(1).detail()).isEqualTo("t3");
com.cameleer.server.core.agent.AgentEventPage p2 =
repo.queryPage(null, null, null, null, null, p1.nextCursor(), 2);
assertThat(p2.data()).hasSize(2);
assertThat(p2.hasMore()).isTrue();
assertThat(p2.data().get(0).detail()).isEqualTo("t2");
assertThat(p2.data().get(1).detail()).isEqualTo("t1");
com.cameleer.server.core.agent.AgentEventPage p3 =
repo.queryPage(null, null, null, null, null, p2.nextCursor(), 2);
assertThat(p3.data()).hasSize(1);
assertThat(p3.hasMore()).isFalse();
assertThat(p3.nextCursor()).isNull();
assertThat(p3.data().get(0).detail()).isEqualTo("t0");
}
@Test
void queryPage_tiebreakByInstanceIdAsc_whenTimestampsEqual() {
Instant ts = Instant.parse("2026-04-01T10:00:00Z");
insertAt("agent-z", "app-a", "TICK", "z", ts);
insertAt("agent-a", "app-a", "TICK", "a", ts);
insertAt("agent-m", "app-a", "TICK", "m", ts);
com.cameleer.server.core.agent.AgentEventPage p1 =
repo.queryPage(null, null, null, null, null, null, 2);
assertThat(p1.data()).hasSize(2);
// (timestamp DESC, instance_id ASC): ties resolve to a, m, z
assertThat(p1.data().get(0).instanceId()).isEqualTo("agent-a");
assertThat(p1.data().get(1).instanceId()).isEqualTo("agent-m");
assertThat(p1.hasMore()).isTrue();
com.cameleer.server.core.agent.AgentEventPage p2 =
repo.queryPage(null, null, null, null, null, p1.nextCursor(), 2);
assertThat(p2.data()).hasSize(1);
assertThat(p2.data().get(0).instanceId()).isEqualTo("agent-z");
assertThat(p2.hasMore()).isFalse();
}
} }