diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/agent/SseConnectionManager.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/agent/SseConnectionManager.java new file mode 100644 index 00000000..88b6f3ed --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/agent/SseConnectionManager.java @@ -0,0 +1,157 @@ +package com.cameleer3.server.app.agent; + +import com.cameleer3.server.app.config.AgentRegistryConfig; +import com.cameleer3.server.core.agent.AgentCommand; +import com.cameleer3.server.core.agent.AgentEventListener; +import com.cameleer3.server.core.agent.AgentRegistryService; +import jakarta.annotation.PostConstruct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Manages per-agent SSE connections and delivers commands via Server-Sent Events. + *

+ * Implements {@link AgentEventListener} so the core {@link AgentRegistryService} + * can notify this component when a command is ready for delivery, without depending + * on Spring or SSE classes. + */ +@Component +public class SseConnectionManager implements AgentEventListener { + + private static final Logger log = LoggerFactory.getLogger(SseConnectionManager.class); + + private final ConcurrentHashMap emitters = new ConcurrentHashMap<>(); + private final AgentRegistryService registryService; + private final AgentRegistryConfig config; + + public SseConnectionManager(AgentRegistryService registryService, AgentRegistryConfig config) { + this.registryService = registryService; + this.config = config; + } + + @PostConstruct + void init() { + registryService.setEventListener(this); + log.info("SseConnectionManager registered as AgentEventListener"); + } + + /** + * Create an SSE connection for the given agent. + * Replaces any existing connection (completing the old emitter). + * + * @param agentId the agent identifier + * @return the new SseEmitter + */ + public SseEmitter connect(String agentId) { + SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); + + SseEmitter old = emitters.put(agentId, emitter); + if (old != null) { + log.debug("Replacing existing SSE connection for agent {}", agentId); + old.complete(); + } + + // Remove from map only if the emitter is still the current one (reference equality) + emitter.onCompletion(() -> { + emitters.remove(agentId, emitter); + log.debug("SSE connection completed for agent {}", agentId); + }); + emitter.onTimeout(() -> { + emitters.remove(agentId, emitter); + log.debug("SSE connection timed out for agent {}", agentId); + }); + emitter.onError(ex -> { + emitters.remove(agentId, emitter); + log.debug("SSE connection error for agent {}: {}", agentId, ex.getMessage()); + }); + + log.info("SSE connection established for agent {}", agentId); + return emitter; + } + + /** + * Send an event to a specific agent's SSE stream. + * + * @param agentId the target agent + * @param eventId the event ID (for Last-Event-ID reconnection) + * @param eventType the SSE event name + * @param data the event data (serialized as JSON) + * @return true if the event was sent successfully, false if the agent is not connected or send failed + */ + public boolean sendEvent(String agentId, String eventId, String eventType, Object data) { + SseEmitter emitter = emitters.get(agentId); + if (emitter == null) { + return false; + } + + try { + emitter.send(SseEmitter.event() + .id(eventId) + .name(eventType) + .data(data, MediaType.APPLICATION_JSON)); + return true; + } catch (IOException e) { + log.debug("Failed to send SSE event to agent {}: {}", agentId, e.getMessage()); + emitters.remove(agentId, emitter); + return false; + } + } + + /** + * Send a ping keepalive comment to all connected agents. + */ + public void sendPingToAll() { + for (Map.Entry entry : emitters.entrySet()) { + String agentId = entry.getKey(); + SseEmitter emitter = entry.getValue(); + try { + emitter.send(SseEmitter.event().comment("ping")); + } catch (IOException e) { + log.debug("Ping failed for agent {}, removing connection", agentId); + emitters.remove(agentId, emitter); + } + } + } + + /** + * Check if an agent has an active SSE connection. + */ + public boolean isConnected(String agentId) { + return emitters.containsKey(agentId); + } + + /** + * Called by the registry when a command is ready for an agent. + * Attempts to deliver via SSE; if successful, marks as DELIVERED. + * If the agent is not connected, the command stays PENDING. + */ + @Override + public void onCommandReady(String agentId, AgentCommand command) { + String eventType = command.type().name().toLowerCase().replace('_', '-'); + boolean sent = sendEvent(agentId, command.id(), eventType, command.payload()); + if (sent) { + registryService.markDelivered(agentId, command.id()); + log.debug("Command {} ({}) delivered to agent {} via SSE", command.id(), eventType, agentId); + } else { + log.debug("Agent {} not connected, command {} stays PENDING", agentId, command.id()); + } + } + + /** + * Scheduled ping keepalive to all connected agents. + */ + @Scheduled(fixedDelayString = "${agent-registry.ping-interval-ms:15000}") + void pingAll() { + if (!emitters.isEmpty()) { + sendPingToAll(); + } + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/WebConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/WebConfig.java index 3d252c2c..a9209980 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/WebConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/WebConfig.java @@ -28,7 +28,8 @@ public class WebConfig implements WebMvcConfigurer { "/api/v1/health", "/api/v1/api-docs/**", "/api/v1/swagger-ui/**", - "/api/v1/swagger-ui.html" + "/api/v1/swagger-ui.html", + "/api/v1/agents/*/events" ); } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java new file mode 100644 index 00000000..7ce07a62 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java @@ -0,0 +1,181 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.agent.SseConnectionManager; +import com.cameleer3.server.core.agent.AgentCommand; +import com.cameleer3.server.core.agent.AgentInfo; +import com.cameleer3.server.core.agent.AgentRegistryService; +import com.cameleer3.server.core.agent.AgentState; +import com.cameleer3.server.core.agent.CommandType; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ResponseStatusException; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Command push endpoints for sending commands to agents via SSE. + *

+ * Supports three targeting levels: + *

+ */ +@RestController +@RequestMapping("/api/v1/agents") +@Tag(name = "Agent Commands", description = "Command push endpoints for agent communication") +public class AgentCommandController { + + private static final Logger log = LoggerFactory.getLogger(AgentCommandController.class); + + private final AgentRegistryService registryService; + private final SseConnectionManager connectionManager; + private final ObjectMapper objectMapper; + + public AgentCommandController(AgentRegistryService registryService, + SseConnectionManager connectionManager, + ObjectMapper objectMapper) { + this.registryService = registryService; + this.connectionManager = connectionManager; + this.objectMapper = objectMapper; + } + + @PostMapping("/{id}/commands") + @Operation(summary = "Send command to a specific agent", + description = "Sends a config-update, deep-trace, or replay command to the specified agent") + @ApiResponse(responseCode = "202", description = "Command accepted") + @ApiResponse(responseCode = "400", description = "Invalid command payload") + @ApiResponse(responseCode = "404", description = "Agent not registered") + public ResponseEntity sendCommand(@PathVariable String id, + @RequestBody String body) throws JsonProcessingException { + AgentInfo agent = registryService.findById(id); + if (agent == null) { + throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not found: " + id); + } + + CommandRequest request = parseCommandRequest(body); + AgentCommand command = registryService.addCommand(id, request.type, request.payloadJson); + + String status = connectionManager.isConnected(id) ? "DELIVERED" : "PENDING"; + + Map response = new LinkedHashMap<>(); + response.put("commandId", command.id()); + response.put("status", status); + + return ResponseEntity.status(HttpStatus.ACCEPTED) + .body(objectMapper.writeValueAsString(response)); + } + + @PostMapping("/groups/{group}/commands") + @Operation(summary = "Send command to all agents in a group", + description = "Sends a command to all LIVE agents in the specified group") + @ApiResponse(responseCode = "202", description = "Commands accepted") + @ApiResponse(responseCode = "400", description = "Invalid command payload") + public ResponseEntity sendGroupCommand(@PathVariable String group, + @RequestBody String body) throws JsonProcessingException { + CommandRequest request = parseCommandRequest(body); + + List agents = registryService.findAll().stream() + .filter(a -> a.state() == AgentState.LIVE) + .filter(a -> group.equals(a.group())) + .toList(); + + List commandIds = new ArrayList<>(); + for (AgentInfo agent : agents) { + AgentCommand command = registryService.addCommand(agent.id(), request.type, request.payloadJson); + commandIds.add(command.id()); + } + + Map response = new LinkedHashMap<>(); + response.put("commandIds", commandIds); + response.put("targetCount", agents.size()); + + return ResponseEntity.status(HttpStatus.ACCEPTED) + .body(objectMapper.writeValueAsString(response)); + } + + @PostMapping("/commands") + @Operation(summary = "Broadcast command to all live agents", + description = "Sends a command to all agents currently in LIVE state") + @ApiResponse(responseCode = "202", description = "Commands accepted") + @ApiResponse(responseCode = "400", description = "Invalid command payload") + public ResponseEntity broadcastCommand(@RequestBody String body) throws JsonProcessingException { + CommandRequest request = parseCommandRequest(body); + + List liveAgents = registryService.findByState(AgentState.LIVE); + + List commandIds = new ArrayList<>(); + for (AgentInfo agent : liveAgents) { + AgentCommand command = registryService.addCommand(agent.id(), request.type, request.payloadJson); + commandIds.add(command.id()); + } + + Map response = new LinkedHashMap<>(); + response.put("commandIds", commandIds); + response.put("targetCount", liveAgents.size()); + + return ResponseEntity.status(HttpStatus.ACCEPTED) + .body(objectMapper.writeValueAsString(response)); + } + + @PostMapping("/{id}/commands/{commandId}/ack") + @Operation(summary = "Acknowledge command receipt", + description = "Agent acknowledges that it has received and processed a command") + @ApiResponse(responseCode = "200", description = "Command acknowledged") + @ApiResponse(responseCode = "404", description = "Command not found") + public ResponseEntity acknowledgeCommand(@PathVariable String id, + @PathVariable String commandId) { + boolean acknowledged = registryService.acknowledgeCommand(id, commandId); + if (!acknowledged) { + throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Command not found: " + commandId); + } + return ResponseEntity.ok().build(); + } + + private CommandRequest parseCommandRequest(String body) throws JsonProcessingException { + JsonNode node = objectMapper.readTree(body); + + if (!node.has("type")) { + throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Missing 'type' field"); + } + + String typeStr = node.get("type").asText(); + CommandType type = mapCommandType(typeStr); + + String payloadJson = "{}"; + if (node.has("payload")) { + payloadJson = node.get("payload").toString(); + } + + return new CommandRequest(type, payloadJson); + } + + private CommandType mapCommandType(String typeStr) { + return switch (typeStr) { + case "config-update" -> CommandType.CONFIG_UPDATE; + case "deep-trace" -> CommandType.DEEP_TRACE; + case "replay" -> CommandType.REPLAY; + default -> throw new ResponseStatusException(HttpStatus.BAD_REQUEST, + "Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay"); + }; + } + + private record CommandRequest(CommandType type, String payloadJson) {} +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentSseController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentSseController.java new file mode 100644 index 00000000..f6598e66 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentSseController.java @@ -0,0 +1,67 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.agent.SseConnectionManager; +import com.cameleer3.server.core.agent.AgentInfo; +import com.cameleer3.server.core.agent.AgentRegistryService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ResponseStatusException; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +/** + * SSE endpoint for real-time event streaming to agents. + *

+ * Agents connect to {@code GET /api/v1/agents/{id}/events} to receive + * config-update, deep-trace, and replay commands as Server-Sent Events. + */ +@RestController +@RequestMapping("/api/v1/agents") +@Tag(name = "Agent SSE", description = "Server-Sent Events endpoint for agent communication") +public class AgentSseController { + + private static final Logger log = LoggerFactory.getLogger(AgentSseController.class); + + private final SseConnectionManager connectionManager; + private final AgentRegistryService registryService; + + public AgentSseController(SseConnectionManager connectionManager, + AgentRegistryService registryService) { + this.connectionManager = connectionManager; + this.registryService = registryService; + } + + @GetMapping(value = "/{id}/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @Operation(summary = "Open SSE event stream", + description = "Opens a Server-Sent Events stream for the specified agent. " + + "Commands (config-update, deep-trace, replay) are pushed as events. " + + "Ping keepalive comments sent every 15 seconds.") + @ApiResponse(responseCode = "200", description = "SSE stream opened") + @ApiResponse(responseCode = "404", description = "Agent not registered") + public SseEmitter events( + @PathVariable String id, + @Parameter(description = "Last received event ID (no replay, acknowledged only)") + @RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) { + + AgentInfo agent = registryService.findById(id); + if (agent == null) { + throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not found: " + id); + } + + if (lastEventId != null) { + log.debug("Agent {} reconnecting with Last-Event-ID: {} (no replay)", id, lastEventId); + } + + return connectionManager.connect(id); + } +}