diff --git a/.planning/phases/03-agent-registry-sse-push/03-RESEARCH.md b/.planning/phases/03-agent-registry-sse-push/03-RESEARCH.md new file mode 100644 index 00000000..1ae2ee1f --- /dev/null +++ b/.planning/phases/03-agent-registry-sse-push/03-RESEARCH.md @@ -0,0 +1,514 @@ +# Phase 3: Agent Registry + SSE Push - Research + +**Researched:** 2026-03-11 +**Domain:** Agent lifecycle management, Server-Sent Events (SSE), in-memory registry +**Confidence:** HIGH + +## Summary + +This phase adds agent registration, heartbeat-based lifecycle management (LIVE/STALE/DEAD), and real-time command push via SSE to the Cameleer3 server. The technology stack is straightforward: Spring MVC's `SseEmitter` for server-push, `ConcurrentHashMap` for the in-memory agent registry, and `@Scheduled` for periodic lifecycle checks (same pattern already used by `ClickHouseFlushScheduler`). + +The main architectural challenge is managing per-agent SSE connections reliably -- handling disconnections, timeouts, and cleanup without leaking threads or emitters. The command delivery model (PENDING with 60s expiry, acknowledgement) adds a second concurrent data structure to manage alongside the registry itself. + +**Primary recommendation:** Use Spring MVC `SseEmitter` (already on classpath via `spring-boot-starter-web`). No new dependencies required. Follow the established core-module-plain-class / app-module-Spring-bean pattern. Agent registry service in core, SSE connection manager and controllers in app. + + + +## User Constraints (from CONTEXT.md) + +### Locked Decisions +- Heartbeat interval: 30 seconds +- STALE threshold: 90 seconds (3 missed heartbeats) +- DEAD threshold: 5 minutes after going STALE +- DEAD agents kept indefinitely (no auto-purge) +- Agent list endpoint returns all agents (LIVE, STALE, DEAD) with `?status=` filter parameter +- Generic command endpoint: `POST /api/v1/agents/{id}/commands` with `{"type": "config-update|deep-trace|replay", "payload": {...}}` +- Three targeting levels: single agent, group, all live agents +- Agent self-declares group name at registration +- Command delivery tracking: PENDING until acknowledged, 60s expiry +- Agent provides its own persistent ID at registration +- Rich registration payload: agent ID, name, group, version, list of route IDs, capabilities +- Re-registration with same ID resumes existing identity +- Heartbeat is just a ping -- no metadata update +- Registration response includes: SSE endpoint URL, current server config, server public key placeholder +- Last-Event-ID supported but does NOT replay missed events +- Pending commands NOT auto-pushed on reconnect +- SSE ping/keepalive interval: 15 seconds + +### Claude's Discretion +- In-memory vs persistent storage for agent registry (in-memory is fine for v1) +- Command acknowledgement mechanism details (heartbeat piggyback vs dedicated endpoint) +- SSE implementation approach (Spring SseEmitter, WebFlux, or other) +- Thread scheduling for lifecycle state transitions + +### Deferred Ideas (OUT OF SCOPE) +- Server-side agent tags/labels for more flexible grouping +- Auto-push pending commands on reconnect +- Last-Event-ID replay of missed events +- Agent capability negotiation + + + + + +## Phase Requirements + +| ID | Description | Research Support | +|----|-------------|-----------------| +| AGNT-01 (#13) | Agent registers via POST /api/v1/agents/register with bootstrap token, receives JWT + server public key | Registration controller + service; JWT/security enforcement deferred to Phase 4 but flow must work end-to-end | +| AGNT-02 (#14) | Server maintains agent registry with LIVE/STALE/DEAD lifecycle based on heartbeat timing | In-memory ConcurrentHashMap registry + @Scheduled lifecycle monitor | +| AGNT-03 (#15) | Agent sends heartbeat via POST /api/v1/agents/{id}/heartbeat every 30s | Heartbeat endpoint updates lastHeartbeat timestamp, transitions STALE back to LIVE | +| AGNT-04 (#16) | Server pushes config-update events to agents via SSE (Ed25519 signature deferred to Phase 4) | SseEmitter per-agent connection + command push infrastructure | +| AGNT-05 (#17) | Server pushes deep-trace commands to agents via SSE for specific correlationIds | Same SSE command push mechanism with deep-trace type | +| AGNT-06 (#18) | Server pushes replay commands to agents via SSE (signed replay tokens deferred to Phase 4) | Same SSE command push mechanism with replay type | +| AGNT-07 (#19) | SSE connection includes ping keepalive and supports Last-Event-ID reconnection | 15s ping via @Scheduled, Last-Event-ID header read on connect | + + + +## Standard Stack + +### Core +| Library | Version | Purpose | Why Standard | +|---------|---------|---------|--------------| +| Spring MVC SseEmitter | 6.2.x (via Boot 3.4.3) | Server-Sent Events | Already on classpath, servlet-based (matches existing stack), no WebFlux needed | +| ConcurrentHashMap | JDK 17 | Agent registry storage | Thread-safe, O(1) lookup by agent ID, no external dependency | +| Spring @Scheduled | 6.2.x (via Boot 3.4.3) | Lifecycle monitor + SSE keepalive | Already enabled in application, proven pattern in ClickHouseFlushScheduler | + +### Supporting +| Library | Version | Purpose | When to Use | +|---------|---------|---------|-------------| +| Jackson ObjectMapper | 2.17.3 (managed) | Command serialization/deserialization | Already configured with JavaTimeModule, used throughout codebase | + +### Alternatives Considered +| Instead of | Could Use | Tradeoff | +|------------|-----------|----------| +| SseEmitter (MVC) | WebFlux Flux | Would require adding spring-boot-starter-webflux and mixing reactive/servlet stacks -- unnecessary complexity for this use case | +| ConcurrentHashMap | Redis/ClickHouse persistence | Over-engineering for v1; in-memory is sufficient since agent state is ephemeral and rebuilt on reconnect | +| @Scheduled | ScheduledExecutorService | @Scheduled already works, already enabled; raw executor only needed for complex scheduling | + +**Installation:** +No new dependencies required. Everything is already on the classpath. + +## Architecture Patterns + +### Recommended Project Structure +``` +cameleer3-server-core/src/main/java/com/cameleer3/server/core/ +├── agent/ +│ ├── AgentInfo.java # Record: id, name, group, version, routeIds, capabilities, state, timestamps +│ ├── AgentState.java # Enum: LIVE, STALE, DEAD +│ ├── AgentRegistryService.java # Plain class: register, heartbeat, findById, findAll, lifecycle transitions +│ ├── AgentCommand.java # Record: id, type, payload, targetAgentId, createdAt, status +│ └── CommandStatus.java # Enum: PENDING, DELIVERED, ACKNOWLEDGED, EXPIRED + +cameleer3-server-app/src/main/java/com/cameleer3/server/app/ +├── config/ +│ ├── AgentRegistryConfig.java # @ConfigurationProperties(prefix = "agent-registry") +│ └── AgentRegistryBeanConfig.java # @Configuration: wires AgentRegistryService as bean +├── controller/ +│ ├── AgentRegistrationController.java # POST /register, POST /{id}/heartbeat, GET /agents +│ ├── AgentCommandController.java # POST /{id}/commands, POST /groups/{group}/commands, POST /commands +│ └── AgentSseController.java # GET /{id}/events (SSE stream) +├── agent/ +│ ├── SseConnectionManager.java # @Component: ConcurrentHashMap, ping scheduler +│ └── AgentLifecycleMonitor.java # @Component: @Scheduled lifecycle check (like ClickHouseFlushScheduler) +``` + +### Pattern 1: Core Module Plain Class + App Module Bean Config +**What:** Domain logic in core module as plain Java classes; Spring wiring in app module via @Configuration +**When to use:** Always -- this is the established codebase pattern +**Example:** +```java +// Core module: plain class, no Spring annotations +public class AgentRegistryService { + private final ConcurrentHashMap agents = new ConcurrentHashMap<>(); + + public AgentInfo register(String id, String name, String group, String version, + List routeIds, Map capabilities) { + AgentInfo existing = agents.get(id); + if (existing != null) { + // Re-registration: update metadata, transition back to LIVE + AgentInfo updated = existing.withState(AgentState.LIVE) + .withLastHeartbeat(Instant.now()); + agents.put(id, updated); + return updated; + } + AgentInfo agent = new AgentInfo(id, name, group, version, routeIds, + capabilities, AgentState.LIVE, Instant.now(), Instant.now()); + agents.put(id, agent); + return agent; + } + + public boolean heartbeat(String id) { + return agents.computeIfPresent(id, (k, v) -> + v.withState(AgentState.LIVE).withLastHeartbeat(Instant.now())) != null; + } +} + +// App module: bean config +@Configuration +public class AgentRegistryBeanConfig { + @Bean + public AgentRegistryService agentRegistryService() { + return new AgentRegistryService(); + } +} +``` + +### Pattern 2: SseEmitter Per-Agent Connection +**What:** Each agent has one SseEmitter stored in ConcurrentHashMap, managed by a dedicated component +**When to use:** For all SSE connections to agents +**Example:** +```java +@Component +public class SseConnectionManager { + private final ConcurrentHashMap emitters = new ConcurrentHashMap<>(); + + public SseEmitter connect(String agentId) { + // Use Long.MAX_VALUE timeout -- we manage keepalive ourselves + SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); + + emitter.onCompletion(() -> emitters.remove(agentId)); + emitter.onTimeout(() -> emitters.remove(agentId)); + emitter.onError(e -> emitters.remove(agentId)); + + // Replace any existing emitter (agent reconnect) + SseEmitter old = emitters.put(agentId, emitter); + if (old != null) { + old.complete(); // Close stale connection + } + + return emitter; + } + + public boolean sendEvent(String agentId, String eventId, String eventType, Object data) { + SseEmitter emitter = emitters.get(agentId); + if (emitter == null) return false; + try { + emitter.send(SseEmitter.event() + .id(eventId) + .name(eventType) + .data(data, MediaType.APPLICATION_JSON)); + return true; + } catch (IOException e) { + emitters.remove(agentId); + return false; + } + } + + public void sendPingToAll() { + emitters.forEach((id, emitter) -> { + try { + emitter.send(SseEmitter.event().comment("ping")); + } catch (IOException e) { + emitters.remove(id); + } + }); + } +} +``` + +### Pattern 3: Lifecycle Monitor via @Scheduled +**What:** Periodic task checks all agents' lastHeartbeat timestamps and transitions states +**When to use:** For LIVE->STALE and STALE->DEAD transitions +**Example:** +```java +@Component +public class AgentLifecycleMonitor { + private final AgentRegistryService registry; + private final AgentRegistryConfig config; + + @Scheduled(fixedDelayString = "${agent-registry.lifecycle-check-interval-ms:10000}") + public void checkLifecycle() { + Instant now = Instant.now(); + for (AgentInfo agent : registry.findAll()) { + Duration sinceHeartbeat = Duration.between(agent.lastHeartbeat(), now); + if (agent.state() == AgentState.LIVE + && sinceHeartbeat.toMillis() > config.getStaleThresholdMs()) { + registry.transitionState(agent.id(), AgentState.STALE); + } else if (agent.state() == AgentState.STALE + && sinceHeartbeat.toMillis() > config.getStaleThresholdMs() + config.getDeadThresholdMs()) { + registry.transitionState(agent.id(), AgentState.DEAD); + } + } + } +} +``` + +### Anti-Patterns to Avoid +- **Mixing WebFlux and MVC:** Do not add spring-boot-starter-webflux. The project uses servlet-based MVC. Adding WebFlux creates classpath conflicts and ambiguity. +- **Sharing SseEmitter across threads without protection:** Always use ConcurrentHashMap and handle IOException on every send. A failed send means the client disconnected. +- **Storing SseEmitter in the core module:** SseEmitter is a Spring MVC class. Keep it in the app module only. The core module should define interfaces for "push event to agent" that the app module implements. +- **Not setting SseEmitter timeout:** Default timeout is server-dependent (often 30s). Use `Long.MAX_VALUE` and manage keepalive yourself. + +## Don't Hand-Roll + +| Problem | Don't Build | Use Instead | Why | +|---------|-------------|-------------|-----| +| SSE protocol | Custom HTTP streaming | Spring SseEmitter | Handles text/event-stream format, event IDs, retry fields automatically | +| Thread-safe map | Synchronized HashMap | ConcurrentHashMap | Lock-free reads, segmented writes, battle-tested | +| Periodic scheduling | Manual Thread/Timer | @Scheduled + @EnableScheduling | Already configured, integrates with Spring lifecycle | +| JSON serialization | Manual string building | ObjectMapper (already configured) | Handles Instant, unknown fields, all edge cases | +| Async request timeout | Manual thread management | spring.mvc.async.request-timeout config | Spring handles Tomcat async timeout correctly | + +**Key insight:** SSE in Spring MVC is a well-supported, first-class feature. The SseEmitter API handles the wire protocol; your job is managing the lifecycle of emitters (create, store, cleanup, send). + +## Common Pitfalls + +### Pitfall 1: SseEmitter Default Timeout Kills Long-Lived Connections +**What goes wrong:** Emitter times out after 30s (Tomcat default), client gets disconnected +**Why it happens:** Not setting explicit timeout on SseEmitter constructor +**How to avoid:** Always use `new SseEmitter(Long.MAX_VALUE)`. Also set `spring.mvc.async.request-timeout=-1` in application.yml to disable the MVC-level async timeout +**Warning signs:** Clients disconnecting every 30 seconds, reconnection storms + +### Pitfall 2: IOException on Send Not Handled +**What goes wrong:** Client disconnects but server keeps trying to send, gets IOException, does not clean up +**Why it happens:** Not wrapping every `emitter.send()` in try-catch +**How to avoid:** Every send must catch IOException, remove the emitter from the map, and log at debug level (not error -- disconnects are normal) +**Warning signs:** Growing emitter map, increasing IOExceptions in logs + +### Pitfall 3: Race Condition on Agent Reconnect +**What goes wrong:** Agent disconnects and reconnects rapidly; old emitter and new emitter both exist briefly +**Why it happens:** `onCompletion` callback of old emitter fires after new emitter is stored, removing the new one +**How to avoid:** Use `ConcurrentHashMap.put()` which returns the old value. Only remove in callbacks if the emitter in the map is still the same instance (reference equality check) +**Warning signs:** Agent SSE stream stops working after reconnect + +### Pitfall 4: Tomcat Thread Exhaustion with SSE +**What goes wrong:** Each SSE connection holds a Tomcat thread (with default sync mode) +**Why it happens:** MVC SseEmitter uses Servlet 3.1 async support but the async processing still occupies a thread from the pool during the initial request +**How to avoid:** Spring Boot's default Tomcat thread pool (200 threads) is sufficient for dozens to low hundreds of agents. If scaling beyond that, configure `server.tomcat.threads.max`. For thousands of agents, consider WebFlux (but that is a v2 concern) +**Warning signs:** Thread pool exhaustion, connection refused errors + +### Pitfall 5: Command Expiry Not Cleaned Up +**What goes wrong:** Expired PENDING commands accumulate in memory +**Why it happens:** No scheduled task to clean them up +**How to avoid:** The lifecycle monitor (or a separate @Scheduled task) should also sweep expired commands every check cycle +**Warning signs:** Memory growth over time, stale commands in API responses + +### Pitfall 6: SSE Endpoint Blocked by ProtocolVersionInterceptor +**What goes wrong:** SSE GET request rejected because it lacks `X-Cameleer-Protocol-Version` header +**Why it happens:** WebConfig already registers the interceptor for `/api/v1/agents/**` which includes the SSE endpoint +**How to avoid:** Either add the protocol header requirement to agents (recommended -- agents already send it for POST requests) or exclude the SSE endpoint path from the interceptor +**Warning signs:** 400 errors on SSE connect attempts + +## Code Examples + +### Registration Controller +```java +@RestController +@RequestMapping("/api/v1/agents") +@Tag(name = "Agent Management", description = "Agent registration and lifecycle endpoints") +public class AgentRegistrationController { + + private final AgentRegistryService registryService; + private final ObjectMapper objectMapper; + + @PostMapping("/register") + @Operation(summary = "Register an agent") + public ResponseEntity register(@RequestBody String body) throws JsonProcessingException { + // Parse registration payload + JsonNode node = objectMapper.readTree(body); + String agentId = node.get("agentId").asText(); + String name = node.get("name").asText(); + String group = node.has("group") ? node.get("group").asText() : "default"; + // ... extract other fields + + AgentInfo agent = registryService.register(agentId, name, group, version, routeIds, capabilities); + + // Build registration response + Map response = new LinkedHashMap<>(); + response.put("agentId", agent.id()); + response.put("sseEndpoint", "/api/v1/agents/" + agentId + "/events"); + response.put("heartbeatIntervalMs", 30000); + response.put("serverPublicKey", null); // Phase 4 + // JWT token placeholder -- Phase 4 will add real JWT + response.put("token", "placeholder-" + agentId); + + return ResponseEntity.ok(objectMapper.writeValueAsString(response)); + } + + @PostMapping("/{id}/heartbeat") + @Operation(summary = "Agent heartbeat ping") + public ResponseEntity heartbeat(@PathVariable String id) { + boolean found = registryService.heartbeat(id); + if (!found) { + return ResponseEntity.notFound().build(); + } + return ResponseEntity.ok().build(); + } + + @GetMapping + @Operation(summary = "List all agents") + public ResponseEntity listAgents( + @RequestParam(required = false) String status) throws JsonProcessingException { + List agents; + if (status != null) { + AgentState stateFilter = AgentState.valueOf(status.toUpperCase()); + agents = registryService.findByState(stateFilter); + } else { + agents = registryService.findAll(); + } + return ResponseEntity.ok(objectMapper.writeValueAsString(agents)); + } +} +``` + +### SSE Controller +```java +@RestController +@RequestMapping("/api/v1/agents") +@Tag(name = "Agent SSE", description = "Server-Sent Events for agent communication") +public class AgentSseController { + + private final SseConnectionManager connectionManager; + private final AgentRegistryService registryService; + + @GetMapping(value = "/{id}/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @Operation(summary = "SSE event stream for an agent") + public SseEmitter subscribe( + @PathVariable String id, + @RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) { + + AgentInfo agent = registryService.findById(id); + if (agent == null) { + throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not registered"); + } + + // Last-Event-ID acknowledged but no replay (per decision) + if (lastEventId != null) { + log.debug("Agent {} reconnected with Last-Event-ID: {} (no replay)", id, lastEventId); + } + + return connectionManager.connect(id); + } +} +``` + +### Command Acknowledgement Endpoint (Recommended: Dedicated Endpoint) +```java +@PostMapping("/{id}/commands/{commandId}/ack") +@Operation(summary = "Acknowledge command receipt") +public ResponseEntity acknowledgeCommand( + @PathVariable String id, + @PathVariable String commandId) { + boolean acknowledged = registryService.acknowledgeCommand(id, commandId); + if (!acknowledged) { + return ResponseEntity.notFound().build(); + } + return ResponseEntity.ok().build(); +} +``` + +### Application Configuration Addition +```yaml +# application.yml additions +agent-registry: + heartbeat-interval-ms: 30000 + stale-threshold-ms: 90000 + dead-threshold-ms: 300000 # 5 minutes after last heartbeat (not after going stale) + ping-interval-ms: 15000 + command-expiry-ms: 60000 + lifecycle-check-interval-ms: 10000 + +spring: + mvc: + async: + request-timeout: -1 # Disable async timeout for SSE +``` + +## State of the Art + +| Old Approach | Current Approach | When Changed | Impact | +|--------------|------------------|--------------|--------| +| Polling for agent status | SSE push for commands | Always SSE for server-push | Immediate delivery, lower latency | +| WebFlux for SSE | MVC SseEmitter | Spring 4.2+ | MVC SseEmitter is sufficient for moderate scale; no need for reactive stack | +| Custom HTTP streaming | SseEmitter.event() builder | Spring 4.2+ | Wire protocol handled automatically | + +**Deprecated/outdated:** +- `ResponseBodyEmitter` directly for SSE: Use `SseEmitter` which extends it with SSE-specific features +- `DeferredResult` for server push: Only for single-value responses, not streams + +## Open Questions + +1. **Command acknowledgement: dedicated endpoint vs heartbeat piggyback** + - What we know: Dedicated endpoint is simpler, more explicit, and decoupled from heartbeat + - What's unclear: Whether agent-side implementation prefers one approach + - Recommendation: Use dedicated `POST /{id}/commands/{commandId}/ack` endpoint. Cleaner separation of concerns, easier to test, and does not complicate the heartbeat path + +2. **Dead threshold calculation: from last heartbeat or from STALE transition?** + - What we know: CONTEXT.md says "5 minutes after going STALE" + - What's unclear: Whether to track staleTransitionTime separately or compute from lastHeartbeat + - Recommendation: Track `staleTransitionTime` in AgentInfo. Dead threshold = 5 minutes after `staleTransitionTime`. This matches the stated requirement precisely + +3. **Async timeout vs SseEmitter timeout** + - What we know: Both `spring.mvc.async.request-timeout` and `new SseEmitter(timeout)` affect SSE lifetime + - What's unclear: Interaction between the two + - Recommendation: Set `SseEmitter(Long.MAX_VALUE)` AND `spring.mvc.async.request-timeout=-1`. Belt and suspenders -- both disabled ensures no premature timeout + +## Validation Architecture + +### Test Framework +| Property | Value | +|----------|-------| +| Framework | JUnit 5 + Spring Boot Test (via spring-boot-starter-test) | +| Config file | pom.xml (Surefire + Failsafe configured) | +| Quick run command | `mvn test -pl cameleer3-server-core -Dtest=AgentRegistryServiceTest` | +| Full suite command | `mvn clean verify` | + +### Phase Requirements to Test Map +| Req ID | Behavior | Test Type | Automated Command | File Exists? | +|--------|----------|-----------|-------------------|-------------| +| AGNT-01 | Agent registers and gets response | integration | `mvn test -pl cameleer3-server-app -Dtest=AgentRegistrationControllerIT#registerAgent*` | No - Wave 0 | +| AGNT-02 | Lifecycle transitions LIVE/STALE/DEAD | unit | `mvn test -pl cameleer3-server-core -Dtest=AgentRegistryServiceTest#lifecycle*` | No - Wave 0 | +| AGNT-03 | Heartbeat updates timestamp, returns 200/404 | integration | `mvn test -pl cameleer3-server-app -Dtest=AgentRegistrationControllerIT#heartbeat*` | No - Wave 0 | +| AGNT-04 | Config-update pushed via SSE | integration | `mvn test -pl cameleer3-server-app -Dtest=AgentSseControllerIT#configUpdate*` | No - Wave 0 | +| AGNT-05 | Deep-trace command pushed via SSE | integration | `mvn test -pl cameleer3-server-app -Dtest=AgentSseControllerIT#deepTrace*` | No - Wave 0 | +| AGNT-06 | Replay command pushed via SSE | integration | `mvn test -pl cameleer3-server-app -Dtest=AgentSseControllerIT#replay*` | No - Wave 0 | +| AGNT-07 | SSE ping keepalive + Last-Event-ID | integration | `mvn test -pl cameleer3-server-app -Dtest=AgentSseControllerIT#pingKeepalive*` | No - Wave 0 | + +### Sampling Rate +- **Per task commit:** `mvn test -pl cameleer3-server-core,cameleer3-server-app -Dtest="Agent*"` (agent-related tests only) +- **Per wave merge:** `mvn clean verify` +- **Phase gate:** Full suite green before /gsd:verify-work + +### Wave 0 Gaps +- [ ] `cameleer3-server-core/.../agent/AgentRegistryServiceTest.java` -- covers AGNT-02, AGNT-03 (unit tests for registry logic) +- [ ] `cameleer3-server-app/.../controller/AgentRegistrationControllerIT.java` -- covers AGNT-01, AGNT-03 +- [ ] `cameleer3-server-app/.../controller/AgentSseControllerIT.java` -- covers AGNT-04, AGNT-05, AGNT-06, AGNT-07 +- [ ] `cameleer3-server-app/.../controller/AgentCommandControllerIT.java` -- covers command targeting (single, group, all) +- [ ] No new framework install needed -- JUnit 5 + Spring Boot Test + Awaitility already in place + +### SSE Test Strategy +Testing SSE with `TestRestTemplate` requires special handling. Use Spring's `WebClient` from WebFlux test support or raw `HttpURLConnection` to read the SSE stream. Alternatively, test at the service layer (SseConnectionManager) with direct emitter interaction. The integration test should: +1. Register agent via POST +2. Open SSE connection (separate thread) +3. Send command via POST +4. Assert SSE stream received the event +5. Verify with Awaitility for async assertions + +## Sources + +### Primary (HIGH confidence) +- [SseEmitter Javadoc (Spring Framework 7.0.5)](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.html) - Full API reference +- [Asynchronous Requests :: Spring Framework](https://docs.spring.io/spring-framework/reference/web/webmvc/mvc-ann-async.html) - Official async request handling docs +- [Task Execution and Scheduling :: Spring Boot](https://docs.spring.io/spring-boot/reference/features/task-execution-and-scheduling.html) - Official scheduling docs +- Existing codebase: ClickHouseFlushScheduler, IngestionService, IngestionBeanConfig, WebConfig patterns + +### Secondary (MEDIUM confidence) +- [Spring Boot SSE SseEmitter tutorial](https://nitinkc.github.io/microservices/sse-springboot/) - Complete guide with patterns +- [SseEmitter timeout issue #4021](https://github.com/spring-projects/spring-boot/issues/4021) - Timeout handling gotchas +- [SseEmitter response closed #19652](https://github.com/spring-projects/spring-framework/issues/19652) - Thread safety discussion + +### Tertiary (LOW confidence) +- Various Medium articles on SSE patterns - used for cross-referencing community patterns only + +## Metadata + +**Confidence breakdown:** +- Standard stack: HIGH - SseEmitter is built into Spring MVC, already on classpath, well-documented API +- Architecture: HIGH - follows established codebase patterns (core plain class, app bean config, @Scheduled) +- Pitfalls: HIGH - well-known issues documented in Spring GitHub issues and multiple sources +- SSE test strategy: MEDIUM - SSE testing with TestRestTemplate is non-trivial, may need adaptation + +**Research date:** 2026-03-11 +**Valid until:** 2026-04-11 (stable stack, no fast-moving dependencies)