diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java index 0d827143..46a2ea7b 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java @@ -18,7 +18,6 @@ import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.agent.AgentState; import com.cameleer3.server.core.agent.CommandReply; import com.cameleer3.server.core.agent.CommandType; -import com.cameleer3.server.core.agent.RouteStateRegistry; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.servlet.http.HttpServletRequest; @@ -68,20 +67,17 @@ public class AgentCommandController { private final ObjectMapper objectMapper; private final AgentEventService agentEventService; private final AuditService auditService; - private final RouteStateRegistry routeStateRegistry; public AgentCommandController(AgentRegistryService registryService, SseConnectionManager connectionManager, ObjectMapper objectMapper, AgentEventService agentEventService, - AuditService auditService, - RouteStateRegistry routeStateRegistry) { + AuditService auditService) { this.registryService = registryService; this.connectionManager = connectionManager; this.objectMapper = objectMapper; this.agentEventService = agentEventService; this.auditService = auditService; - this.routeStateRegistry = routeStateRegistry; } @PostMapping("/{id}/commands") @@ -161,29 +157,6 @@ public class AgentCommandController { boolean allSuccess = timedOut.isEmpty() && responses.stream().allMatch(r -> "SUCCESS".equals(r.status())); - // Update route state when all agents successfully ACK a route-control command - if (allSuccess && type == CommandType.ROUTE_CONTROL) { - try { - @SuppressWarnings("unchecked") - var payloadMap = objectMapper.readValue(payloadJson, java.util.Map.class); - String routeId = (String) payloadMap.get("routeId"); - String action = (String) payloadMap.get("action"); - if (routeId != null && action != null) { - RouteStateRegistry.RouteState newState = switch (action) { - case "start", "resume" -> RouteStateRegistry.RouteState.STARTED; - case "stop" -> RouteStateRegistry.RouteState.STOPPED; - case "suspend" -> RouteStateRegistry.RouteState.SUSPENDED; - default -> null; - }; - if (newState != null) { - routeStateRegistry.setState(group, routeId, newState); - } - } - } catch (Exception e) { - log.warn("Failed to parse route-control payload for state tracking", e); - } - } - auditService.log("broadcast_group_command", AuditCategory.AGENT, group, java.util.Map.of("type", request.type(), "agentCount", futures.size(), "responded", responses.size(), "timedOut", timedOut.size()), diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java index 32282dad..01191393 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java @@ -7,6 +7,7 @@ import com.cameleer3.server.app.dto.AgentRefreshResponse; import com.cameleer3.server.app.dto.AgentRegistrationRequest; import com.cameleer3.server.app.dto.AgentRegistrationResponse; import com.cameleer3.server.app.dto.ErrorResponse; +import com.cameleer3.server.app.dto.HeartbeatRequest; import com.cameleer3.server.app.security.BootstrapTokenValidator; import com.cameleer3.server.core.admin.AuditCategory; import com.cameleer3.server.core.admin.AuditResult; @@ -15,6 +16,7 @@ import com.cameleer3.server.core.agent.AgentEventService; import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.agent.AgentState; +import com.cameleer3.server.core.agent.RouteStateRegistry; import com.cameleer3.server.core.security.Ed25519SigningService; import com.cameleer3.server.core.security.InvalidTokenException; import com.cameleer3.server.core.security.JwtService; @@ -63,6 +65,7 @@ public class AgentRegistrationController { private final AgentEventService agentEventService; private final AuditService auditService; private final JdbcTemplate jdbc; + private final RouteStateRegistry routeStateRegistry; public AgentRegistrationController(AgentRegistryService registryService, AgentRegistryConfig config, @@ -71,7 +74,8 @@ public class AgentRegistrationController { Ed25519SigningService ed25519SigningService, AgentEventService agentEventService, AuditService auditService, - @org.springframework.beans.factory.annotation.Qualifier("clickHouseJdbcTemplate") JdbcTemplate jdbc) { + @org.springframework.beans.factory.annotation.Qualifier("clickHouseJdbcTemplate") JdbcTemplate jdbc, + RouteStateRegistry routeStateRegistry) { this.registryService = registryService; this.config = config; this.bootstrapTokenValidator = bootstrapTokenValidator; @@ -80,6 +84,7 @@ public class AgentRegistrationController { this.agentEventService = agentEventService; this.auditService = auditService; this.jdbc = jdbc; + this.routeStateRegistry = routeStateRegistry; } @PostMapping("/register") @@ -191,14 +196,38 @@ public class AgentRegistrationController { description = "Updates the agent's last heartbeat timestamp") @ApiResponse(responseCode = "200", description = "Heartbeat accepted") @ApiResponse(responseCode = "404", description = "Agent not registered") - public ResponseEntity heartbeat(@PathVariable String id) { + public ResponseEntity heartbeat(@PathVariable String id, + @RequestBody(required = false) HeartbeatRequest request) { boolean found = registryService.heartbeat(id); if (!found) { return ResponseEntity.notFound().build(); } + + if (request != null && request.routeStates() != null && !request.routeStates().isEmpty()) { + AgentInfo agent = registryService.findById(id); + if (agent != null) { + for (var entry : request.routeStates().entrySet()) { + RouteStateRegistry.RouteState state = parseRouteState(entry.getValue()); + if (state != null) { + routeStateRegistry.setState(agent.applicationId(), entry.getKey(), state); + } + } + } + } + return ResponseEntity.ok().build(); } + private RouteStateRegistry.RouteState parseRouteState(String state) { + if (state == null) return null; + return switch (state) { + case "Started" -> RouteStateRegistry.RouteState.STARTED; + case "Stopped" -> RouteStateRegistry.RouteState.STOPPED; + case "Suspended" -> RouteStateRegistry.RouteState.SUSPENDED; + default -> null; + }; + } + @PostMapping("/{id}/deregister") @Operation(summary = "Deregister agent", description = "Removes the agent from the registry. Called by agents during graceful shutdown.") diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/EventIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/EventIngestionController.java index 3e4dcf7a..9a620ade 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/EventIngestionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/EventIngestionController.java @@ -4,6 +4,7 @@ import com.cameleer3.common.model.AgentEvent; import com.cameleer3.server.core.agent.AgentEventService; import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; +import com.cameleer3.server.core.agent.RouteStateRegistry; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.swagger.v3.oas.annotations.Operation; @@ -19,6 +20,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; +import java.util.Map; /** * Ingestion endpoint for agent lifecycle events. @@ -37,13 +39,16 @@ public class EventIngestionController { private final AgentEventService agentEventService; private final AgentRegistryService registryService; private final ObjectMapper objectMapper; + private final RouteStateRegistry routeStateRegistry; public EventIngestionController(AgentEventService agentEventService, AgentRegistryService registryService, - ObjectMapper objectMapper) { + ObjectMapper objectMapper, + RouteStateRegistry routeStateRegistry) { this.agentEventService = agentEventService; this.registryService = registryService; this.objectMapper = objectMapper; + this.routeStateRegistry = routeStateRegistry; } @PostMapping("/events") @@ -76,11 +81,37 @@ public class EventIngestionController { log.info("Agent {} reported graceful shutdown", instanceId); registryService.shutdown(instanceId); } + + if ("ROUTE_STATE_CHANGED".equals(event.getEventType())) { + Map details = event.getDetails(); + if (details != null) { + String routeId = details.get("routeId"); + String newState = details.get("newState"); + if (routeId != null && newState != null) { + RouteStateRegistry.RouteState state = parseRouteState(newState); + if (state != null) { + routeStateRegistry.setState(applicationId, routeId, state); + log.debug("Route state changed: {}/{} -> {} (reason: {})", + applicationId, routeId, newState, details.get("reason")); + } + } + } + } } return ResponseEntity.accepted().build(); } + private RouteStateRegistry.RouteState parseRouteState(String state) { + if (state == null) return null; + return switch (state) { + case "Started" -> RouteStateRegistry.RouteState.STARTED; + case "Stopped" -> RouteStateRegistry.RouteState.STOPPED; + case "Suspended" -> RouteStateRegistry.RouteState.SUSPENDED; + default -> null; + }; + } + private String extractInstanceId() { Authentication auth = SecurityContextHolder.getContext().getAuthentication(); return auth != null ? auth.getName() : ""; diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/HeartbeatRequest.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/HeartbeatRequest.java new file mode 100644 index 00000000..ddf0b93e --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/HeartbeatRequest.java @@ -0,0 +1,7 @@ +package com.cameleer3.server.app.dto; + +import java.util.Map; + +public record HeartbeatRequest( + Map routeStates +) {} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/RouteStateRegistry.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/RouteStateRegistry.java index 76577708..d2794bdc 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/RouteStateRegistry.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/RouteStateRegistry.java @@ -6,7 +6,8 @@ import java.util.concurrent.ConcurrentHashMap; /** * In-memory registry tracking the operational state of routes. - * State is inferred from successful route-control command ACKs. + * State is updated from agent heartbeats (routeStates map) and + * ROUTE_STATE_CHANGED lifecycle events. * On server restart, all states reset to STARTED (default Camel behavior). */ public class RouteStateRegistry {