fix(pagination): add insert_id UUID tiebreak to cursor keyset
Same-millisecond rows were silently skipped between pages because the log cursor had no tiebreak and the events cursor tied by instance_id (which also collides when one instance emits multiple events within a millisecond). Add an insert_id UUID (DEFAULT generateUUIDv4()) column to both logs and agent_events, order by (timestamp, insert_id) consistently, and encode the cursor as 'timestamp|insert_id'. Existing data is materialized via ALTER TABLE MATERIALIZE COLUMN (one-time background mutation). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -10,8 +10,10 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Base64;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -176,13 +178,26 @@ public class ClickHouseLogStore implements LogIndex {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (request.cursor() != null && !request.cursor().isEmpty()) {
|
if (request.cursor() != null && !request.cursor().isEmpty()) {
|
||||||
Instant cursorTs = Instant.parse(request.cursor());
|
String decoded = new String(Base64.getUrlDecoder().decode(request.cursor()),
|
||||||
if ("asc".equalsIgnoreCase(request.sort())) {
|
StandardCharsets.UTF_8);
|
||||||
dataConditions.add("timestamp > parseDateTime64BestEffort(?, 3)");
|
int bar = decoded.indexOf('|');
|
||||||
} else {
|
if (bar <= 0 || bar == decoded.length() - 1) {
|
||||||
dataConditions.add("timestamp < parseDateTime64BestEffort(?, 3)");
|
throw new IllegalArgumentException("Malformed cursor");
|
||||||
}
|
}
|
||||||
|
Instant cursorTs;
|
||||||
|
try {
|
||||||
|
cursorTs = Instant.parse(decoded.substring(0, bar));
|
||||||
|
} catch (java.time.format.DateTimeParseException e) {
|
||||||
|
throw new IllegalArgumentException("Malformed cursor", e);
|
||||||
|
}
|
||||||
|
String cursorId = decoded.substring(bar + 1);
|
||||||
|
String cmp = "asc".equalsIgnoreCase(request.sort()) ? ">" : "<";
|
||||||
|
dataConditions.add(
|
||||||
|
"(timestamp " + cmp + " parseDateTime64BestEffort(?, 3)" +
|
||||||
|
" OR (timestamp = parseDateTime64BestEffort(?, 3) AND insert_id " + cmp + " toUUID(?)))");
|
||||||
dataParams.add(cursorTs.toString());
|
dataParams.add(cursorTs.toString());
|
||||||
|
dataParams.add(cursorTs.toString());
|
||||||
|
dataParams.add(cursorId);
|
||||||
}
|
}
|
||||||
|
|
||||||
String dataWhere = String.join(" AND ", dataConditions);
|
String dataWhere = String.join(" AND ", dataConditions);
|
||||||
@@ -192,11 +207,12 @@ public class ClickHouseLogStore implements LogIndex {
|
|||||||
String dataSql = "SELECT formatDateTime(timestamp, '%Y-%m-%dT%H:%i:%S', 'UTC') AS ts_utc," +
|
String dataSql = "SELECT formatDateTime(timestamp, '%Y-%m-%dT%H:%i:%S', 'UTC') AS ts_utc," +
|
||||||
" toUnixTimestamp64Milli(timestamp) AS ts_millis," +
|
" toUnixTimestamp64Milli(timestamp) AS ts_millis," +
|
||||||
" level, logger_name, message, thread_name, stack_trace, " +
|
" level, logger_name, message, thread_name, stack_trace, " +
|
||||||
"exchange_id, instance_id, application, mdc, source " +
|
"exchange_id, instance_id, application, mdc, source, toString(insert_id) AS insert_id_str " +
|
||||||
"FROM logs WHERE " + dataWhere +
|
"FROM logs WHERE " + dataWhere +
|
||||||
" ORDER BY timestamp " + orderDir + " LIMIT ?";
|
" ORDER BY timestamp " + orderDir + ", insert_id " + orderDir + " LIMIT ?";
|
||||||
dataParams.add(fetchLimit);
|
dataParams.add(fetchLimit);
|
||||||
|
|
||||||
|
List<String> insertIds = new ArrayList<>();
|
||||||
List<LogEntryResult> results = jdbc.query(dataSql, dataParams.toArray(), (rs, rowNum) -> {
|
List<LogEntryResult> results = jdbc.query(dataSql, dataParams.toArray(), (rs, rowNum) -> {
|
||||||
long tsMillis = rs.getLong("ts_millis");
|
long tsMillis = rs.getLong("ts_millis");
|
||||||
String timestampStr = Instant.ofEpochMilli(tsMillis).toString();
|
String timestampStr = Instant.ofEpochMilli(tsMillis).toString();
|
||||||
@@ -207,6 +223,8 @@ public class ClickHouseLogStore implements LogIndex {
|
|||||||
|
|
||||||
String source = rs.getString("source");
|
String source = rs.getString("source");
|
||||||
|
|
||||||
|
insertIds.add(rs.getString("insert_id_str"));
|
||||||
|
|
||||||
return new LogEntryResult(
|
return new LogEntryResult(
|
||||||
timestampStr,
|
timestampStr,
|
||||||
rs.getString("level"),
|
rs.getString("level"),
|
||||||
@@ -229,7 +247,10 @@ public class ClickHouseLogStore implements LogIndex {
|
|||||||
|
|
||||||
String nextCursor = null;
|
String nextCursor = null;
|
||||||
if (hasMore && !results.isEmpty()) {
|
if (hasMore && !results.isEmpty()) {
|
||||||
nextCursor = results.get(results.size() - 1).timestamp();
|
int lastIdx = results.size() - 1;
|
||||||
|
String raw = results.get(lastIdx).timestamp() + "|" + insertIds.get(lastIdx);
|
||||||
|
nextCursor = Base64.getUrlEncoder().withoutPadding()
|
||||||
|
.encodeToString(raw.getBytes(StandardCharsets.UTF_8));
|
||||||
}
|
}
|
||||||
|
|
||||||
return new LogSearchResponse(results, nextCursor, hasMore, levelCounts);
|
return new LogSearchResponse(results, nextCursor, hasMore, levelCounts);
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ public class ClickHouseAgentEventRepository implements AgentEventRepository {
|
|||||||
"INSERT INTO agent_events (tenant_id, instance_id, application_id, environment, event_type, detail) VALUES (?, ?, ?, ?, ?, ?)";
|
"INSERT INTO agent_events (tenant_id, instance_id, application_id, environment, event_type, detail) VALUES (?, ?, ?, ?, ?, ?)";
|
||||||
|
|
||||||
private static final String SELECT_BASE =
|
private static final String SELECT_BASE =
|
||||||
"SELECT 0 AS id, instance_id, application_id, event_type, detail, timestamp FROM agent_events WHERE tenant_id = ?";
|
"SELECT 0 AS id, instance_id, application_id, event_type, detail, timestamp, toString(insert_id) AS insert_id_str FROM agent_events WHERE tenant_id = ?";
|
||||||
|
|
||||||
private final String tenantId;
|
private final String tenantId;
|
||||||
private final JdbcTemplate jdbc;
|
private final JdbcTemplate jdbc;
|
||||||
@@ -65,26 +65,30 @@ public class ClickHouseAgentEventRepository implements AgentEventRepository {
|
|||||||
} catch (java.time.format.DateTimeParseException e) {
|
} catch (java.time.format.DateTimeParseException e) {
|
||||||
throw new IllegalArgumentException("Malformed cursor", e);
|
throw new IllegalArgumentException("Malformed cursor", e);
|
||||||
}
|
}
|
||||||
String cursorInstance = decoded.substring(bar + 1);
|
String cursorInsertId = decoded.substring(bar + 1);
|
||||||
sql.append(" AND (timestamp < ? OR (timestamp = ? AND instance_id > ?))");
|
sql.append(" AND (timestamp < ? OR (timestamp = ? AND insert_id < toUUID(?)))");
|
||||||
params.add(Timestamp.from(cursorTs));
|
params.add(Timestamp.from(cursorTs));
|
||||||
params.add(Timestamp.from(cursorTs));
|
params.add(Timestamp.from(cursorTs));
|
||||||
params.add(cursorInstance);
|
params.add(cursorInsertId);
|
||||||
}
|
}
|
||||||
|
|
||||||
sql.append(" ORDER BY timestamp DESC, instance_id ASC LIMIT ?");
|
sql.append(" ORDER BY timestamp DESC, insert_id DESC LIMIT ?");
|
||||||
int fetchLimit = limit + 1;
|
int fetchLimit = limit + 1;
|
||||||
params.add(fetchLimit);
|
params.add(fetchLimit);
|
||||||
|
|
||||||
|
List<String> insertIds = new ArrayList<>();
|
||||||
List<AgentEventRecord> results = new ArrayList<>(jdbc.query(sql.toString(),
|
List<AgentEventRecord> results = new ArrayList<>(jdbc.query(sql.toString(),
|
||||||
(rs, rowNum) -> new AgentEventRecord(
|
(rs, rowNum) -> {
|
||||||
rs.getLong("id"),
|
insertIds.add(rs.getString("insert_id_str"));
|
||||||
rs.getString("instance_id"),
|
return new AgentEventRecord(
|
||||||
rs.getString("application_id"),
|
rs.getLong("id"),
|
||||||
rs.getString("event_type"),
|
rs.getString("instance_id"),
|
||||||
rs.getString("detail"),
|
rs.getString("application_id"),
|
||||||
rs.getTimestamp("timestamp").toInstant()
|
rs.getString("event_type"),
|
||||||
), params.toArray()));
|
rs.getString("detail"),
|
||||||
|
rs.getTimestamp("timestamp").toInstant()
|
||||||
|
);
|
||||||
|
}, params.toArray()));
|
||||||
|
|
||||||
boolean hasMore = results.size() > limit;
|
boolean hasMore = results.size() > limit;
|
||||||
if (hasMore) {
|
if (hasMore) {
|
||||||
@@ -93,8 +97,9 @@ public class ClickHouseAgentEventRepository implements AgentEventRepository {
|
|||||||
|
|
||||||
String nextCursor = null;
|
String nextCursor = null;
|
||||||
if (hasMore && !results.isEmpty()) {
|
if (hasMore && !results.isEmpty()) {
|
||||||
AgentEventRecord last = results.get(results.size() - 1);
|
int lastIdx = results.size() - 1;
|
||||||
String raw = last.timestamp().toString() + "|" + last.instanceId();
|
AgentEventRecord last = results.get(lastIdx);
|
||||||
|
String raw = last.timestamp().toString() + "|" + insertIds.get(lastIdx);
|
||||||
nextCursor = Base64.getUrlEncoder().withoutPadding()
|
nextCursor = Base64.getUrlEncoder().withoutPadding()
|
||||||
.encodeToString(raw.getBytes(StandardCharsets.UTF_8));
|
.encodeToString(raw.getBytes(StandardCharsets.UTF_8));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -327,7 +327,8 @@ CREATE TABLE IF NOT EXISTS agent_events (
|
|||||||
instance_id LowCardinality(String),
|
instance_id LowCardinality(String),
|
||||||
application_id LowCardinality(String),
|
application_id LowCardinality(String),
|
||||||
event_type LowCardinality(String),
|
event_type LowCardinality(String),
|
||||||
detail String DEFAULT ''
|
detail String DEFAULT '',
|
||||||
|
insert_id UUID DEFAULT generateUUIDv4()
|
||||||
)
|
)
|
||||||
ENGINE = MergeTree()
|
ENGINE = MergeTree()
|
||||||
PARTITION BY (tenant_id, toYYYYMM(timestamp))
|
PARTITION BY (tenant_id, toYYYYMM(timestamp))
|
||||||
@@ -349,6 +350,7 @@ CREATE TABLE IF NOT EXISTS logs (
|
|||||||
stack_trace String DEFAULT '',
|
stack_trace String DEFAULT '',
|
||||||
exchange_id String DEFAULT '',
|
exchange_id String DEFAULT '',
|
||||||
mdc Map(String, String) DEFAULT map(),
|
mdc Map(String, String) DEFAULT map(),
|
||||||
|
insert_id UUID DEFAULT generateUUIDv4(),
|
||||||
|
|
||||||
INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
||||||
INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4,
|
||||||
@@ -398,3 +400,12 @@ CREATE TABLE IF NOT EXISTS route_catalog (
|
|||||||
)
|
)
|
||||||
ENGINE = ReplacingMergeTree(last_seen)
|
ENGINE = ReplacingMergeTree(last_seen)
|
||||||
ORDER BY (tenant_id, environment, application_id, route_id);
|
ORDER BY (tenant_id, environment, application_id, route_id);
|
||||||
|
|
||||||
|
-- insert_id tiebreak for keyset pagination (fixes same-millisecond cursor collision).
|
||||||
|
-- IF NOT EXISTS on ADD COLUMN is idempotent. MATERIALIZE COLUMN is a background mutation,
|
||||||
|
-- effectively a no-op once all parts are already materialized.
|
||||||
|
ALTER TABLE logs ADD COLUMN IF NOT EXISTS insert_id UUID DEFAULT generateUUIDv4();
|
||||||
|
ALTER TABLE logs MATERIALIZE COLUMN insert_id;
|
||||||
|
|
||||||
|
ALTER TABLE agent_events ADD COLUMN IF NOT EXISTS insert_id UUID DEFAULT generateUUIDv4();
|
||||||
|
ALTER TABLE agent_events MATERIALIZE COLUMN insert_id;
|
||||||
|
|||||||
@@ -371,4 +371,30 @@ class ClickHouseLogStoreIT {
|
|||||||
assertThat(result.data()).extracting(LogEntryResult::message)
|
assertThat(result.data()).extracting(LogEntryResult::message)
|
||||||
.containsExactlyInAnyOrder("app msg", "container msg");
|
.containsExactlyInAnyOrder("app msg", "container msg");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void search_cursorPagination_sameMillisecond_doesNotSkip() {
|
||||||
|
Instant ts = Instant.parse("2026-04-17T10:00:00Z");
|
||||||
|
// Insert 5 rows at the exact same timestamp
|
||||||
|
java.util.List<LogEntry> batch = new java.util.ArrayList<>();
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
batch.add(entry(ts, "INFO", "logger", "msg-" + i, "t1", null, null));
|
||||||
|
}
|
||||||
|
store.indexBatch("agent-1", "my-app", batch);
|
||||||
|
|
||||||
|
// Page through with limit 2; across 3 pages we must see all 5 distinct messages, no duplicates
|
||||||
|
java.util.Set<String> seen = new java.util.HashSet<>();
|
||||||
|
String cursor = null;
|
||||||
|
for (int page = 0; page < 10; page++) {
|
||||||
|
LogSearchResponse resp = store.search(new LogSearchRequest(
|
||||||
|
null, null, "my-app", null, null, null, null, null,
|
||||||
|
null, null, cursor, 2, "desc"));
|
||||||
|
for (LogEntryResult r : resp.data()) {
|
||||||
|
assertThat(seen.add(r.message())).as("duplicate row returned: " + r.message()).isTrue();
|
||||||
|
}
|
||||||
|
cursor = resp.nextCursor();
|
||||||
|
if (!resp.hasMore()) break;
|
||||||
|
}
|
||||||
|
assertThat(seen).containsExactlyInAnyOrder("msg-0", "msg-1", "msg-2", "msg-3", "msg-4");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -115,25 +115,24 @@ class ClickHouseAgentEventRepositoryIT {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void queryPage_tiebreakByInstanceIdAsc_whenTimestampsEqual() {
|
void queryPage_tiebreak_sameMillisecond_returnsAllRowsNoDuplicates() {
|
||||||
Instant ts = Instant.parse("2026-04-01T10:00:00Z");
|
Instant ts = Instant.parse("2026-04-01T10:00:00Z");
|
||||||
insertAt("agent-z", "app-a", "TICK", "z", ts);
|
|
||||||
insertAt("agent-a", "app-a", "TICK", "a", ts);
|
insertAt("agent-a", "app-a", "TICK", "a", ts);
|
||||||
insertAt("agent-m", "app-a", "TICK", "m", ts);
|
insertAt("agent-b", "app-a", "TICK", "b", ts);
|
||||||
|
insertAt("agent-c", "app-a", "TICK", "c", ts);
|
||||||
|
|
||||||
com.cameleer.server.core.agent.AgentEventPage p1 =
|
java.util.Set<String> seen = new java.util.HashSet<>();
|
||||||
repo.queryPage(null, null, null, null, null, null, 2);
|
String cursor = null;
|
||||||
assertThat(p1.data()).hasSize(2);
|
for (int page = 0; page < 10; page++) {
|
||||||
// (timestamp DESC, instance_id ASC): ties resolve to a, m, z
|
com.cameleer.server.core.agent.AgentEventPage p =
|
||||||
assertThat(p1.data().get(0).instanceId()).isEqualTo("agent-a");
|
repo.queryPage(null, null, null, null, null, cursor, 1);
|
||||||
assertThat(p1.data().get(1).instanceId()).isEqualTo("agent-m");
|
for (com.cameleer.server.core.agent.AgentEventRecord r : p.data()) {
|
||||||
assertThat(p1.hasMore()).isTrue();
|
assertThat(seen.add(r.instanceId())).as("duplicate row returned: " + r.instanceId()).isTrue();
|
||||||
|
}
|
||||||
com.cameleer.server.core.agent.AgentEventPage p2 =
|
cursor = p.nextCursor();
|
||||||
repo.queryPage(null, null, null, null, null, p1.nextCursor(), 2);
|
if (!p.hasMore()) break;
|
||||||
assertThat(p2.data()).hasSize(1);
|
}
|
||||||
assertThat(p2.data().get(0).instanceId()).isEqualTo("agent-z");
|
assertThat(seen).containsExactlyInAnyOrder("agent-a", "agent-b", "agent-c");
|
||||||
assertThat(p2.hasMore()).isFalse();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -148,7 +147,7 @@ class ClickHouseAgentEventRepositoryIT {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void queryPage_malformedCursor_emptyInstanceId_throws() {
|
void queryPage_malformedCursor_emptyInsertId_throws() {
|
||||||
String raw = "2026-04-01T10:00:00Z|";
|
String raw = "2026-04-01T10:00:00Z|";
|
||||||
String cursor = java.util.Base64.getUrlEncoder().withoutPadding()
|
String cursor = java.util.Base64.getUrlEncoder().withoutPadding()
|
||||||
.encodeToString(raw.getBytes(java.nio.charset.StandardCharsets.UTF_8));
|
.encodeToString(raw.getBytes(java.nio.charset.StandardCharsets.UTF_8));
|
||||||
|
|||||||
Reference in New Issue
Block a user