refactor: derive processor-route mapping from diagrams instead of executions
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 37s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped

Store application_name in route_diagrams at ingestion time (V7 migration),
resolve from agent registry same as ExecutionController. Move
findProcessorRouteMapping from ExecutionStore to DiagramStore using a
JSONB query that extracts node IDs directly from stored RouteGraph
definitions. This makes the mapping available as soon as diagrams are
sent, before any executions are recorded.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-26 23:00:10 +01:00
parent 100b780b47
commit d6c1f2c25b
8 changed files with 45 additions and 28 deletions

View File

@@ -13,7 +13,7 @@ import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.agent.AgentState; import com.cameleer3.server.core.agent.AgentState;
import com.cameleer3.server.core.agent.CommandReply; import com.cameleer3.server.core.agent.CommandReply;
import com.cameleer3.server.core.agent.CommandType; import com.cameleer3.server.core.agent.CommandType;
import com.cameleer3.server.core.storage.ExecutionStore; import com.cameleer3.server.core.storage.DiagramStore;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
@@ -49,18 +49,18 @@ public class ApplicationConfigController {
private final AgentRegistryService registryService; private final AgentRegistryService registryService;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final AuditService auditService; private final AuditService auditService;
private final ExecutionStore executionStore; private final DiagramStore diagramStore;
public ApplicationConfigController(PostgresApplicationConfigRepository configRepository, public ApplicationConfigController(PostgresApplicationConfigRepository configRepository,
AgentRegistryService registryService, AgentRegistryService registryService,
ObjectMapper objectMapper, ObjectMapper objectMapper,
AuditService auditService, AuditService auditService,
ExecutionStore executionStore) { DiagramStore diagramStore) {
this.configRepository = configRepository; this.configRepository = configRepository;
this.registryService = registryService; this.registryService = registryService;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.auditService = auditService; this.auditService = auditService;
this.executionStore = executionStore; this.diagramStore = diagramStore;
} }
@GetMapping @GetMapping
@@ -112,7 +112,7 @@ public class ApplicationConfigController {
description = "Returns a map of processorId → routeId for all processors seen in this application") description = "Returns a map of processorId → routeId for all processors seen in this application")
@ApiResponse(responseCode = "200", description = "Mapping returned") @ApiResponse(responseCode = "200", description = "Mapping returned")
public ResponseEntity<Map<String, String>> getProcessorRouteMapping(@PathVariable String application) { public ResponseEntity<Map<String, String>> getProcessorRouteMapping(@PathVariable String application) {
return ResponseEntity.ok(executionStore.findProcessorRouteMapping(application)); return ResponseEntity.ok(diagramStore.findProcessorRouteMapping(application));
} }
@PostMapping("/{application}/test-expression") @PostMapping("/{application}/test-expression")

View File

@@ -1,6 +1,8 @@
package com.cameleer3.server.app.controller; package com.cameleer3.server.app.controller;
import com.cameleer3.common.graph.RouteGraph; import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.ingestion.IngestionService; import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.TaggedDiagram; import com.cameleer3.server.core.ingestion.TaggedDiagram;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
@@ -35,10 +37,14 @@ public class DiagramController {
private static final Logger log = LoggerFactory.getLogger(DiagramController.class); private static final Logger log = LoggerFactory.getLogger(DiagramController.class);
private final IngestionService ingestionService; private final IngestionService ingestionService;
private final AgentRegistryService registryService;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
public DiagramController(IngestionService ingestionService, ObjectMapper objectMapper) { public DiagramController(IngestionService ingestionService,
AgentRegistryService registryService,
ObjectMapper objectMapper) {
this.ingestionService = ingestionService; this.ingestionService = ingestionService;
this.registryService = registryService;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
} }
@@ -48,10 +54,11 @@ public class DiagramController {
@ApiResponse(responseCode = "202", description = "Data accepted for processing") @ApiResponse(responseCode = "202", description = "Data accepted for processing")
public ResponseEntity<Void> ingestDiagrams(@RequestBody String body) throws JsonProcessingException { public ResponseEntity<Void> ingestDiagrams(@RequestBody String body) throws JsonProcessingException {
String agentId = extractAgentId(); String agentId = extractAgentId();
String applicationName = resolveApplicationName(agentId);
List<RouteGraph> graphs = parsePayload(body); List<RouteGraph> graphs = parsePayload(body);
for (RouteGraph graph : graphs) { for (RouteGraph graph : graphs) {
ingestionService.ingestDiagram(new TaggedDiagram(agentId, graph)); ingestionService.ingestDiagram(new TaggedDiagram(agentId, applicationName, graph));
} }
return ResponseEntity.accepted().build(); return ResponseEntity.accepted().build();
@@ -62,6 +69,11 @@ public class DiagramController {
return auth != null ? auth.getName() : ""; return auth != null ? auth.getName() : "";
} }
private String resolveApplicationName(String agentId) {
AgentInfo agent = registryService.findById(agentId);
return agent != null ? agent.application() : "";
}
private List<RouteGraph> parsePayload(String body) throws JsonProcessingException { private List<RouteGraph> parsePayload(String body) throws JsonProcessingException {
String trimmed = body.strip(); String trimmed = body.strip();
if (trimmed.startsWith("[")) { if (trimmed.startsWith("[")) {

View File

@@ -16,6 +16,7 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HexFormat; import java.util.HexFormat;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -33,8 +34,8 @@ public class PostgresDiagramStore implements DiagramStore {
private static final Logger log = LoggerFactory.getLogger(PostgresDiagramStore.class); private static final Logger log = LoggerFactory.getLogger(PostgresDiagramStore.class);
private static final String INSERT_SQL = """ private static final String INSERT_SQL = """
INSERT INTO route_diagrams (content_hash, route_id, agent_id, definition) INSERT INTO route_diagrams (content_hash, route_id, agent_id, application_name, definition)
VALUES (?, ?, ?, ?::jsonb) VALUES (?, ?, ?, ?, ?::jsonb)
ON CONFLICT (content_hash) DO NOTHING ON CONFLICT (content_hash) DO NOTHING
"""; """;
@@ -62,11 +63,12 @@ public class PostgresDiagramStore implements DiagramStore {
try { try {
RouteGraph graph = diagram.graph(); RouteGraph graph = diagram.graph();
String agentId = diagram.agentId() != null ? diagram.agentId() : ""; String agentId = diagram.agentId() != null ? diagram.agentId() : "";
String applicationName = diagram.applicationName() != null ? diagram.applicationName() : "";
String json = objectMapper.writeValueAsString(graph); String json = objectMapper.writeValueAsString(graph);
String contentHash = sha256Hex(json); String contentHash = sha256Hex(json);
String routeId = graph.getRouteId() != null ? graph.getRouteId() : ""; String routeId = graph.getRouteId() != null ? graph.getRouteId() : "";
jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, json); jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, applicationName, json);
log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash); log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize RouteGraph to JSON", e); throw new RuntimeException("Failed to serialize RouteGraph to JSON", e);
@@ -116,6 +118,21 @@ public class PostgresDiagramStore implements DiagramStore {
return Optional.of((String) rows.get(0).get("content_hash")); return Optional.of((String) rows.get(0).get("content_hash"));
} }
@Override
public Map<String, String> findProcessorRouteMapping(String applicationName) {
Map<String, String> mapping = new HashMap<>();
jdbcTemplate.query("""
SELECT DISTINCT rd.route_id, node_elem->>'id' AS processor_id
FROM route_diagrams rd,
jsonb_array_elements(rd.definition::jsonb->'nodes') AS node_elem
WHERE rd.application_name = ?
AND node_elem->>'id' IS NOT NULL
""",
rs -> { mapping.put(rs.getString("processor_id"), rs.getString("route_id")); },
applicationName);
return mapping;
}
static String sha256Hex(String input) { static String sha256Hex(String input) {
try { try {
MessageDigest digest = MessageDigest.getInstance("SHA-256"); MessageDigest digest = MessageDigest.getInstance("SHA-256");

View File

@@ -9,9 +9,7 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.time.Instant; import java.time.Instant;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
@Repository @Repository
@@ -115,18 +113,6 @@ public class PostgresExecutionStore implements ExecutionStore {
PROCESSOR_MAPPER, executionId); PROCESSOR_MAPPER, executionId);
} }
@Override
public Map<String, String> findProcessorRouteMapping(String applicationName) {
Map<String, String> mapping = new HashMap<>();
jdbc.query("""
SELECT DISTINCT ON (processor_id) processor_id, route_id
FROM processor_executions WHERE application_name = ? ORDER BY processor_id
""",
rs -> { mapping.put(rs.getString("processor_id"), rs.getString("route_id")); },
applicationName);
return mapping;
}
private static final RowMapper<ExecutionRecord> EXECUTION_MAPPER = (rs, rowNum) -> private static final RowMapper<ExecutionRecord> EXECUTION_MAPPER = (rs, rowNum) ->
new ExecutionRecord( new ExecutionRecord(
rs.getString("execution_id"), rs.getString("route_id"), rs.getString("execution_id"), rs.getString("route_id"),

View File

@@ -0,0 +1,2 @@
ALTER TABLE route_diagrams ADD COLUMN IF NOT EXISTS application_name TEXT NOT NULL DEFAULT '';
CREATE INDEX IF NOT EXISTS idx_diagrams_application ON route_diagrams (application_name);

View File

@@ -8,4 +8,4 @@ import com.cameleer3.common.graph.RouteGraph;
* The agent ID is extracted from the SecurityContext in the controller layer * The agent ID is extracted from the SecurityContext in the controller layer
* and carried through the write buffer so the flush scheduler can persist it. * and carried through the write buffer so the flush scheduler can persist it.
*/ */
public record TaggedDiagram(String agentId, RouteGraph graph) {} public record TaggedDiagram(String agentId, String applicationName, RouteGraph graph) {}

View File

@@ -4,6 +4,7 @@ import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.server.core.ingestion.TaggedDiagram; import com.cameleer3.server.core.ingestion.TaggedDiagram;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
public interface DiagramStore { public interface DiagramStore {
@@ -15,4 +16,6 @@ public interface DiagramStore {
Optional<String> findContentHashForRoute(String routeId, String agentId); Optional<String> findContentHashForRoute(String routeId, String agentId);
Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds); Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds);
Map<String, String> findProcessorRouteMapping(String applicationName);
} }

View File

@@ -2,7 +2,6 @@ package com.cameleer3.server.core.storage;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
public interface ExecutionStore { public interface ExecutionStore {
@@ -17,8 +16,6 @@ public interface ExecutionStore {
List<ProcessorRecord> findProcessors(String executionId); List<ProcessorRecord> findProcessors(String executionId);
Map<String, String> findProcessorRouteMapping(String applicationName);
record ExecutionRecord( record ExecutionRecord(
String executionId, String routeId, String agentId, String applicationName, String executionId, String routeId, String agentId, String applicationName,
String status, String correlationId, String exchangeId, String status, String correlationId, String exchangeId,