feat: synchronous replay endpoint with agent response status
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m8s
CI / docker (push) Successful in 56s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 38s

Add dedicated POST /agents/{id}/replay endpoint that uses
addCommandWithReply to wait for the agent ACK (30s timeout).
Returns the actual replay result (status, message, data) instead
of just a delivery confirmation.

Frontend toast now reflects the agent's response: "Replay completed"
on success, agent error message on failure, timeout message if the
agent doesn't respond.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-30 22:48:02 +02:00
parent dd398178f0
commit 715cbc1894
6 changed files with 123 additions and 16 deletions

View File

@@ -5,6 +5,8 @@ import com.cameleer3.server.app.dto.CommandAckRequest;
import com.cameleer3.server.app.dto.CommandBroadcastResponse;
import com.cameleer3.server.app.dto.CommandRequest;
import com.cameleer3.server.app.dto.CommandSingleResponse;
import com.cameleer3.server.app.dto.ReplayRequest;
import com.cameleer3.server.app.dto.ReplayResponse;
import com.cameleer3.server.core.admin.AuditCategory;
import com.cameleer3.server.core.admin.AuditResult;
import com.cameleer3.server.core.admin.AuditService;
@@ -13,6 +15,7 @@ import com.cameleer3.server.core.agent.AgentEventService;
import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.agent.AgentState;
import com.cameleer3.server.core.agent.CommandReply;
import com.cameleer3.server.core.agent.CommandType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,7 +35,14 @@ import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Command push endpoints for sending commands to agents via SSE.
@@ -184,6 +194,64 @@ public class AgentCommandController {
return ResponseEntity.ok().build();
}
@PostMapping("/{id}/replay")
@Operation(summary = "Replay an exchange on a specific agent (synchronous)",
description = "Sends a replay command and waits for the agent to complete the replay. "
+ "Returns the replay result including status, replayExchangeId, and duration.")
@ApiResponse(responseCode = "200", description = "Replay completed (check status for success/failure)")
@ApiResponse(responseCode = "404", description = "Agent not found or not connected")
@ApiResponse(responseCode = "504", description = "Agent did not respond in time")
public ResponseEntity<ReplayResponse> replayExchange(@PathVariable String id,
@RequestBody ReplayRequest request,
HttpServletRequest httpRequest) {
AgentInfo agent = registryService.findById(id);
if (agent == null) {
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not found: " + id);
}
// Build protocol-compliant replay payload
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("routeId", request.routeId());
Map<String, Object> exchange = new LinkedHashMap<>();
exchange.put("body", request.body() != null ? request.body() : "");
exchange.put("headers", request.headers() != null ? request.headers() : Map.of());
payload.put("exchange", exchange);
if (request.originalExchangeId() != null) {
payload.put("originalExchangeId", request.originalExchangeId());
}
payload.put("nonce", UUID.randomUUID().toString());
String payloadJson;
try {
payloadJson = objectMapper.writeValueAsString(payload);
} catch (JsonProcessingException e) {
log.error("Failed to serialize replay payload", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new ReplayResponse("FAILURE", "Failed to serialize request", null));
}
CompletableFuture<CommandReply> future = registryService.addCommandWithReply(
id, CommandType.REPLAY, payloadJson);
auditService.log("replay_exchange", AuditCategory.AGENT, id,
Map.of("routeId", request.routeId(),
"originalExchangeId", request.originalExchangeId() != null ? request.originalExchangeId() : ""),
AuditResult.SUCCESS, httpRequest);
try {
CommandReply reply = future.orTimeout(30, TimeUnit.SECONDS).join();
return ResponseEntity.ok(new ReplayResponse(reply.status(), reply.message(), reply.data()));
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
return ResponseEntity.status(HttpStatus.GATEWAY_TIMEOUT)
.body(new ReplayResponse("FAILURE", "Agent did not respond within 30 seconds", null));
}
log.error("Error awaiting replay reply from agent {}", id, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new ReplayResponse("FAILURE", "Internal error: " + e.getCause().getMessage(), null));
}
}
private CommandType mapCommandType(String typeStr) {
return switch (typeStr) {
case "config-update" -> CommandType.CONFIG_UPDATE;

View File

@@ -0,0 +1,18 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import java.util.Map;
@Schema(description = "Request to replay an exchange on an agent")
public record ReplayRequest(
@NotNull @Schema(description = "Camel route ID to replay on")
String routeId,
@Schema(description = "Message body for the replayed exchange")
String body,
@Schema(description = "Message headers for the replayed exchange")
Map<String, String> headers,
@Schema(description = "Exchange ID of the original execution being replayed (for audit trail)")
String originalExchangeId
) {}

View File

@@ -0,0 +1,13 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(description = "Result of a replay command")
public record ReplayResponse(
@Schema(description = "Replay outcome: SUCCESS or FAILURE")
String status,
@Schema(description = "Human-readable result message")
String message,
@Schema(description = "Structured result data from the agent (JSON)")
String data
) {}

View File

@@ -72,6 +72,7 @@ public class SecurityConfig {
.requestMatchers(HttpMethod.POST, "/api/v1/agents/*/commands").hasAnyRole("OPERATOR", "ADMIN")
.requestMatchers(HttpMethod.POST, "/api/v1/agents/groups/*/commands").hasAnyRole("OPERATOR", "ADMIN")
.requestMatchers(HttpMethod.POST, "/api/v1/agents/commands").hasAnyRole("OPERATOR", "ADMIN")
.requestMatchers(HttpMethod.POST, "/api/v1/agents/*/replay").hasAnyRole("OPERATOR", "ADMIN")
// Search endpoints
.requestMatchers(HttpMethod.GET, "/api/v1/search/**").hasAnyRole("VIEWER", "OPERATOR", "ADMIN", "AGENT")

View File

@@ -175,6 +175,12 @@ export function useSendRouteCommand() {
// ── Replay Exchange ───────────────────────────────────────────────────────
export interface ReplayResult {
status: string
message: string
data?: string
}
export function useReplayExchange() {
return useMutation({
mutationFn: async ({
@@ -189,21 +195,18 @@ export function useReplayExchange() {
headers?: Record<string, string>
body: string
originalExchangeId?: string
}) => {
const { data, error } = await api.POST('/agents/{id}/commands', {
params: { path: { id: agentId } },
body: {
type: 'replay',
payload: {
routeId,
exchange: { body, headers: headers ?? {} },
originalExchangeId,
nonce: crypto.randomUUID(),
},
} as any,
}): Promise<ReplayResult> => {
const res = await authFetch(`/api/v1/agents/${encodeURIComponent(agentId)}/replay`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ routeId, body, headers: headers ?? {}, originalExchangeId }),
})
if (error) throw new Error('Failed to send replay command')
return data!
if (!res.ok) {
if (res.status === 404) throw new Error('Agent not found')
if (res.status === 504) throw new Error('Replay timed out — agent did not respond')
throw new Error('Failed to send replay command')
}
return res.json() as Promise<ReplayResult>
},
})
}

View File

@@ -57,8 +57,12 @@ export function RouteControlBar({ application, routeId, hasRouteControl, hasRepl
replayExchange.mutate(
{ agentId, routeId, headers, body: inputBody ?? '', originalExchangeId: exchangeId },
{
onSuccess: () => {
toast({ title: 'Replay sent', description: `${routeId} on ${agentId}`, variant: 'success' });
onSuccess: (result) => {
if (result.status === 'SUCCESS') {
toast({ title: 'Replay completed', description: result.message ?? `${routeId} on ${agentId}`, variant: 'success' });
} else {
toast({ title: 'Replay failed', description: result.message ?? 'Agent reported failure', variant: 'error' });
}
setSendingAction(null);
},
onError: (err) => {