diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java index a1e5e9ea..388a7722 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java @@ -5,6 +5,8 @@ import com.cameleer3.server.app.dto.CommandAckRequest; import com.cameleer3.server.app.dto.CommandBroadcastResponse; import com.cameleer3.server.app.dto.CommandRequest; import com.cameleer3.server.app.dto.CommandSingleResponse; +import com.cameleer3.server.app.dto.ReplayRequest; +import com.cameleer3.server.app.dto.ReplayResponse; import com.cameleer3.server.core.admin.AuditCategory; import com.cameleer3.server.core.admin.AuditResult; import com.cameleer3.server.core.admin.AuditService; @@ -13,6 +15,7 @@ import com.cameleer3.server.core.agent.AgentEventService; import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.agent.AgentState; +import com.cameleer3.server.core.agent.CommandReply; import com.cameleer3.server.core.agent.CommandType; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -32,7 +35,14 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ResponseStatusException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Command push endpoints for sending commands to agents via SSE. @@ -184,6 +194,64 @@ public class AgentCommandController { return ResponseEntity.ok().build(); } + @PostMapping("/{id}/replay") + @Operation(summary = "Replay an exchange on a specific agent (synchronous)", + description = "Sends a replay command and waits for the agent to complete the replay. " + + "Returns the replay result including status, replayExchangeId, and duration.") + @ApiResponse(responseCode = "200", description = "Replay completed (check status for success/failure)") + @ApiResponse(responseCode = "404", description = "Agent not found or not connected") + @ApiResponse(responseCode = "504", description = "Agent did not respond in time") + public ResponseEntity replayExchange(@PathVariable String id, + @RequestBody ReplayRequest request, + HttpServletRequest httpRequest) { + AgentInfo agent = registryService.findById(id); + if (agent == null) { + throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not found: " + id); + } + + // Build protocol-compliant replay payload + Map payload = new LinkedHashMap<>(); + payload.put("routeId", request.routeId()); + Map exchange = new LinkedHashMap<>(); + exchange.put("body", request.body() != null ? request.body() : ""); + exchange.put("headers", request.headers() != null ? request.headers() : Map.of()); + payload.put("exchange", exchange); + if (request.originalExchangeId() != null) { + payload.put("originalExchangeId", request.originalExchangeId()); + } + payload.put("nonce", UUID.randomUUID().toString()); + + String payloadJson; + try { + payloadJson = objectMapper.writeValueAsString(payload); + } catch (JsonProcessingException e) { + log.error("Failed to serialize replay payload", e); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) + .body(new ReplayResponse("FAILURE", "Failed to serialize request", null)); + } + + CompletableFuture future = registryService.addCommandWithReply( + id, CommandType.REPLAY, payloadJson); + + auditService.log("replay_exchange", AuditCategory.AGENT, id, + Map.of("routeId", request.routeId(), + "originalExchangeId", request.originalExchangeId() != null ? request.originalExchangeId() : ""), + AuditResult.SUCCESS, httpRequest); + + try { + CommandReply reply = future.orTimeout(30, TimeUnit.SECONDS).join(); + return ResponseEntity.ok(new ReplayResponse(reply.status(), reply.message(), reply.data())); + } catch (CompletionException e) { + if (e.getCause() instanceof TimeoutException) { + return ResponseEntity.status(HttpStatus.GATEWAY_TIMEOUT) + .body(new ReplayResponse("FAILURE", "Agent did not respond within 30 seconds", null)); + } + log.error("Error awaiting replay reply from agent {}", id, e); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) + .body(new ReplayResponse("FAILURE", "Internal error: " + e.getCause().getMessage(), null)); + } + } + private CommandType mapCommandType(String typeStr) { return switch (typeStr) { case "config-update" -> CommandType.CONFIG_UPDATE; diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ReplayRequest.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ReplayRequest.java new file mode 100644 index 00000000..18b96cba --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ReplayRequest.java @@ -0,0 +1,18 @@ +package com.cameleer3.server.app.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; + +import java.util.Map; + +@Schema(description = "Request to replay an exchange on an agent") +public record ReplayRequest( + @NotNull @Schema(description = "Camel route ID to replay on") + String routeId, + @Schema(description = "Message body for the replayed exchange") + String body, + @Schema(description = "Message headers for the replayed exchange") + Map headers, + @Schema(description = "Exchange ID of the original execution being replayed (for audit trail)") + String originalExchangeId +) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ReplayResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ReplayResponse.java new file mode 100644 index 00000000..c65147eb --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ReplayResponse.java @@ -0,0 +1,13 @@ +package com.cameleer3.server.app.dto; + +import io.swagger.v3.oas.annotations.media.Schema; + +@Schema(description = "Result of a replay command") +public record ReplayResponse( + @Schema(description = "Replay outcome: SUCCESS or FAILURE") + String status, + @Schema(description = "Human-readable result message") + String message, + @Schema(description = "Structured result data from the agent (JSON)") + String data +) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/security/SecurityConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/security/SecurityConfig.java index 1b5ec7b1..73ecac06 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/security/SecurityConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/security/SecurityConfig.java @@ -72,6 +72,7 @@ public class SecurityConfig { .requestMatchers(HttpMethod.POST, "/api/v1/agents/*/commands").hasAnyRole("OPERATOR", "ADMIN") .requestMatchers(HttpMethod.POST, "/api/v1/agents/groups/*/commands").hasAnyRole("OPERATOR", "ADMIN") .requestMatchers(HttpMethod.POST, "/api/v1/agents/commands").hasAnyRole("OPERATOR", "ADMIN") + .requestMatchers(HttpMethod.POST, "/api/v1/agents/*/replay").hasAnyRole("OPERATOR", "ADMIN") // Search endpoints .requestMatchers(HttpMethod.GET, "/api/v1/search/**").hasAnyRole("VIEWER", "OPERATOR", "ADMIN", "AGENT") diff --git a/ui/src/api/queries/commands.ts b/ui/src/api/queries/commands.ts index 9ce502ad..d3fb51d6 100644 --- a/ui/src/api/queries/commands.ts +++ b/ui/src/api/queries/commands.ts @@ -175,6 +175,12 @@ export function useSendRouteCommand() { // ── Replay Exchange ─────────────────────────────────────────────────────── +export interface ReplayResult { + status: string + message: string + data?: string +} + export function useReplayExchange() { return useMutation({ mutationFn: async ({ @@ -189,21 +195,18 @@ export function useReplayExchange() { headers?: Record body: string originalExchangeId?: string - }) => { - const { data, error } = await api.POST('/agents/{id}/commands', { - params: { path: { id: agentId } }, - body: { - type: 'replay', - payload: { - routeId, - exchange: { body, headers: headers ?? {} }, - originalExchangeId, - nonce: crypto.randomUUID(), - }, - } as any, + }): Promise => { + const res = await authFetch(`/api/v1/agents/${encodeURIComponent(agentId)}/replay`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ routeId, body, headers: headers ?? {}, originalExchangeId }), }) - if (error) throw new Error('Failed to send replay command') - return data! + if (!res.ok) { + if (res.status === 404) throw new Error('Agent not found') + if (res.status === 504) throw new Error('Replay timed out — agent did not respond') + throw new Error('Failed to send replay command') + } + return res.json() as Promise }, }) } diff --git a/ui/src/pages/Exchanges/RouteControlBar.tsx b/ui/src/pages/Exchanges/RouteControlBar.tsx index c66e2529..23119f8a 100644 --- a/ui/src/pages/Exchanges/RouteControlBar.tsx +++ b/ui/src/pages/Exchanges/RouteControlBar.tsx @@ -57,8 +57,12 @@ export function RouteControlBar({ application, routeId, hasRouteControl, hasRepl replayExchange.mutate( { agentId, routeId, headers, body: inputBody ?? '', originalExchangeId: exchangeId }, { - onSuccess: () => { - toast({ title: 'Replay sent', description: `${routeId} on ${agentId}`, variant: 'success' }); + onSuccess: (result) => { + if (result.status === 'SUCCESS') { + toast({ title: 'Replay completed', description: result.message ?? `${routeId} on ${agentId}`, variant: 'success' }); + } else { + toast({ title: 'Replay failed', description: result.message ?? 'Agent reported failure', variant: 'error' }); + } setSendingAction(null); }, onError: (err) => {