refactor: remove diagramNodeId indirection, use processorId directly
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 37s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped

Agent now uses Camel processorId as RouteNode.id, eliminating the
nodeId mapping layer. Drop diagram_node_id column (V6 migration),
remove from ProcessorRecord/ProcessorNode/IngestionService/DetailService,
add /processor-routes endpoint for processorId→routeId lookup,
simplify frontend diagram-mapping and ExchangeDetail overlays,
replace N diagram fetches in AppConfigPage with single hook.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-26 22:44:07 +01:00
parent bd63a8ce95
commit 100b780b47
16 changed files with 73 additions and 92 deletions

View File

@@ -13,6 +13,7 @@ import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.agent.AgentState;
import com.cameleer3.server.core.agent.CommandReply;
import com.cameleer3.server.core.agent.CommandType;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.v3.oas.annotations.Operation;
@@ -48,15 +49,18 @@ public class ApplicationConfigController {
private final AgentRegistryService registryService;
private final ObjectMapper objectMapper;
private final AuditService auditService;
private final ExecutionStore executionStore;
public ApplicationConfigController(PostgresApplicationConfigRepository configRepository,
AgentRegistryService registryService,
ObjectMapper objectMapper,
AuditService auditService) {
AuditService auditService,
ExecutionStore executionStore) {
this.configRepository = configRepository;
this.registryService = registryService;
this.objectMapper = objectMapper;
this.auditService = auditService;
this.executionStore = executionStore;
}
@GetMapping
@@ -103,6 +107,14 @@ public class ApplicationConfigController {
return ResponseEntity.ok(saved);
}
@GetMapping("/{application}/processor-routes")
@Operation(summary = "Get processor to route mapping",
description = "Returns a map of processorId → routeId for all processors seen in this application")
@ApiResponse(responseCode = "200", description = "Mapping returned")
public ResponseEntity<Map<String, String>> getProcessorRouteMapping(@PathVariable String application) {
return ResponseEntity.ok(executionStore.findProcessorRouteMapping(application));
}
@PostMapping("/{application}/test-expression")
@Operation(summary = "Test a tap expression against sample data via a live agent")
@ApiResponse(responseCode = "200", description = "Expression evaluated successfully")

View File

@@ -9,7 +9,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Repository
@@ -70,10 +72,10 @@ public class PostgresExecutionStore implements ExecutionStore {
List<ProcessorRecord> processors) {
jdbc.batchUpdate("""
INSERT INTO processor_executions (execution_id, processor_id, processor_type,
diagram_node_id, application_name, route_id, depth, parent_processor_id,
application_name, route_id, depth, parent_processor_id,
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
input_body, output_body, input_headers, output_headers, attributes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb)
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
status = EXCLUDED.status,
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
@@ -88,7 +90,7 @@ public class PostgresExecutionStore implements ExecutionStore {
""",
processors.stream().map(p -> new Object[]{
p.executionId(), p.processorId(), p.processorType(),
p.diagramNodeId(), p.applicationName(), p.routeId(),
p.applicationName(), p.routeId(),
p.depth(), p.parentProcessorId(), p.status(),
Timestamp.from(p.startTime()),
p.endTime() != null ? Timestamp.from(p.endTime()) : null,
@@ -113,6 +115,18 @@ public class PostgresExecutionStore implements ExecutionStore {
PROCESSOR_MAPPER, executionId);
}
@Override
public Map<String, String> findProcessorRouteMapping(String applicationName) {
Map<String, String> mapping = new HashMap<>();
jdbc.query("""
SELECT DISTINCT ON (processor_id) processor_id, route_id
FROM processor_executions WHERE application_name = ? ORDER BY processor_id
""",
rs -> { mapping.put(rs.getString("processor_id"), rs.getString("route_id")); },
applicationName);
return mapping;
}
private static final RowMapper<ExecutionRecord> EXECUTION_MAPPER = (rs, rowNum) ->
new ExecutionRecord(
rs.getString("execution_id"), rs.getString("route_id"),
@@ -131,7 +145,7 @@ public class PostgresExecutionStore implements ExecutionStore {
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
new ProcessorRecord(
rs.getString("execution_id"), rs.getString("processor_id"),
rs.getString("processor_type"), rs.getString("diagram_node_id"),
rs.getString("processor_type"),
rs.getString("application_name"), rs.getString("route_id"),
rs.getInt("depth"), rs.getString("parent_processor_id"),
rs.getString("status"),

View File

@@ -0,0 +1 @@
ALTER TABLE processor_executions DROP COLUMN IF EXISTS diagram_node_id;

View File

@@ -65,7 +65,6 @@ class DetailControllerIT extends AbstractPostgresIT {
"startTime": "2026-03-10T10:00:00Z",
"endTime": "2026-03-10T10:00:01Z",
"durationMs": 1000,
"diagramNodeId": "node-root",
"inputBody": "root-input-body",
"outputBody": "root-output-body",
"inputHeaders": {"Content-Type": "application/json"},
@@ -78,7 +77,6 @@ class DetailControllerIT extends AbstractPostgresIT {
"startTime": "2026-03-10T10:00:00.100Z",
"endTime": "2026-03-10T10:00:00.200Z",
"durationMs": 100,
"diagramNodeId": "node-child1",
"inputBody": "child1-input",
"outputBody": "child1-output",
"inputHeaders": {},
@@ -91,7 +89,6 @@ class DetailControllerIT extends AbstractPostgresIT {
"startTime": "2026-03-10T10:00:00.200Z",
"endTime": "2026-03-10T10:00:00.800Z",
"durationMs": 600,
"diagramNodeId": "node-child2",
"inputBody": "child2-input",
"outputBody": "child2-output",
"inputHeaders": {},
@@ -104,7 +101,6 @@ class DetailControllerIT extends AbstractPostgresIT {
"startTime": "2026-03-10T10:00:00.300Z",
"endTime": "2026-03-10T10:00:00.700Z",
"durationMs": 400,
"diagramNodeId": "node-gc",
"inputBody": "gc-input",
"outputBody": "gc-output",
"inputHeaders": {"X-GC": "true"},

View File

@@ -39,8 +39,7 @@ class DiagramControllerIT extends AbstractPostgresIT {
"description": "Test route",
"version": 1,
"nodes": [],
"edges": [],
"processorNodeMapping": {}
"edges": []
}
""";
@@ -60,8 +59,7 @@ class DiagramControllerIT extends AbstractPostgresIT {
"description": "Flush test",
"version": 1,
"nodes": [],
"edges": [],
"processorNodeMapping": {}
"edges": []
}
""";

View File

@@ -53,8 +53,7 @@ class DiagramRenderControllerIT extends AbstractPostgresIT {
"edges": [
{"source": "n1", "target": "n2", "edgeType": "FLOW"},
{"source": "n2", "target": "n3", "edgeType": "FLOW"}
],
"processorNodeMapping": {}
]
}
""";

View File

@@ -46,8 +46,7 @@ class DiagramLinkingIT extends AbstractPostgresIT {
],
"edges": [
{"source": "n1", "target": "n2", "edgeType": "FLOW"}
],
"processorNodeMapping": {}
]
}
""";

View File

@@ -55,8 +55,7 @@ class IngestionSchemaIT extends AbstractPostgresIT {
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:00.500Z",
"durationMs": 500,
"diagramNodeId": "node-root",
"inputBody": "root-input",
"inputBody": "root-input",
"outputBody": "root-output",
"inputHeaders": {"Content-Type": "application/json"},
"outputHeaders": {"X-Result": "ok"},
@@ -68,8 +67,7 @@ class IngestionSchemaIT extends AbstractPostgresIT {
"startTime": "2026-03-11T10:00:00.100Z",
"endTime": "2026-03-11T10:00:00.400Z",
"durationMs": 300,
"diagramNodeId": "node-child",
"inputBody": "child-input",
"inputBody": "child-input",
"outputBody": "child-output",
"children": [
{
@@ -79,8 +77,7 @@ class IngestionSchemaIT extends AbstractPostgresIT {
"startTime": "2026-03-11T10:00:00.200Z",
"endTime": "2026-03-11T10:00:00.300Z",
"durationMs": 100,
"diagramNodeId": "node-grandchild",
"children": []
"children": []
}
]
}
@@ -101,7 +98,7 @@ class IngestionSchemaIT extends AbstractPostgresIT {
// Verify processors were flattened into processor_executions
List<Map<String, Object>> processors = jdbcTemplate.queryForList(
"SELECT processor_id, processor_type, depth, parent_processor_id, " +
"diagram_node_id, input_body, output_body, input_headers " +
"input_body, output_body, input_headers " +
"FROM processor_executions WHERE execution_id = 'ex-tree-1' " +
"ORDER BY depth, processor_id");
assertThat(processors).hasSize(3);
@@ -110,7 +107,6 @@ class IngestionSchemaIT extends AbstractPostgresIT {
assertThat(processors.get(0).get("processor_id")).isEqualTo("root-proc");
assertThat(((Number) processors.get(0).get("depth")).intValue()).isEqualTo(0);
assertThat(processors.get(0).get("parent_processor_id")).isNull();
assertThat(processors.get(0).get("diagram_node_id")).isEqualTo("node-root");
assertThat(processors.get(0).get("input_body")).isEqualTo("root-input");
assertThat(processors.get(0).get("output_body")).isEqualTo("root-output");
assertThat(processors.get(0).get("input_headers").toString()).contains("Content-Type");
@@ -119,7 +115,6 @@ class IngestionSchemaIT extends AbstractPostgresIT {
assertThat(processors.get(1).get("processor_id")).isEqualTo("child-proc");
assertThat(((Number) processors.get(1).get("depth")).intValue()).isEqualTo(1);
assertThat(processors.get(1).get("parent_processor_id")).isEqualTo("root-proc");
assertThat(processors.get(1).get("diagram_node_id")).isEqualTo("node-child");
assertThat(processors.get(1).get("input_body")).isEqualTo("child-input");
assertThat(processors.get(1).get("output_body")).isEqualTo("child-output");
@@ -127,7 +122,6 @@ class IngestionSchemaIT extends AbstractPostgresIT {
assertThat(processors.get(2).get("processor_id")).isEqualTo("grandchild-proc");
assertThat(((Number) processors.get(2).get("depth")).intValue()).isEqualTo(2);
assertThat(processors.get(2).get("parent_processor_id")).isEqualTo("child-proc");
assertThat(processors.get(2).get("diagram_node_id")).isEqualTo("node-grandchild");
}
@Test

View File

@@ -47,7 +47,7 @@ public class DetailService {
p.processorId(), p.processorType(), p.status(),
p.startTime(), p.endTime(),
p.durationMs() != null ? p.durationMs() : 0L,
p.diagramNodeId(), p.errorMessage(), p.errorStacktrace(),
p.errorMessage(), p.errorStacktrace(),
parseAttributes(p.attributes())
));
}

View File

@@ -19,7 +19,6 @@ public final class ProcessorNode {
private final Instant startTime;
private final Instant endTime;
private final long durationMs;
private final String diagramNodeId;
private final String errorMessage;
private final String errorStackTrace;
private final Map<String, String> attributes;
@@ -27,7 +26,7 @@ public final class ProcessorNode {
public ProcessorNode(String processorId, String processorType, String status,
Instant startTime, Instant endTime, long durationMs,
String diagramNodeId, String errorMessage, String errorStackTrace,
String errorMessage, String errorStackTrace,
Map<String, String> attributes) {
this.processorId = processorId;
this.processorType = processorType;
@@ -35,7 +34,6 @@ public final class ProcessorNode {
this.startTime = startTime;
this.endTime = endTime;
this.durationMs = durationMs;
this.diagramNodeId = diagramNodeId;
this.errorMessage = errorMessage;
this.errorStackTrace = errorStackTrace;
this.attributes = attributes;
@@ -52,7 +50,6 @@ public final class ProcessorNode {
public Instant getStartTime() { return startTime; }
public Instant getEndTime() { return endTime; }
public long getDurationMs() { return durationMs; }
public String getDiagramNodeId() { return diagramNodeId; }
public String getErrorMessage() { return errorMessage; }
public String getErrorStackTrace() { return errorStackTrace; }
public Map<String, String> getAttributes() { return attributes; }

View File

@@ -119,7 +119,7 @@ public class IngestionService {
for (ProcessorExecution p : processors) {
flat.add(new ProcessorRecord(
executionId, p.getProcessorId(), p.getProcessorType(),
p.getDiagramNodeId(), applicationName, routeId,
applicationName, routeId,
depth, parentProcessorId,
p.getStatus() != null ? p.getStatus().name() : "RUNNING",
p.getStartTime() != null ? p.getStartTime() : execStartTime,

View File

@@ -2,6 +2,7 @@ package com.cameleer3.server.core.storage;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public interface ExecutionStore {
@@ -16,6 +17,8 @@ public interface ExecutionStore {
List<ProcessorRecord> findProcessors(String executionId);
Map<String, String> findProcessorRouteMapping(String applicationName);
record ExecutionRecord(
String executionId, String routeId, String agentId, String applicationName,
String status, String correlationId, String exchangeId,
@@ -28,7 +31,7 @@ public interface ExecutionStore {
record ProcessorRecord(
String executionId, String processorId, String processorType,
String diagramNodeId, String applicationName, String routeId,
String applicationName, String routeId,
int depth, String parentProcessorId, String status,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace,

View File

@@ -83,6 +83,20 @@ export function useUpdateApplicationConfig() {
})
}
// ── Processor → Route Mapping ─────────────────────────────────────────────
export function useProcessorRouteMapping(application?: string) {
return useQuery({
queryKey: ['config', application, 'processor-routes'],
queryFn: async () => {
const res = await authFetch(`/api/v1/config/${application}/processor-routes`)
if (!res.ok) throw new Error('Failed to fetch processor-route mapping')
return res.json() as Promise<Record<string, string>>
},
enabled: !!application,
})
}
// ── Generic Group Command (kept for non-config commands) ──────────────────
interface SendGroupCommandParams {

View File

@@ -1,14 +1,12 @@
import { useState, useMemo, useEffect } from 'react';
import { useNavigate } from 'react-router';
import { useQueries } from '@tanstack/react-query';
import {
DataTable, Badge, MonoText, DetailPanel, SectionHeader, Button, Toggle, Spinner, useToast,
} from '@cameleer/design-system';
import type { Column } from '@cameleer/design-system';
import { useAllApplicationConfigs, useApplicationConfig, useUpdateApplicationConfig } from '../../api/queries/commands';
import { useAllApplicationConfigs, useApplicationConfig, useUpdateApplicationConfig, useProcessorRouteMapping } from '../../api/queries/commands';
import type { ApplicationConfig, TapDefinition } from '../../api/queries/commands';
import { useRouteCatalog } from '../../api/queries/catalog';
import { api } from '../../api/client';
import type { AppCatalogEntry, RouteSummary } from '../../api/types';
import styles from './AppConfigPage.module.css';
@@ -87,32 +85,8 @@ function AppConfigDetail({ appId, onClose }: { appId: string; onClose: () => voi
return entry?.routes ?? [];
}, [catalog, appId]);
// Fetch diagrams for all routes to build processorId → routeId mapping
const diagramQueries = useQueries({
queries: appRoutes.map((r) => ({
queryKey: ['diagrams', 'byRoute', appId, r.routeId],
queryFn: async () => {
const { data, error } = await api.GET('/diagrams', {
params: { query: { application: appId!, routeId: r.routeId } },
});
if (error) return { routeId: r.routeId, nodes: [] as Array<{ id?: string }> };
return { routeId: r.routeId, nodes: (data as any)?.nodes ?? [] };
},
enabled: !!appId,
staleTime: 60_000,
})),
});
const processorToRoute = useMemo(() => {
const map: Record<string, string> = {};
for (const q of diagramQueries) {
if (!q.data) continue;
for (const node of q.data.nodes) {
if (node.id) map[node.id] = q.data.routeId;
}
}
return map;
}, [diagramQueries.map(q => q.data)]);
// processorId → routeId mapping from backend
const { data: processorToRoute = {} } = useProcessorRouteMapping(appId);
useEffect(() => {
if (config) {

View File

@@ -172,19 +172,10 @@ export default function ExchangeDetail() {
const { routeFlows, flowIndexMap } = useMemo(() => {
let nodes: RouteNode[]
if (diagram?.nodes) {
// Flatten processors to build diagramNodeId → processorId lookup
const flatProcs: Array<{ diagramNodeId?: string; processorId?: string }> = []
function flattenProcs(list: any[]) {
for (const n of list) { flatProcs.push(n); if (n.children) flattenProcs(n.children) }
}
flattenProcs(procList)
const pidLookup = new Map(flatProcs
.filter(p => p.diagramNodeId && p.processorId)
.map(p => [p.diagramNodeId!, p.processorId!]))
// node.id is the processorId (Camel nodeId), so lookup is direct
nodes = mapDiagramToRouteNodes(diagram.nodes, procList).map((node, i) => ({
...node,
badges: badgesFor(pidLookup.get(diagram.nodes[i]?.id ?? '') ?? diagram.nodes[i]?.id ?? ''),
badges: badgesFor(diagram.nodes[i]?.id ?? ''),
}))
} else {
// Fallback: build from processor list
@@ -211,22 +202,11 @@ export default function ExchangeDetail() {
return ids
}, [procList])
// ProcessorId lookup: diagram node index → processorId
// ProcessorId lookup: diagram node index → processorId (node.id IS processorId)
const flowProcessorIds: string[] = useMemo(() => {
if (!diagram?.nodes) return processorIds
const flatProcs: Array<{ diagramNodeId?: string; processorId?: string }> = []
function flatten(nodes: any[]) {
for (const n of nodes) {
flatProcs.push(n)
if (n.children) flatten(n.children)
}
}
flatten(procList)
const lookup = new Map(flatProcs
.filter(p => p.diagramNodeId && p.processorId)
.map(p => [p.diagramNodeId!, p.processorId!]))
return diagram.nodes.map(node => lookup.get(node.id ?? '') ?? node.id ?? '')
}, [diagram, procList, processorIds])
return diagram.nodes.map(node => node.id ?? '')
}, [diagram, processorIds])
// Map flow display index → processor tree index (for snapshot API)
const flowToTreeIndex = useMemo(() =>

View File

@@ -20,11 +20,11 @@ function mapStatus(status: string | undefined): RouteNode['status'] {
/**
* Maps diagram PositionedNodes + execution ProcessorNodes to RouteFlow RouteNode[] format.
* Joins on diagramNodeId → node.id.
* Joins on processorId → node.id (node IDs are Camel processor IDs).
*/
export function mapDiagramToRouteNodes(
diagramNodes: Array<{ id?: string; label?: string; type?: string }>,
processors: Array<{ diagramNodeId?: string; processorId?: string; status?: string; durationMs?: number; children?: any[] }>
processors: Array<{ processorId?: string; status?: string; durationMs?: number; children?: any[] }>
): RouteNode[] {
// Flatten processor tree
const flatProcessors: typeof processors = [];
@@ -36,10 +36,10 @@ export function mapDiagramToRouteNodes(
}
flatten(processors || []);
// Build lookup: diagramNodeId → processor
// Build lookup: processorId → processor
const procMap = new Map<string, (typeof flatProcessors)[0]>();
for (const p of flatProcessors) {
if (p.diagramNodeId) procMap.set(p.diagramNodeId, p);
if (p.processorId) procMap.set(p.processorId, p);
}
return diagramNodes.map(node => {