diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseLogStore.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseLogStore.java index 631622e5..708e0ef5 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseLogStore.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseLogStore.java @@ -10,8 +10,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.JdbcTemplate; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.ArrayList; +import java.util.Base64; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -176,13 +178,26 @@ public class ClickHouseLogStore implements LogIndex { } if (request.cursor() != null && !request.cursor().isEmpty()) { - Instant cursorTs = Instant.parse(request.cursor()); - if ("asc".equalsIgnoreCase(request.sort())) { - dataConditions.add("timestamp > parseDateTime64BestEffort(?, 3)"); - } else { - dataConditions.add("timestamp < parseDateTime64BestEffort(?, 3)"); + String decoded = new String(Base64.getUrlDecoder().decode(request.cursor()), + StandardCharsets.UTF_8); + int bar = decoded.indexOf('|'); + if (bar <= 0 || bar == decoded.length() - 1) { + throw new IllegalArgumentException("Malformed cursor"); } + Instant cursorTs; + try { + cursorTs = Instant.parse(decoded.substring(0, bar)); + } catch (java.time.format.DateTimeParseException e) { + throw new IllegalArgumentException("Malformed cursor", e); + } + String cursorId = decoded.substring(bar + 1); + String cmp = "asc".equalsIgnoreCase(request.sort()) ? ">" : "<"; + dataConditions.add( + "(timestamp " + cmp + " parseDateTime64BestEffort(?, 3)" + + " OR (timestamp = parseDateTime64BestEffort(?, 3) AND insert_id " + cmp + " toUUID(?)))"); dataParams.add(cursorTs.toString()); + dataParams.add(cursorTs.toString()); + dataParams.add(cursorId); } String dataWhere = String.join(" AND ", dataConditions); @@ -192,11 +207,12 @@ public class ClickHouseLogStore implements LogIndex { String dataSql = "SELECT formatDateTime(timestamp, '%Y-%m-%dT%H:%i:%S', 'UTC') AS ts_utc," + " toUnixTimestamp64Milli(timestamp) AS ts_millis," + " level, logger_name, message, thread_name, stack_trace, " + - "exchange_id, instance_id, application, mdc, source " + + "exchange_id, instance_id, application, mdc, source, toString(insert_id) AS insert_id_str " + "FROM logs WHERE " + dataWhere + - " ORDER BY timestamp " + orderDir + " LIMIT ?"; + " ORDER BY timestamp " + orderDir + ", insert_id " + orderDir + " LIMIT ?"; dataParams.add(fetchLimit); + List insertIds = new ArrayList<>(); List results = jdbc.query(dataSql, dataParams.toArray(), (rs, rowNum) -> { long tsMillis = rs.getLong("ts_millis"); String timestampStr = Instant.ofEpochMilli(tsMillis).toString(); @@ -207,6 +223,8 @@ public class ClickHouseLogStore implements LogIndex { String source = rs.getString("source"); + insertIds.add(rs.getString("insert_id_str")); + return new LogEntryResult( timestampStr, rs.getString("level"), @@ -229,7 +247,10 @@ public class ClickHouseLogStore implements LogIndex { String nextCursor = null; if (hasMore && !results.isEmpty()) { - nextCursor = results.get(results.size() - 1).timestamp(); + int lastIdx = results.size() - 1; + String raw = results.get(lastIdx).timestamp() + "|" + insertIds.get(lastIdx); + nextCursor = Base64.getUrlEncoder().withoutPadding() + .encodeToString(raw.getBytes(StandardCharsets.UTF_8)); } return new LogSearchResponse(results, nextCursor, hasMore, levelCounts); diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepository.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepository.java index 2438575e..817cf2a1 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepository.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepository.java @@ -24,7 +24,7 @@ public class ClickHouseAgentEventRepository implements AgentEventRepository { "INSERT INTO agent_events (tenant_id, instance_id, application_id, environment, event_type, detail) VALUES (?, ?, ?, ?, ?, ?)"; private static final String SELECT_BASE = - "SELECT 0 AS id, instance_id, application_id, event_type, detail, timestamp FROM agent_events WHERE tenant_id = ?"; + "SELECT 0 AS id, instance_id, application_id, event_type, detail, timestamp, toString(insert_id) AS insert_id_str FROM agent_events WHERE tenant_id = ?"; private final String tenantId; private final JdbcTemplate jdbc; @@ -65,26 +65,30 @@ public class ClickHouseAgentEventRepository implements AgentEventRepository { } catch (java.time.format.DateTimeParseException e) { throw new IllegalArgumentException("Malformed cursor", e); } - String cursorInstance = decoded.substring(bar + 1); - sql.append(" AND (timestamp < ? OR (timestamp = ? AND instance_id > ?))"); + String cursorInsertId = decoded.substring(bar + 1); + sql.append(" AND (timestamp < ? OR (timestamp = ? AND insert_id < toUUID(?)))"); params.add(Timestamp.from(cursorTs)); params.add(Timestamp.from(cursorTs)); - params.add(cursorInstance); + params.add(cursorInsertId); } - sql.append(" ORDER BY timestamp DESC, instance_id ASC LIMIT ?"); + sql.append(" ORDER BY timestamp DESC, insert_id DESC LIMIT ?"); int fetchLimit = limit + 1; params.add(fetchLimit); + List insertIds = new ArrayList<>(); List results = new ArrayList<>(jdbc.query(sql.toString(), - (rs, rowNum) -> new AgentEventRecord( - rs.getLong("id"), - rs.getString("instance_id"), - rs.getString("application_id"), - rs.getString("event_type"), - rs.getString("detail"), - rs.getTimestamp("timestamp").toInstant() - ), params.toArray())); + (rs, rowNum) -> { + insertIds.add(rs.getString("insert_id_str")); + return new AgentEventRecord( + rs.getLong("id"), + rs.getString("instance_id"), + rs.getString("application_id"), + rs.getString("event_type"), + rs.getString("detail"), + rs.getTimestamp("timestamp").toInstant() + ); + }, params.toArray())); boolean hasMore = results.size() > limit; if (hasMore) { @@ -93,8 +97,9 @@ public class ClickHouseAgentEventRepository implements AgentEventRepository { String nextCursor = null; if (hasMore && !results.isEmpty()) { - AgentEventRecord last = results.get(results.size() - 1); - String raw = last.timestamp().toString() + "|" + last.instanceId(); + int lastIdx = results.size() - 1; + AgentEventRecord last = results.get(lastIdx); + String raw = last.timestamp().toString() + "|" + insertIds.get(lastIdx); nextCursor = Base64.getUrlEncoder().withoutPadding() .encodeToString(raw.getBytes(StandardCharsets.UTF_8)); } diff --git a/cameleer-server-app/src/main/resources/clickhouse/init.sql b/cameleer-server-app/src/main/resources/clickhouse/init.sql index beca32d3..5b6dc76b 100644 --- a/cameleer-server-app/src/main/resources/clickhouse/init.sql +++ b/cameleer-server-app/src/main/resources/clickhouse/init.sql @@ -327,7 +327,8 @@ CREATE TABLE IF NOT EXISTS agent_events ( instance_id LowCardinality(String), application_id LowCardinality(String), event_type LowCardinality(String), - detail String DEFAULT '' + detail String DEFAULT '', + insert_id UUID DEFAULT generateUUIDv4() ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(timestamp)) @@ -349,6 +350,7 @@ CREATE TABLE IF NOT EXISTS logs ( stack_trace String DEFAULT '', exchange_id String DEFAULT '', mdc Map(String, String) DEFAULT map(), + insert_id UUID DEFAULT generateUUIDv4(), INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, @@ -398,3 +400,12 @@ CREATE TABLE IF NOT EXISTS route_catalog ( ) ENGINE = ReplacingMergeTree(last_seen) ORDER BY (tenant_id, environment, application_id, route_id); + +-- insert_id tiebreak for keyset pagination (fixes same-millisecond cursor collision). +-- IF NOT EXISTS on ADD COLUMN is idempotent. MATERIALIZE COLUMN is a background mutation, +-- effectively a no-op once all parts are already materialized. +ALTER TABLE logs ADD COLUMN IF NOT EXISTS insert_id UUID DEFAULT generateUUIDv4(); +ALTER TABLE logs MATERIALIZE COLUMN insert_id; + +ALTER TABLE agent_events ADD COLUMN IF NOT EXISTS insert_id UUID DEFAULT generateUUIDv4(); +ALTER TABLE agent_events MATERIALIZE COLUMN insert_id; diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseLogStoreIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseLogStoreIT.java index caaa761b..fb0a213e 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseLogStoreIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseLogStoreIT.java @@ -371,4 +371,30 @@ class ClickHouseLogStoreIT { assertThat(result.data()).extracting(LogEntryResult::message) .containsExactlyInAnyOrder("app msg", "container msg"); } + + @Test + void search_cursorPagination_sameMillisecond_doesNotSkip() { + Instant ts = Instant.parse("2026-04-17T10:00:00Z"); + // Insert 5 rows at the exact same timestamp + java.util.List batch = new java.util.ArrayList<>(); + for (int i = 0; i < 5; i++) { + batch.add(entry(ts, "INFO", "logger", "msg-" + i, "t1", null, null)); + } + store.indexBatch("agent-1", "my-app", batch); + + // Page through with limit 2; across 3 pages we must see all 5 distinct messages, no duplicates + java.util.Set seen = new java.util.HashSet<>(); + String cursor = null; + for (int page = 0; page < 10; page++) { + LogSearchResponse resp = store.search(new LogSearchRequest( + null, null, "my-app", null, null, null, null, null, + null, null, cursor, 2, "desc")); + for (LogEntryResult r : resp.data()) { + assertThat(seen.add(r.message())).as("duplicate row returned: " + r.message()).isTrue(); + } + cursor = resp.nextCursor(); + if (!resp.hasMore()) break; + } + assertThat(seen).containsExactlyInAnyOrder("msg-0", "msg-1", "msg-2", "msg-3", "msg-4"); + } } diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepositoryIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepositoryIT.java index a538b1ff..9140c808 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepositoryIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseAgentEventRepositoryIT.java @@ -115,25 +115,24 @@ class ClickHouseAgentEventRepositoryIT { } @Test - void queryPage_tiebreakByInstanceIdAsc_whenTimestampsEqual() { + void queryPage_tiebreak_sameMillisecond_returnsAllRowsNoDuplicates() { Instant ts = Instant.parse("2026-04-01T10:00:00Z"); - insertAt("agent-z", "app-a", "TICK", "z", ts); insertAt("agent-a", "app-a", "TICK", "a", ts); - insertAt("agent-m", "app-a", "TICK", "m", ts); + insertAt("agent-b", "app-a", "TICK", "b", ts); + insertAt("agent-c", "app-a", "TICK", "c", ts); - com.cameleer.server.core.agent.AgentEventPage p1 = - repo.queryPage(null, null, null, null, null, null, 2); - assertThat(p1.data()).hasSize(2); - // (timestamp DESC, instance_id ASC): ties resolve to a, m, z - assertThat(p1.data().get(0).instanceId()).isEqualTo("agent-a"); - assertThat(p1.data().get(1).instanceId()).isEqualTo("agent-m"); - assertThat(p1.hasMore()).isTrue(); - - com.cameleer.server.core.agent.AgentEventPage p2 = - repo.queryPage(null, null, null, null, null, p1.nextCursor(), 2); - assertThat(p2.data()).hasSize(1); - assertThat(p2.data().get(0).instanceId()).isEqualTo("agent-z"); - assertThat(p2.hasMore()).isFalse(); + java.util.Set seen = new java.util.HashSet<>(); + String cursor = null; + for (int page = 0; page < 10; page++) { + com.cameleer.server.core.agent.AgentEventPage p = + repo.queryPage(null, null, null, null, null, cursor, 1); + for (com.cameleer.server.core.agent.AgentEventRecord r : p.data()) { + assertThat(seen.add(r.instanceId())).as("duplicate row returned: " + r.instanceId()).isTrue(); + } + cursor = p.nextCursor(); + if (!p.hasMore()) break; + } + assertThat(seen).containsExactlyInAnyOrder("agent-a", "agent-b", "agent-c"); } @Test @@ -148,7 +147,7 @@ class ClickHouseAgentEventRepositoryIT { } @Test - void queryPage_malformedCursor_emptyInstanceId_throws() { + void queryPage_malformedCursor_emptyInsertId_throws() { String raw = "2026-04-01T10:00:00Z|"; String cursor = java.util.Base64.getUrlEncoder().withoutPadding() .encodeToString(raw.getBytes(java.nio.charset.StandardCharsets.UTF_8));