feat(#119): add RouteStateRegistry for tracking route operational state
In-memory registry that infers route state (started/stopped/suspended) from successful route-control command ACKs. Updates state only when all agents in a group confirm success. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3,11 +3,13 @@ package com.cameleer3.server.app.config;
|
|||||||
import com.cameleer3.server.core.agent.AgentEventRepository;
|
import com.cameleer3.server.core.agent.AgentEventRepository;
|
||||||
import com.cameleer3.server.core.agent.AgentEventService;
|
import com.cameleer3.server.core.agent.AgentEventService;
|
||||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||||
|
import com.cameleer3.server.core.agent.RouteStateRegistry;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates the {@link AgentRegistryService} and {@link AgentEventService} beans.
|
* Creates the {@link AgentRegistryService}, {@link AgentEventService},
|
||||||
|
* and {@link RouteStateRegistry} beans.
|
||||||
* <p>
|
* <p>
|
||||||
* Follows the established pattern: core module plain class, app module bean config.
|
* Follows the established pattern: core module plain class, app module bean config.
|
||||||
*/
|
*/
|
||||||
@@ -27,4 +29,9 @@ public class AgentRegistryBeanConfig {
|
|||||||
public AgentEventService agentEventService(AgentEventRepository repository) {
|
public AgentEventService agentEventService(AgentEventRepository repository) {
|
||||||
return new AgentEventService(repository);
|
return new AgentEventService(repository);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RouteStateRegistry routeStateRegistry() {
|
||||||
|
return new RouteStateRegistry();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import com.cameleer3.server.core.agent.AgentRegistryService;
|
|||||||
import com.cameleer3.server.core.agent.AgentState;
|
import com.cameleer3.server.core.agent.AgentState;
|
||||||
import com.cameleer3.server.core.agent.CommandReply;
|
import com.cameleer3.server.core.agent.CommandReply;
|
||||||
import com.cameleer3.server.core.agent.CommandType;
|
import com.cameleer3.server.core.agent.CommandType;
|
||||||
|
import com.cameleer3.server.core.agent.RouteStateRegistry;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import jakarta.servlet.http.HttpServletRequest;
|
import jakarta.servlet.http.HttpServletRequest;
|
||||||
@@ -67,17 +68,20 @@ public class AgentCommandController {
|
|||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
private final AgentEventService agentEventService;
|
private final AgentEventService agentEventService;
|
||||||
private final AuditService auditService;
|
private final AuditService auditService;
|
||||||
|
private final RouteStateRegistry routeStateRegistry;
|
||||||
|
|
||||||
public AgentCommandController(AgentRegistryService registryService,
|
public AgentCommandController(AgentRegistryService registryService,
|
||||||
SseConnectionManager connectionManager,
|
SseConnectionManager connectionManager,
|
||||||
ObjectMapper objectMapper,
|
ObjectMapper objectMapper,
|
||||||
AgentEventService agentEventService,
|
AgentEventService agentEventService,
|
||||||
AuditService auditService) {
|
AuditService auditService,
|
||||||
|
RouteStateRegistry routeStateRegistry) {
|
||||||
this.registryService = registryService;
|
this.registryService = registryService;
|
||||||
this.connectionManager = connectionManager;
|
this.connectionManager = connectionManager;
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
this.agentEventService = agentEventService;
|
this.agentEventService = agentEventService;
|
||||||
this.auditService = auditService;
|
this.auditService = auditService;
|
||||||
|
this.routeStateRegistry = routeStateRegistry;
|
||||||
}
|
}
|
||||||
|
|
||||||
@PostMapping("/{id}/commands")
|
@PostMapping("/{id}/commands")
|
||||||
@@ -157,6 +161,29 @@ public class AgentCommandController {
|
|||||||
boolean allSuccess = timedOut.isEmpty() &&
|
boolean allSuccess = timedOut.isEmpty() &&
|
||||||
responses.stream().allMatch(r -> "SUCCESS".equals(r.status()));
|
responses.stream().allMatch(r -> "SUCCESS".equals(r.status()));
|
||||||
|
|
||||||
|
// Update route state when all agents successfully ACK a route-control command
|
||||||
|
if (allSuccess && type == CommandType.ROUTE_CONTROL) {
|
||||||
|
try {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
var payloadMap = objectMapper.readValue(payloadJson, java.util.Map.class);
|
||||||
|
String routeId = (String) payloadMap.get("routeId");
|
||||||
|
String action = (String) payloadMap.get("action");
|
||||||
|
if (routeId != null && action != null) {
|
||||||
|
RouteStateRegistry.RouteState newState = switch (action) {
|
||||||
|
case "start", "resume" -> RouteStateRegistry.RouteState.STARTED;
|
||||||
|
case "stop" -> RouteStateRegistry.RouteState.STOPPED;
|
||||||
|
case "suspend" -> RouteStateRegistry.RouteState.SUSPENDED;
|
||||||
|
default -> null;
|
||||||
|
};
|
||||||
|
if (newState != null) {
|
||||||
|
routeStateRegistry.setState(group, routeId, newState);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Failed to parse route-control payload for state tracking", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
auditService.log("broadcast_group_command", AuditCategory.AGENT, group,
|
auditService.log("broadcast_group_command", AuditCategory.AGENT, group,
|
||||||
java.util.Map.of("type", request.type(), "agentCount", futures.size(),
|
java.util.Map.of("type", request.type(), "agentCount", futures.size(),
|
||||||
"responded", responses.size(), "timedOut", timedOut.size()),
|
"responded", responses.size(), "timedOut", timedOut.size()),
|
||||||
|
|||||||
@@ -0,0 +1,36 @@
|
|||||||
|
package com.cameleer3.server.core.agent;
|
||||||
|
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* In-memory registry tracking the operational state of routes.
|
||||||
|
* State is inferred from successful route-control command ACKs.
|
||||||
|
* On server restart, all states reset to STARTED (default Camel behavior).
|
||||||
|
*/
|
||||||
|
public class RouteStateRegistry {
|
||||||
|
|
||||||
|
public enum RouteState { STARTED, STOPPED, SUSPENDED }
|
||||||
|
|
||||||
|
// Key: "applicationId:routeId"
|
||||||
|
private final ConcurrentHashMap<String, RouteState> states = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public void setState(String applicationId, String routeId, RouteState state) {
|
||||||
|
states.put(applicationId + ":" + routeId, state);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RouteState getState(String applicationId, String routeId) {
|
||||||
|
return states.getOrDefault(applicationId + ":" + routeId, RouteState.STARTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, RouteState> getStatesForApplication(String applicationId) {
|
||||||
|
Map<String, RouteState> result = new LinkedHashMap<>();
|
||||||
|
states.forEach((key, state) -> {
|
||||||
|
if (key.startsWith(applicationId + ":")) {
|
||||||
|
result.put(key.substring(applicationId.length() + 1), state);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user