Rename Java packages from com.cameleer3 to com.cameleer, module directories from cameleer3-* to cameleer-*, and all references throughout workflows, Dockerfiles, docs, migrations, and pom.xml. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
11 KiB
Protocol Extension: Route State Reporting
Date: 2026-04-02 Scope: Agent protocol v1 extension (backward-compatible)
Problem
Route operational state (Started/Stopped/Suspended) is currently inferred from command ACKs — the server guesses what state a route is in based on the last successful command. This fails when:
- An agent restarts (routes reset to Started, but server doesn't know)
- A route is stopped/suspended outside Cameleer (JMX, Hawtio, programmatic CamelContext)
- The server restarts (all state is lost)
- A new agent joins with routes already in non-default states
Solution
Two complementary mechanisms, both backward-compatible:
- Heartbeat extension — agent includes current route states in every heartbeat
- Route state change events — agent emits an event immediately when any route changes state
The heartbeat is the steady-state source of truth. Events provide real-time updates between heartbeats.
1. Heartbeat Extension
Current Protocol
POST /api/v1/agents/{id}/heartbeat
Authorization: Bearer <token>
(empty body)
→ 200 OK
Extended Protocol
POST /api/v1/agents/{id}/heartbeat
Authorization: Bearer <token>
Content-Type: application/json
{
"routeStates": {
"file-processing": "Started",
"timer-heartbeat": "Started",
"error-handling-test": "Suspended",
"try-catch-test": "Stopped"
}
}
| Field | Type | Required | Description |
|---|---|---|---|
routeStates |
Map<String, String> |
No | Current state of each Camel route. Keys are route IDs, values are "Started", "Stopped", or "Suspended". |
Backward compatibility: The body is optional. Agents that don't send it (older versions) continue working — the server falls back to ACK-based inference for those agents. The Content-Type header is only required when a body is present.
Server Handling
When a heartbeat includes routeStates:
- Update
RouteStateRegistrywith the reported states - For routes present in the map, set their state directly (this is authoritative)
- For routes NOT in the map but previously tracked, mark as
STARTED(agent didn't report them = default state) - Clear any ACK-inferred state for this agent's routes — heartbeat is the source of truth
Conflict resolution: If multiple agents for the same application report different states for the same route, use the most restrictive state: STOPPED > SUSPENDED > STARTED. This handles split-brain scenarios where a command partially succeeded.
Heartbeat Request Model
// New: optional heartbeat body
public record HeartbeatRequest(
Map<String, String> routeStates // routeId → "Started" | "Stopped" | "Suspended"
) {}
The server endpoint changes from accepting no body to optionally accepting HeartbeatRequest:
@PostMapping("/{id}/heartbeat")
public ResponseEntity<Void> heartbeat(
@PathVariable String id,
@RequestBody(required = false) HeartbeatRequest request) {
boolean found = registryService.heartbeat(id);
if (!found) return ResponseEntity.notFound().build();
if (request != null && request.routeStates() != null) {
AgentInfo agent = registryService.findById(id);
if (agent != null) {
routeStateRegistry.updateFromHeartbeat(
agent.applicationId(), id, request.routeStates());
}
}
return ResponseEntity.ok().build();
}
RouteStateRegistry Extension
/**
* Update route states from an agent heartbeat.
* Heartbeat-reported state is authoritative — it overrides ACK-inferred state.
*
* When multiple agents report different states for the same route,
* the most restrictive state wins (STOPPED > SUSPENDED > STARTED).
*/
public void updateFromHeartbeat(String applicationId, String agentId,
Map<String, String> routeStates) {
for (var entry : routeStates.entrySet()) {
String routeId = entry.getKey();
RouteState reported = parseState(entry.getValue());
RouteState current = getState(applicationId, routeId);
// Most restrictive wins across agents
if (reported.ordinal() > current.ordinal()) {
setState(applicationId, routeId, reported);
} else if (reported == RouteState.STARTED && current != RouteState.STARTED) {
// If this agent says STARTED but another said STOPPED,
// keep STOPPED until ALL agents report STARTED
// (track per-agent state to resolve this properly)
}
}
}
Note on multi-agent conflict: The simple "most restrictive wins" heuristic works for the common case. A more precise implementation would track per-agent route states and compute the aggregate. This can be deferred — single-agent-per-app deployments cover most early use cases.
2. Route State Change Events
Event Type
Agents emit a ROUTE_STATE_CHANGED event via the existing event ingestion endpoint whenever a Camel route changes operational state.
POST /api/v1/data/events
Authorization: Bearer <token>
Content-Type: application/json
X-Cameleer-Protocol-Version: 1
[
{
"eventType": "ROUTE_STATE_CHANGED",
"timestamp": "2026-04-02T18:30:00Z",
"details": {
"routeId": "file-processing",
"previousState": "Started",
"newState": "Stopped",
"reason": "command",
"commandId": "a1b2c3d4-..."
}
}
]
| Field | Type | Required | Description |
|---|---|---|---|
eventType |
"ROUTE_STATE_CHANGED" |
Yes | Fixed event type |
timestamp |
ISO-8601 | Yes | When the state change occurred |
details.routeId |
String |
Yes | The affected route |
details.previousState |
String |
Yes | State before change: "Started", "Stopped", "Suspended" |
details.newState |
String |
Yes | State after change |
details.reason |
String |
No | What triggered the change: "command" (Cameleer command), "external" (JMX/Hawtio/programmatic), "startup" (route starting on agent boot), "error" (route stopped due to error) |
details.commandId |
String |
No | If triggered by a Cameleer command, the command ID |
Server Handling
When EventIngestionController receives a ROUTE_STATE_CHANGED event:
- Store the event in the agent events table (existing behavior — all events are stored)
- Additionally, update
RouteStateRegistryimmediately:
// In EventIngestionController, after storing events:
for (AgentEvent event : events) {
if ("ROUTE_STATE_CHANGED".equals(event.getEventType())) {
Map<String, String> details = event.getDetails();
if (details != null) {
String routeId = details.get("routeId");
String newState = details.get("newState");
if (routeId != null && newState != null) {
routeStateRegistry.setState(
agent.applicationId(), routeId,
RouteStateRegistry.parseState(newState));
}
}
}
}
Agent-Side Detection
The agent detects route state changes using Camel's EventNotifier mechanism:
// In the Cameleer agent (cameleer repo)
public class RouteStateEventNotifier extends EventNotifierSupport {
@Override
public void notify(CamelEvent event) {
if (event instanceof RouteStartedEvent e) {
emit(e.getRoute().getRouteId(), "Started", previousState);
} else if (event instanceof RouteStoppedEvent e) {
emit(e.getRoute().getRouteId(), "Stopped", previousState);
} else if (event instanceof RouteSuspendedEvent e) {
emit(e.getRoute().getRouteId(), "Suspended", previousState);
} else if (event instanceof RouteResumedEvent e) {
emit(e.getRoute().getRouteId(), "Started", "Suspended");
}
}
private void emit(String routeId, String newState, String previousState) {
// Send ROUTE_STATE_CHANGED event to server
// Also update local state map used for heartbeat reporting
}
}
For heartbeat route state reporting, the agent maintains an in-memory map of route states, updated by the same EventNotifier. The heartbeat sends this map on each tick.
State Flow Diagram
Agent boots
│
├─ Camel routes start → EventNotifier fires RouteStartedEvent
│ ├─ Update local routeStates map: { "route1": "Started", ... }
│ └─ Emit ROUTE_STATE_CHANGED event → Server updates RouteStateRegistry
│
├─ First heartbeat (30s) → includes routeStates
│ └─ Server: RouteStateRegistry.updateFromHeartbeat() — authoritative snapshot
│
├─ User sends STOP command via Cameleer UI
│ ├─ Server dispatches to all agents via SSE
│ ├─ Agent stops route → EventNotifier fires RouteStoppedEvent
│ │ ├─ Update local routeStates map: { "route1": "Stopped" }
│ │ └─ Emit ROUTE_STATE_CHANGED event → Server updates immediately
│ └─ Agent ACKs command → Server also updates from ACK (redundant, both are fine)
│
├─ Operator stops route via JMX (outside Cameleer)
│ ├─ EventNotifier fires RouteStoppedEvent
│ │ ├─ Update local routeStates map: { "route1": "Stopped" }
│ │ └─ Emit ROUTE_STATE_CHANGED event → Server updates immediately
│ └─ Next heartbeat confirms state
│
├─ Agent restarts
│ ├─ All routes start in default state
│ ├─ EventNotifier fires RouteStartedEvent for each route
│ ├─ Re-registration includes routeIds (existing protocol)
│ └─ First heartbeat after restart: routeStates = all Started
│ └─ Server: RouteStateRegistry cleared for this agent's routes
│
└─ Server restarts
├─ RouteStateRegistry is empty (in-memory, no persistence)
└─ Next heartbeat from each agent restores accurate state
Migration Path
Phase 1: Server-side (this repo)
- Accept optional
HeartbeatRequestbody on heartbeat endpoint - Handle
ROUTE_STATE_CHANGEDevents inEventIngestionController - Update
RouteStateRegistryfrom both sources - Remove ACK-based inference from
AgentCommandController(heartbeat/events are authoritative)
Phase 2: Agent-side (cameleer repo)
- Add
RouteStateEventNotifierto detect state changes - Maintain local
routeStatesmap - Include
routeStatesin heartbeat payload - Emit
ROUTE_STATE_CHANGEDevents on state transitions
Backward Compatibility
- Agents without this extension: heartbeat body is empty, no events emitted. Server falls back to ACK-based inference (current behavior).
- Server without this extension: ignores unknown heartbeat body fields and stores
ROUTE_STATE_CHANGEDevents as generic events (no registry update). - Protocol version stays at
1— this is an additive extension, not a breaking change.
Not In Scope
- Persistence of route state — intentionally omitted. Route state is runtime state that resets with the application. Persisting it would create stale data after app restarts.
- Per-agent state tracking — the registry tracks aggregate state per application+route. Per-agent granularity can be added later if multi-agent split-brain scenarios are common.
- Route state in SSE push — the server does not push state changes to the UI via SSE. The UI polls the catalog API (existing 15s refresh interval) which includes route state.