From 0acceaf1a9780c1d134ee7d1b0f6d964b09b6d4b Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Thu, 2 Apr 2026 19:15:35 +0200 Subject: [PATCH] feat(#119): add RouteStateRegistry for tracking route operational state In-memory registry that infers route state (started/stopped/suspended) from successful route-control command ACKs. Updates state only when all agents in a group confirm success. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/config/AgentRegistryBeanConfig.java | 9 ++++- .../controller/AgentCommandController.java | 29 ++++++++++++++- .../server/core/agent/RouteStateRegistry.java | 36 +++++++++++++++++++ 3 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/RouteStateRegistry.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryBeanConfig.java index f2732907..1b46e6a1 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryBeanConfig.java @@ -3,11 +3,13 @@ package com.cameleer3.server.app.config; import com.cameleer3.server.core.agent.AgentEventRepository; import com.cameleer3.server.core.agent.AgentEventService; import com.cameleer3.server.core.agent.AgentRegistryService; +import com.cameleer3.server.core.agent.RouteStateRegistry; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** - * Creates the {@link AgentRegistryService} and {@link AgentEventService} beans. + * Creates the {@link AgentRegistryService}, {@link AgentEventService}, + * and {@link RouteStateRegistry} beans. *

* Follows the established pattern: core module plain class, app module bean config. */ @@ -27,4 +29,9 @@ public class AgentRegistryBeanConfig { public AgentEventService agentEventService(AgentEventRepository repository) { return new AgentEventService(repository); } + + @Bean + public RouteStateRegistry routeStateRegistry() { + return new RouteStateRegistry(); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java index 46a2ea7b..0d827143 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java @@ -18,6 +18,7 @@ import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.agent.AgentState; import com.cameleer3.server.core.agent.CommandReply; import com.cameleer3.server.core.agent.CommandType; +import com.cameleer3.server.core.agent.RouteStateRegistry; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.servlet.http.HttpServletRequest; @@ -67,17 +68,20 @@ public class AgentCommandController { private final ObjectMapper objectMapper; private final AgentEventService agentEventService; private final AuditService auditService; + private final RouteStateRegistry routeStateRegistry; public AgentCommandController(AgentRegistryService registryService, SseConnectionManager connectionManager, ObjectMapper objectMapper, AgentEventService agentEventService, - AuditService auditService) { + AuditService auditService, + RouteStateRegistry routeStateRegistry) { this.registryService = registryService; this.connectionManager = connectionManager; this.objectMapper = objectMapper; this.agentEventService = agentEventService; this.auditService = auditService; + this.routeStateRegistry = routeStateRegistry; } @PostMapping("/{id}/commands") @@ -157,6 +161,29 @@ public class AgentCommandController { boolean allSuccess = timedOut.isEmpty() && responses.stream().allMatch(r -> "SUCCESS".equals(r.status())); + // Update route state when all agents successfully ACK a route-control command + if (allSuccess && type == CommandType.ROUTE_CONTROL) { + try { + @SuppressWarnings("unchecked") + var payloadMap = objectMapper.readValue(payloadJson, java.util.Map.class); + String routeId = (String) payloadMap.get("routeId"); + String action = (String) payloadMap.get("action"); + if (routeId != null && action != null) { + RouteStateRegistry.RouteState newState = switch (action) { + case "start", "resume" -> RouteStateRegistry.RouteState.STARTED; + case "stop" -> RouteStateRegistry.RouteState.STOPPED; + case "suspend" -> RouteStateRegistry.RouteState.SUSPENDED; + default -> null; + }; + if (newState != null) { + routeStateRegistry.setState(group, routeId, newState); + } + } + } catch (Exception e) { + log.warn("Failed to parse route-control payload for state tracking", e); + } + } + auditService.log("broadcast_group_command", AuditCategory.AGENT, group, java.util.Map.of("type", request.type(), "agentCount", futures.size(), "responded", responses.size(), "timedOut", timedOut.size()), diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/RouteStateRegistry.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/RouteStateRegistry.java new file mode 100644 index 00000000..76577708 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/RouteStateRegistry.java @@ -0,0 +1,36 @@ +package com.cameleer3.server.core.agent; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * In-memory registry tracking the operational state of routes. + * State is inferred from successful route-control command ACKs. + * On server restart, all states reset to STARTED (default Camel behavior). + */ +public class RouteStateRegistry { + + public enum RouteState { STARTED, STOPPED, SUSPENDED } + + // Key: "applicationId:routeId" + private final ConcurrentHashMap states = new ConcurrentHashMap<>(); + + public void setState(String applicationId, String routeId, RouteState state) { + states.put(applicationId + ":" + routeId, state); + } + + public RouteState getState(String applicationId, String routeId) { + return states.getOrDefault(applicationId + ":" + routeId, RouteState.STARTED); + } + + public Map getStatesForApplication(String applicationId) { + Map result = new LinkedHashMap<>(); + states.forEach((key, state) -> { + if (key.startsWith(applicationId + ":")) { + result.put(key.substring(applicationId.length() + 1), state); + } + }); + return result; + } +}