feat: implement ClickHouseRouteCatalogStore with first_seen cache

This commit is contained in:
hsiegeln
2026-04-16 18:45:45 +02:00
parent 887a9b6faa
commit 961dadd1c8

View File

@@ -0,0 +1,126 @@
package com.cameleer.server.app.storage;
import com.cameleer.server.core.storage.RouteCatalogEntry;
import com.cameleer.server.core.storage.RouteCatalogStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class ClickHouseRouteCatalogStore implements RouteCatalogStore {
private static final Logger log = LoggerFactory.getLogger(ClickHouseRouteCatalogStore.class);
private final String tenantId;
private final JdbcTemplate jdbc;
// key: environment + "\0" + application_id + "\0" + route_id -> first_seen
private final ConcurrentHashMap<String, Instant> firstSeenCache = new ConcurrentHashMap<>();
public ClickHouseRouteCatalogStore(String tenantId, JdbcTemplate jdbc) {
this.tenantId = tenantId;
this.jdbc = jdbc;
warmLoadFirstSeenCache();
}
private void warmLoadFirstSeenCache() {
try {
jdbc.query(
"SELECT environment, application_id, route_id, first_seen " +
"FROM route_catalog FINAL WHERE tenant_id = ?",
rs -> {
String key = cacheKey(
rs.getString("environment"),
rs.getString("application_id"),
rs.getString("route_id"));
Timestamp ts = rs.getTimestamp("first_seen");
if (ts != null) {
firstSeenCache.put(key, ts.toInstant());
}
},
tenantId);
log.info("Route catalog cache warmed: {} entries", firstSeenCache.size());
} catch (Exception e) {
log.warn("Failed to warm route catalog cache — first_seen values will default to now: {}", e.getMessage());
}
}
private static String cacheKey(String environment, String applicationId, String routeId) {
return environment + "\0" + applicationId + "\0" + routeId;
}
@Override
public void upsert(String applicationId, String environment, Collection<String> routeIds) {
if (routeIds == null || routeIds.isEmpty()) {
return;
}
Instant now = Instant.now();
Timestamp nowTs = Timestamp.from(now);
for (String routeId : routeIds) {
String key = cacheKey(environment, applicationId, routeId);
Instant firstSeen = firstSeenCache.computeIfAbsent(key, k -> now);
Timestamp firstSeenTs = Timestamp.from(firstSeen);
try {
jdbc.update(
"INSERT INTO route_catalog " +
"(tenant_id, environment, application_id, route_id, first_seen, last_seen) " +
"VALUES (?, ?, ?, ?, ?, ?)",
tenantId, environment, applicationId, routeId, firstSeenTs, nowTs);
} catch (Exception e) {
log.warn("Failed to upsert route catalog entry {}/{}: {}",
applicationId, routeId, e.getMessage());
}
}
}
@Override
public List<RouteCatalogEntry> findByEnvironment(String environment, Instant from, Instant to) {
return jdbc.query(
"SELECT application_id, route_id, first_seen, last_seen " +
"FROM route_catalog FINAL " +
"WHERE tenant_id = ? AND environment = ? " +
"AND first_seen <= ? AND last_seen >= ?",
(rs, rowNum) -> new RouteCatalogEntry(
rs.getString("application_id"),
rs.getString("route_id"),
environment,
rs.getTimestamp("first_seen").toInstant(),
rs.getTimestamp("last_seen").toInstant()),
tenantId, environment,
Timestamp.from(to), Timestamp.from(from));
}
@Override
public List<RouteCatalogEntry> findAll(Instant from, Instant to) {
return jdbc.query(
"SELECT application_id, route_id, environment, first_seen, last_seen " +
"FROM route_catalog FINAL " +
"WHERE tenant_id = ? " +
"AND first_seen <= ? AND last_seen >= ?",
(rs, rowNum) -> new RouteCatalogEntry(
rs.getString("application_id"),
rs.getString("route_id"),
rs.getString("environment"),
rs.getTimestamp("first_seen").toInstant(),
rs.getTimestamp("last_seen").toInstant()),
tenantId,
Timestamp.from(to), Timestamp.from(from));
}
@Override
public void deleteByApplication(String applicationId) {
// Remove from cache
firstSeenCache.entrySet().removeIf(e -> {
String[] parts = e.getKey().split("\0", 3);
return parts.length == 3 && parts[1].equals(applicationId);
});
}
}