docs(03): create phase plan for agent registry + SSE push
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -61,11 +61,11 @@ Plans:
|
||||
1. An agent can register via POST with a bootstrap token and receive a JWT (security enforcement deferred to Phase 4, but the registration flow and token issuance work end-to-end)
|
||||
2. Server correctly transitions agents through LIVE/STALE/DEAD states based on heartbeat timing, and the agent list endpoint reflects current states
|
||||
3. Server pushes config-update, deep-trace, and replay events to a specific agent's SSE stream, with ping keepalive and Last-Event-ID reconnection support
|
||||
**Plans**: TBD
|
||||
**Plans:** 2 plans
|
||||
|
||||
Plans:
|
||||
- [ ] 03-01: Agent registration, heartbeat lifecycle, and registry endpoints
|
||||
- [ ] 03-02: SSE connection management and command push (config-update, deep-trace, replay, ping, reconnection)
|
||||
- [ ] 03-01-PLAN.md -- Agent domain types, registry service, registration/heartbeat/list endpoints, lifecycle monitor
|
||||
- [ ] 03-02-PLAN.md -- SSE connection management, command push (config-update, deep-trace, replay), ping keepalive, acknowledgement, integration tests
|
||||
|
||||
### Phase 4: Security
|
||||
**Goal**: All server communication is authenticated and integrity-protected, with JWT for API access and Ed25519 signatures for pushed configuration
|
||||
|
||||
267
.planning/phases/03-agent-registry-sse-push/03-01-PLAN.md
Normal file
267
.planning/phases/03-agent-registry-sse-push/03-01-PLAN.md
Normal file
@@ -0,0 +1,267 @@
|
||||
---
|
||||
phase: 03-agent-registry-sse-push
|
||||
plan: 01
|
||||
type: execute
|
||||
wave: 1
|
||||
depends_on: []
|
||||
files_modified:
|
||||
- cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentInfo.java
|
||||
- cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentState.java
|
||||
- cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentCommand.java
|
||||
- cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandStatus.java
|
||||
- cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java
|
||||
- cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java
|
||||
- cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentEventListener.java
|
||||
- cameleer3-server-core/src/test/java/com/cameleer3/server/core/agent/AgentRegistryServiceTest.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryConfig.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryBeanConfig.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/agent/AgentLifecycleMonitor.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/Cameleer3ServerApplication.java
|
||||
- cameleer3-server-app/src/main/resources/application.yml
|
||||
- cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java
|
||||
autonomous: true
|
||||
requirements:
|
||||
- AGNT-01
|
||||
- AGNT-02
|
||||
- AGNT-03
|
||||
|
||||
must_haves:
|
||||
truths:
|
||||
- "Agent can register via POST /api/v1/agents/register with agentId, name, group, version, routeIds, capabilities and receive a response containing SSE endpoint URL and server config"
|
||||
- "Re-registration with the same agentId resumes existing identity (transitions back to LIVE, updates metadata)"
|
||||
- "Agent can send heartbeat via POST /api/v1/agents/{id}/heartbeat and receive 200 (or 404 if unknown)"
|
||||
- "Server transitions agents LIVE->STALE after 90s without heartbeat, STALE->DEAD 5 minutes after staleTransitionTime"
|
||||
- "Agent list endpoint GET /api/v1/agents returns all agents, filterable by ?status=LIVE|STALE|DEAD"
|
||||
artifacts:
|
||||
- path: "cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java"
|
||||
provides: "Agent registration, heartbeat, lifecycle transitions, find/filter"
|
||||
- path: "cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentInfo.java"
|
||||
provides: "Agent record with id, name, group, version, routeIds, capabilities, state, timestamps"
|
||||
- path: "cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java"
|
||||
provides: "POST /register, POST /{id}/heartbeat, GET /agents endpoints"
|
||||
- path: "cameleer3-server-app/src/main/java/com/cameleer3/server/app/agent/AgentLifecycleMonitor.java"
|
||||
provides: "@Scheduled lifecycle transitions LIVE->STALE->DEAD"
|
||||
key_links:
|
||||
- from: "AgentRegistrationController"
|
||||
to: "AgentRegistryService"
|
||||
via: "constructor injection"
|
||||
pattern: "registryService\\.register|registryService\\.heartbeat"
|
||||
- from: "AgentLifecycleMonitor"
|
||||
to: "AgentRegistryService"
|
||||
via: "@Scheduled lifecycle check"
|
||||
pattern: "registry\\.transitionState"
|
||||
- from: "AgentRegistryBeanConfig"
|
||||
to: "AgentRegistryService"
|
||||
via: "@Bean factory method"
|
||||
pattern: "new AgentRegistryService"
|
||||
---
|
||||
|
||||
<objective>
|
||||
Build the agent registry domain model, registration/heartbeat REST endpoints, and lifecycle monitoring.
|
||||
|
||||
Purpose: Agents need to register with the server, send periodic heartbeats, and the server must track their LIVE/STALE/DEAD states. This is the foundation that the SSE push layer (Plan 02) builds on.
|
||||
Output: Core domain types (AgentInfo, AgentState, AgentCommand, CommandStatus, CommandType), AgentRegistryService in core module, registration/heartbeat/list controllers in app module, lifecycle monitor, unit + integration tests.
|
||||
</objective>
|
||||
|
||||
<execution_context>
|
||||
@C:/Users/Hendrik/.claude/get-shit-done/workflows/execute-plan.md
|
||||
@C:/Users/Hendrik/.claude/get-shit-done/templates/summary.md
|
||||
</execution_context>
|
||||
|
||||
<context>
|
||||
@.planning/PROJECT.md
|
||||
@.planning/ROADMAP.md
|
||||
@.planning/STATE.md
|
||||
@.planning/phases/03-agent-registry-sse-push/03-CONTEXT.md
|
||||
@.planning/phases/03-agent-registry-sse-push/03-RESEARCH.md
|
||||
|
||||
@cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java
|
||||
@cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java
|
||||
@cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionConfig.java
|
||||
@cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java
|
||||
@cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/WebConfig.java
|
||||
@cameleer3-server-app/src/main/java/com/cameleer3/server/app/Cameleer3ServerApplication.java
|
||||
@cameleer3-server-app/src/main/resources/application.yml
|
||||
@cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java
|
||||
|
||||
<interfaces>
|
||||
<!-- Established codebase patterns the executor must follow -->
|
||||
|
||||
Pattern: Core module plain class, app module bean config:
|
||||
- IngestionService is a plain Java class (no Spring annotations) in core module
|
||||
- IngestionBeanConfig is @Configuration in app module that creates the bean
|
||||
- IngestionConfig is @ConfigurationProperties in app module for YAML binding
|
||||
|
||||
Pattern: Controller accepts raw String body:
|
||||
- Controllers use @RequestBody String body, parse with ObjectMapper
|
||||
- Return ResponseEntity with serialized JSON string
|
||||
|
||||
Pattern: @Scheduled for periodic tasks:
|
||||
- ClickHouseFlushScheduler uses @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
|
||||
- @EnableScheduling already on Cameleer3ServerApplication
|
||||
|
||||
Pattern: @EnableConfigurationProperties registration:
|
||||
- Cameleer3ServerApplication has @EnableConfigurationProperties(IngestionConfig.class)
|
||||
- New config classes must be added to this annotation
|
||||
|
||||
Pattern: ProtocolVersionInterceptor:
|
||||
- WebConfig registers interceptor for "/api/v1/data/**", "/api/v1/agents/**"
|
||||
- Agent endpoints already covered -- agents must send X-Cameleer-Protocol-Version:1 header
|
||||
</interfaces>
|
||||
</context>
|
||||
|
||||
<tasks>
|
||||
|
||||
<task type="auto" tdd="true">
|
||||
<name>Task 1: Core domain types and AgentRegistryService with unit tests</name>
|
||||
<files>
|
||||
cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentInfo.java,
|
||||
cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentState.java,
|
||||
cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentCommand.java,
|
||||
cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandStatus.java,
|
||||
cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java,
|
||||
cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java,
|
||||
cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentEventListener.java,
|
||||
cameleer3-server-core/src/test/java/com/cameleer3/server/core/agent/AgentRegistryServiceTest.java
|
||||
</files>
|
||||
<behavior>
|
||||
- register: new agent ID creates AgentInfo with state LIVE, returns AgentInfo
|
||||
- register: same agent ID re-registers (updates metadata, transitions to LIVE, updates lastHeartbeat and registeredAt)
|
||||
- heartbeat: known agent updates lastHeartbeat and transitions STALE back to LIVE, returns true
|
||||
- heartbeat: unknown agent returns false
|
||||
- lifecycle: LIVE agent with lastHeartbeat > staleThresholdMs transitions to STALE (staleTransitionTime recorded)
|
||||
- lifecycle: STALE agent where now - staleTransitionTime > deadThresholdMs transitions to DEAD
|
||||
- lifecycle: DEAD agent remains DEAD (no auto-purge)
|
||||
- findAll: returns all agents regardless of state
|
||||
- findByState: filters agents by AgentState
|
||||
- findById: returns null for unknown ID
|
||||
- addCommand: creates AgentCommand with PENDING status, returns command ID
|
||||
- acknowledgeCommand: transitions command from PENDING/DELIVERED to ACKNOWLEDGED
|
||||
- expireCommands: removes commands older than expiryMs with PENDING status
|
||||
- findPendingCommands: returns PENDING commands for given agentId
|
||||
</behavior>
|
||||
<action>
|
||||
Create the agent domain model in the core module (package com.cameleer3.server.core.agent):
|
||||
|
||||
1. **AgentState enum**: LIVE, STALE, DEAD
|
||||
|
||||
2. **CommandType enum**: CONFIG_UPDATE, DEEP_TRACE, REPLAY
|
||||
|
||||
3. **CommandStatus enum**: PENDING, DELIVERED, ACKNOWLEDGED, EXPIRED
|
||||
|
||||
4. **AgentInfo**: Mutable class (not record -- needs state transitions) with fields:
|
||||
- id (String), name (String), group (String), version (String)
|
||||
- routeIds (List<String>), capabilities (Map<String, Object>)
|
||||
- state (AgentState), registeredAt (Instant), lastHeartbeat (Instant)
|
||||
- staleTransitionTime (Instant, nullable -- set when transitioning to STALE)
|
||||
- Use synchronized methods or volatile fields for thread safety since ConcurrentHashMap only protects the map, not the values.
|
||||
- Actually, prefer immutable-style: store as records in the ConcurrentHashMap and use computeIfPresent to atomically swap. AgentInfo can be a record with wither-style methods (withState, withLastHeartbeat, etc.).
|
||||
|
||||
5. **AgentCommand**: Record with fields: id (String, UUID), type (CommandType), payload (String -- raw JSON), targetAgentId (String), createdAt (Instant), status (CommandStatus). Provide withStatus method.
|
||||
|
||||
6. **AgentEventListener**: Interface with methods `onCommandReady(String agentId, AgentCommand command)` -- this allows the SSE layer (Plan 02) to be notified when a command is added. The core module defines the interface; the app module implements it.
|
||||
|
||||
7. **AgentRegistryService**: Plain class (no Spring annotations), constructor takes staleThresholdMs (long), deadThresholdMs (long), commandExpiryMs (long). Uses ConcurrentHashMap<String, AgentInfo> for agents and ConcurrentHashMap<String, List<AgentCommand>> (or ConcurrentHashMap<String, ConcurrentLinkedQueue<AgentCommand>>) for pending commands per agent.
|
||||
|
||||
Methods:
|
||||
- `register(String id, String name, String group, String version, List<String> routeIds, Map<String, Object> capabilities)` -> AgentInfo
|
||||
- `heartbeat(String id)` -> boolean
|
||||
- `transitionState(String id, AgentState newState)` -> void (used by lifecycle monitor)
|
||||
- `checkLifecycle()` -> void (iterates all agents, applies LIVE->STALE and STALE->DEAD based on thresholds)
|
||||
- `findById(String id)` -> AgentInfo (nullable)
|
||||
- `findAll()` -> List<AgentInfo>
|
||||
- `findByState(AgentState state)` -> List<AgentInfo>
|
||||
- `addCommand(String agentId, CommandType type, String payload)` -> AgentCommand (creates with PENDING, calls eventListener.onCommandReady if set)
|
||||
- `acknowledgeCommand(String agentId, String commandId)` -> boolean
|
||||
- `findPendingCommands(String agentId)` -> List<AgentCommand>
|
||||
- `markDelivered(String agentId, String commandId)` -> void
|
||||
- `expireOldCommands()` -> void (sweep commands older than commandExpiryMs)
|
||||
- `setEventListener(AgentEventListener listener)` -> void (optional, for SSE integration)
|
||||
|
||||
Write tests FIRST (RED), then implement (GREEN). Test class: AgentRegistryServiceTest.
|
||||
</action>
|
||||
<verify>
|
||||
<automated>mvn test -pl cameleer3-server-core -Dtest=AgentRegistryServiceTest</automated>
|
||||
</verify>
|
||||
<done>All unit tests pass: registration (new + re-register), heartbeat (known + unknown), lifecycle transitions (LIVE->STALE->DEAD, heartbeat revives STALE), findAll/findByState/findById, command add/acknowledge/expire. AgentEventListener interface defined.</done>
|
||||
</task>
|
||||
|
||||
<task type="auto">
|
||||
<name>Task 2: Registration/heartbeat/list controllers, config, lifecycle monitor, integration tests</name>
|
||||
<files>
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryConfig.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryBeanConfig.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/agent/AgentLifecycleMonitor.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/Cameleer3ServerApplication.java,
|
||||
cameleer3-server-app/src/main/resources/application.yml,
|
||||
cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java
|
||||
</files>
|
||||
<action>
|
||||
Wire the agent registry into the Spring Boot app and create REST endpoints:
|
||||
|
||||
1. **AgentRegistryConfig** (@ConfigurationProperties prefix "agent-registry"):
|
||||
- heartbeatIntervalMs (long, default 30000)
|
||||
- staleThresholdMs (long, default 90000)
|
||||
- deadThresholdMs (long, default 300000) -- this is 5 minutes from staleTransitionTime, NOT from lastHeartbeat
|
||||
- pingIntervalMs (long, default 15000)
|
||||
- commandExpiryMs (long, default 60000)
|
||||
- lifecycleCheckIntervalMs (long, default 10000)
|
||||
Follow IngestionConfig pattern: plain class with getters/setters.
|
||||
|
||||
2. **AgentRegistryBeanConfig** (@Configuration):
|
||||
- @Bean AgentRegistryService: `new AgentRegistryService(config.getStaleThresholdMs(), config.getDeadThresholdMs(), config.getCommandExpiryMs())`
|
||||
Follow IngestionBeanConfig pattern.
|
||||
|
||||
3. **Update Cameleer3ServerApplication**: Add AgentRegistryConfig.class to @EnableConfigurationProperties.
|
||||
|
||||
4. **Update application.yml**: Add agent-registry section with all defaults (see RESEARCH.md code example). Also add `spring.mvc.async.request-timeout: -1` for SSE support (Plan 02 needs it, but set it now).
|
||||
|
||||
5. **AgentLifecycleMonitor** (@Component):
|
||||
- Inject AgentRegistryService
|
||||
- @Scheduled(fixedDelayString = "${agent-registry.lifecycle-check-interval-ms:10000}") calls registryService.checkLifecycle() and registryService.expireOldCommands()
|
||||
- Follow ClickHouseFlushScheduler pattern but simpler (no SmartLifecycle needed -- agent state is ephemeral)
|
||||
|
||||
6. **AgentRegistrationController** (@RestController, @RequestMapping("/api/v1/agents")):
|
||||
- Inject AgentRegistryService, ObjectMapper
|
||||
- `POST /register`: Accept raw String body, parse JSON with ObjectMapper. Extract: agentId (required), name (required), group (default "default"), version, routeIds (default empty list), capabilities (default empty map). Call registryService.register(). Build response JSON: { agentId, sseEndpoint: "/api/v1/agents/{agentId}/events", heartbeatIntervalMs: from config, serverPublicKey: null (Phase 4 placeholder) }. Return 200.
|
||||
- `POST /{id}/heartbeat`: Call registryService.heartbeat(id). Return 200 if true, 404 if false.
|
||||
- `GET /`: Accept optional @RequestParam status. If status provided, parse to AgentState and call findByState. Otherwise call findAll. Serialize with ObjectMapper, return 200. Handle invalid status with 400.
|
||||
- Add @Tag(name = "Agent Management") and @Operation annotations for OpenAPI.
|
||||
|
||||
7. **AgentRegistrationControllerIT** (extends AbstractClickHouseIT):
|
||||
- Test register new agent: POST /api/v1/agents/register with valid payload, assert 200, response contains agentId and sseEndpoint
|
||||
- Test re-register same agent: register twice with same ID, assert second returns 200, state is LIVE
|
||||
- Test heartbeat known agent: register then heartbeat, assert 200
|
||||
- Test heartbeat unknown agent: heartbeat without register, assert 404
|
||||
- Test list all agents: register 2 agents, GET /api/v1/agents, assert both returned
|
||||
- Test list by status filter: register agent, GET /api/v1/agents?status=LIVE, assert filtered correctly
|
||||
- Test invalid status filter: GET /api/v1/agents?status=INVALID, assert 400
|
||||
- All requests must include X-Cameleer-Protocol-Version:1 header (ProtocolVersionInterceptor covers /api/v1/agents/**)
|
||||
- Use TestRestTemplate (already available from AbstractClickHouseIT's @SpringBootTest)
|
||||
</action>
|
||||
<verify>
|
||||
<automated>mvn test -pl cameleer3-server-core,cameleer3-server-app -Dtest="Agent*"</automated>
|
||||
</verify>
|
||||
<done>POST /register returns 200 with agentId + sseEndpoint + heartbeatIntervalMs. POST /{id}/heartbeat returns 200 for known agents, 404 for unknown. GET /agents returns all agents with optional ?status= filter. AgentLifecycleMonitor runs on schedule. All integration tests pass. mvn clean verify passes.</done>
|
||||
</task>
|
||||
|
||||
</tasks>
|
||||
|
||||
<verification>
|
||||
mvn clean verify -- full suite green (existing Phase 1+2 tests still pass, new agent tests pass)
|
||||
</verification>
|
||||
|
||||
<success_criteria>
|
||||
- Agent registration flow works end-to-end via REST
|
||||
- Heartbeat updates agent state correctly
|
||||
- Lifecycle monitor transitions LIVE->STALE->DEAD based on configured thresholds
|
||||
- Agent list endpoint with optional status filter returns correct results
|
||||
- All 7+ integration tests pass
|
||||
- Existing test suite unbroken
|
||||
</success_criteria>
|
||||
|
||||
<output>
|
||||
After completion, create `.planning/phases/03-agent-registry-sse-push/03-01-SUMMARY.md`
|
||||
</output>
|
||||
251
.planning/phases/03-agent-registry-sse-push/03-02-PLAN.md
Normal file
251
.planning/phases/03-agent-registry-sse-push/03-02-PLAN.md
Normal file
@@ -0,0 +1,251 @@
|
||||
---
|
||||
phase: 03-agent-registry-sse-push
|
||||
plan: 02
|
||||
type: execute
|
||||
wave: 2
|
||||
depends_on: ["03-01"]
|
||||
files_modified:
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/agent/SseConnectionManager.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentSseController.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/WebConfig.java
|
||||
- cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java
|
||||
- cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java
|
||||
autonomous: true
|
||||
requirements:
|
||||
- AGNT-04
|
||||
- AGNT-05
|
||||
- AGNT-06
|
||||
- AGNT-07
|
||||
|
||||
must_haves:
|
||||
truths:
|
||||
- "Registered agent can open SSE stream at GET /api/v1/agents/{id}/events and receive events"
|
||||
- "Server pushes config-update events to a specific agent's SSE stream via POST /api/v1/agents/{id}/commands"
|
||||
- "Server pushes deep-trace commands to a specific agent's SSE stream with correlationId in payload"
|
||||
- "Server pushes replay commands to a specific agent's SSE stream"
|
||||
- "Server can target commands to all agents in a group via POST /api/v1/agents/groups/{group}/commands"
|
||||
- "Server can broadcast commands to all live agents via POST /api/v1/agents/commands"
|
||||
- "SSE stream receives ping keepalive comments every 15 seconds"
|
||||
- "SSE events include event ID for Last-Event-ID reconnection support (no replay of missed events)"
|
||||
- "Agent can acknowledge command receipt via POST /api/v1/agents/{id}/commands/{commandId}/ack"
|
||||
artifacts:
|
||||
- path: "cameleer3-server-app/src/main/java/com/cameleer3/server/app/agent/SseConnectionManager.java"
|
||||
provides: "Per-agent SseEmitter management, event sending, ping keepalive"
|
||||
- path: "cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentSseController.java"
|
||||
provides: "GET /{id}/events SSE endpoint"
|
||||
- path: "cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java"
|
||||
provides: "POST command endpoints (single, group, broadcast) + ack endpoint"
|
||||
key_links:
|
||||
- from: "AgentCommandController"
|
||||
to: "SseConnectionManager"
|
||||
via: "sendEvent for command delivery"
|
||||
pattern: "connectionManager\\.sendEvent"
|
||||
- from: "AgentCommandController"
|
||||
to: "AgentRegistryService"
|
||||
via: "addCommand + findByState/findByGroup"
|
||||
pattern: "registryService\\.addCommand"
|
||||
- from: "SseConnectionManager"
|
||||
to: "AgentEventListener"
|
||||
via: "implements interface, receives command notifications"
|
||||
pattern: "implements AgentEventListener"
|
||||
- from: "AgentSseController"
|
||||
to: "SseConnectionManager"
|
||||
via: "connect() returns SseEmitter"
|
||||
pattern: "connectionManager\\.connect"
|
||||
---
|
||||
|
||||
<objective>
|
||||
Build SSE connection management and command push infrastructure for real-time agent communication.
|
||||
|
||||
Purpose: The server needs to push config-update, deep-trace, and replay commands to connected agents in real time via Server-Sent Events. This completes the bidirectional communication channel (agents POST data to server, server pushes commands via SSE).
|
||||
Output: SseConnectionManager, SSE endpoint, command controller (single/group/broadcast targeting), command acknowledgement, ping keepalive, Last-Event-ID support, integration tests.
|
||||
</objective>
|
||||
|
||||
<execution_context>
|
||||
@C:/Users/Hendrik/.claude/get-shit-done/workflows/execute-plan.md
|
||||
@C:/Users/Hendrik/.claude/get-shit-done/templates/summary.md
|
||||
</execution_context>
|
||||
|
||||
<context>
|
||||
@.planning/PROJECT.md
|
||||
@.planning/ROADMAP.md
|
||||
@.planning/STATE.md
|
||||
@.planning/phases/03-agent-registry-sse-push/03-CONTEXT.md
|
||||
@.planning/phases/03-agent-registry-sse-push/03-RESEARCH.md
|
||||
@.planning/phases/03-agent-registry-sse-push/03-01-SUMMARY.md
|
||||
|
||||
@cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/WebConfig.java
|
||||
@cameleer3-server-app/src/main/resources/application.yml
|
||||
@cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java
|
||||
|
||||
<interfaces>
|
||||
<!-- From Plan 01 (must exist before this plan executes) -->
|
||||
|
||||
From cameleer3-server-core/.../agent/AgentInfo.java:
|
||||
```java
|
||||
// Record or class with fields:
|
||||
// id, name, group, version, routeIds, capabilities, state, registeredAt, lastHeartbeat, staleTransitionTime
|
||||
// Methods: withState(), withLastHeartbeat(), etc.
|
||||
```
|
||||
|
||||
From cameleer3-server-core/.../agent/AgentState.java:
|
||||
```java
|
||||
public enum AgentState { LIVE, STALE, DEAD }
|
||||
```
|
||||
|
||||
From cameleer3-server-core/.../agent/CommandType.java:
|
||||
```java
|
||||
public enum CommandType { CONFIG_UPDATE, DEEP_TRACE, REPLAY }
|
||||
```
|
||||
|
||||
From cameleer3-server-core/.../agent/CommandStatus.java:
|
||||
```java
|
||||
public enum CommandStatus { PENDING, DELIVERED, ACKNOWLEDGED, EXPIRED }
|
||||
```
|
||||
|
||||
From cameleer3-server-core/.../agent/AgentCommand.java:
|
||||
```java
|
||||
// Record: id (UUID string), type (CommandType), payload (String JSON), targetAgentId, createdAt, status
|
||||
// Method: withStatus()
|
||||
```
|
||||
|
||||
From cameleer3-server-core/.../agent/AgentEventListener.java:
|
||||
```java
|
||||
public interface AgentEventListener {
|
||||
void onCommandReady(String agentId, AgentCommand command);
|
||||
}
|
||||
```
|
||||
|
||||
From cameleer3-server-core/.../agent/AgentRegistryService.java:
|
||||
```java
|
||||
// Key methods:
|
||||
// register(id, name, group, version, routeIds, capabilities) -> AgentInfo
|
||||
// heartbeat(id) -> boolean
|
||||
// findById(id) -> AgentInfo
|
||||
// findAll() -> List<AgentInfo>
|
||||
// findByState(state) -> List<AgentInfo>
|
||||
// addCommand(agentId, type, payload) -> AgentCommand
|
||||
// acknowledgeCommand(agentId, commandId) -> boolean
|
||||
// markDelivered(agentId, commandId) -> void
|
||||
// setEventListener(listener) -> void
|
||||
```
|
||||
|
||||
From cameleer3-server-app/.../config/AgentRegistryConfig.java:
|
||||
```java
|
||||
// @ConfigurationProperties(prefix = "agent-registry")
|
||||
// getPingIntervalMs(), getCommandExpiryMs(), etc.
|
||||
```
|
||||
</interfaces>
|
||||
</context>
|
||||
|
||||
<tasks>
|
||||
|
||||
<task type="auto">
|
||||
<name>Task 1: SseConnectionManager, SSE controller, and command controller</name>
|
||||
<files>
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/agent/SseConnectionManager.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentSseController.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryBeanConfig.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/WebConfig.java
|
||||
</files>
|
||||
<action>
|
||||
Build the SSE infrastructure and command delivery system:
|
||||
|
||||
1. **SseConnectionManager** (@Component, implements AgentEventListener):
|
||||
- ConcurrentHashMap<String, SseEmitter> emitters for per-agent connections
|
||||
- Inject AgentRegistryConfig for ping interval, inject AgentRegistryService (call setEventListener(this) in @PostConstruct)
|
||||
- `connect(String agentId)`: Create SseEmitter(Long.MAX_VALUE). Register onCompletion/onTimeout/onError callbacks that remove the emitter ONLY if the current map value is the same instance (reference equality via == check to avoid Pitfall 3 from research). Replace existing emitter with put(), complete() old one if exists. Return new emitter.
|
||||
- `sendEvent(String agentId, String eventId, String eventType, Object data)`: Get emitter from map, send SseEmitter.event().id(eventId).name(eventType).data(data, MediaType.APPLICATION_JSON). Catch IOException, remove emitter, return false. Return true on success.
|
||||
- `sendPingToAll()`: Iterate emitters, send comment("ping") to each. Remove on IOException.
|
||||
- `isConnected(String agentId)`: Check if emitter exists in map.
|
||||
- `onCommandReady(String agentId, AgentCommand command)`: Attempt sendEvent with command.id() as eventId, command.type().name().toLowerCase().replace('_', '-') as event name (config-update, deep-trace, replay), command.payload() as data. If successful, call registryService.markDelivered(agentId, command.id()). If agent not connected, command stays PENDING (caller can re-send or it expires).
|
||||
- @Scheduled(fixedDelayString = "${agent-registry.ping-interval-ms:15000}") pingAll(): calls sendPingToAll()
|
||||
|
||||
2. **Update AgentRegistryBeanConfig**: After creating AgentRegistryService bean, the SseConnectionManager (auto-scanned as @Component) will call setEventListener in @PostConstruct. No change needed in bean config if SseConnectionManager handles it. BUT -- to avoid circular dependency, SseConnectionManager should inject AgentRegistryService and call setEventListener(this) in @PostConstruct.
|
||||
|
||||
3. **AgentSseController** (@RestController, @RequestMapping("/api/v1/agents")):
|
||||
- Inject SseConnectionManager, AgentRegistryService
|
||||
- `GET /{id}/events` (produces TEXT_EVENT_STREAM_VALUE): Check agent exists via registryService.findById(id). If null, return 404 (throw ResponseStatusException). Read Last-Event-ID header (optional) -- log it at debug level but do NOT replay missed events (per locked decision). Call connectionManager.connect(id), return the SseEmitter.
|
||||
- Add @Tag(name = "Agent SSE") and @Operation annotations.
|
||||
|
||||
4. **AgentCommandController** (@RestController, @RequestMapping("/api/v1/agents")):
|
||||
- Inject AgentRegistryService, SseConnectionManager, ObjectMapper
|
||||
- `POST /{id}/commands`: Accept raw String body. Parse JSON: { "type": "config-update|deep-trace|replay", "payload": {...} }. Map type string to CommandType enum (config-update -> CONFIG_UPDATE, deep-trace -> DEEP_TRACE, replay -> REPLAY). Call registryService.addCommand(id, type, payloadJsonString). The AgentEventListener.onCommandReady in SseConnectionManager handles delivery. Return 202 with { commandId, status: "PENDING" or "DELIVERED" depending on whether agent is connected }.
|
||||
- `POST /groups/{group}/commands`: Same body parsing. Find all LIVE agents in group via registryService.findAll() filtered by group. For each, call registryService.addCommand(). Return 202 with { commandIds: [...], targetCount: N }.
|
||||
- `POST /commands`: Broadcast to all LIVE agents. Same pattern as group but uses registryService.findByState(LIVE). Return 202 with count.
|
||||
- `POST /{id}/commands/{commandId}/ack`: Call registryService.acknowledgeCommand(id, commandId). Return 200 if true, 404 if false.
|
||||
- Add @Tag(name = "Agent Commands") and @Operation annotations.
|
||||
|
||||
5. **Update WebConfig**: The SSE endpoint GET /api/v1/agents/{id}/events is already covered by the interceptor pattern "/api/v1/agents/**". Agents send the protocol version header on all requests (per research recommendation), so no exclusion needed. However, if the SSE GET causes issues because browsers/clients may not easily add custom headers to EventSource, add the SSE events path to excludePathPatterns: `/api/v1/agents/*/events`. This is a practical consideration -- add the exclusion to be safe.
|
||||
</action>
|
||||
<verify>
|
||||
<automated>mvn compile -pl cameleer3-server-core,cameleer3-server-app</automated>
|
||||
</verify>
|
||||
<done>SseConnectionManager, AgentSseController, and AgentCommandController compile. SSE endpoint returns SseEmitter. Command endpoints accept type/payload and deliver via SSE. Ping keepalive scheduled. WebConfig updated if needed.</done>
|
||||
</task>
|
||||
|
||||
<task type="auto">
|
||||
<name>Task 2: Integration tests for SSE, commands, and full flow</name>
|
||||
<files>
|
||||
cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java,
|
||||
cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java
|
||||
</files>
|
||||
<action>
|
||||
Write integration tests covering SSE connection, command delivery, ping, and acknowledgement:
|
||||
|
||||
**SSE Test Strategy** (from RESEARCH.md): Testing SSE with TestRestTemplate is non-trivial. Use one of these approaches:
|
||||
- Option A (preferred): Use raw HttpURLConnection or java.net.http.HttpClient to open the SSE stream in a separate thread, read lines, and assert event format.
|
||||
- Option B: Use Spring WebClient (from spring-boot-starter-webflux test dependency) -- BUT do not add webflux as a main dependency, only as test scope if needed.
|
||||
- Option C: Test at the service layer by calling SseConnectionManager.connect() directly, then sendEvent(), and reading from the SseEmitter via a custom handler.
|
||||
|
||||
Recommend Option A (HttpClient) for true end-to-end testing without adding dependencies.
|
||||
|
||||
1. **AgentSseControllerIT** (extends AbstractClickHouseIT):
|
||||
- Test SSE connect for registered agent: Register agent, open GET /{id}/events with Accept: text/event-stream. Assert 200 and content-type is text/event-stream.
|
||||
- Test SSE connect for unknown agent: GET /unknown-id/events, assert 404.
|
||||
- Test config-update delivery: Register agent, open SSE stream (background thread), POST /{id}/commands with {"type":"config-update","payload":{"key":"value"}}. Use Awaitility to assert SSE stream received event with name "config-update" and correct data.
|
||||
- Test deep-trace delivery: Same pattern with {"type":"deep-trace","payload":{"correlationId":"test-123"}}.
|
||||
- Test replay delivery: Same pattern with {"type":"replay","payload":{"exchangeId":"ex-456"}}.
|
||||
- Test ping keepalive: Open SSE stream, wait for ping comment (may need to set ping interval low in test config or use Awaitility with timeout). Assert ":ping" comment received.
|
||||
- Test Last-Event-ID header: Open SSE with Last-Event-ID header set. Assert connection succeeds (no replay, just acknowledges).
|
||||
- All POST requests include X-Cameleer-Protocol-Version:1 header. SSE GET may need the header excluded in WebConfig (test will reveal if this is an issue).
|
||||
- Use Awaitility with ignoreExceptions() for async assertions (established pattern).
|
||||
|
||||
2. **AgentCommandControllerIT** (extends AbstractClickHouseIT):
|
||||
- Test single agent command: Register agent, POST /{id}/commands, assert 202 with commandId.
|
||||
- Test group command: Register 2 agents in same group, POST /groups/{group}/commands, assert 202 with targetCount=2.
|
||||
- Test broadcast command: Register 3 agents, POST /commands, assert 202 with count of LIVE agents.
|
||||
- Test command ack: Send command, POST /{id}/commands/{commandId}/ack, assert 200.
|
||||
- Test ack unknown command: POST /{id}/commands/unknown-id/ack, assert 404.
|
||||
- Test command to unregistered agent: POST /nonexistent/commands, assert 404.
|
||||
|
||||
**Test configuration**: If ping interval needs to be shorter for tests, add to test application.yml or use @TestPropertySource with agent-registry.ping-interval-ms=1000.
|
||||
</action>
|
||||
<verify>
|
||||
<automated>mvn test -pl cameleer3-server-core,cameleer3-server-app -Dtest="Agent*"</automated>
|
||||
</verify>
|
||||
<done>All SSE integration tests pass: connect/disconnect, config-update/deep-trace/replay delivery via SSE, ping keepalive received, Last-Event-ID accepted, command targeting (single/group/broadcast), command acknowledgement. mvn clean verify passes with all existing tests still green.</done>
|
||||
</task>
|
||||
|
||||
</tasks>
|
||||
|
||||
<verification>
|
||||
mvn clean verify -- full suite green (all Phase 1, 2, and 3 tests pass)
|
||||
</verification>
|
||||
|
||||
<success_criteria>
|
||||
- SSE endpoint returns working event stream for registered agents
|
||||
- config-update, deep-trace, and replay commands delivered via SSE in real time
|
||||
- Group and broadcast targeting works correctly
|
||||
- Ping keepalive sent every 15 seconds
|
||||
- Last-Event-ID header accepted (no replay, per decision)
|
||||
- Command acknowledgement endpoint works
|
||||
- All integration tests pass
|
||||
- Full mvn clean verify passes
|
||||
</success_criteria>
|
||||
|
||||
<output>
|
||||
After completion, create `.planning/phases/03-agent-registry-sse-push/03-02-SUMMARY.md`
|
||||
</output>
|
||||
Reference in New Issue
Block a user