diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java index 24db1795..46a2ea7b 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java @@ -3,6 +3,7 @@ package com.cameleer3.server.app.controller; import com.cameleer3.server.app.agent.SseConnectionManager; import com.cameleer3.server.app.dto.CommandAckRequest; import com.cameleer3.server.app.dto.CommandBroadcastResponse; +import com.cameleer3.server.app.dto.CommandGroupResponse; import com.cameleer3.server.app.dto.CommandRequest; import com.cameleer3.server.app.dto.CommandSingleResponse; import com.cameleer3.server.app.dto.ReplayRequest; @@ -109,32 +110,60 @@ public class AgentCommandController { @PostMapping("/groups/{group}/commands") @Operation(summary = "Send command to all agents in a group", - description = "Sends a command to all LIVE agents in the specified group") - @ApiResponse(responseCode = "202", description = "Commands accepted") + description = "Sends a command to all LIVE agents in the specified group and waits for responses") + @ApiResponse(responseCode = "200", description = "Commands dispatched and responses collected") @ApiResponse(responseCode = "400", description = "Invalid command payload") - public ResponseEntity sendGroupCommand(@PathVariable String group, - @RequestBody CommandRequest request, - HttpServletRequest httpRequest) throws JsonProcessingException { + public ResponseEntity sendGroupCommand(@PathVariable String group, + @RequestBody CommandRequest request, + HttpServletRequest httpRequest) throws JsonProcessingException { CommandType type = mapCommandType(request.type()); String payloadJson = request.payload() != null ? objectMapper.writeValueAsString(request.payload()) : "{}"; - List agents = registryService.findAll().stream() - .filter(a -> a.state() == AgentState.LIVE) - .filter(a -> group.equals(a.applicationId())) - .toList(); + Map> futures = + registryService.addGroupCommandWithReplies(group, type, payloadJson); - List commandIds = new ArrayList<>(); - for (AgentInfo agent : agents) { - AgentCommand command = registryService.addCommand(agent.instanceId(), type, payloadJson); - commandIds.add(command.id()); + if (futures.isEmpty()) { + auditService.log("broadcast_group_command", AuditCategory.AGENT, group, + java.util.Map.of("type", request.type(), "agentCount", 0), + AuditResult.SUCCESS, httpRequest); + return ResponseEntity.ok(new CommandGroupResponse(true, 0, 0, List.of(), List.of())); } + // Wait with shared 10-second deadline + long deadline = System.currentTimeMillis() + 10_000; + List responses = new ArrayList<>(); + List timedOut = new ArrayList<>(); + + for (var entry : futures.entrySet()) { + long remaining = deadline - System.currentTimeMillis(); + if (remaining <= 0) { + timedOut.add(entry.getKey()); + entry.getValue().cancel(false); + continue; + } + try { + CommandReply reply = entry.getValue().get(remaining, TimeUnit.MILLISECONDS); + responses.add(new CommandGroupResponse.AgentResponse( + entry.getKey(), reply.status(), reply.message())); + } catch (TimeoutException e) { + timedOut.add(entry.getKey()); + entry.getValue().cancel(false); + } catch (Exception e) { + responses.add(new CommandGroupResponse.AgentResponse( + entry.getKey(), "ERROR", e.getMessage())); + } + } + + boolean allSuccess = timedOut.isEmpty() && + responses.stream().allMatch(r -> "SUCCESS".equals(r.status())); + auditService.log("broadcast_group_command", AuditCategory.AGENT, group, - java.util.Map.of("type", request.type(), "agentCount", agents.size()), + java.util.Map.of("type", request.type(), "agentCount", futures.size(), + "responded", responses.size(), "timedOut", timedOut.size()), AuditResult.SUCCESS, httpRequest); - return ResponseEntity.status(HttpStatus.ACCEPTED) - .body(new CommandBroadcastResponse(commandIds, agents.size())); + return ResponseEntity.ok(new CommandGroupResponse( + allSuccess, futures.size(), responses.size(), responses, timedOut)); } @PostMapping("/commands") diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ApplicationConfigController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ApplicationConfigController.java index 7ac0181a..63dffd3c 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ApplicationConfigController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ApplicationConfigController.java @@ -1,13 +1,14 @@ package com.cameleer3.server.app.controller; import com.cameleer3.common.model.ApplicationConfig; +import com.cameleer3.server.app.dto.CommandGroupResponse; +import com.cameleer3.server.app.dto.ConfigUpdateResponse; import com.cameleer3.server.app.dto.TestExpressionRequest; import com.cameleer3.server.app.dto.TestExpressionResponse; import com.cameleer3.server.app.storage.PostgresApplicationConfigRepository; import com.cameleer3.server.core.admin.AuditCategory; import com.cameleer3.server.core.admin.AuditResult; import com.cameleer3.server.core.admin.AuditService; -import com.cameleer3.server.core.agent.AgentCommand; import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.agent.AgentState; @@ -27,6 +28,7 @@ import org.springframework.http.ResponseEntity; import org.springframework.security.core.Authentication; import org.springframework.web.bind.annotation.*; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -88,23 +90,25 @@ public class ApplicationConfigController { @Operation(summary = "Update application config", description = "Saves config and pushes CONFIG_UPDATE to all LIVE agents of this application") @ApiResponse(responseCode = "200", description = "Config saved and pushed") - public ResponseEntity updateConfig(@PathVariable String application, - @RequestBody ApplicationConfig config, - Authentication auth, - HttpServletRequest httpRequest) { + public ResponseEntity updateConfig(@PathVariable String application, + @RequestBody ApplicationConfig config, + Authentication auth, + HttpServletRequest httpRequest) { String updatedBy = auth != null ? auth.getName() : "system"; config.setApplication(application); ApplicationConfig saved = configRepository.save(application, config, updatedBy); - int pushed = pushConfigToAgents(application, saved); - log.info("Config v{} saved for '{}', pushed to {} agent(s)", saved.getVersion(), application, pushed); + CommandGroupResponse pushResult = pushConfigToAgents(application, saved); + log.info("Config v{} saved for '{}', pushed to {} agent(s), {} responded", + saved.getVersion(), application, pushResult.total(), pushResult.responded()); auditService.log("update_app_config", AuditCategory.CONFIG, application, - Map.of("version", saved.getVersion(), "agentsPushed", pushed), + Map.of("version", saved.getVersion(), "agentsPushed", pushResult.total(), + "responded", pushResult.responded(), "timedOut", pushResult.timedOut().size()), AuditResult.SUCCESS, httpRequest); - return ResponseEntity.ok(saved); + return ResponseEntity.ok(new ConfigUpdateResponse(saved, pushResult)); } @GetMapping("/{application}/processor-routes") @@ -172,24 +176,50 @@ public class ApplicationConfigController { } } - private int pushConfigToAgents(String application, ApplicationConfig config) { + private CommandGroupResponse pushConfigToAgents(String application, ApplicationConfig config) { String payloadJson; try { payloadJson = objectMapper.writeValueAsString(config); } catch (JsonProcessingException e) { log.error("Failed to serialize config for push", e); - return 0; + return new CommandGroupResponse(false, 0, 0, List.of(), List.of()); } - List agents = registryService.findAll().stream() - .filter(a -> a.state() == AgentState.LIVE) - .filter(a -> application.equals(a.applicationId())) - .toList(); + Map> futures = + registryService.addGroupCommandWithReplies(application, CommandType.CONFIG_UPDATE, payloadJson); - for (AgentInfo agent : agents) { - registryService.addCommand(agent.instanceId(), CommandType.CONFIG_UPDATE, payloadJson); + if (futures.isEmpty()) { + return new CommandGroupResponse(true, 0, 0, List.of(), List.of()); } - return agents.size(); + + // Wait with shared 10-second deadline + long deadline = System.currentTimeMillis() + 10_000; + List responses = new ArrayList<>(); + List timedOut = new ArrayList<>(); + + for (var entry : futures.entrySet()) { + long remaining = deadline - System.currentTimeMillis(); + if (remaining <= 0) { + timedOut.add(entry.getKey()); + entry.getValue().cancel(false); + continue; + } + try { + CommandReply reply = entry.getValue().get(remaining, TimeUnit.MILLISECONDS); + responses.add(new CommandGroupResponse.AgentResponse( + entry.getKey(), reply.status(), reply.message())); + } catch (TimeoutException e) { + timedOut.add(entry.getKey()); + entry.getValue().cancel(false); + } catch (Exception e) { + responses.add(new CommandGroupResponse.AgentResponse( + entry.getKey(), "ERROR", e.getMessage())); + } + } + + boolean allSuccess = timedOut.isEmpty() && + responses.stream().allMatch(r -> "SUCCESS".equals(r.status())); + return new CommandGroupResponse(allSuccess, futures.size(), responses.size(), responses, timedOut); } private static ApplicationConfig defaultConfig(String application) { diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/CommandGroupResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/CommandGroupResponse.java new file mode 100644 index 00000000..82a524e3 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/CommandGroupResponse.java @@ -0,0 +1,13 @@ +package com.cameleer3.server.app.dto; + +import java.util.List; + +public record CommandGroupResponse( + boolean success, + int total, + int responded, + List responses, + List timedOut +) { + public record AgentResponse(String agentId, String status, String message) {} +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ConfigUpdateResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ConfigUpdateResponse.java new file mode 100644 index 00000000..c55f516f --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ConfigUpdateResponse.java @@ -0,0 +1,8 @@ +package com.cameleer3.server.app.dto; + +import com.cameleer3.common.model.ApplicationConfig; + +public record ConfigUpdateResponse( + ApplicationConfig config, + CommandGroupResponse pushResult +) {} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java index ce76a7e8..72bf5512 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java @@ -6,6 +6,7 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -323,6 +324,28 @@ public class AgentRegistryService { return future; } + /** + * Send a command to all LIVE agents in a group and return futures for collecting replies. + * Returns a map of agentId -> CompletableFuture<CommandReply>. + */ + public Map> addGroupCommandWithReplies( + String group, CommandType type, String payload) { + Map> results = new LinkedHashMap<>(); + List liveAgents = findByApplication(group).stream() + .filter(a -> a.state() == AgentState.LIVE) + .toList(); + + for (AgentInfo agent : liveAgents) { + AgentCommand cmd = addCommand(agent.instanceId(), type, payload); + CompletableFuture future = new CompletableFuture<>(); + pendingReplies.put(cmd.id(), future); + future.whenComplete((r, ex) -> pendingReplies.remove(cmd.id())); + results.put(agent.instanceId(), future); + } + + return results; + } + /** * Complete a pending reply future for a command. * Called when an agent ACKs a command that was registered via {@link #addCommandWithReply}.