feat(#116): synchronous group command dispatch with multi-agent response collection

Add addGroupCommandWithReplies() to AgentRegistryService that sends commands
to all LIVE agents in a group and returns CompletableFuture per agent for
collecting replies. Update sendGroupCommand() and pushConfigToAgents() to
wait with a shared 10-second deadline, returning CommandGroupResponse with
per-agent status, timeouts, and overall success. Config update endpoint now
returns ConfigUpdateResponse wrapping both the saved config and push result.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-02 19:00:56 +02:00
parent f39f07e7bf
commit 027e45aadf
5 changed files with 137 additions and 34 deletions

View File

@@ -3,6 +3,7 @@ package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.agent.SseConnectionManager; import com.cameleer3.server.app.agent.SseConnectionManager;
import com.cameleer3.server.app.dto.CommandAckRequest; import com.cameleer3.server.app.dto.CommandAckRequest;
import com.cameleer3.server.app.dto.CommandBroadcastResponse; import com.cameleer3.server.app.dto.CommandBroadcastResponse;
import com.cameleer3.server.app.dto.CommandGroupResponse;
import com.cameleer3.server.app.dto.CommandRequest; import com.cameleer3.server.app.dto.CommandRequest;
import com.cameleer3.server.app.dto.CommandSingleResponse; import com.cameleer3.server.app.dto.CommandSingleResponse;
import com.cameleer3.server.app.dto.ReplayRequest; import com.cameleer3.server.app.dto.ReplayRequest;
@@ -109,32 +110,60 @@ public class AgentCommandController {
@PostMapping("/groups/{group}/commands") @PostMapping("/groups/{group}/commands")
@Operation(summary = "Send command to all agents in a group", @Operation(summary = "Send command to all agents in a group",
description = "Sends a command to all LIVE agents in the specified group") description = "Sends a command to all LIVE agents in the specified group and waits for responses")
@ApiResponse(responseCode = "202", description = "Commands accepted") @ApiResponse(responseCode = "200", description = "Commands dispatched and responses collected")
@ApiResponse(responseCode = "400", description = "Invalid command payload") @ApiResponse(responseCode = "400", description = "Invalid command payload")
public ResponseEntity<CommandBroadcastResponse> sendGroupCommand(@PathVariable String group, public ResponseEntity<CommandGroupResponse> sendGroupCommand(@PathVariable String group,
@RequestBody CommandRequest request, @RequestBody CommandRequest request,
HttpServletRequest httpRequest) throws JsonProcessingException { HttpServletRequest httpRequest) throws JsonProcessingException {
CommandType type = mapCommandType(request.type()); CommandType type = mapCommandType(request.type());
String payloadJson = request.payload() != null ? objectMapper.writeValueAsString(request.payload()) : "{}"; String payloadJson = request.payload() != null ? objectMapper.writeValueAsString(request.payload()) : "{}";
List<AgentInfo> agents = registryService.findAll().stream() Map<String, CompletableFuture<CommandReply>> futures =
.filter(a -> a.state() == AgentState.LIVE) registryService.addGroupCommandWithReplies(group, type, payloadJson);
.filter(a -> group.equals(a.applicationId()))
.toList();
List<String> commandIds = new ArrayList<>(); if (futures.isEmpty()) {
for (AgentInfo agent : agents) { auditService.log("broadcast_group_command", AuditCategory.AGENT, group,
AgentCommand command = registryService.addCommand(agent.instanceId(), type, payloadJson); java.util.Map.of("type", request.type(), "agentCount", 0),
commandIds.add(command.id()); AuditResult.SUCCESS, httpRequest);
return ResponseEntity.ok(new CommandGroupResponse(true, 0, 0, List.of(), List.of()));
} }
// Wait with shared 10-second deadline
long deadline = System.currentTimeMillis() + 10_000;
List<CommandGroupResponse.AgentResponse> responses = new ArrayList<>();
List<String> timedOut = new ArrayList<>();
for (var entry : futures.entrySet()) {
long remaining = deadline - System.currentTimeMillis();
if (remaining <= 0) {
timedOut.add(entry.getKey());
entry.getValue().cancel(false);
continue;
}
try {
CommandReply reply = entry.getValue().get(remaining, TimeUnit.MILLISECONDS);
responses.add(new CommandGroupResponse.AgentResponse(
entry.getKey(), reply.status(), reply.message()));
} catch (TimeoutException e) {
timedOut.add(entry.getKey());
entry.getValue().cancel(false);
} catch (Exception e) {
responses.add(new CommandGroupResponse.AgentResponse(
entry.getKey(), "ERROR", e.getMessage()));
}
}
boolean allSuccess = timedOut.isEmpty() &&
responses.stream().allMatch(r -> "SUCCESS".equals(r.status()));
auditService.log("broadcast_group_command", AuditCategory.AGENT, group, auditService.log("broadcast_group_command", AuditCategory.AGENT, group,
java.util.Map.of("type", request.type(), "agentCount", agents.size()), java.util.Map.of("type", request.type(), "agentCount", futures.size(),
"responded", responses.size(), "timedOut", timedOut.size()),
AuditResult.SUCCESS, httpRequest); AuditResult.SUCCESS, httpRequest);
return ResponseEntity.status(HttpStatus.ACCEPTED) return ResponseEntity.ok(new CommandGroupResponse(
.body(new CommandBroadcastResponse(commandIds, agents.size())); allSuccess, futures.size(), responses.size(), responses, timedOut));
} }
@PostMapping("/commands") @PostMapping("/commands")

View File

@@ -1,13 +1,14 @@
package com.cameleer3.server.app.controller; package com.cameleer3.server.app.controller;
import com.cameleer3.common.model.ApplicationConfig; import com.cameleer3.common.model.ApplicationConfig;
import com.cameleer3.server.app.dto.CommandGroupResponse;
import com.cameleer3.server.app.dto.ConfigUpdateResponse;
import com.cameleer3.server.app.dto.TestExpressionRequest; import com.cameleer3.server.app.dto.TestExpressionRequest;
import com.cameleer3.server.app.dto.TestExpressionResponse; import com.cameleer3.server.app.dto.TestExpressionResponse;
import com.cameleer3.server.app.storage.PostgresApplicationConfigRepository; import com.cameleer3.server.app.storage.PostgresApplicationConfigRepository;
import com.cameleer3.server.core.admin.AuditCategory; import com.cameleer3.server.core.admin.AuditCategory;
import com.cameleer3.server.core.admin.AuditResult; import com.cameleer3.server.core.admin.AuditResult;
import com.cameleer3.server.core.admin.AuditService; import com.cameleer3.server.core.admin.AuditService;
import com.cameleer3.server.core.agent.AgentCommand;
import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.agent.AgentState; import com.cameleer3.server.core.agent.AgentState;
@@ -27,6 +28,7 @@ import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication; import org.springframework.security.core.Authentication;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@@ -88,7 +90,7 @@ public class ApplicationConfigController {
@Operation(summary = "Update application config", @Operation(summary = "Update application config",
description = "Saves config and pushes CONFIG_UPDATE to all LIVE agents of this application") description = "Saves config and pushes CONFIG_UPDATE to all LIVE agents of this application")
@ApiResponse(responseCode = "200", description = "Config saved and pushed") @ApiResponse(responseCode = "200", description = "Config saved and pushed")
public ResponseEntity<ApplicationConfig> updateConfig(@PathVariable String application, public ResponseEntity<ConfigUpdateResponse> updateConfig(@PathVariable String application,
@RequestBody ApplicationConfig config, @RequestBody ApplicationConfig config,
Authentication auth, Authentication auth,
HttpServletRequest httpRequest) { HttpServletRequest httpRequest) {
@@ -97,14 +99,16 @@ public class ApplicationConfigController {
config.setApplication(application); config.setApplication(application);
ApplicationConfig saved = configRepository.save(application, config, updatedBy); ApplicationConfig saved = configRepository.save(application, config, updatedBy);
int pushed = pushConfigToAgents(application, saved); CommandGroupResponse pushResult = pushConfigToAgents(application, saved);
log.info("Config v{} saved for '{}', pushed to {} agent(s)", saved.getVersion(), application, pushed); log.info("Config v{} saved for '{}', pushed to {} agent(s), {} responded",
saved.getVersion(), application, pushResult.total(), pushResult.responded());
auditService.log("update_app_config", AuditCategory.CONFIG, application, auditService.log("update_app_config", AuditCategory.CONFIG, application,
Map.of("version", saved.getVersion(), "agentsPushed", pushed), Map.of("version", saved.getVersion(), "agentsPushed", pushResult.total(),
"responded", pushResult.responded(), "timedOut", pushResult.timedOut().size()),
AuditResult.SUCCESS, httpRequest); AuditResult.SUCCESS, httpRequest);
return ResponseEntity.ok(saved); return ResponseEntity.ok(new ConfigUpdateResponse(saved, pushResult));
} }
@GetMapping("/{application}/processor-routes") @GetMapping("/{application}/processor-routes")
@@ -172,24 +176,50 @@ public class ApplicationConfigController {
} }
} }
private int pushConfigToAgents(String application, ApplicationConfig config) { private CommandGroupResponse pushConfigToAgents(String application, ApplicationConfig config) {
String payloadJson; String payloadJson;
try { try {
payloadJson = objectMapper.writeValueAsString(config); payloadJson = objectMapper.writeValueAsString(config);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
log.error("Failed to serialize config for push", e); log.error("Failed to serialize config for push", e);
return 0; return new CommandGroupResponse(false, 0, 0, List.of(), List.of());
} }
List<AgentInfo> agents = registryService.findAll().stream() Map<String, CompletableFuture<CommandReply>> futures =
.filter(a -> a.state() == AgentState.LIVE) registryService.addGroupCommandWithReplies(application, CommandType.CONFIG_UPDATE, payloadJson);
.filter(a -> application.equals(a.applicationId()))
.toList();
for (AgentInfo agent : agents) { if (futures.isEmpty()) {
registryService.addCommand(agent.instanceId(), CommandType.CONFIG_UPDATE, payloadJson); return new CommandGroupResponse(true, 0, 0, List.of(), List.of());
} }
return agents.size();
// Wait with shared 10-second deadline
long deadline = System.currentTimeMillis() + 10_000;
List<CommandGroupResponse.AgentResponse> responses = new ArrayList<>();
List<String> timedOut = new ArrayList<>();
for (var entry : futures.entrySet()) {
long remaining = deadline - System.currentTimeMillis();
if (remaining <= 0) {
timedOut.add(entry.getKey());
entry.getValue().cancel(false);
continue;
}
try {
CommandReply reply = entry.getValue().get(remaining, TimeUnit.MILLISECONDS);
responses.add(new CommandGroupResponse.AgentResponse(
entry.getKey(), reply.status(), reply.message()));
} catch (TimeoutException e) {
timedOut.add(entry.getKey());
entry.getValue().cancel(false);
} catch (Exception e) {
responses.add(new CommandGroupResponse.AgentResponse(
entry.getKey(), "ERROR", e.getMessage()));
}
}
boolean allSuccess = timedOut.isEmpty() &&
responses.stream().allMatch(r -> "SUCCESS".equals(r.status()));
return new CommandGroupResponse(allSuccess, futures.size(), responses.size(), responses, timedOut);
} }
private static ApplicationConfig defaultConfig(String application) { private static ApplicationConfig defaultConfig(String application) {

View File

@@ -0,0 +1,13 @@
package com.cameleer3.server.app.dto;
import java.util.List;
public record CommandGroupResponse(
boolean success,
int total,
int responded,
List<AgentResponse> responses,
List<String> timedOut
) {
public record AgentResponse(String agentId, String status, String message) {}
}

View File

@@ -0,0 +1,8 @@
package com.cameleer3.server.app.dto;
import com.cameleer3.common.model.ApplicationConfig;
public record ConfigUpdateResponse(
ApplicationConfig config,
CommandGroupResponse pushResult
) {}

View File

@@ -6,6 +6,7 @@ import org.slf4j.LoggerFactory;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@@ -323,6 +324,28 @@ public class AgentRegistryService {
return future; return future;
} }
/**
* Send a command to all LIVE agents in a group and return futures for collecting replies.
* Returns a map of agentId -> CompletableFuture&lt;CommandReply&gt;.
*/
public Map<String, CompletableFuture<CommandReply>> addGroupCommandWithReplies(
String group, CommandType type, String payload) {
Map<String, CompletableFuture<CommandReply>> results = new LinkedHashMap<>();
List<AgentInfo> liveAgents = findByApplication(group).stream()
.filter(a -> a.state() == AgentState.LIVE)
.toList();
for (AgentInfo agent : liveAgents) {
AgentCommand cmd = addCommand(agent.instanceId(), type, payload);
CompletableFuture<CommandReply> future = new CompletableFuture<>();
pendingReplies.put(cmd.id(), future);
future.whenComplete((r, ex) -> pendingReplies.remove(cmd.id()));
results.put(agent.instanceId(), future);
}
return results;
}
/** /**
* Complete a pending reply future for a command. * Complete a pending reply future for a command.
* Called when an agent ACKs a command that was registered via {@link #addCommandWithReply}. * Called when an agent ACKs a command that was registered via {@link #addCommandWithReply}.