feat(#119): accept route states in heartbeat and state-change events
Replace ACK-based route state inference with agent-reported state. Heartbeats now carry optional routeStates map, and ROUTE_STATE_CHANGED events update the registry immediately. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -18,7 +18,6 @@ import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||
import com.cameleer3.server.core.agent.AgentState;
|
||||
import com.cameleer3.server.core.agent.CommandReply;
|
||||
import com.cameleer3.server.core.agent.CommandType;
|
||||
import com.cameleer3.server.core.agent.RouteStateRegistry;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
@@ -68,20 +67,17 @@ public class AgentCommandController {
|
||||
private final ObjectMapper objectMapper;
|
||||
private final AgentEventService agentEventService;
|
||||
private final AuditService auditService;
|
||||
private final RouteStateRegistry routeStateRegistry;
|
||||
|
||||
public AgentCommandController(AgentRegistryService registryService,
|
||||
SseConnectionManager connectionManager,
|
||||
ObjectMapper objectMapper,
|
||||
AgentEventService agentEventService,
|
||||
AuditService auditService,
|
||||
RouteStateRegistry routeStateRegistry) {
|
||||
AuditService auditService) {
|
||||
this.registryService = registryService;
|
||||
this.connectionManager = connectionManager;
|
||||
this.objectMapper = objectMapper;
|
||||
this.agentEventService = agentEventService;
|
||||
this.auditService = auditService;
|
||||
this.routeStateRegistry = routeStateRegistry;
|
||||
}
|
||||
|
||||
@PostMapping("/{id}/commands")
|
||||
@@ -161,29 +157,6 @@ public class AgentCommandController {
|
||||
boolean allSuccess = timedOut.isEmpty() &&
|
||||
responses.stream().allMatch(r -> "SUCCESS".equals(r.status()));
|
||||
|
||||
// Update route state when all agents successfully ACK a route-control command
|
||||
if (allSuccess && type == CommandType.ROUTE_CONTROL) {
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
var payloadMap = objectMapper.readValue(payloadJson, java.util.Map.class);
|
||||
String routeId = (String) payloadMap.get("routeId");
|
||||
String action = (String) payloadMap.get("action");
|
||||
if (routeId != null && action != null) {
|
||||
RouteStateRegistry.RouteState newState = switch (action) {
|
||||
case "start", "resume" -> RouteStateRegistry.RouteState.STARTED;
|
||||
case "stop" -> RouteStateRegistry.RouteState.STOPPED;
|
||||
case "suspend" -> RouteStateRegistry.RouteState.SUSPENDED;
|
||||
default -> null;
|
||||
};
|
||||
if (newState != null) {
|
||||
routeStateRegistry.setState(group, routeId, newState);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to parse route-control payload for state tracking", e);
|
||||
}
|
||||
}
|
||||
|
||||
auditService.log("broadcast_group_command", AuditCategory.AGENT, group,
|
||||
java.util.Map.of("type", request.type(), "agentCount", futures.size(),
|
||||
"responded", responses.size(), "timedOut", timedOut.size()),
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.cameleer3.server.app.dto.AgentRefreshResponse;
|
||||
import com.cameleer3.server.app.dto.AgentRegistrationRequest;
|
||||
import com.cameleer3.server.app.dto.AgentRegistrationResponse;
|
||||
import com.cameleer3.server.app.dto.ErrorResponse;
|
||||
import com.cameleer3.server.app.dto.HeartbeatRequest;
|
||||
import com.cameleer3.server.app.security.BootstrapTokenValidator;
|
||||
import com.cameleer3.server.core.admin.AuditCategory;
|
||||
import com.cameleer3.server.core.admin.AuditResult;
|
||||
@@ -15,6 +16,7 @@ import com.cameleer3.server.core.agent.AgentEventService;
|
||||
import com.cameleer3.server.core.agent.AgentInfo;
|
||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||
import com.cameleer3.server.core.agent.AgentState;
|
||||
import com.cameleer3.server.core.agent.RouteStateRegistry;
|
||||
import com.cameleer3.server.core.security.Ed25519SigningService;
|
||||
import com.cameleer3.server.core.security.InvalidTokenException;
|
||||
import com.cameleer3.server.core.security.JwtService;
|
||||
@@ -63,6 +65,7 @@ public class AgentRegistrationController {
|
||||
private final AgentEventService agentEventService;
|
||||
private final AuditService auditService;
|
||||
private final JdbcTemplate jdbc;
|
||||
private final RouteStateRegistry routeStateRegistry;
|
||||
|
||||
public AgentRegistrationController(AgentRegistryService registryService,
|
||||
AgentRegistryConfig config,
|
||||
@@ -71,7 +74,8 @@ public class AgentRegistrationController {
|
||||
Ed25519SigningService ed25519SigningService,
|
||||
AgentEventService agentEventService,
|
||||
AuditService auditService,
|
||||
@org.springframework.beans.factory.annotation.Qualifier("clickHouseJdbcTemplate") JdbcTemplate jdbc) {
|
||||
@org.springframework.beans.factory.annotation.Qualifier("clickHouseJdbcTemplate") JdbcTemplate jdbc,
|
||||
RouteStateRegistry routeStateRegistry) {
|
||||
this.registryService = registryService;
|
||||
this.config = config;
|
||||
this.bootstrapTokenValidator = bootstrapTokenValidator;
|
||||
@@ -80,6 +84,7 @@ public class AgentRegistrationController {
|
||||
this.agentEventService = agentEventService;
|
||||
this.auditService = auditService;
|
||||
this.jdbc = jdbc;
|
||||
this.routeStateRegistry = routeStateRegistry;
|
||||
}
|
||||
|
||||
@PostMapping("/register")
|
||||
@@ -191,14 +196,38 @@ public class AgentRegistrationController {
|
||||
description = "Updates the agent's last heartbeat timestamp")
|
||||
@ApiResponse(responseCode = "200", description = "Heartbeat accepted")
|
||||
@ApiResponse(responseCode = "404", description = "Agent not registered")
|
||||
public ResponseEntity<Void> heartbeat(@PathVariable String id) {
|
||||
public ResponseEntity<Void> heartbeat(@PathVariable String id,
|
||||
@RequestBody(required = false) HeartbeatRequest request) {
|
||||
boolean found = registryService.heartbeat(id);
|
||||
if (!found) {
|
||||
return ResponseEntity.notFound().build();
|
||||
}
|
||||
|
||||
if (request != null && request.routeStates() != null && !request.routeStates().isEmpty()) {
|
||||
AgentInfo agent = registryService.findById(id);
|
||||
if (agent != null) {
|
||||
for (var entry : request.routeStates().entrySet()) {
|
||||
RouteStateRegistry.RouteState state = parseRouteState(entry.getValue());
|
||||
if (state != null) {
|
||||
routeStateRegistry.setState(agent.applicationId(), entry.getKey(), state);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
|
||||
private RouteStateRegistry.RouteState parseRouteState(String state) {
|
||||
if (state == null) return null;
|
||||
return switch (state) {
|
||||
case "Started" -> RouteStateRegistry.RouteState.STARTED;
|
||||
case "Stopped" -> RouteStateRegistry.RouteState.STOPPED;
|
||||
case "Suspended" -> RouteStateRegistry.RouteState.SUSPENDED;
|
||||
default -> null;
|
||||
};
|
||||
}
|
||||
|
||||
@PostMapping("/{id}/deregister")
|
||||
@Operation(summary = "Deregister agent",
|
||||
description = "Removes the agent from the registry. Called by agents during graceful shutdown.")
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.cameleer3.common.model.AgentEvent;
|
||||
import com.cameleer3.server.core.agent.AgentEventService;
|
||||
import com.cameleer3.server.core.agent.AgentInfo;
|
||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||
import com.cameleer3.server.core.agent.RouteStateRegistry;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
@@ -19,6 +20,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Ingestion endpoint for agent lifecycle events.
|
||||
@@ -37,13 +39,16 @@ public class EventIngestionController {
|
||||
private final AgentEventService agentEventService;
|
||||
private final AgentRegistryService registryService;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final RouteStateRegistry routeStateRegistry;
|
||||
|
||||
public EventIngestionController(AgentEventService agentEventService,
|
||||
AgentRegistryService registryService,
|
||||
ObjectMapper objectMapper) {
|
||||
ObjectMapper objectMapper,
|
||||
RouteStateRegistry routeStateRegistry) {
|
||||
this.agentEventService = agentEventService;
|
||||
this.registryService = registryService;
|
||||
this.objectMapper = objectMapper;
|
||||
this.routeStateRegistry = routeStateRegistry;
|
||||
}
|
||||
|
||||
@PostMapping("/events")
|
||||
@@ -76,11 +81,37 @@ public class EventIngestionController {
|
||||
log.info("Agent {} reported graceful shutdown", instanceId);
|
||||
registryService.shutdown(instanceId);
|
||||
}
|
||||
|
||||
if ("ROUTE_STATE_CHANGED".equals(event.getEventType())) {
|
||||
Map<String, String> details = event.getDetails();
|
||||
if (details != null) {
|
||||
String routeId = details.get("routeId");
|
||||
String newState = details.get("newState");
|
||||
if (routeId != null && newState != null) {
|
||||
RouteStateRegistry.RouteState state = parseRouteState(newState);
|
||||
if (state != null) {
|
||||
routeStateRegistry.setState(applicationId, routeId, state);
|
||||
log.debug("Route state changed: {}/{} -> {} (reason: {})",
|
||||
applicationId, routeId, newState, details.get("reason"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ResponseEntity.accepted().build();
|
||||
}
|
||||
|
||||
private RouteStateRegistry.RouteState parseRouteState(String state) {
|
||||
if (state == null) return null;
|
||||
return switch (state) {
|
||||
case "Started" -> RouteStateRegistry.RouteState.STARTED;
|
||||
case "Stopped" -> RouteStateRegistry.RouteState.STOPPED;
|
||||
case "Suspended" -> RouteStateRegistry.RouteState.SUSPENDED;
|
||||
default -> null;
|
||||
};
|
||||
}
|
||||
|
||||
private String extractInstanceId() {
|
||||
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
|
||||
return auth != null ? auth.getName() : "";
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.cameleer3.server.app.dto;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public record HeartbeatRequest(
|
||||
Map<String, String> routeStates
|
||||
) {}
|
||||
@@ -6,7 +6,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* In-memory registry tracking the operational state of routes.
|
||||
* State is inferred from successful route-control command ACKs.
|
||||
* State is updated from agent heartbeats (routeStates map) and
|
||||
* ROUTE_STATE_CHANGED lifecycle events.
|
||||
* On server restart, all states reset to STARTED (default Camel behavior).
|
||||
*/
|
||||
public class RouteStateRegistry {
|
||||
|
||||
Reference in New Issue
Block a user