diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ApplicationConfigController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ApplicationConfigController.java index b892da26..09e50110 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ApplicationConfigController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ApplicationConfigController.java @@ -13,7 +13,7 @@ import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.agent.AgentState; import com.cameleer3.server.core.agent.CommandReply; import com.cameleer3.server.core.agent.CommandType; -import com.cameleer3.server.core.storage.ExecutionStore; +import com.cameleer3.server.core.storage.DiagramStore; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.swagger.v3.oas.annotations.Operation; @@ -49,18 +49,18 @@ public class ApplicationConfigController { private final AgentRegistryService registryService; private final ObjectMapper objectMapper; private final AuditService auditService; - private final ExecutionStore executionStore; + private final DiagramStore diagramStore; public ApplicationConfigController(PostgresApplicationConfigRepository configRepository, AgentRegistryService registryService, ObjectMapper objectMapper, AuditService auditService, - ExecutionStore executionStore) { + DiagramStore diagramStore) { this.configRepository = configRepository; this.registryService = registryService; this.objectMapper = objectMapper; this.auditService = auditService; - this.executionStore = executionStore; + this.diagramStore = diagramStore; } @GetMapping @@ -112,7 +112,7 @@ public class ApplicationConfigController { description = "Returns a map of processorId → routeId for all processors seen in this application") @ApiResponse(responseCode = "200", description = "Mapping returned") public ResponseEntity> getProcessorRouteMapping(@PathVariable String application) { - return ResponseEntity.ok(executionStore.findProcessorRouteMapping(application)); + return ResponseEntity.ok(diagramStore.findProcessorRouteMapping(application)); } @PostMapping("/{application}/test-expression") diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java index 5cdaf176..2524ea01 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java @@ -1,6 +1,8 @@ package com.cameleer3.server.app.controller; import com.cameleer3.common.graph.RouteGraph; +import com.cameleer3.server.core.agent.AgentInfo; +import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.ingestion.IngestionService; import com.cameleer3.server.core.ingestion.TaggedDiagram; import com.fasterxml.jackson.core.JsonProcessingException; @@ -35,10 +37,14 @@ public class DiagramController { private static final Logger log = LoggerFactory.getLogger(DiagramController.class); private final IngestionService ingestionService; + private final AgentRegistryService registryService; private final ObjectMapper objectMapper; - public DiagramController(IngestionService ingestionService, ObjectMapper objectMapper) { + public DiagramController(IngestionService ingestionService, + AgentRegistryService registryService, + ObjectMapper objectMapper) { this.ingestionService = ingestionService; + this.registryService = registryService; this.objectMapper = objectMapper; } @@ -48,10 +54,11 @@ public class DiagramController { @ApiResponse(responseCode = "202", description = "Data accepted for processing") public ResponseEntity ingestDiagrams(@RequestBody String body) throws JsonProcessingException { String agentId = extractAgentId(); + String applicationName = resolveApplicationName(agentId); List graphs = parsePayload(body); for (RouteGraph graph : graphs) { - ingestionService.ingestDiagram(new TaggedDiagram(agentId, graph)); + ingestionService.ingestDiagram(new TaggedDiagram(agentId, applicationName, graph)); } return ResponseEntity.accepted().build(); @@ -62,6 +69,11 @@ public class DiagramController { return auth != null ? auth.getName() : ""; } + private String resolveApplicationName(String agentId) { + AgentInfo agent = registryService.findById(agentId); + return agent != null ? agent.application() : ""; + } + private List parsePayload(String body) throws JsonProcessingException { String trimmed = body.strip(); if (trimmed.startsWith("[")) { diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java index 0c7dbbf8..c4219482 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java @@ -16,6 +16,7 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HexFormat; import java.util.List; import java.util.Map; @@ -33,8 +34,8 @@ public class PostgresDiagramStore implements DiagramStore { private static final Logger log = LoggerFactory.getLogger(PostgresDiagramStore.class); private static final String INSERT_SQL = """ - INSERT INTO route_diagrams (content_hash, route_id, agent_id, definition) - VALUES (?, ?, ?, ?::jsonb) + INSERT INTO route_diagrams (content_hash, route_id, agent_id, application_name, definition) + VALUES (?, ?, ?, ?, ?::jsonb) ON CONFLICT (content_hash) DO NOTHING """; @@ -62,11 +63,12 @@ public class PostgresDiagramStore implements DiagramStore { try { RouteGraph graph = diagram.graph(); String agentId = diagram.agentId() != null ? diagram.agentId() : ""; + String applicationName = diagram.applicationName() != null ? diagram.applicationName() : ""; String json = objectMapper.writeValueAsString(graph); String contentHash = sha256Hex(json); String routeId = graph.getRouteId() != null ? graph.getRouteId() : ""; - jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, json); + jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, applicationName, json); log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize RouteGraph to JSON", e); @@ -116,6 +118,21 @@ public class PostgresDiagramStore implements DiagramStore { return Optional.of((String) rows.get(0).get("content_hash")); } + @Override + public Map findProcessorRouteMapping(String applicationName) { + Map mapping = new HashMap<>(); + jdbcTemplate.query(""" + SELECT DISTINCT rd.route_id, node_elem->>'id' AS processor_id + FROM route_diagrams rd, + jsonb_array_elements(rd.definition::jsonb->'nodes') AS node_elem + WHERE rd.application_name = ? + AND node_elem->>'id' IS NOT NULL + """, + rs -> { mapping.put(rs.getString("processor_id"), rs.getString("route_id")); }, + applicationName); + return mapping; + } + static String sha256Hex(String input) { try { MessageDigest digest = MessageDigest.getInstance("SHA-256"); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java index 082b88df..760cb5e2 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java @@ -9,9 +9,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.time.Instant; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; @Repository @@ -115,18 +113,6 @@ public class PostgresExecutionStore implements ExecutionStore { PROCESSOR_MAPPER, executionId); } - @Override - public Map findProcessorRouteMapping(String applicationName) { - Map mapping = new HashMap<>(); - jdbc.query(""" - SELECT DISTINCT ON (processor_id) processor_id, route_id - FROM processor_executions WHERE application_name = ? ORDER BY processor_id - """, - rs -> { mapping.put(rs.getString("processor_id"), rs.getString("route_id")); }, - applicationName); - return mapping; - } - private static final RowMapper EXECUTION_MAPPER = (rs, rowNum) -> new ExecutionRecord( rs.getString("execution_id"), rs.getString("route_id"), diff --git a/cameleer3-server-app/src/main/resources/db/migration/V7__diagram_application_name.sql b/cameleer3-server-app/src/main/resources/db/migration/V7__diagram_application_name.sql new file mode 100644 index 00000000..708c1960 --- /dev/null +++ b/cameleer3-server-app/src/main/resources/db/migration/V7__diagram_application_name.sql @@ -0,0 +1,2 @@ +ALTER TABLE route_diagrams ADD COLUMN IF NOT EXISTS application_name TEXT NOT NULL DEFAULT ''; +CREATE INDEX IF NOT EXISTS idx_diagrams_application ON route_diagrams (application_name); diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/TaggedDiagram.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/TaggedDiagram.java index 2f7b9ec5..736867b6 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/TaggedDiagram.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/TaggedDiagram.java @@ -8,4 +8,4 @@ import com.cameleer3.common.graph.RouteGraph; * The agent ID is extracted from the SecurityContext in the controller layer * and carried through the write buffer so the flush scheduler can persist it. */ -public record TaggedDiagram(String agentId, RouteGraph graph) {} +public record TaggedDiagram(String agentId, String applicationName, RouteGraph graph) {} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramStore.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramStore.java index 12ff6d7d..5d259763 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramStore.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramStore.java @@ -4,6 +4,7 @@ import com.cameleer3.common.graph.RouteGraph; import com.cameleer3.server.core.ingestion.TaggedDiagram; import java.util.List; +import java.util.Map; import java.util.Optional; public interface DiagramStore { @@ -15,4 +16,6 @@ public interface DiagramStore { Optional findContentHashForRoute(String routeId, String agentId); Optional findContentHashForRouteByAgents(String routeId, List agentIds); + + Map findProcessorRouteMapping(String applicationName); } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java index 2edcc0cc..ecc076dd 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java @@ -2,7 +2,6 @@ package com.cameleer3.server.core.storage; import java.time.Instant; import java.util.List; -import java.util.Map; import java.util.Optional; public interface ExecutionStore { @@ -17,8 +16,6 @@ public interface ExecutionStore { List findProcessors(String executionId); - Map findProcessorRouteMapping(String applicationName); - record ExecutionRecord( String executionId, String routeId, String agentId, String applicationName, String status, String correlationId, String exchangeId,