docs(03): research agent registry and SSE push domain
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
514
.planning/phases/03-agent-registry-sse-push/03-RESEARCH.md
Normal file
514
.planning/phases/03-agent-registry-sse-push/03-RESEARCH.md
Normal file
@@ -0,0 +1,514 @@
|
||||
# Phase 3: Agent Registry + SSE Push - Research
|
||||
|
||||
**Researched:** 2026-03-11
|
||||
**Domain:** Agent lifecycle management, Server-Sent Events (SSE), in-memory registry
|
||||
**Confidence:** HIGH
|
||||
|
||||
## Summary
|
||||
|
||||
This phase adds agent registration, heartbeat-based lifecycle management (LIVE/STALE/DEAD), and real-time command push via SSE to the Cameleer3 server. The technology stack is straightforward: Spring MVC's `SseEmitter` for server-push, `ConcurrentHashMap` for the in-memory agent registry, and `@Scheduled` for periodic lifecycle checks (same pattern already used by `ClickHouseFlushScheduler`).
|
||||
|
||||
The main architectural challenge is managing per-agent SSE connections reliably -- handling disconnections, timeouts, and cleanup without leaking threads or emitters. The command delivery model (PENDING with 60s expiry, acknowledgement) adds a second concurrent data structure to manage alongside the registry itself.
|
||||
|
||||
**Primary recommendation:** Use Spring MVC `SseEmitter` (already on classpath via `spring-boot-starter-web`). No new dependencies required. Follow the established core-module-plain-class / app-module-Spring-bean pattern. Agent registry service in core, SSE connection manager and controllers in app.
|
||||
|
||||
<user_constraints>
|
||||
|
||||
## User Constraints (from CONTEXT.md)
|
||||
|
||||
### Locked Decisions
|
||||
- Heartbeat interval: 30 seconds
|
||||
- STALE threshold: 90 seconds (3 missed heartbeats)
|
||||
- DEAD threshold: 5 minutes after going STALE
|
||||
- DEAD agents kept indefinitely (no auto-purge)
|
||||
- Agent list endpoint returns all agents (LIVE, STALE, DEAD) with `?status=` filter parameter
|
||||
- Generic command endpoint: `POST /api/v1/agents/{id}/commands` with `{"type": "config-update|deep-trace|replay", "payload": {...}}`
|
||||
- Three targeting levels: single agent, group, all live agents
|
||||
- Agent self-declares group name at registration
|
||||
- Command delivery tracking: PENDING until acknowledged, 60s expiry
|
||||
- Agent provides its own persistent ID at registration
|
||||
- Rich registration payload: agent ID, name, group, version, list of route IDs, capabilities
|
||||
- Re-registration with same ID resumes existing identity
|
||||
- Heartbeat is just a ping -- no metadata update
|
||||
- Registration response includes: SSE endpoint URL, current server config, server public key placeholder
|
||||
- Last-Event-ID supported but does NOT replay missed events
|
||||
- Pending commands NOT auto-pushed on reconnect
|
||||
- SSE ping/keepalive interval: 15 seconds
|
||||
|
||||
### Claude's Discretion
|
||||
- In-memory vs persistent storage for agent registry (in-memory is fine for v1)
|
||||
- Command acknowledgement mechanism details (heartbeat piggyback vs dedicated endpoint)
|
||||
- SSE implementation approach (Spring SseEmitter, WebFlux, or other)
|
||||
- Thread scheduling for lifecycle state transitions
|
||||
|
||||
### Deferred Ideas (OUT OF SCOPE)
|
||||
- Server-side agent tags/labels for more flexible grouping
|
||||
- Auto-push pending commands on reconnect
|
||||
- Last-Event-ID replay of missed events
|
||||
- Agent capability negotiation
|
||||
|
||||
</user_constraints>
|
||||
|
||||
<phase_requirements>
|
||||
|
||||
## Phase Requirements
|
||||
|
||||
| ID | Description | Research Support |
|
||||
|----|-------------|-----------------|
|
||||
| AGNT-01 (#13) | Agent registers via POST /api/v1/agents/register with bootstrap token, receives JWT + server public key | Registration controller + service; JWT/security enforcement deferred to Phase 4 but flow must work end-to-end |
|
||||
| AGNT-02 (#14) | Server maintains agent registry with LIVE/STALE/DEAD lifecycle based on heartbeat timing | In-memory ConcurrentHashMap registry + @Scheduled lifecycle monitor |
|
||||
| AGNT-03 (#15) | Agent sends heartbeat via POST /api/v1/agents/{id}/heartbeat every 30s | Heartbeat endpoint updates lastHeartbeat timestamp, transitions STALE back to LIVE |
|
||||
| AGNT-04 (#16) | Server pushes config-update events to agents via SSE (Ed25519 signature deferred to Phase 4) | SseEmitter per-agent connection + command push infrastructure |
|
||||
| AGNT-05 (#17) | Server pushes deep-trace commands to agents via SSE for specific correlationIds | Same SSE command push mechanism with deep-trace type |
|
||||
| AGNT-06 (#18) | Server pushes replay commands to agents via SSE (signed replay tokens deferred to Phase 4) | Same SSE command push mechanism with replay type |
|
||||
| AGNT-07 (#19) | SSE connection includes ping keepalive and supports Last-Event-ID reconnection | 15s ping via @Scheduled, Last-Event-ID header read on connect |
|
||||
|
||||
</phase_requirements>
|
||||
|
||||
## Standard Stack
|
||||
|
||||
### Core
|
||||
| Library | Version | Purpose | Why Standard |
|
||||
|---------|---------|---------|--------------|
|
||||
| Spring MVC SseEmitter | 6.2.x (via Boot 3.4.3) | Server-Sent Events | Already on classpath, servlet-based (matches existing stack), no WebFlux needed |
|
||||
| ConcurrentHashMap | JDK 17 | Agent registry storage | Thread-safe, O(1) lookup by agent ID, no external dependency |
|
||||
| Spring @Scheduled | 6.2.x (via Boot 3.4.3) | Lifecycle monitor + SSE keepalive | Already enabled in application, proven pattern in ClickHouseFlushScheduler |
|
||||
|
||||
### Supporting
|
||||
| Library | Version | Purpose | When to Use |
|
||||
|---------|---------|---------|-------------|
|
||||
| Jackson ObjectMapper | 2.17.3 (managed) | Command serialization/deserialization | Already configured with JavaTimeModule, used throughout codebase |
|
||||
|
||||
### Alternatives Considered
|
||||
| Instead of | Could Use | Tradeoff |
|
||||
|------------|-----------|----------|
|
||||
| SseEmitter (MVC) | WebFlux Flux<ServerSentEvent> | Would require adding spring-boot-starter-webflux and mixing reactive/servlet stacks -- unnecessary complexity for this use case |
|
||||
| ConcurrentHashMap | Redis/ClickHouse persistence | Over-engineering for v1; in-memory is sufficient since agent state is ephemeral and rebuilt on reconnect |
|
||||
| @Scheduled | ScheduledExecutorService | @Scheduled already works, already enabled; raw executor only needed for complex scheduling |
|
||||
|
||||
**Installation:**
|
||||
No new dependencies required. Everything is already on the classpath.
|
||||
|
||||
## Architecture Patterns
|
||||
|
||||
### Recommended Project Structure
|
||||
```
|
||||
cameleer3-server-core/src/main/java/com/cameleer3/server/core/
|
||||
├── agent/
|
||||
│ ├── AgentInfo.java # Record: id, name, group, version, routeIds, capabilities, state, timestamps
|
||||
│ ├── AgentState.java # Enum: LIVE, STALE, DEAD
|
||||
│ ├── AgentRegistryService.java # Plain class: register, heartbeat, findById, findAll, lifecycle transitions
|
||||
│ ├── AgentCommand.java # Record: id, type, payload, targetAgentId, createdAt, status
|
||||
│ └── CommandStatus.java # Enum: PENDING, DELIVERED, ACKNOWLEDGED, EXPIRED
|
||||
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/
|
||||
├── config/
|
||||
│ ├── AgentRegistryConfig.java # @ConfigurationProperties(prefix = "agent-registry")
|
||||
│ └── AgentRegistryBeanConfig.java # @Configuration: wires AgentRegistryService as bean
|
||||
├── controller/
|
||||
│ ├── AgentRegistrationController.java # POST /register, POST /{id}/heartbeat, GET /agents
|
||||
│ ├── AgentCommandController.java # POST /{id}/commands, POST /groups/{group}/commands, POST /commands
|
||||
│ └── AgentSseController.java # GET /{id}/events (SSE stream)
|
||||
├── agent/
|
||||
│ ├── SseConnectionManager.java # @Component: ConcurrentHashMap<agentId, SseEmitter>, ping scheduler
|
||||
│ └── AgentLifecycleMonitor.java # @Component: @Scheduled lifecycle check (like ClickHouseFlushScheduler)
|
||||
```
|
||||
|
||||
### Pattern 1: Core Module Plain Class + App Module Bean Config
|
||||
**What:** Domain logic in core module as plain Java classes; Spring wiring in app module via @Configuration
|
||||
**When to use:** Always -- this is the established codebase pattern
|
||||
**Example:**
|
||||
```java
|
||||
// Core module: plain class, no Spring annotations
|
||||
public class AgentRegistryService {
|
||||
private final ConcurrentHashMap<String, AgentInfo> agents = new ConcurrentHashMap<>();
|
||||
|
||||
public AgentInfo register(String id, String name, String group, String version,
|
||||
List<String> routeIds, Map<String, Object> capabilities) {
|
||||
AgentInfo existing = agents.get(id);
|
||||
if (existing != null) {
|
||||
// Re-registration: update metadata, transition back to LIVE
|
||||
AgentInfo updated = existing.withState(AgentState.LIVE)
|
||||
.withLastHeartbeat(Instant.now());
|
||||
agents.put(id, updated);
|
||||
return updated;
|
||||
}
|
||||
AgentInfo agent = new AgentInfo(id, name, group, version, routeIds,
|
||||
capabilities, AgentState.LIVE, Instant.now(), Instant.now());
|
||||
agents.put(id, agent);
|
||||
return agent;
|
||||
}
|
||||
|
||||
public boolean heartbeat(String id) {
|
||||
return agents.computeIfPresent(id, (k, v) ->
|
||||
v.withState(AgentState.LIVE).withLastHeartbeat(Instant.now())) != null;
|
||||
}
|
||||
}
|
||||
|
||||
// App module: bean config
|
||||
@Configuration
|
||||
public class AgentRegistryBeanConfig {
|
||||
@Bean
|
||||
public AgentRegistryService agentRegistryService() {
|
||||
return new AgentRegistryService();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Pattern 2: SseEmitter Per-Agent Connection
|
||||
**What:** Each agent has one SseEmitter stored in ConcurrentHashMap, managed by a dedicated component
|
||||
**When to use:** For all SSE connections to agents
|
||||
**Example:**
|
||||
```java
|
||||
@Component
|
||||
public class SseConnectionManager {
|
||||
private final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
|
||||
|
||||
public SseEmitter connect(String agentId) {
|
||||
// Use Long.MAX_VALUE timeout -- we manage keepalive ourselves
|
||||
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
|
||||
|
||||
emitter.onCompletion(() -> emitters.remove(agentId));
|
||||
emitter.onTimeout(() -> emitters.remove(agentId));
|
||||
emitter.onError(e -> emitters.remove(agentId));
|
||||
|
||||
// Replace any existing emitter (agent reconnect)
|
||||
SseEmitter old = emitters.put(agentId, emitter);
|
||||
if (old != null) {
|
||||
old.complete(); // Close stale connection
|
||||
}
|
||||
|
||||
return emitter;
|
||||
}
|
||||
|
||||
public boolean sendEvent(String agentId, String eventId, String eventType, Object data) {
|
||||
SseEmitter emitter = emitters.get(agentId);
|
||||
if (emitter == null) return false;
|
||||
try {
|
||||
emitter.send(SseEmitter.event()
|
||||
.id(eventId)
|
||||
.name(eventType)
|
||||
.data(data, MediaType.APPLICATION_JSON));
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
emitters.remove(agentId);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public void sendPingToAll() {
|
||||
emitters.forEach((id, emitter) -> {
|
||||
try {
|
||||
emitter.send(SseEmitter.event().comment("ping"));
|
||||
} catch (IOException e) {
|
||||
emitters.remove(id);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Pattern 3: Lifecycle Monitor via @Scheduled
|
||||
**What:** Periodic task checks all agents' lastHeartbeat timestamps and transitions states
|
||||
**When to use:** For LIVE->STALE and STALE->DEAD transitions
|
||||
**Example:**
|
||||
```java
|
||||
@Component
|
||||
public class AgentLifecycleMonitor {
|
||||
private final AgentRegistryService registry;
|
||||
private final AgentRegistryConfig config;
|
||||
|
||||
@Scheduled(fixedDelayString = "${agent-registry.lifecycle-check-interval-ms:10000}")
|
||||
public void checkLifecycle() {
|
||||
Instant now = Instant.now();
|
||||
for (AgentInfo agent : registry.findAll()) {
|
||||
Duration sinceHeartbeat = Duration.between(agent.lastHeartbeat(), now);
|
||||
if (agent.state() == AgentState.LIVE
|
||||
&& sinceHeartbeat.toMillis() > config.getStaleThresholdMs()) {
|
||||
registry.transitionState(agent.id(), AgentState.STALE);
|
||||
} else if (agent.state() == AgentState.STALE
|
||||
&& sinceHeartbeat.toMillis() > config.getStaleThresholdMs() + config.getDeadThresholdMs()) {
|
||||
registry.transitionState(agent.id(), AgentState.DEAD);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Anti-Patterns to Avoid
|
||||
- **Mixing WebFlux and MVC:** Do not add spring-boot-starter-webflux. The project uses servlet-based MVC. Adding WebFlux creates classpath conflicts and ambiguity.
|
||||
- **Sharing SseEmitter across threads without protection:** Always use ConcurrentHashMap and handle IOException on every send. A failed send means the client disconnected.
|
||||
- **Storing SseEmitter in the core module:** SseEmitter is a Spring MVC class. Keep it in the app module only. The core module should define interfaces for "push event to agent" that the app module implements.
|
||||
- **Not setting SseEmitter timeout:** Default timeout is server-dependent (often 30s). Use `Long.MAX_VALUE` and manage keepalive yourself.
|
||||
|
||||
## Don't Hand-Roll
|
||||
|
||||
| Problem | Don't Build | Use Instead | Why |
|
||||
|---------|-------------|-------------|-----|
|
||||
| SSE protocol | Custom HTTP streaming | Spring SseEmitter | Handles text/event-stream format, event IDs, retry fields automatically |
|
||||
| Thread-safe map | Synchronized HashMap | ConcurrentHashMap | Lock-free reads, segmented writes, battle-tested |
|
||||
| Periodic scheduling | Manual Thread/Timer | @Scheduled + @EnableScheduling | Already configured, integrates with Spring lifecycle |
|
||||
| JSON serialization | Manual string building | ObjectMapper (already configured) | Handles Instant, unknown fields, all edge cases |
|
||||
| Async request timeout | Manual thread management | spring.mvc.async.request-timeout config | Spring handles Tomcat async timeout correctly |
|
||||
|
||||
**Key insight:** SSE in Spring MVC is a well-supported, first-class feature. The SseEmitter API handles the wire protocol; your job is managing the lifecycle of emitters (create, store, cleanup, send).
|
||||
|
||||
## Common Pitfalls
|
||||
|
||||
### Pitfall 1: SseEmitter Default Timeout Kills Long-Lived Connections
|
||||
**What goes wrong:** Emitter times out after 30s (Tomcat default), client gets disconnected
|
||||
**Why it happens:** Not setting explicit timeout on SseEmitter constructor
|
||||
**How to avoid:** Always use `new SseEmitter(Long.MAX_VALUE)`. Also set `spring.mvc.async.request-timeout=-1` in application.yml to disable the MVC-level async timeout
|
||||
**Warning signs:** Clients disconnecting every 30 seconds, reconnection storms
|
||||
|
||||
### Pitfall 2: IOException on Send Not Handled
|
||||
**What goes wrong:** Client disconnects but server keeps trying to send, gets IOException, does not clean up
|
||||
**Why it happens:** Not wrapping every `emitter.send()` in try-catch
|
||||
**How to avoid:** Every send must catch IOException, remove the emitter from the map, and log at debug level (not error -- disconnects are normal)
|
||||
**Warning signs:** Growing emitter map, increasing IOExceptions in logs
|
||||
|
||||
### Pitfall 3: Race Condition on Agent Reconnect
|
||||
**What goes wrong:** Agent disconnects and reconnects rapidly; old emitter and new emitter both exist briefly
|
||||
**Why it happens:** `onCompletion` callback of old emitter fires after new emitter is stored, removing the new one
|
||||
**How to avoid:** Use `ConcurrentHashMap.put()` which returns the old value. Only remove in callbacks if the emitter in the map is still the same instance (reference equality check)
|
||||
**Warning signs:** Agent SSE stream stops working after reconnect
|
||||
|
||||
### Pitfall 4: Tomcat Thread Exhaustion with SSE
|
||||
**What goes wrong:** Each SSE connection holds a Tomcat thread (with default sync mode)
|
||||
**Why it happens:** MVC SseEmitter uses Servlet 3.1 async support but the async processing still occupies a thread from the pool during the initial request
|
||||
**How to avoid:** Spring Boot's default Tomcat thread pool (200 threads) is sufficient for dozens to low hundreds of agents. If scaling beyond that, configure `server.tomcat.threads.max`. For thousands of agents, consider WebFlux (but that is a v2 concern)
|
||||
**Warning signs:** Thread pool exhaustion, connection refused errors
|
||||
|
||||
### Pitfall 5: Command Expiry Not Cleaned Up
|
||||
**What goes wrong:** Expired PENDING commands accumulate in memory
|
||||
**Why it happens:** No scheduled task to clean them up
|
||||
**How to avoid:** The lifecycle monitor (or a separate @Scheduled task) should also sweep expired commands every check cycle
|
||||
**Warning signs:** Memory growth over time, stale commands in API responses
|
||||
|
||||
### Pitfall 6: SSE Endpoint Blocked by ProtocolVersionInterceptor
|
||||
**What goes wrong:** SSE GET request rejected because it lacks `X-Cameleer-Protocol-Version` header
|
||||
**Why it happens:** WebConfig already registers the interceptor for `/api/v1/agents/**` which includes the SSE endpoint
|
||||
**How to avoid:** Either add the protocol header requirement to agents (recommended -- agents already send it for POST requests) or exclude the SSE endpoint path from the interceptor
|
||||
**Warning signs:** 400 errors on SSE connect attempts
|
||||
|
||||
## Code Examples
|
||||
|
||||
### Registration Controller
|
||||
```java
|
||||
@RestController
|
||||
@RequestMapping("/api/v1/agents")
|
||||
@Tag(name = "Agent Management", description = "Agent registration and lifecycle endpoints")
|
||||
public class AgentRegistrationController {
|
||||
|
||||
private final AgentRegistryService registryService;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
@PostMapping("/register")
|
||||
@Operation(summary = "Register an agent")
|
||||
public ResponseEntity<String> register(@RequestBody String body) throws JsonProcessingException {
|
||||
// Parse registration payload
|
||||
JsonNode node = objectMapper.readTree(body);
|
||||
String agentId = node.get("agentId").asText();
|
||||
String name = node.get("name").asText();
|
||||
String group = node.has("group") ? node.get("group").asText() : "default";
|
||||
// ... extract other fields
|
||||
|
||||
AgentInfo agent = registryService.register(agentId, name, group, version, routeIds, capabilities);
|
||||
|
||||
// Build registration response
|
||||
Map<String, Object> response = new LinkedHashMap<>();
|
||||
response.put("agentId", agent.id());
|
||||
response.put("sseEndpoint", "/api/v1/agents/" + agentId + "/events");
|
||||
response.put("heartbeatIntervalMs", 30000);
|
||||
response.put("serverPublicKey", null); // Phase 4
|
||||
// JWT token placeholder -- Phase 4 will add real JWT
|
||||
response.put("token", "placeholder-" + agentId);
|
||||
|
||||
return ResponseEntity.ok(objectMapper.writeValueAsString(response));
|
||||
}
|
||||
|
||||
@PostMapping("/{id}/heartbeat")
|
||||
@Operation(summary = "Agent heartbeat ping")
|
||||
public ResponseEntity<Void> heartbeat(@PathVariable String id) {
|
||||
boolean found = registryService.heartbeat(id);
|
||||
if (!found) {
|
||||
return ResponseEntity.notFound().build();
|
||||
}
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
|
||||
@GetMapping
|
||||
@Operation(summary = "List all agents")
|
||||
public ResponseEntity<String> listAgents(
|
||||
@RequestParam(required = false) String status) throws JsonProcessingException {
|
||||
List<AgentInfo> agents;
|
||||
if (status != null) {
|
||||
AgentState stateFilter = AgentState.valueOf(status.toUpperCase());
|
||||
agents = registryService.findByState(stateFilter);
|
||||
} else {
|
||||
agents = registryService.findAll();
|
||||
}
|
||||
return ResponseEntity.ok(objectMapper.writeValueAsString(agents));
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### SSE Controller
|
||||
```java
|
||||
@RestController
|
||||
@RequestMapping("/api/v1/agents")
|
||||
@Tag(name = "Agent SSE", description = "Server-Sent Events for agent communication")
|
||||
public class AgentSseController {
|
||||
|
||||
private final SseConnectionManager connectionManager;
|
||||
private final AgentRegistryService registryService;
|
||||
|
||||
@GetMapping(value = "/{id}/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
|
||||
@Operation(summary = "SSE event stream for an agent")
|
||||
public SseEmitter subscribe(
|
||||
@PathVariable String id,
|
||||
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
|
||||
|
||||
AgentInfo agent = registryService.findById(id);
|
||||
if (agent == null) {
|
||||
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not registered");
|
||||
}
|
||||
|
||||
// Last-Event-ID acknowledged but no replay (per decision)
|
||||
if (lastEventId != null) {
|
||||
log.debug("Agent {} reconnected with Last-Event-ID: {} (no replay)", id, lastEventId);
|
||||
}
|
||||
|
||||
return connectionManager.connect(id);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Command Acknowledgement Endpoint (Recommended: Dedicated Endpoint)
|
||||
```java
|
||||
@PostMapping("/{id}/commands/{commandId}/ack")
|
||||
@Operation(summary = "Acknowledge command receipt")
|
||||
public ResponseEntity<Void> acknowledgeCommand(
|
||||
@PathVariable String id,
|
||||
@PathVariable String commandId) {
|
||||
boolean acknowledged = registryService.acknowledgeCommand(id, commandId);
|
||||
if (!acknowledged) {
|
||||
return ResponseEntity.notFound().build();
|
||||
}
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
```
|
||||
|
||||
### Application Configuration Addition
|
||||
```yaml
|
||||
# application.yml additions
|
||||
agent-registry:
|
||||
heartbeat-interval-ms: 30000
|
||||
stale-threshold-ms: 90000
|
||||
dead-threshold-ms: 300000 # 5 minutes after last heartbeat (not after going stale)
|
||||
ping-interval-ms: 15000
|
||||
command-expiry-ms: 60000
|
||||
lifecycle-check-interval-ms: 10000
|
||||
|
||||
spring:
|
||||
mvc:
|
||||
async:
|
||||
request-timeout: -1 # Disable async timeout for SSE
|
||||
```
|
||||
|
||||
## State of the Art
|
||||
|
||||
| Old Approach | Current Approach | When Changed | Impact |
|
||||
|--------------|------------------|--------------|--------|
|
||||
| Polling for agent status | SSE push for commands | Always SSE for server-push | Immediate delivery, lower latency |
|
||||
| WebFlux for SSE | MVC SseEmitter | Spring 4.2+ | MVC SseEmitter is sufficient for moderate scale; no need for reactive stack |
|
||||
| Custom HTTP streaming | SseEmitter.event() builder | Spring 4.2+ | Wire protocol handled automatically |
|
||||
|
||||
**Deprecated/outdated:**
|
||||
- `ResponseBodyEmitter` directly for SSE: Use `SseEmitter` which extends it with SSE-specific features
|
||||
- `DeferredResult` for server push: Only for single-value responses, not streams
|
||||
|
||||
## Open Questions
|
||||
|
||||
1. **Command acknowledgement: dedicated endpoint vs heartbeat piggyback**
|
||||
- What we know: Dedicated endpoint is simpler, more explicit, and decoupled from heartbeat
|
||||
- What's unclear: Whether agent-side implementation prefers one approach
|
||||
- Recommendation: Use dedicated `POST /{id}/commands/{commandId}/ack` endpoint. Cleaner separation of concerns, easier to test, and does not complicate the heartbeat path
|
||||
|
||||
2. **Dead threshold calculation: from last heartbeat or from STALE transition?**
|
||||
- What we know: CONTEXT.md says "5 minutes after going STALE"
|
||||
- What's unclear: Whether to track staleTransitionTime separately or compute from lastHeartbeat
|
||||
- Recommendation: Track `staleTransitionTime` in AgentInfo. Dead threshold = 5 minutes after `staleTransitionTime`. This matches the stated requirement precisely
|
||||
|
||||
3. **Async timeout vs SseEmitter timeout**
|
||||
- What we know: Both `spring.mvc.async.request-timeout` and `new SseEmitter(timeout)` affect SSE lifetime
|
||||
- What's unclear: Interaction between the two
|
||||
- Recommendation: Set `SseEmitter(Long.MAX_VALUE)` AND `spring.mvc.async.request-timeout=-1`. Belt and suspenders -- both disabled ensures no premature timeout
|
||||
|
||||
## Validation Architecture
|
||||
|
||||
### Test Framework
|
||||
| Property | Value |
|
||||
|----------|-------|
|
||||
| Framework | JUnit 5 + Spring Boot Test (via spring-boot-starter-test) |
|
||||
| Config file | pom.xml (Surefire + Failsafe configured) |
|
||||
| Quick run command | `mvn test -pl cameleer3-server-core -Dtest=AgentRegistryServiceTest` |
|
||||
| Full suite command | `mvn clean verify` |
|
||||
|
||||
### Phase Requirements to Test Map
|
||||
| Req ID | Behavior | Test Type | Automated Command | File Exists? |
|
||||
|--------|----------|-----------|-------------------|-------------|
|
||||
| AGNT-01 | Agent registers and gets response | integration | `mvn test -pl cameleer3-server-app -Dtest=AgentRegistrationControllerIT#registerAgent*` | No - Wave 0 |
|
||||
| AGNT-02 | Lifecycle transitions LIVE/STALE/DEAD | unit | `mvn test -pl cameleer3-server-core -Dtest=AgentRegistryServiceTest#lifecycle*` | No - Wave 0 |
|
||||
| AGNT-03 | Heartbeat updates timestamp, returns 200/404 | integration | `mvn test -pl cameleer3-server-app -Dtest=AgentRegistrationControllerIT#heartbeat*` | No - Wave 0 |
|
||||
| AGNT-04 | Config-update pushed via SSE | integration | `mvn test -pl cameleer3-server-app -Dtest=AgentSseControllerIT#configUpdate*` | No - Wave 0 |
|
||||
| AGNT-05 | Deep-trace command pushed via SSE | integration | `mvn test -pl cameleer3-server-app -Dtest=AgentSseControllerIT#deepTrace*` | No - Wave 0 |
|
||||
| AGNT-06 | Replay command pushed via SSE | integration | `mvn test -pl cameleer3-server-app -Dtest=AgentSseControllerIT#replay*` | No - Wave 0 |
|
||||
| AGNT-07 | SSE ping keepalive + Last-Event-ID | integration | `mvn test -pl cameleer3-server-app -Dtest=AgentSseControllerIT#pingKeepalive*` | No - Wave 0 |
|
||||
|
||||
### Sampling Rate
|
||||
- **Per task commit:** `mvn test -pl cameleer3-server-core,cameleer3-server-app -Dtest="Agent*"` (agent-related tests only)
|
||||
- **Per wave merge:** `mvn clean verify`
|
||||
- **Phase gate:** Full suite green before /gsd:verify-work
|
||||
|
||||
### Wave 0 Gaps
|
||||
- [ ] `cameleer3-server-core/.../agent/AgentRegistryServiceTest.java` -- covers AGNT-02, AGNT-03 (unit tests for registry logic)
|
||||
- [ ] `cameleer3-server-app/.../controller/AgentRegistrationControllerIT.java` -- covers AGNT-01, AGNT-03
|
||||
- [ ] `cameleer3-server-app/.../controller/AgentSseControllerIT.java` -- covers AGNT-04, AGNT-05, AGNT-06, AGNT-07
|
||||
- [ ] `cameleer3-server-app/.../controller/AgentCommandControllerIT.java` -- covers command targeting (single, group, all)
|
||||
- [ ] No new framework install needed -- JUnit 5 + Spring Boot Test + Awaitility already in place
|
||||
|
||||
### SSE Test Strategy
|
||||
Testing SSE with `TestRestTemplate` requires special handling. Use Spring's `WebClient` from WebFlux test support or raw `HttpURLConnection` to read the SSE stream. Alternatively, test at the service layer (SseConnectionManager) with direct emitter interaction. The integration test should:
|
||||
1. Register agent via POST
|
||||
2. Open SSE connection (separate thread)
|
||||
3. Send command via POST
|
||||
4. Assert SSE stream received the event
|
||||
5. Verify with Awaitility for async assertions
|
||||
|
||||
## Sources
|
||||
|
||||
### Primary (HIGH confidence)
|
||||
- [SseEmitter Javadoc (Spring Framework 7.0.5)](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.html) - Full API reference
|
||||
- [Asynchronous Requests :: Spring Framework](https://docs.spring.io/spring-framework/reference/web/webmvc/mvc-ann-async.html) - Official async request handling docs
|
||||
- [Task Execution and Scheduling :: Spring Boot](https://docs.spring.io/spring-boot/reference/features/task-execution-and-scheduling.html) - Official scheduling docs
|
||||
- Existing codebase: ClickHouseFlushScheduler, IngestionService, IngestionBeanConfig, WebConfig patterns
|
||||
|
||||
### Secondary (MEDIUM confidence)
|
||||
- [Spring Boot SSE SseEmitter tutorial](https://nitinkc.github.io/microservices/sse-springboot/) - Complete guide with patterns
|
||||
- [SseEmitter timeout issue #4021](https://github.com/spring-projects/spring-boot/issues/4021) - Timeout handling gotchas
|
||||
- [SseEmitter response closed #19652](https://github.com/spring-projects/spring-framework/issues/19652) - Thread safety discussion
|
||||
|
||||
### Tertiary (LOW confidence)
|
||||
- Various Medium articles on SSE patterns - used for cross-referencing community patterns only
|
||||
|
||||
## Metadata
|
||||
|
||||
**Confidence breakdown:**
|
||||
- Standard stack: HIGH - SseEmitter is built into Spring MVC, already on classpath, well-documented API
|
||||
- Architecture: HIGH - follows established codebase patterns (core plain class, app bean config, @Scheduled)
|
||||
- Pitfalls: HIGH - well-known issues documented in Spring GitHub issues and multiple sources
|
||||
- SSE test strategy: MEDIUM - SSE testing with TestRestTemplate is non-trivial, may need adaptation
|
||||
|
||||
**Research date:** 2026-03-11
|
||||
**Valid until:** 2026-04-11 (stable stack, no fast-moving dependencies)
|
||||
Reference in New Issue
Block a user