feat(03-02): SSE connection manager, SSE endpoint, and command controller
- SseConnectionManager with per-agent SseEmitter, ping keepalive, event delivery
- AgentSseController GET /{id}/events SSE endpoint with Last-Event-ID support
- AgentCommandController with single/group/broadcast command targeting + ack
- WebConfig excludes SSE events path from protocol version interceptor
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,157 @@
|
||||
package com.cameleer3.server.app.agent;
|
||||
|
||||
import com.cameleer3.server.app.config.AgentRegistryConfig;
|
||||
import com.cameleer3.server.core.agent.AgentCommand;
|
||||
import com.cameleer3.server.core.agent.AgentEventListener;
|
||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Manages per-agent SSE connections and delivers commands via Server-Sent Events.
|
||||
* <p>
|
||||
* Implements {@link AgentEventListener} so the core {@link AgentRegistryService}
|
||||
* can notify this component when a command is ready for delivery, without depending
|
||||
* on Spring or SSE classes.
|
||||
*/
|
||||
@Component
|
||||
public class SseConnectionManager implements AgentEventListener {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SseConnectionManager.class);
|
||||
|
||||
private final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
|
||||
private final AgentRegistryService registryService;
|
||||
private final AgentRegistryConfig config;
|
||||
|
||||
public SseConnectionManager(AgentRegistryService registryService, AgentRegistryConfig config) {
|
||||
this.registryService = registryService;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
void init() {
|
||||
registryService.setEventListener(this);
|
||||
log.info("SseConnectionManager registered as AgentEventListener");
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an SSE connection for the given agent.
|
||||
* Replaces any existing connection (completing the old emitter).
|
||||
*
|
||||
* @param agentId the agent identifier
|
||||
* @return the new SseEmitter
|
||||
*/
|
||||
public SseEmitter connect(String agentId) {
|
||||
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
|
||||
|
||||
SseEmitter old = emitters.put(agentId, emitter);
|
||||
if (old != null) {
|
||||
log.debug("Replacing existing SSE connection for agent {}", agentId);
|
||||
old.complete();
|
||||
}
|
||||
|
||||
// Remove from map only if the emitter is still the current one (reference equality)
|
||||
emitter.onCompletion(() -> {
|
||||
emitters.remove(agentId, emitter);
|
||||
log.debug("SSE connection completed for agent {}", agentId);
|
||||
});
|
||||
emitter.onTimeout(() -> {
|
||||
emitters.remove(agentId, emitter);
|
||||
log.debug("SSE connection timed out for agent {}", agentId);
|
||||
});
|
||||
emitter.onError(ex -> {
|
||||
emitters.remove(agentId, emitter);
|
||||
log.debug("SSE connection error for agent {}: {}", agentId, ex.getMessage());
|
||||
});
|
||||
|
||||
log.info("SSE connection established for agent {}", agentId);
|
||||
return emitter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an event to a specific agent's SSE stream.
|
||||
*
|
||||
* @param agentId the target agent
|
||||
* @param eventId the event ID (for Last-Event-ID reconnection)
|
||||
* @param eventType the SSE event name
|
||||
* @param data the event data (serialized as JSON)
|
||||
* @return true if the event was sent successfully, false if the agent is not connected or send failed
|
||||
*/
|
||||
public boolean sendEvent(String agentId, String eventId, String eventType, Object data) {
|
||||
SseEmitter emitter = emitters.get(agentId);
|
||||
if (emitter == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
emitter.send(SseEmitter.event()
|
||||
.id(eventId)
|
||||
.name(eventType)
|
||||
.data(data, MediaType.APPLICATION_JSON));
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
log.debug("Failed to send SSE event to agent {}: {}", agentId, e.getMessage());
|
||||
emitters.remove(agentId, emitter);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a ping keepalive comment to all connected agents.
|
||||
*/
|
||||
public void sendPingToAll() {
|
||||
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
|
||||
String agentId = entry.getKey();
|
||||
SseEmitter emitter = entry.getValue();
|
||||
try {
|
||||
emitter.send(SseEmitter.event().comment("ping"));
|
||||
} catch (IOException e) {
|
||||
log.debug("Ping failed for agent {}, removing connection", agentId);
|
||||
emitters.remove(agentId, emitter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an agent has an active SSE connection.
|
||||
*/
|
||||
public boolean isConnected(String agentId) {
|
||||
return emitters.containsKey(agentId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the registry when a command is ready for an agent.
|
||||
* Attempts to deliver via SSE; if successful, marks as DELIVERED.
|
||||
* If the agent is not connected, the command stays PENDING.
|
||||
*/
|
||||
@Override
|
||||
public void onCommandReady(String agentId, AgentCommand command) {
|
||||
String eventType = command.type().name().toLowerCase().replace('_', '-');
|
||||
boolean sent = sendEvent(agentId, command.id(), eventType, command.payload());
|
||||
if (sent) {
|
||||
registryService.markDelivered(agentId, command.id());
|
||||
log.debug("Command {} ({}) delivered to agent {} via SSE", command.id(), eventType, agentId);
|
||||
} else {
|
||||
log.debug("Agent {} not connected, command {} stays PENDING", agentId, command.id());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Scheduled ping keepalive to all connected agents.
|
||||
*/
|
||||
@Scheduled(fixedDelayString = "${agent-registry.ping-interval-ms:15000}")
|
||||
void pingAll() {
|
||||
if (!emitters.isEmpty()) {
|
||||
sendPingToAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -28,7 +28,8 @@ public class WebConfig implements WebMvcConfigurer {
|
||||
"/api/v1/health",
|
||||
"/api/v1/api-docs/**",
|
||||
"/api/v1/swagger-ui/**",
|
||||
"/api/v1/swagger-ui.html"
|
||||
"/api/v1/swagger-ui.html",
|
||||
"/api/v1/agents/*/events"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,181 @@
|
||||
package com.cameleer3.server.app.controller;
|
||||
|
||||
import com.cameleer3.server.app.agent.SseConnectionManager;
|
||||
import com.cameleer3.server.core.agent.AgentCommand;
|
||||
import com.cameleer3.server.core.agent.AgentInfo;
|
||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||
import com.cameleer3.server.core.agent.AgentState;
|
||||
import com.cameleer3.server.core.agent.CommandType;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.server.ResponseStatusException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Command push endpoints for sending commands to agents via SSE.
|
||||
* <p>
|
||||
* Supports three targeting levels:
|
||||
* <ul>
|
||||
* <li>Single agent: POST /api/v1/agents/{id}/commands</li>
|
||||
* <li>Group: POST /api/v1/agents/groups/{group}/commands</li>
|
||||
* <li>Broadcast: POST /api/v1/agents/commands</li>
|
||||
* </ul>
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/api/v1/agents")
|
||||
@Tag(name = "Agent Commands", description = "Command push endpoints for agent communication")
|
||||
public class AgentCommandController {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AgentCommandController.class);
|
||||
|
||||
private final AgentRegistryService registryService;
|
||||
private final SseConnectionManager connectionManager;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public AgentCommandController(AgentRegistryService registryService,
|
||||
SseConnectionManager connectionManager,
|
||||
ObjectMapper objectMapper) {
|
||||
this.registryService = registryService;
|
||||
this.connectionManager = connectionManager;
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
@PostMapping("/{id}/commands")
|
||||
@Operation(summary = "Send command to a specific agent",
|
||||
description = "Sends a config-update, deep-trace, or replay command to the specified agent")
|
||||
@ApiResponse(responseCode = "202", description = "Command accepted")
|
||||
@ApiResponse(responseCode = "400", description = "Invalid command payload")
|
||||
@ApiResponse(responseCode = "404", description = "Agent not registered")
|
||||
public ResponseEntity<String> sendCommand(@PathVariable String id,
|
||||
@RequestBody String body) throws JsonProcessingException {
|
||||
AgentInfo agent = registryService.findById(id);
|
||||
if (agent == null) {
|
||||
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not found: " + id);
|
||||
}
|
||||
|
||||
CommandRequest request = parseCommandRequest(body);
|
||||
AgentCommand command = registryService.addCommand(id, request.type, request.payloadJson);
|
||||
|
||||
String status = connectionManager.isConnected(id) ? "DELIVERED" : "PENDING";
|
||||
|
||||
Map<String, Object> response = new LinkedHashMap<>();
|
||||
response.put("commandId", command.id());
|
||||
response.put("status", status);
|
||||
|
||||
return ResponseEntity.status(HttpStatus.ACCEPTED)
|
||||
.body(objectMapper.writeValueAsString(response));
|
||||
}
|
||||
|
||||
@PostMapping("/groups/{group}/commands")
|
||||
@Operation(summary = "Send command to all agents in a group",
|
||||
description = "Sends a command to all LIVE agents in the specified group")
|
||||
@ApiResponse(responseCode = "202", description = "Commands accepted")
|
||||
@ApiResponse(responseCode = "400", description = "Invalid command payload")
|
||||
public ResponseEntity<String> sendGroupCommand(@PathVariable String group,
|
||||
@RequestBody String body) throws JsonProcessingException {
|
||||
CommandRequest request = parseCommandRequest(body);
|
||||
|
||||
List<AgentInfo> agents = registryService.findAll().stream()
|
||||
.filter(a -> a.state() == AgentState.LIVE)
|
||||
.filter(a -> group.equals(a.group()))
|
||||
.toList();
|
||||
|
||||
List<String> commandIds = new ArrayList<>();
|
||||
for (AgentInfo agent : agents) {
|
||||
AgentCommand command = registryService.addCommand(agent.id(), request.type, request.payloadJson);
|
||||
commandIds.add(command.id());
|
||||
}
|
||||
|
||||
Map<String, Object> response = new LinkedHashMap<>();
|
||||
response.put("commandIds", commandIds);
|
||||
response.put("targetCount", agents.size());
|
||||
|
||||
return ResponseEntity.status(HttpStatus.ACCEPTED)
|
||||
.body(objectMapper.writeValueAsString(response));
|
||||
}
|
||||
|
||||
@PostMapping("/commands")
|
||||
@Operation(summary = "Broadcast command to all live agents",
|
||||
description = "Sends a command to all agents currently in LIVE state")
|
||||
@ApiResponse(responseCode = "202", description = "Commands accepted")
|
||||
@ApiResponse(responseCode = "400", description = "Invalid command payload")
|
||||
public ResponseEntity<String> broadcastCommand(@RequestBody String body) throws JsonProcessingException {
|
||||
CommandRequest request = parseCommandRequest(body);
|
||||
|
||||
List<AgentInfo> liveAgents = registryService.findByState(AgentState.LIVE);
|
||||
|
||||
List<String> commandIds = new ArrayList<>();
|
||||
for (AgentInfo agent : liveAgents) {
|
||||
AgentCommand command = registryService.addCommand(agent.id(), request.type, request.payloadJson);
|
||||
commandIds.add(command.id());
|
||||
}
|
||||
|
||||
Map<String, Object> response = new LinkedHashMap<>();
|
||||
response.put("commandIds", commandIds);
|
||||
response.put("targetCount", liveAgents.size());
|
||||
|
||||
return ResponseEntity.status(HttpStatus.ACCEPTED)
|
||||
.body(objectMapper.writeValueAsString(response));
|
||||
}
|
||||
|
||||
@PostMapping("/{id}/commands/{commandId}/ack")
|
||||
@Operation(summary = "Acknowledge command receipt",
|
||||
description = "Agent acknowledges that it has received and processed a command")
|
||||
@ApiResponse(responseCode = "200", description = "Command acknowledged")
|
||||
@ApiResponse(responseCode = "404", description = "Command not found")
|
||||
public ResponseEntity<Void> acknowledgeCommand(@PathVariable String id,
|
||||
@PathVariable String commandId) {
|
||||
boolean acknowledged = registryService.acknowledgeCommand(id, commandId);
|
||||
if (!acknowledged) {
|
||||
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Command not found: " + commandId);
|
||||
}
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
|
||||
private CommandRequest parseCommandRequest(String body) throws JsonProcessingException {
|
||||
JsonNode node = objectMapper.readTree(body);
|
||||
|
||||
if (!node.has("type")) {
|
||||
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Missing 'type' field");
|
||||
}
|
||||
|
||||
String typeStr = node.get("type").asText();
|
||||
CommandType type = mapCommandType(typeStr);
|
||||
|
||||
String payloadJson = "{}";
|
||||
if (node.has("payload")) {
|
||||
payloadJson = node.get("payload").toString();
|
||||
}
|
||||
|
||||
return new CommandRequest(type, payloadJson);
|
||||
}
|
||||
|
||||
private CommandType mapCommandType(String typeStr) {
|
||||
return switch (typeStr) {
|
||||
case "config-update" -> CommandType.CONFIG_UPDATE;
|
||||
case "deep-trace" -> CommandType.DEEP_TRACE;
|
||||
case "replay" -> CommandType.REPLAY;
|
||||
default -> throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
|
||||
"Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay");
|
||||
};
|
||||
}
|
||||
|
||||
private record CommandRequest(CommandType type, String payloadJson) {}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package com.cameleer3.server.app.controller;
|
||||
|
||||
import com.cameleer3.server.app.agent.SseConnectionManager;
|
||||
import com.cameleer3.server.core.agent.AgentInfo;
|
||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestHeader;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.server.ResponseStatusException;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
/**
|
||||
* SSE endpoint for real-time event streaming to agents.
|
||||
* <p>
|
||||
* Agents connect to {@code GET /api/v1/agents/{id}/events} to receive
|
||||
* config-update, deep-trace, and replay commands as Server-Sent Events.
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/api/v1/agents")
|
||||
@Tag(name = "Agent SSE", description = "Server-Sent Events endpoint for agent communication")
|
||||
public class AgentSseController {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AgentSseController.class);
|
||||
|
||||
private final SseConnectionManager connectionManager;
|
||||
private final AgentRegistryService registryService;
|
||||
|
||||
public AgentSseController(SseConnectionManager connectionManager,
|
||||
AgentRegistryService registryService) {
|
||||
this.connectionManager = connectionManager;
|
||||
this.registryService = registryService;
|
||||
}
|
||||
|
||||
@GetMapping(value = "/{id}/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
|
||||
@Operation(summary = "Open SSE event stream",
|
||||
description = "Opens a Server-Sent Events stream for the specified agent. "
|
||||
+ "Commands (config-update, deep-trace, replay) are pushed as events. "
|
||||
+ "Ping keepalive comments sent every 15 seconds.")
|
||||
@ApiResponse(responseCode = "200", description = "SSE stream opened")
|
||||
@ApiResponse(responseCode = "404", description = "Agent not registered")
|
||||
public SseEmitter events(
|
||||
@PathVariable String id,
|
||||
@Parameter(description = "Last received event ID (no replay, acknowledged only)")
|
||||
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
|
||||
|
||||
AgentInfo agent = registryService.findById(id);
|
||||
if (agent == null) {
|
||||
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not found: " + id);
|
||||
}
|
||||
|
||||
if (lastEventId != null) {
|
||||
log.debug("Agent {} reconnecting with Last-Event-ID: {} (no replay)", id, lastEventId);
|
||||
}
|
||||
|
||||
return connectionManager.connect(id);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user