diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseRouteCatalogStore.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseRouteCatalogStore.java new file mode 100644 index 00000000..cbdbaaf9 --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseRouteCatalogStore.java @@ -0,0 +1,126 @@ +package com.cameleer.server.app.storage; + +import com.cameleer.server.core.storage.RouteCatalogEntry; +import com.cameleer.server.core.storage.RouteCatalogStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +public class ClickHouseRouteCatalogStore implements RouteCatalogStore { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseRouteCatalogStore.class); + + private final String tenantId; + private final JdbcTemplate jdbc; + + // key: environment + "\0" + application_id + "\0" + route_id -> first_seen + private final ConcurrentHashMap firstSeenCache = new ConcurrentHashMap<>(); + + public ClickHouseRouteCatalogStore(String tenantId, JdbcTemplate jdbc) { + this.tenantId = tenantId; + this.jdbc = jdbc; + warmLoadFirstSeenCache(); + } + + private void warmLoadFirstSeenCache() { + try { + jdbc.query( + "SELECT environment, application_id, route_id, first_seen " + + "FROM route_catalog FINAL WHERE tenant_id = ?", + rs -> { + String key = cacheKey( + rs.getString("environment"), + rs.getString("application_id"), + rs.getString("route_id")); + Timestamp ts = rs.getTimestamp("first_seen"); + if (ts != null) { + firstSeenCache.put(key, ts.toInstant()); + } + }, + tenantId); + log.info("Route catalog cache warmed: {} entries", firstSeenCache.size()); + } catch (Exception e) { + log.warn("Failed to warm route catalog cache — first_seen values will default to now: {}", e.getMessage()); + } + } + + private static String cacheKey(String environment, String applicationId, String routeId) { + return environment + "\0" + applicationId + "\0" + routeId; + } + + @Override + public void upsert(String applicationId, String environment, Collection routeIds) { + if (routeIds == null || routeIds.isEmpty()) { + return; + } + + Instant now = Instant.now(); + Timestamp nowTs = Timestamp.from(now); + + for (String routeId : routeIds) { + String key = cacheKey(environment, applicationId, routeId); + Instant firstSeen = firstSeenCache.computeIfAbsent(key, k -> now); + Timestamp firstSeenTs = Timestamp.from(firstSeen); + + try { + jdbc.update( + "INSERT INTO route_catalog " + + "(tenant_id, environment, application_id, route_id, first_seen, last_seen) " + + "VALUES (?, ?, ?, ?, ?, ?)", + tenantId, environment, applicationId, routeId, firstSeenTs, nowTs); + } catch (Exception e) { + log.warn("Failed to upsert route catalog entry {}/{}: {}", + applicationId, routeId, e.getMessage()); + } + } + } + + @Override + public List findByEnvironment(String environment, Instant from, Instant to) { + return jdbc.query( + "SELECT application_id, route_id, first_seen, last_seen " + + "FROM route_catalog FINAL " + + "WHERE tenant_id = ? AND environment = ? " + + "AND first_seen <= ? AND last_seen >= ?", + (rs, rowNum) -> new RouteCatalogEntry( + rs.getString("application_id"), + rs.getString("route_id"), + environment, + rs.getTimestamp("first_seen").toInstant(), + rs.getTimestamp("last_seen").toInstant()), + tenantId, environment, + Timestamp.from(to), Timestamp.from(from)); + } + + @Override + public List findAll(Instant from, Instant to) { + return jdbc.query( + "SELECT application_id, route_id, environment, first_seen, last_seen " + + "FROM route_catalog FINAL " + + "WHERE tenant_id = ? " + + "AND first_seen <= ? AND last_seen >= ?", + (rs, rowNum) -> new RouteCatalogEntry( + rs.getString("application_id"), + rs.getString("route_id"), + rs.getString("environment"), + rs.getTimestamp("first_seen").toInstant(), + rs.getTimestamp("last_seen").toInstant()), + tenantId, + Timestamp.from(to), Timestamp.from(from)); + } + + @Override + public void deleteByApplication(String applicationId) { + // Remove from cache + firstSeenCache.entrySet().removeIf(e -> { + String[] parts = e.getKey().split("\0", 3); + return parts.length == 3 && parts[1].equals(applicationId); + }); + } +}