# Protocol Extension: Route State Reporting **Date:** 2026-04-02 **Scope:** Agent protocol v1 extension (backward-compatible) ## Problem Route operational state (Started/Stopped/Suspended) is currently inferred from command ACKs — the server guesses what state a route is in based on the last successful command. This fails when: - An agent restarts (routes reset to Started, but server doesn't know) - A route is stopped/suspended outside Cameleer (JMX, Hawtio, programmatic CamelContext) - The server restarts (all state is lost) - A new agent joins with routes already in non-default states ## Solution Two complementary mechanisms, both backward-compatible: 1. **Heartbeat extension** — agent includes current route states in every heartbeat 2. **Route state change events** — agent emits an event immediately when any route changes state The heartbeat is the steady-state source of truth. Events provide real-time updates between heartbeats. ## 1. Heartbeat Extension ### Current Protocol ``` POST /api/v1/agents/{id}/heartbeat Authorization: Bearer (empty body) → 200 OK ``` ### Extended Protocol ``` POST /api/v1/agents/{id}/heartbeat Authorization: Bearer Content-Type: application/json { "routeStates": { "file-processing": "Started", "timer-heartbeat": "Started", "error-handling-test": "Suspended", "try-catch-test": "Stopped" } } ``` | Field | Type | Required | Description | |-------|------|----------|-------------| | `routeStates` | `Map` | No | Current state of each Camel route. Keys are route IDs, values are `"Started"`, `"Stopped"`, or `"Suspended"`. | **Backward compatibility:** The body is optional. Agents that don't send it (older versions) continue working — the server falls back to ACK-based inference for those agents. The `Content-Type` header is only required when a body is present. ### Server Handling When a heartbeat includes `routeStates`: 1. Update `RouteStateRegistry` with the reported states 2. For routes present in the map, set their state directly (this is authoritative) 3. For routes NOT in the map but previously tracked, mark as `STARTED` (agent didn't report them = default state) 4. Clear any ACK-inferred state for this agent's routes — heartbeat is the source of truth **Conflict resolution:** If multiple agents for the same application report different states for the same route, use the **most restrictive** state: `STOPPED > SUSPENDED > STARTED`. This handles split-brain scenarios where a command partially succeeded. ### Heartbeat Request Model ```java // New: optional heartbeat body public record HeartbeatRequest( Map routeStates // routeId → "Started" | "Stopped" | "Suspended" ) {} ``` The server endpoint changes from accepting no body to optionally accepting `HeartbeatRequest`: ```java @PostMapping("/{id}/heartbeat") public ResponseEntity heartbeat( @PathVariable String id, @RequestBody(required = false) HeartbeatRequest request) { boolean found = registryService.heartbeat(id); if (!found) return ResponseEntity.notFound().build(); if (request != null && request.routeStates() != null) { AgentInfo agent = registryService.findById(id); if (agent != null) { routeStateRegistry.updateFromHeartbeat( agent.applicationId(), id, request.routeStates()); } } return ResponseEntity.ok().build(); } ``` ### RouteStateRegistry Extension ```java /** * Update route states from an agent heartbeat. * Heartbeat-reported state is authoritative — it overrides ACK-inferred state. * * When multiple agents report different states for the same route, * the most restrictive state wins (STOPPED > SUSPENDED > STARTED). */ public void updateFromHeartbeat(String applicationId, String agentId, Map routeStates) { for (var entry : routeStates.entrySet()) { String routeId = entry.getKey(); RouteState reported = parseState(entry.getValue()); RouteState current = getState(applicationId, routeId); // Most restrictive wins across agents if (reported.ordinal() > current.ordinal()) { setState(applicationId, routeId, reported); } else if (reported == RouteState.STARTED && current != RouteState.STARTED) { // If this agent says STARTED but another said STOPPED, // keep STOPPED until ALL agents report STARTED // (track per-agent state to resolve this properly) } } } ``` **Note on multi-agent conflict:** The simple "most restrictive wins" heuristic works for the common case. A more precise implementation would track per-agent route states and compute the aggregate. This can be deferred — single-agent-per-app deployments cover most early use cases. ## 2. Route State Change Events ### Event Type Agents emit a `ROUTE_STATE_CHANGED` event via the existing event ingestion endpoint whenever a Camel route changes operational state. ``` POST /api/v1/data/events Authorization: Bearer Content-Type: application/json X-Cameleer-Protocol-Version: 1 [ { "eventType": "ROUTE_STATE_CHANGED", "timestamp": "2026-04-02T18:30:00Z", "details": { "routeId": "file-processing", "previousState": "Started", "newState": "Stopped", "reason": "command", "commandId": "a1b2c3d4-..." } } ] ``` | Field | Type | Required | Description | |-------|------|----------|-------------| | `eventType` | `"ROUTE_STATE_CHANGED"` | Yes | Fixed event type | | `timestamp` | ISO-8601 | Yes | When the state change occurred | | `details.routeId` | `String` | Yes | The affected route | | `details.previousState` | `String` | Yes | State before change: `"Started"`, `"Stopped"`, `"Suspended"` | | `details.newState` | `String` | Yes | State after change | | `details.reason` | `String` | No | What triggered the change: `"command"` (Cameleer command), `"external"` (JMX/Hawtio/programmatic), `"startup"` (route starting on agent boot), `"error"` (route stopped due to error) | | `details.commandId` | `String` | No | If triggered by a Cameleer command, the command ID | ### Server Handling When `EventIngestionController` receives a `ROUTE_STATE_CHANGED` event: 1. Store the event in the agent events table (existing behavior — all events are stored) 2. Additionally, update `RouteStateRegistry` immediately: ```java // In EventIngestionController, after storing events: for (AgentEvent event : events) { if ("ROUTE_STATE_CHANGED".equals(event.getEventType())) { Map details = event.getDetails(); if (details != null) { String routeId = details.get("routeId"); String newState = details.get("newState"); if (routeId != null && newState != null) { routeStateRegistry.setState( agent.applicationId(), routeId, RouteStateRegistry.parseState(newState)); } } } } ``` ### Agent-Side Detection The agent detects route state changes using Camel's `EventNotifier` mechanism: ```java // In the Cameleer3 agent (cameleer3 repo) public class RouteStateEventNotifier extends EventNotifierSupport { @Override public void notify(CamelEvent event) { if (event instanceof RouteStartedEvent e) { emit(e.getRoute().getRouteId(), "Started", previousState); } else if (event instanceof RouteStoppedEvent e) { emit(e.getRoute().getRouteId(), "Stopped", previousState); } else if (event instanceof RouteSuspendedEvent e) { emit(e.getRoute().getRouteId(), "Suspended", previousState); } else if (event instanceof RouteResumedEvent e) { emit(e.getRoute().getRouteId(), "Started", "Suspended"); } } private void emit(String routeId, String newState, String previousState) { // Send ROUTE_STATE_CHANGED event to server // Also update local state map used for heartbeat reporting } } ``` For heartbeat route state reporting, the agent maintains an in-memory map of route states, updated by the same `EventNotifier`. The heartbeat sends this map on each tick. ## State Flow Diagram ``` Agent boots │ ├─ Camel routes start → EventNotifier fires RouteStartedEvent │ ├─ Update local routeStates map: { "route1": "Started", ... } │ └─ Emit ROUTE_STATE_CHANGED event → Server updates RouteStateRegistry │ ├─ First heartbeat (30s) → includes routeStates │ └─ Server: RouteStateRegistry.updateFromHeartbeat() — authoritative snapshot │ ├─ User sends STOP command via Cameleer UI │ ├─ Server dispatches to all agents via SSE │ ├─ Agent stops route → EventNotifier fires RouteStoppedEvent │ │ ├─ Update local routeStates map: { "route1": "Stopped" } │ │ └─ Emit ROUTE_STATE_CHANGED event → Server updates immediately │ └─ Agent ACKs command → Server also updates from ACK (redundant, both are fine) │ ├─ Operator stops route via JMX (outside Cameleer) │ ├─ EventNotifier fires RouteStoppedEvent │ │ ├─ Update local routeStates map: { "route1": "Stopped" } │ │ └─ Emit ROUTE_STATE_CHANGED event → Server updates immediately │ └─ Next heartbeat confirms state │ ├─ Agent restarts │ ├─ All routes start in default state │ ├─ EventNotifier fires RouteStartedEvent for each route │ ├─ Re-registration includes routeIds (existing protocol) │ └─ First heartbeat after restart: routeStates = all Started │ └─ Server: RouteStateRegistry cleared for this agent's routes │ └─ Server restarts ├─ RouteStateRegistry is empty (in-memory, no persistence) └─ Next heartbeat from each agent restores accurate state ``` ## Migration Path ### Phase 1: Server-side (this repo) 1. Accept optional `HeartbeatRequest` body on heartbeat endpoint 2. Handle `ROUTE_STATE_CHANGED` events in `EventIngestionController` 3. Update `RouteStateRegistry` from both sources 4. Remove ACK-based inference from `AgentCommandController` (heartbeat/events are authoritative) ### Phase 2: Agent-side (cameleer3 repo) 1. Add `RouteStateEventNotifier` to detect state changes 2. Maintain local `routeStates` map 3. Include `routeStates` in heartbeat payload 4. Emit `ROUTE_STATE_CHANGED` events on state transitions ### Backward Compatibility - Agents without this extension: heartbeat body is empty, no events emitted. Server falls back to ACK-based inference (current behavior). - Server without this extension: ignores unknown heartbeat body fields and stores `ROUTE_STATE_CHANGED` events as generic events (no registry update). - Protocol version stays at `1` — this is an additive extension, not a breaking change. ## Not In Scope - **Persistence of route state** — intentionally omitted. Route state is runtime state that resets with the application. Persisting it would create stale data after app restarts. - **Per-agent state tracking** — the registry tracks aggregate state per application+route. Per-agent granularity can be added later if multi-agent split-brain scenarios are common. - **Route state in SSE push** — the server does not push state changes to the UI via SSE. The UI polls the catalog API (existing 15s refresh interval) which includes route state.