feat(clickhouse): add ClickHouseDiagramStore with integration tests
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,193 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.common.graph.RouteGraph;
|
||||
import com.cameleer3.common.graph.RouteNode;
|
||||
import com.cameleer3.server.core.ingestion.TaggedDiagram;
|
||||
import com.cameleer3.server.core.storage.DiagramStore;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HexFormat;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* ClickHouse implementation of {@link DiagramStore}.
|
||||
* <p>
|
||||
* Stores route graphs as JSON with SHA-256 content-hash deduplication.
|
||||
* Uses ReplacingMergeTree — duplicate inserts are deduplicated on merge.
|
||||
* <p>
|
||||
* {@code findProcessorRouteMapping} fetches all definitions for the application
|
||||
* and deserializes them in Java because ClickHouse has no equivalent of
|
||||
* PostgreSQL's {@code jsonb_array_elements()}.
|
||||
*/
|
||||
public class ClickHouseDiagramStore implements DiagramStore {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ClickHouseDiagramStore.class);
|
||||
|
||||
private static final String TENANT = "default";
|
||||
|
||||
private static final String INSERT_SQL = """
|
||||
INSERT INTO route_diagrams
|
||||
(tenant_id, content_hash, route_id, agent_id, application_name, definition, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""";
|
||||
|
||||
private static final String SELECT_BY_HASH = """
|
||||
SELECT definition FROM route_diagrams
|
||||
WHERE tenant_id = ? AND content_hash = ?
|
||||
LIMIT 1
|
||||
""";
|
||||
|
||||
private static final String SELECT_HASH_FOR_ROUTE = """
|
||||
SELECT content_hash FROM route_diagrams
|
||||
WHERE tenant_id = ? AND route_id = ? AND agent_id = ?
|
||||
ORDER BY created_at DESC LIMIT 1
|
||||
""";
|
||||
|
||||
private static final String SELECT_DEFINITIONS_FOR_APP = """
|
||||
SELECT DISTINCT route_id, definition FROM route_diagrams
|
||||
WHERE tenant_id = ? AND application_name = ?
|
||||
""";
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public ClickHouseDiagramStore(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void store(TaggedDiagram diagram) {
|
||||
try {
|
||||
RouteGraph graph = diagram.graph();
|
||||
String agentId = diagram.agentId() != null ? diagram.agentId() : "";
|
||||
String applicationName = diagram.applicationName() != null ? diagram.applicationName() : "";
|
||||
String json = objectMapper.writeValueAsString(graph);
|
||||
String contentHash = sha256Hex(json);
|
||||
String routeId = graph.getRouteId() != null ? graph.getRouteId() : "";
|
||||
|
||||
jdbc.update(INSERT_SQL,
|
||||
TENANT,
|
||||
contentHash,
|
||||
routeId,
|
||||
agentId,
|
||||
applicationName,
|
||||
json,
|
||||
Timestamp.from(Instant.now()));
|
||||
log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException("Failed to serialize RouteGraph to JSON", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<RouteGraph> findByContentHash(String contentHash) {
|
||||
List<Map<String, Object>> rows = jdbc.queryForList(SELECT_BY_HASH, TENANT, contentHash);
|
||||
if (rows.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
String json = (String) rows.get(0).get("definition");
|
||||
try {
|
||||
return Optional.of(objectMapper.readValue(json, RouteGraph.class));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Failed to deserialize RouteGraph from ClickHouse", e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> findContentHashForRoute(String routeId, String agentId) {
|
||||
List<Map<String, Object>> rows = jdbc.queryForList(
|
||||
SELECT_HASH_FOR_ROUTE, TENANT, routeId, agentId);
|
||||
if (rows.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of((String) rows.get(0).get("content_hash"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds) {
|
||||
if (agentIds == null || agentIds.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?"));
|
||||
String sql = "SELECT content_hash FROM route_diagrams " +
|
||||
"WHERE tenant_id = ? AND route_id = ? AND agent_id IN (" + placeholders + ") " +
|
||||
"ORDER BY created_at DESC LIMIT 1";
|
||||
var params = new ArrayList<Object>();
|
||||
params.add(TENANT);
|
||||
params.add(routeId);
|
||||
params.addAll(agentIds);
|
||||
List<Map<String, Object>> rows = jdbc.queryForList(sql, params.toArray());
|
||||
if (rows.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of((String) rows.get(0).get("content_hash"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> findProcessorRouteMapping(String applicationName) {
|
||||
Map<String, String> mapping = new HashMap<>();
|
||||
List<Map<String, Object>> rows = jdbc.queryForList(
|
||||
SELECT_DEFINITIONS_FOR_APP, TENANT, applicationName);
|
||||
for (Map<String, Object> row : rows) {
|
||||
String routeId = (String) row.get("route_id");
|
||||
String json = (String) row.get("definition");
|
||||
if (json == null || routeId == null) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
RouteGraph graph = objectMapper.readValue(json, RouteGraph.class);
|
||||
collectNodeIds(graph.getRoot(), routeId, mapping);
|
||||
} catch (JsonProcessingException e) {
|
||||
log.warn("Failed to deserialize RouteGraph for route={} app={}", routeId, applicationName, e);
|
||||
}
|
||||
}
|
||||
return mapping;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively walks the RouteNode tree and maps each node ID to the given routeId.
|
||||
*/
|
||||
private void collectNodeIds(RouteNode node, String routeId, Map<String, String> mapping) {
|
||||
if (node == null) {
|
||||
return;
|
||||
}
|
||||
String id = node.getId();
|
||||
if (id != null && !id.isEmpty()) {
|
||||
mapping.put(id, routeId);
|
||||
}
|
||||
List<RouteNode> children = node.getChildren();
|
||||
if (children != null) {
|
||||
for (RouteNode child : children) {
|
||||
collectNodeIds(child, routeId, mapping);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static String sha256Hex(String input) {
|
||||
try {
|
||||
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||
byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
|
||||
return HexFormat.of().formatHex(hash);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException("SHA-256 not available", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user