diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramStore.java new file mode 100644 index 00000000..922b3a2b --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramStore.java @@ -0,0 +1,193 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.common.graph.RouteGraph; +import com.cameleer3.common.graph.RouteNode; +import com.cameleer3.server.core.ingestion.TaggedDiagram; +import com.cameleer3.server.core.storage.DiagramStore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HexFormat; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * ClickHouse implementation of {@link DiagramStore}. + *

+ * Stores route graphs as JSON with SHA-256 content-hash deduplication. + * Uses ReplacingMergeTree — duplicate inserts are deduplicated on merge. + *

+ * {@code findProcessorRouteMapping} fetches all definitions for the application + * and deserializes them in Java because ClickHouse has no equivalent of + * PostgreSQL's {@code jsonb_array_elements()}. + */ +public class ClickHouseDiagramStore implements DiagramStore { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseDiagramStore.class); + + private static final String TENANT = "default"; + + private static final String INSERT_SQL = """ + INSERT INTO route_diagrams + (tenant_id, content_hash, route_id, agent_id, application_name, definition, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """; + + private static final String SELECT_BY_HASH = """ + SELECT definition FROM route_diagrams + WHERE tenant_id = ? AND content_hash = ? + LIMIT 1 + """; + + private static final String SELECT_HASH_FOR_ROUTE = """ + SELECT content_hash FROM route_diagrams + WHERE tenant_id = ? AND route_id = ? AND agent_id = ? + ORDER BY created_at DESC LIMIT 1 + """; + + private static final String SELECT_DEFINITIONS_FOR_APP = """ + SELECT DISTINCT route_id, definition FROM route_diagrams + WHERE tenant_id = ? AND application_name = ? + """; + + private final JdbcTemplate jdbc; + private final ObjectMapper objectMapper; + + public ClickHouseDiagramStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + this.objectMapper = new ObjectMapper(); + this.objectMapper.registerModule(new JavaTimeModule()); + } + + @Override + public void store(TaggedDiagram diagram) { + try { + RouteGraph graph = diagram.graph(); + String agentId = diagram.agentId() != null ? diagram.agentId() : ""; + String applicationName = diagram.applicationName() != null ? diagram.applicationName() : ""; + String json = objectMapper.writeValueAsString(graph); + String contentHash = sha256Hex(json); + String routeId = graph.getRouteId() != null ? graph.getRouteId() : ""; + + jdbc.update(INSERT_SQL, + TENANT, + contentHash, + routeId, + agentId, + applicationName, + json, + Timestamp.from(Instant.now())); + log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize RouteGraph to JSON", e); + } + } + + @Override + public Optional findByContentHash(String contentHash) { + List> rows = jdbc.queryForList(SELECT_BY_HASH, TENANT, contentHash); + if (rows.isEmpty()) { + return Optional.empty(); + } + String json = (String) rows.get(0).get("definition"); + try { + return Optional.of(objectMapper.readValue(json, RouteGraph.class)); + } catch (JsonProcessingException e) { + log.error("Failed to deserialize RouteGraph from ClickHouse", e); + return Optional.empty(); + } + } + + @Override + public Optional findContentHashForRoute(String routeId, String agentId) { + List> rows = jdbc.queryForList( + SELECT_HASH_FOR_ROUTE, TENANT, routeId, agentId); + if (rows.isEmpty()) { + return Optional.empty(); + } + return Optional.of((String) rows.get(0).get("content_hash")); + } + + @Override + public Optional findContentHashForRouteByAgents(String routeId, List agentIds) { + if (agentIds == null || agentIds.isEmpty()) { + return Optional.empty(); + } + String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?")); + String sql = "SELECT content_hash FROM route_diagrams " + + "WHERE tenant_id = ? AND route_id = ? AND agent_id IN (" + placeholders + ") " + + "ORDER BY created_at DESC LIMIT 1"; + var params = new ArrayList(); + params.add(TENANT); + params.add(routeId); + params.addAll(agentIds); + List> rows = jdbc.queryForList(sql, params.toArray()); + if (rows.isEmpty()) { + return Optional.empty(); + } + return Optional.of((String) rows.get(0).get("content_hash")); + } + + @Override + public Map findProcessorRouteMapping(String applicationName) { + Map mapping = new HashMap<>(); + List> rows = jdbc.queryForList( + SELECT_DEFINITIONS_FOR_APP, TENANT, applicationName); + for (Map row : rows) { + String routeId = (String) row.get("route_id"); + String json = (String) row.get("definition"); + if (json == null || routeId == null) { + continue; + } + try { + RouteGraph graph = objectMapper.readValue(json, RouteGraph.class); + collectNodeIds(graph.getRoot(), routeId, mapping); + } catch (JsonProcessingException e) { + log.warn("Failed to deserialize RouteGraph for route={} app={}", routeId, applicationName, e); + } + } + return mapping; + } + + /** + * Recursively walks the RouteNode tree and maps each node ID to the given routeId. + */ + private void collectNodeIds(RouteNode node, String routeId, Map mapping) { + if (node == null) { + return; + } + String id = node.getId(); + if (id != null && !id.isEmpty()) { + mapping.put(id, routeId); + } + List children = node.getChildren(); + if (children != null) { + for (RouteNode child : children) { + collectNodeIds(child, routeId, mapping); + } + } + } + + static String sha256Hex(String input) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8)); + return HexFormat.of().formatHex(hash); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("SHA-256 not available", e); + } + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseDiagramStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseDiagramStoreIT.java new file mode 100644 index 00000000..ea608a3f --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseDiagramStoreIT.java @@ -0,0 +1,213 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.common.graph.NodeType; +import com.cameleer3.common.graph.RouteGraph; +import com.cameleer3.common.graph.RouteNode; +import com.cameleer3.server.core.ingestion.TaggedDiagram; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseDiagramStoreIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseDiagramStore store; + + @BeforeEach + void setUp() throws Exception { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + String ddl = new ClassPathResource("clickhouse/V6__route_diagrams.sql") + .getContentAsString(StandardCharsets.UTF_8); + jdbc.execute(ddl); + jdbc.execute("TRUNCATE TABLE route_diagrams"); + + store = new ClickHouseDiagramStore(jdbc); + } + + // ── Helpers ────────────────────────────────────────────────────────── + + private RouteGraph buildGraph(String routeId, String... nodeIds) { + RouteGraph graph = new RouteGraph(routeId); + if (nodeIds.length > 0) { + RouteNode root = new RouteNode(nodeIds[0], NodeType.ENDPOINT, "from:" + nodeIds[0]); + for (int i = 1; i < nodeIds.length; i++) { + root.addChild(new RouteNode(nodeIds[i], NodeType.PROCESSOR, "proc:" + nodeIds[i])); + } + graph.setRoot(root); + } + return graph; + } + + private TaggedDiagram tagged(String agentId, String appName, RouteGraph graph) { + return new TaggedDiagram(agentId, appName, graph); + } + + // ── Tests ───────────────────────────────────────────────────────────── + + @Test + void store_insertsNewDiagram() { + RouteGraph graph = buildGraph("route-1", "node-a", "node-b"); + store.store(tagged("agent-1", "my-app", graph)); + + // Allow ReplacingMergeTree to settle + jdbc.execute("OPTIMIZE TABLE route_diagrams FINAL"); + + long count = jdbc.queryForObject( + "SELECT count() FROM route_diagrams WHERE route_id = 'route-1'", + Long.class); + assertThat(count).isEqualTo(1); + } + + @Test + void store_duplicateHashIgnored() { + RouteGraph graph = buildGraph("route-1", "node-a"); + TaggedDiagram diagram = tagged("agent-1", "my-app", graph); + + store.store(diagram); + store.store(diagram); // same graph → same hash + + jdbc.execute("OPTIMIZE TABLE route_diagrams FINAL"); + + long count = jdbc.queryForObject( + "SELECT count() FROM route_diagrams FINAL WHERE route_id = 'route-1'", + Long.class); + assertThat(count).isEqualTo(1); + } + + @Test + void findByContentHash_returnsGraph() { + RouteGraph graph = buildGraph("route-2", "node-x"); + graph.setDescription("Test route"); + TaggedDiagram diagram = tagged("agent-2", "app-a", graph); + store.store(diagram); + + // Compute the expected hash + String hash = store.findContentHashForRoute("route-2", "agent-2") + .orElseThrow(() -> new AssertionError("No hash found for route-2/agent-2")); + + Optional result = store.findByContentHash(hash); + + assertThat(result).isPresent(); + assertThat(result.get().getRouteId()).isEqualTo("route-2"); + assertThat(result.get().getDescription()).isEqualTo("Test route"); + } + + @Test + void findByContentHash_returnsEmptyForUnknownHash() { + Optional result = store.findByContentHash("nonexistent-hash-000"); + assertThat(result).isEmpty(); + } + + @Test + void findContentHashForRoute_returnsMostRecent() throws InterruptedException { + RouteGraph graphV1 = buildGraph("route-3", "node-1"); + graphV1.setDescription("v1"); + RouteGraph graphV2 = buildGraph("route-3", "node-1", "node-2"); + graphV2.setDescription("v2"); + + store.store(tagged("agent-1", "my-app", graphV1)); + // Small delay to ensure different created_at timestamps + Thread.sleep(10); + store.store(tagged("agent-1", "my-app", graphV2)); + + Optional hashOpt = store.findContentHashForRoute("route-3", "agent-1"); + assertThat(hashOpt).isPresent(); + + // The hash should correspond to graphV2 (the most recent) + String expectedHash = ClickHouseDiagramStore.sha256Hex( + store.findByContentHash(hashOpt.get()) + .map(g -> { + try { + return new com.fasterxml.jackson.databind.ObjectMapper() + .registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule()) + .writeValueAsString(g); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .orElseThrow()); + + assertThat(hashOpt.get()).isEqualTo(expectedHash); + + // Verify retrieved graph has v2's content + RouteGraph retrieved = store.findByContentHash(hashOpt.get()).orElseThrow(); + assertThat(retrieved.getDescription()).isEqualTo("v2"); + } + + @Test + void findContentHashForRouteByAgents_returnsHash() { + RouteGraph graph = buildGraph("route-4", "node-z"); + store.store(tagged("agent-10", "app-b", graph)); + store.store(tagged("agent-20", "app-b", graph)); + + Optional result = store.findContentHashForRouteByAgents( + "route-4", java.util.List.of("agent-10", "agent-20")); + + assertThat(result).isPresent(); + } + + @Test + void findContentHashForRouteByAgents_emptyListReturnsEmpty() { + Optional result = store.findContentHashForRouteByAgents("route-x", java.util.List.of()); + assertThat(result).isEmpty(); + } + + @Test + void findProcessorRouteMapping_extractsMapping() { + // Build a graph with 3 nodes: root + 2 children + RouteGraph graph = buildGraph("route-5", "proc-from-1", "proc-to-2", "proc-log-3"); + store.store(tagged("agent-1", "app-mapping", graph)); + + jdbc.execute("OPTIMIZE TABLE route_diagrams FINAL"); + + Map mapping = store.findProcessorRouteMapping("app-mapping"); + + assertThat(mapping).containsEntry("proc-from-1", "route-5"); + assertThat(mapping).containsEntry("proc-to-2", "route-5"); + assertThat(mapping).containsEntry("proc-log-3", "route-5"); + } + + @Test + void findProcessorRouteMapping_multipleRoutes() { + RouteGraph graphA = buildGraph("route-a", "proc-a1", "proc-a2"); + RouteGraph graphB = buildGraph("route-b", "proc-b1"); + store.store(tagged("agent-1", "multi-app", graphA)); + store.store(tagged("agent-1", "multi-app", graphB)); + + jdbc.execute("OPTIMIZE TABLE route_diagrams FINAL"); + + Map mapping = store.findProcessorRouteMapping("multi-app"); + + assertThat(mapping).containsEntry("proc-a1", "route-a"); + assertThat(mapping).containsEntry("proc-a2", "route-a"); + assertThat(mapping).containsEntry("proc-b1", "route-b"); + } + + @Test + void findProcessorRouteMapping_unknownAppReturnsEmpty() { + Map mapping = store.findProcessorRouteMapping("nonexistent-app"); + assertThat(mapping).isEmpty(); + } +}