diff --git a/docs/superpowers/specs/2026-04-02-route-state-protocol-extension.md b/docs/superpowers/specs/2026-04-02-route-state-protocol-extension.md new file mode 100644 index 00000000..db492acf --- /dev/null +++ b/docs/superpowers/specs/2026-04-02-route-state-protocol-extension.md @@ -0,0 +1,283 @@ +# Protocol Extension: Route State Reporting + +**Date:** 2026-04-02 +**Scope:** Agent protocol v1 extension (backward-compatible) + +## Problem + +Route operational state (Started/Stopped/Suspended) is currently inferred from command ACKs — the server guesses what state a route is in based on the last successful command. This fails when: + +- An agent restarts (routes reset to Started, but server doesn't know) +- A route is stopped/suspended outside Cameleer (JMX, Hawtio, programmatic CamelContext) +- The server restarts (all state is lost) +- A new agent joins with routes already in non-default states + +## Solution + +Two complementary mechanisms, both backward-compatible: + +1. **Heartbeat extension** — agent includes current route states in every heartbeat +2. **Route state change events** — agent emits an event immediately when any route changes state + +The heartbeat is the steady-state source of truth. Events provide real-time updates between heartbeats. + +## 1. Heartbeat Extension + +### Current Protocol + +``` +POST /api/v1/agents/{id}/heartbeat +Authorization: Bearer +(empty body) +→ 200 OK +``` + +### Extended Protocol + +``` +POST /api/v1/agents/{id}/heartbeat +Authorization: Bearer +Content-Type: application/json + +{ + "routeStates": { + "file-processing": "Started", + "timer-heartbeat": "Started", + "error-handling-test": "Suspended", + "try-catch-test": "Stopped" + } +} +``` + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `routeStates` | `Map` | No | Current state of each Camel route. Keys are route IDs, values are `"Started"`, `"Stopped"`, or `"Suspended"`. | + +**Backward compatibility:** The body is optional. Agents that don't send it (older versions) continue working — the server falls back to ACK-based inference for those agents. The `Content-Type` header is only required when a body is present. + +### Server Handling + +When a heartbeat includes `routeStates`: + +1. Update `RouteStateRegistry` with the reported states +2. For routes present in the map, set their state directly (this is authoritative) +3. For routes NOT in the map but previously tracked, mark as `STARTED` (agent didn't report them = default state) +4. Clear any ACK-inferred state for this agent's routes — heartbeat is the source of truth + +**Conflict resolution:** If multiple agents for the same application report different states for the same route, use the **most restrictive** state: `STOPPED > SUSPENDED > STARTED`. This handles split-brain scenarios where a command partially succeeded. + +### Heartbeat Request Model + +```java +// New: optional heartbeat body +public record HeartbeatRequest( + Map routeStates // routeId → "Started" | "Stopped" | "Suspended" +) {} +``` + +The server endpoint changes from accepting no body to optionally accepting `HeartbeatRequest`: + +```java +@PostMapping("/{id}/heartbeat") +public ResponseEntity heartbeat( + @PathVariable String id, + @RequestBody(required = false) HeartbeatRequest request) { + boolean found = registryService.heartbeat(id); + if (!found) return ResponseEntity.notFound().build(); + + if (request != null && request.routeStates() != null) { + AgentInfo agent = registryService.findById(id); + if (agent != null) { + routeStateRegistry.updateFromHeartbeat( + agent.applicationId(), id, request.routeStates()); + } + } + return ResponseEntity.ok().build(); +} +``` + +### RouteStateRegistry Extension + +```java +/** + * Update route states from an agent heartbeat. + * Heartbeat-reported state is authoritative — it overrides ACK-inferred state. + * + * When multiple agents report different states for the same route, + * the most restrictive state wins (STOPPED > SUSPENDED > STARTED). + */ +public void updateFromHeartbeat(String applicationId, String agentId, + Map routeStates) { + for (var entry : routeStates.entrySet()) { + String routeId = entry.getKey(); + RouteState reported = parseState(entry.getValue()); + RouteState current = getState(applicationId, routeId); + + // Most restrictive wins across agents + if (reported.ordinal() > current.ordinal()) { + setState(applicationId, routeId, reported); + } else if (reported == RouteState.STARTED && current != RouteState.STARTED) { + // If this agent says STARTED but another said STOPPED, + // keep STOPPED until ALL agents report STARTED + // (track per-agent state to resolve this properly) + } + } +} +``` + +**Note on multi-agent conflict:** The simple "most restrictive wins" heuristic works for the common case. A more precise implementation would track per-agent route states and compute the aggregate. This can be deferred — single-agent-per-app deployments cover most early use cases. + +## 2. Route State Change Events + +### Event Type + +Agents emit a `ROUTE_STATE_CHANGED` event via the existing event ingestion endpoint whenever a Camel route changes operational state. + +``` +POST /api/v1/data/events +Authorization: Bearer +Content-Type: application/json +X-Cameleer-Protocol-Version: 1 + +[ + { + "eventType": "ROUTE_STATE_CHANGED", + "timestamp": "2026-04-02T18:30:00Z", + "details": { + "routeId": "file-processing", + "previousState": "Started", + "newState": "Stopped", + "reason": "command", + "commandId": "a1b2c3d4-..." + } + } +] +``` + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `eventType` | `"ROUTE_STATE_CHANGED"` | Yes | Fixed event type | +| `timestamp` | ISO-8601 | Yes | When the state change occurred | +| `details.routeId` | `String` | Yes | The affected route | +| `details.previousState` | `String` | Yes | State before change: `"Started"`, `"Stopped"`, `"Suspended"` | +| `details.newState` | `String` | Yes | State after change | +| `details.reason` | `String` | No | What triggered the change: `"command"` (Cameleer command), `"external"` (JMX/Hawtio/programmatic), `"startup"` (route starting on agent boot), `"error"` (route stopped due to error) | +| `details.commandId` | `String` | No | If triggered by a Cameleer command, the command ID | + +### Server Handling + +When `EventIngestionController` receives a `ROUTE_STATE_CHANGED` event: + +1. Store the event in the agent events table (existing behavior — all events are stored) +2. Additionally, update `RouteStateRegistry` immediately: + +```java +// In EventIngestionController, after storing events: +for (AgentEvent event : events) { + if ("ROUTE_STATE_CHANGED".equals(event.getEventType())) { + Map details = event.getDetails(); + if (details != null) { + String routeId = details.get("routeId"); + String newState = details.get("newState"); + if (routeId != null && newState != null) { + routeStateRegistry.setState( + agent.applicationId(), routeId, + RouteStateRegistry.parseState(newState)); + } + } + } +} +``` + +### Agent-Side Detection + +The agent detects route state changes using Camel's `EventNotifier` mechanism: + +```java +// In the Cameleer3 agent (cameleer3 repo) +public class RouteStateEventNotifier extends EventNotifierSupport { + + @Override + public void notify(CamelEvent event) { + if (event instanceof RouteStartedEvent e) { + emit(e.getRoute().getRouteId(), "Started", previousState); + } else if (event instanceof RouteStoppedEvent e) { + emit(e.getRoute().getRouteId(), "Stopped", previousState); + } else if (event instanceof RouteSuspendedEvent e) { + emit(e.getRoute().getRouteId(), "Suspended", previousState); + } else if (event instanceof RouteResumedEvent e) { + emit(e.getRoute().getRouteId(), "Started", "Suspended"); + } + } + + private void emit(String routeId, String newState, String previousState) { + // Send ROUTE_STATE_CHANGED event to server + // Also update local state map used for heartbeat reporting + } +} +``` + +For heartbeat route state reporting, the agent maintains an in-memory map of route states, updated by the same `EventNotifier`. The heartbeat sends this map on each tick. + +## State Flow Diagram + +``` +Agent boots + │ + ├─ Camel routes start → EventNotifier fires RouteStartedEvent + │ ├─ Update local routeStates map: { "route1": "Started", ... } + │ └─ Emit ROUTE_STATE_CHANGED event → Server updates RouteStateRegistry + │ + ├─ First heartbeat (30s) → includes routeStates + │ └─ Server: RouteStateRegistry.updateFromHeartbeat() — authoritative snapshot + │ + ├─ User sends STOP command via Cameleer UI + │ ├─ Server dispatches to all agents via SSE + │ ├─ Agent stops route → EventNotifier fires RouteStoppedEvent + │ │ ├─ Update local routeStates map: { "route1": "Stopped" } + │ │ └─ Emit ROUTE_STATE_CHANGED event → Server updates immediately + │ └─ Agent ACKs command → Server also updates from ACK (redundant, both are fine) + │ + ├─ Operator stops route via JMX (outside Cameleer) + │ ├─ EventNotifier fires RouteStoppedEvent + │ │ ├─ Update local routeStates map: { "route1": "Stopped" } + │ │ └─ Emit ROUTE_STATE_CHANGED event → Server updates immediately + │ └─ Next heartbeat confirms state + │ + ├─ Agent restarts + │ ├─ All routes start in default state + │ ├─ EventNotifier fires RouteStartedEvent for each route + │ ├─ Re-registration includes routeIds (existing protocol) + │ └─ First heartbeat after restart: routeStates = all Started + │ └─ Server: RouteStateRegistry cleared for this agent's routes + │ + └─ Server restarts + ├─ RouteStateRegistry is empty (in-memory, no persistence) + └─ Next heartbeat from each agent restores accurate state +``` + +## Migration Path + +### Phase 1: Server-side (this repo) +1. Accept optional `HeartbeatRequest` body on heartbeat endpoint +2. Handle `ROUTE_STATE_CHANGED` events in `EventIngestionController` +3. Update `RouteStateRegistry` from both sources +4. Remove ACK-based inference from `AgentCommandController` (heartbeat/events are authoritative) + +### Phase 2: Agent-side (cameleer3 repo) +1. Add `RouteStateEventNotifier` to detect state changes +2. Maintain local `routeStates` map +3. Include `routeStates` in heartbeat payload +4. Emit `ROUTE_STATE_CHANGED` events on state transitions + +### Backward Compatibility + +- Agents without this extension: heartbeat body is empty, no events emitted. Server falls back to ACK-based inference (current behavior). +- Server without this extension: ignores unknown heartbeat body fields and stores `ROUTE_STATE_CHANGED` events as generic events (no registry update). +- Protocol version stays at `1` — this is an additive extension, not a breaking change. + +## Not In Scope + +- **Persistence of route state** — intentionally omitted. Route state is runtime state that resets with the application. Persisting it would create stale data after app restarts. +- **Per-agent state tracking** — the registry tracks aggregate state per application+route. Per-agent granularity can be added later if multi-agent split-brain scenarios are common. +- **Route state in SSE push** — the server does not push state changes to the UI via SSE. The UI polls the catalog API (existing 15s refresh interval) which includes route state.