fix(logs): use parseDateTime64BestEffort for all timestamp binds

JDBC Timestamp binding shifted timestamps by the JVM local timezone
offset on both insert and query, producing asymmetric UTC offsets that
broke time-range filtering and cursor pagination. Switching inserts
(indexBatch, insertBufferedBatch) and all WHERE predicates to ISO-8601
strings via parseDateTime64BestEffort, and reading timestamps back as
epoch-millis via toUnixTimestamp64Milli, pins everything to UTC and
fixes the time-range filter test plus cursor pagination.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-17 13:13:34 +02:00
parent 5d9f6735cc
commit f1c5a95f12

View File

@@ -10,13 +10,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -29,7 +25,6 @@ import java.util.Map;
public class ClickHouseLogStore implements LogIndex {
private static final Logger log = LoggerFactory.getLogger(ClickHouseLogStore.class);
private static final DateTimeFormatter ISO_FMT = DateTimeFormatter.ISO_INSTANT;
private final String tenantId;
private final JdbcTemplate jdbc;
@@ -47,12 +42,12 @@ public class ClickHouseLogStore implements LogIndex {
String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " +
"logger_name, message, thread_name, stack_trace, exchange_id, mdc, source) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
"VALUES (?, parseDateTime64BestEffort(?, 3), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
jdbc.batchUpdate(sql, entries, entries.size(), (ps, entry) -> {
Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now();
ps.setString(1, tenantId);
ps.setTimestamp(2, Timestamp.from(ts));
ps.setString(2, ts.toString());
ps.setString(3, applicationId);
ps.setString(4, instanceId);
ps.setString(5, entry.getLevel() != null ? entry.getLevel() : "");
@@ -76,14 +71,14 @@ public class ClickHouseLogStore implements LogIndex {
String sql = "INSERT INTO logs (tenant_id, environment, timestamp, application, instance_id, level, " +
"logger_name, message, thread_name, stack_trace, exchange_id, mdc, source) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
"VALUES (?, ?, parseDateTime64BestEffort(?, 3), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
jdbc.batchUpdate(sql, entries, entries.size(), (ps, ble) -> {
LogEntry entry = ble.entry();
Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now();
ps.setString(1, ble.tenantId() != null ? ble.tenantId() : tenantId);
ps.setString(2, ble.environment() != null ? ble.environment() : "default");
ps.setTimestamp(3, Timestamp.from(ts));
ps.setString(3, ts.toString());
ps.setString(4, ble.applicationId());
ps.setString(5, ble.instanceId());
ps.setString(6, entry.getLevel() != null ? entry.getLevel() : "");
@@ -155,13 +150,13 @@ public class ClickHouseLogStore implements LogIndex {
}
if (request.from() != null) {
baseConditions.add("timestamp >= ?");
baseParams.add(Timestamp.from(request.from()));
baseConditions.add("timestamp >= parseDateTime64BestEffort(?, 3)");
baseParams.add(request.from().toString());
}
if (request.to() != null) {
baseConditions.add("timestamp <= ?");
baseParams.add(Timestamp.from(request.to()));
baseConditions.add("timestamp <= parseDateTime64BestEffort(?, 3)");
baseParams.add(request.to().toString());
}
// Level counts query: uses base conditions WITHOUT level filter and cursor
@@ -183,28 +178,28 @@ public class ClickHouseLogStore implements LogIndex {
if (request.cursor() != null && !request.cursor().isEmpty()) {
Instant cursorTs = Instant.parse(request.cursor());
if ("asc".equalsIgnoreCase(request.sort())) {
dataConditions.add("timestamp > ?");
dataConditions.add("timestamp > parseDateTime64BestEffort(?, 3)");
} else {
dataConditions.add("timestamp < ?");
dataConditions.add("timestamp < parseDateTime64BestEffort(?, 3)");
}
dataParams.add(Timestamp.from(cursorTs));
dataParams.add(cursorTs.toString());
}
String dataWhere = String.join(" AND ", dataConditions);
String orderDir = "asc".equalsIgnoreCase(request.sort()) ? "ASC" : "DESC";
int fetchLimit = request.limit() + 1; // fetch N+1 to detect hasMore
String dataSql = "SELECT timestamp, level, logger_name, message, thread_name, stack_trace, " +
String dataSql = "SELECT formatDateTime(timestamp, '%Y-%m-%dT%H:%i:%S', 'UTC') AS ts_utc," +
" toUnixTimestamp64Milli(timestamp) AS ts_millis," +
" level, logger_name, message, thread_name, stack_trace, " +
"exchange_id, instance_id, application, mdc, source " +
"FROM logs WHERE " + dataWhere +
" ORDER BY timestamp " + orderDir + " LIMIT ?";
dataParams.add(fetchLimit);
List<LogEntryResult> results = jdbc.query(dataSql, dataParams.toArray(), (rs, rowNum) -> {
Timestamp ts = rs.getTimestamp("timestamp");
String timestampStr = ts != null
? ts.toInstant().atOffset(ZoneOffset.UTC).format(ISO_FMT)
: null;
long tsMillis = rs.getLong("ts_millis");
String timestampStr = Instant.ofEpochMilli(tsMillis).toString();
@SuppressWarnings("unchecked")
Map<String, String> mdc = (Map<String, String>) rs.getObject("mdc");