Files
cameleer-server/docs/superpowers/specs/2026-04-02-route-state-protocol-extension.md
hsiegeln 17aff5ef9d docs: route state protocol extension spec
Defines two backward-compatible mechanisms for accurate route state
tracking: heartbeat extension (routeStates map in heartbeat body)
and ROUTE_STATE_CHANGED events for real-time updates. Covers
agent-side detection via Camel EventNotifier, server-side handling,
multi-agent conflict resolution, and migration path.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 19:26:38 +02:00

11 KiB

Protocol Extension: Route State Reporting

Date: 2026-04-02 Scope: Agent protocol v1 extension (backward-compatible)

Problem

Route operational state (Started/Stopped/Suspended) is currently inferred from command ACKs — the server guesses what state a route is in based on the last successful command. This fails when:

  • An agent restarts (routes reset to Started, but server doesn't know)
  • A route is stopped/suspended outside Cameleer (JMX, Hawtio, programmatic CamelContext)
  • The server restarts (all state is lost)
  • A new agent joins with routes already in non-default states

Solution

Two complementary mechanisms, both backward-compatible:

  1. Heartbeat extension — agent includes current route states in every heartbeat
  2. Route state change events — agent emits an event immediately when any route changes state

The heartbeat is the steady-state source of truth. Events provide real-time updates between heartbeats.

1. Heartbeat Extension

Current Protocol

POST /api/v1/agents/{id}/heartbeat
Authorization: Bearer <token>
(empty body)
→ 200 OK

Extended Protocol

POST /api/v1/agents/{id}/heartbeat
Authorization: Bearer <token>
Content-Type: application/json

{
  "routeStates": {
    "file-processing": "Started",
    "timer-heartbeat": "Started",
    "error-handling-test": "Suspended",
    "try-catch-test": "Stopped"
  }
}
Field Type Required Description
routeStates Map<String, String> No Current state of each Camel route. Keys are route IDs, values are "Started", "Stopped", or "Suspended".

Backward compatibility: The body is optional. Agents that don't send it (older versions) continue working — the server falls back to ACK-based inference for those agents. The Content-Type header is only required when a body is present.

Server Handling

When a heartbeat includes routeStates:

  1. Update RouteStateRegistry with the reported states
  2. For routes present in the map, set their state directly (this is authoritative)
  3. For routes NOT in the map but previously tracked, mark as STARTED (agent didn't report them = default state)
  4. Clear any ACK-inferred state for this agent's routes — heartbeat is the source of truth

Conflict resolution: If multiple agents for the same application report different states for the same route, use the most restrictive state: STOPPED > SUSPENDED > STARTED. This handles split-brain scenarios where a command partially succeeded.

Heartbeat Request Model

// New: optional heartbeat body
public record HeartbeatRequest(
    Map<String, String> routeStates   // routeId → "Started" | "Stopped" | "Suspended"
) {}

The server endpoint changes from accepting no body to optionally accepting HeartbeatRequest:

@PostMapping("/{id}/heartbeat")
public ResponseEntity<Void> heartbeat(
        @PathVariable String id,
        @RequestBody(required = false) HeartbeatRequest request) {
    boolean found = registryService.heartbeat(id);
    if (!found) return ResponseEntity.notFound().build();

    if (request != null && request.routeStates() != null) {
        AgentInfo agent = registryService.findById(id);
        if (agent != null) {
            routeStateRegistry.updateFromHeartbeat(
                agent.applicationId(), id, request.routeStates());
        }
    }
    return ResponseEntity.ok().build();
}

RouteStateRegistry Extension

/**
 * Update route states from an agent heartbeat.
 * Heartbeat-reported state is authoritative — it overrides ACK-inferred state.
 *
 * When multiple agents report different states for the same route,
 * the most restrictive state wins (STOPPED > SUSPENDED > STARTED).
 */
public void updateFromHeartbeat(String applicationId, String agentId,
                                 Map<String, String> routeStates) {
    for (var entry : routeStates.entrySet()) {
        String routeId = entry.getKey();
        RouteState reported = parseState(entry.getValue());
        RouteState current = getState(applicationId, routeId);

        // Most restrictive wins across agents
        if (reported.ordinal() > current.ordinal()) {
            setState(applicationId, routeId, reported);
        } else if (reported == RouteState.STARTED && current != RouteState.STARTED) {
            // If this agent says STARTED but another said STOPPED,
            // keep STOPPED until ALL agents report STARTED
            // (track per-agent state to resolve this properly)
        }
    }
}

Note on multi-agent conflict: The simple "most restrictive wins" heuristic works for the common case. A more precise implementation would track per-agent route states and compute the aggregate. This can be deferred — single-agent-per-app deployments cover most early use cases.

2. Route State Change Events

Event Type

Agents emit a ROUTE_STATE_CHANGED event via the existing event ingestion endpoint whenever a Camel route changes operational state.

POST /api/v1/data/events
Authorization: Bearer <token>
Content-Type: application/json
X-Cameleer-Protocol-Version: 1

[
  {
    "eventType": "ROUTE_STATE_CHANGED",
    "timestamp": "2026-04-02T18:30:00Z",
    "details": {
      "routeId": "file-processing",
      "previousState": "Started",
      "newState": "Stopped",
      "reason": "command",
      "commandId": "a1b2c3d4-..."
    }
  }
]
Field Type Required Description
eventType "ROUTE_STATE_CHANGED" Yes Fixed event type
timestamp ISO-8601 Yes When the state change occurred
details.routeId String Yes The affected route
details.previousState String Yes State before change: "Started", "Stopped", "Suspended"
details.newState String Yes State after change
details.reason String No What triggered the change: "command" (Cameleer command), "external" (JMX/Hawtio/programmatic), "startup" (route starting on agent boot), "error" (route stopped due to error)
details.commandId String No If triggered by a Cameleer command, the command ID

Server Handling

When EventIngestionController receives a ROUTE_STATE_CHANGED event:

  1. Store the event in the agent events table (existing behavior — all events are stored)
  2. Additionally, update RouteStateRegistry immediately:
// In EventIngestionController, after storing events:
for (AgentEvent event : events) {
    if ("ROUTE_STATE_CHANGED".equals(event.getEventType())) {
        Map<String, String> details = event.getDetails();
        if (details != null) {
            String routeId = details.get("routeId");
            String newState = details.get("newState");
            if (routeId != null && newState != null) {
                routeStateRegistry.setState(
                    agent.applicationId(), routeId,
                    RouteStateRegistry.parseState(newState));
            }
        }
    }
}

Agent-Side Detection

The agent detects route state changes using Camel's EventNotifier mechanism:

// In the Cameleer3 agent (cameleer3 repo)
public class RouteStateEventNotifier extends EventNotifierSupport {

    @Override
    public void notify(CamelEvent event) {
        if (event instanceof RouteStartedEvent e) {
            emit(e.getRoute().getRouteId(), "Started", previousState);
        } else if (event instanceof RouteStoppedEvent e) {
            emit(e.getRoute().getRouteId(), "Stopped", previousState);
        } else if (event instanceof RouteSuspendedEvent e) {
            emit(e.getRoute().getRouteId(), "Suspended", previousState);
        } else if (event instanceof RouteResumedEvent e) {
            emit(e.getRoute().getRouteId(), "Started", "Suspended");
        }
    }

    private void emit(String routeId, String newState, String previousState) {
        // Send ROUTE_STATE_CHANGED event to server
        // Also update local state map used for heartbeat reporting
    }
}

For heartbeat route state reporting, the agent maintains an in-memory map of route states, updated by the same EventNotifier. The heartbeat sends this map on each tick.

State Flow Diagram

Agent boots
  │
  ├─ Camel routes start → EventNotifier fires RouteStartedEvent
  │   ├─ Update local routeStates map: { "route1": "Started", ... }
  │   └─ Emit ROUTE_STATE_CHANGED event → Server updates RouteStateRegistry
  │
  ├─ First heartbeat (30s) → includes routeStates
  │   └─ Server: RouteStateRegistry.updateFromHeartbeat() — authoritative snapshot
  │
  ├─ User sends STOP command via Cameleer UI
  │   ├─ Server dispatches to all agents via SSE
  │   ├─ Agent stops route → EventNotifier fires RouteStoppedEvent
  │   │   ├─ Update local routeStates map: { "route1": "Stopped" }
  │   │   └─ Emit ROUTE_STATE_CHANGED event → Server updates immediately
  │   └─ Agent ACKs command → Server also updates from ACK (redundant, both are fine)
  │
  ├─ Operator stops route via JMX (outside Cameleer)
  │   ├─ EventNotifier fires RouteStoppedEvent
  │   │   ├─ Update local routeStates map: { "route1": "Stopped" }
  │   │   └─ Emit ROUTE_STATE_CHANGED event → Server updates immediately
  │   └─ Next heartbeat confirms state
  │
  ├─ Agent restarts
  │   ├─ All routes start in default state
  │   ├─ EventNotifier fires RouteStartedEvent for each route
  │   ├─ Re-registration includes routeIds (existing protocol)
  │   └─ First heartbeat after restart: routeStates = all Started
  │       └─ Server: RouteStateRegistry cleared for this agent's routes
  │
  └─ Server restarts
      ├─ RouteStateRegistry is empty (in-memory, no persistence)
      └─ Next heartbeat from each agent restores accurate state

Migration Path

Phase 1: Server-side (this repo)

  1. Accept optional HeartbeatRequest body on heartbeat endpoint
  2. Handle ROUTE_STATE_CHANGED events in EventIngestionController
  3. Update RouteStateRegistry from both sources
  4. Remove ACK-based inference from AgentCommandController (heartbeat/events are authoritative)

Phase 2: Agent-side (cameleer3 repo)

  1. Add RouteStateEventNotifier to detect state changes
  2. Maintain local routeStates map
  3. Include routeStates in heartbeat payload
  4. Emit ROUTE_STATE_CHANGED events on state transitions

Backward Compatibility

  • Agents without this extension: heartbeat body is empty, no events emitted. Server falls back to ACK-based inference (current behavior).
  • Server without this extension: ignores unknown heartbeat body fields and stores ROUTE_STATE_CHANGED events as generic events (no registry update).
  • Protocol version stays at 1 — this is an additive extension, not a breaking change.

Not In Scope

  • Persistence of route state — intentionally omitted. Route state is runtime state that resets with the application. Persisting it would create stale data after app restarts.
  • Per-agent state tracking — the registry tracks aggregate state per application+route. Per-agent granularity can be added later if multi-agent split-brain scenarios are common.
  • Route state in SSE push — the server does not push state changes to the UI via SSE. The UI polls the catalog API (existing 15s refresh interval) which includes route state.