feat(diagrams): add findLatestContentHashForAppRoute with app-route cache
Agent-scoped lookups miss diagrams from routes whose publishing agents have been redeployed or removed. The new method resolves by (applicationId, environment, routeId) + created_at DESC, independent of the agent registry. An in-memory cache mirrors the existing hashCache pattern, warm-loaded at startup via argMax. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -57,6 +57,12 @@ public class ClickHouseDiagramStore implements DiagramStore {
|
|||||||
ORDER BY created_at DESC LIMIT 1
|
ORDER BY created_at DESC LIMIT 1
|
||||||
""";
|
""";
|
||||||
|
|
||||||
|
private static final String SELECT_HASH_FOR_APP_ROUTE = """
|
||||||
|
SELECT content_hash FROM route_diagrams
|
||||||
|
WHERE tenant_id = ? AND application_id = ? AND environment = ? AND route_id = ?
|
||||||
|
ORDER BY created_at DESC LIMIT 1
|
||||||
|
""";
|
||||||
|
|
||||||
private static final String SELECT_DEFINITIONS_FOR_APP = """
|
private static final String SELECT_DEFINITIONS_FOR_APP = """
|
||||||
SELECT DISTINCT route_id, definition FROM route_diagrams
|
SELECT DISTINCT route_id, definition FROM route_diagrams
|
||||||
WHERE tenant_id = ? AND application_id = ? AND environment = ?
|
WHERE tenant_id = ? AND application_id = ? AND environment = ?
|
||||||
@@ -68,6 +74,8 @@ public class ClickHouseDiagramStore implements DiagramStore {
|
|||||||
|
|
||||||
// (routeId + "\0" + instanceId) → contentHash
|
// (routeId + "\0" + instanceId) → contentHash
|
||||||
private final ConcurrentHashMap<String, String> hashCache = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, String> hashCache = new ConcurrentHashMap<>();
|
||||||
|
// (applicationId + "\0" + environment + "\0" + routeId) → most recent contentHash
|
||||||
|
private final ConcurrentHashMap<String, String> appRouteHashCache = new ConcurrentHashMap<>();
|
||||||
// contentHash → deserialized RouteGraph
|
// contentHash → deserialized RouteGraph
|
||||||
private final ConcurrentHashMap<String, RouteGraph> graphCache = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, RouteGraph> graphCache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@@ -92,12 +100,37 @@ public class ClickHouseDiagramStore implements DiagramStore {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("Failed to warm diagram hash cache — lookups will fall back to ClickHouse: {}", e.getMessage());
|
log.warn("Failed to warm diagram hash cache — lookups will fall back to ClickHouse: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
jdbc.query(
|
||||||
|
"SELECT application_id, environment, route_id, " +
|
||||||
|
"argMax(content_hash, created_at) AS content_hash " +
|
||||||
|
"FROM route_diagrams WHERE tenant_id = ? " +
|
||||||
|
"GROUP BY application_id, environment, route_id",
|
||||||
|
rs -> {
|
||||||
|
String key = appRouteCacheKey(
|
||||||
|
rs.getString("application_id"),
|
||||||
|
rs.getString("environment"),
|
||||||
|
rs.getString("route_id"));
|
||||||
|
appRouteHashCache.put(key, rs.getString("content_hash"));
|
||||||
|
},
|
||||||
|
tenantId);
|
||||||
|
log.info("Diagram app-route cache warmed: {} entries", appRouteHashCache.size());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Failed to warm diagram app-route cache — lookups will fall back to ClickHouse: {}", e.getMessage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String cacheKey(String routeId, String instanceId) {
|
private static String cacheKey(String routeId, String instanceId) {
|
||||||
return routeId + "\0" + instanceId;
|
return routeId + "\0" + instanceId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String appRouteCacheKey(String applicationId, String environment, String routeId) {
|
||||||
|
return (applicationId != null ? applicationId : "") + "\0"
|
||||||
|
+ (environment != null ? environment : "") + "\0"
|
||||||
|
+ (routeId != null ? routeId : "");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void store(TaggedDiagram diagram) {
|
public void store(TaggedDiagram diagram) {
|
||||||
try {
|
try {
|
||||||
@@ -122,6 +155,7 @@ public class ClickHouseDiagramStore implements DiagramStore {
|
|||||||
|
|
||||||
// Update caches
|
// Update caches
|
||||||
hashCache.put(cacheKey(routeId, agentId), contentHash);
|
hashCache.put(cacheKey(routeId, agentId), contentHash);
|
||||||
|
appRouteHashCache.put(appRouteCacheKey(applicationId, environment, routeId), contentHash);
|
||||||
graphCache.put(contentHash, graph);
|
graphCache.put(contentHash, graph);
|
||||||
|
|
||||||
log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash);
|
log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash);
|
||||||
@@ -199,6 +233,32 @@ public class ClickHouseDiagramStore implements DiagramStore {
|
|||||||
return Optional.of((String) rows.get(0).get("content_hash"));
|
return Optional.of((String) rows.get(0).get("content_hash"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<String> findLatestContentHashForAppRoute(String applicationId,
|
||||||
|
String routeId,
|
||||||
|
String environment) {
|
||||||
|
if (applicationId == null || applicationId.isBlank()
|
||||||
|
|| routeId == null || routeId.isBlank()
|
||||||
|
|| environment == null || environment.isBlank()) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
String key = appRouteCacheKey(applicationId, environment, routeId);
|
||||||
|
String cached = appRouteHashCache.get(key);
|
||||||
|
if (cached != null) {
|
||||||
|
return Optional.of(cached);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Map<String, Object>> rows = jdbc.queryForList(
|
||||||
|
SELECT_HASH_FOR_APP_ROUTE, tenantId, applicationId, environment, routeId);
|
||||||
|
if (rows.isEmpty()) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
String hash = (String) rows.get(0).get("content_hash");
|
||||||
|
appRouteHashCache.put(key, hash);
|
||||||
|
return Optional.of(hash);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, String> findProcessorRouteMapping(String applicationId, String environment) {
|
public Map<String, String> findProcessorRouteMapping(String applicationId, String environment) {
|
||||||
Map<String, String> mapping = new HashMap<>();
|
Map<String, String> mapping = new HashMap<>();
|
||||||
|
|||||||
@@ -17,5 +17,19 @@ public interface DiagramStore {
|
|||||||
|
|
||||||
Optional<String> findContentHashForRouteByAgents(String routeId, List<String> instanceIds);
|
Optional<String> findContentHashForRouteByAgents(String routeId, List<String> instanceIds);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the most recently stored {@code content_hash} for the given
|
||||||
|
* {@code (applicationId, environment, routeId)} triple, regardless of the
|
||||||
|
* agent instance that produced it.
|
||||||
|
*
|
||||||
|
* <p>Unlike {@link #findContentHashForRoute(String, String)} and
|
||||||
|
* {@link #findContentHashForRouteByAgents(String, List)}, this lookup is
|
||||||
|
* independent of the agent registry — so it keeps working for routes
|
||||||
|
* whose publishing agents have since been redeployed or removed.
|
||||||
|
*/
|
||||||
|
Optional<String> findLatestContentHashForAppRoute(String applicationId,
|
||||||
|
String routeId,
|
||||||
|
String environment);
|
||||||
|
|
||||||
Map<String, String> findProcessorRouteMapping(String applicationId, String environment);
|
Map<String, String> findProcessorRouteMapping(String applicationId, String environment);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ class ChunkAccumulatorTest {
|
|||||||
public Optional<com.cameleer.common.graph.RouteGraph> findByContentHash(String h) { return Optional.empty(); }
|
public Optional<com.cameleer.common.graph.RouteGraph> findByContentHash(String h) { return Optional.empty(); }
|
||||||
public Optional<String> findContentHashForRoute(String r, String a) { return Optional.empty(); }
|
public Optional<String> findContentHashForRoute(String r, String a) { return Optional.empty(); }
|
||||||
public Optional<String> findContentHashForRouteByAgents(String r, List<String> a) { return Optional.empty(); }
|
public Optional<String> findContentHashForRouteByAgents(String r, List<String> a) { return Optional.empty(); }
|
||||||
|
public Optional<String> findLatestContentHashForAppRoute(String app, String r, String env) { return Optional.empty(); }
|
||||||
public Map<String, String> findProcessorRouteMapping(String app, String env) { return Map.of(); }
|
public Map<String, String> findProcessorRouteMapping(String app, String env) { return Map.of(); }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user