Compare commits
2 Commits
b714d3363f
...
520b80444a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
520b80444a | ||
|
|
17aff5ef9d |
@@ -18,7 +18,6 @@ import com.cameleer3.server.core.agent.AgentRegistryService;
|
|||||||
import com.cameleer3.server.core.agent.AgentState;
|
import com.cameleer3.server.core.agent.AgentState;
|
||||||
import com.cameleer3.server.core.agent.CommandReply;
|
import com.cameleer3.server.core.agent.CommandReply;
|
||||||
import com.cameleer3.server.core.agent.CommandType;
|
import com.cameleer3.server.core.agent.CommandType;
|
||||||
import com.cameleer3.server.core.agent.RouteStateRegistry;
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import jakarta.servlet.http.HttpServletRequest;
|
import jakarta.servlet.http.HttpServletRequest;
|
||||||
@@ -68,20 +67,17 @@ public class AgentCommandController {
|
|||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
private final AgentEventService agentEventService;
|
private final AgentEventService agentEventService;
|
||||||
private final AuditService auditService;
|
private final AuditService auditService;
|
||||||
private final RouteStateRegistry routeStateRegistry;
|
|
||||||
|
|
||||||
public AgentCommandController(AgentRegistryService registryService,
|
public AgentCommandController(AgentRegistryService registryService,
|
||||||
SseConnectionManager connectionManager,
|
SseConnectionManager connectionManager,
|
||||||
ObjectMapper objectMapper,
|
ObjectMapper objectMapper,
|
||||||
AgentEventService agentEventService,
|
AgentEventService agentEventService,
|
||||||
AuditService auditService,
|
AuditService auditService) {
|
||||||
RouteStateRegistry routeStateRegistry) {
|
|
||||||
this.registryService = registryService;
|
this.registryService = registryService;
|
||||||
this.connectionManager = connectionManager;
|
this.connectionManager = connectionManager;
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
this.agentEventService = agentEventService;
|
this.agentEventService = agentEventService;
|
||||||
this.auditService = auditService;
|
this.auditService = auditService;
|
||||||
this.routeStateRegistry = routeStateRegistry;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@PostMapping("/{id}/commands")
|
@PostMapping("/{id}/commands")
|
||||||
@@ -161,29 +157,6 @@ public class AgentCommandController {
|
|||||||
boolean allSuccess = timedOut.isEmpty() &&
|
boolean allSuccess = timedOut.isEmpty() &&
|
||||||
responses.stream().allMatch(r -> "SUCCESS".equals(r.status()));
|
responses.stream().allMatch(r -> "SUCCESS".equals(r.status()));
|
||||||
|
|
||||||
// Update route state when all agents successfully ACK a route-control command
|
|
||||||
if (allSuccess && type == CommandType.ROUTE_CONTROL) {
|
|
||||||
try {
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
var payloadMap = objectMapper.readValue(payloadJson, java.util.Map.class);
|
|
||||||
String routeId = (String) payloadMap.get("routeId");
|
|
||||||
String action = (String) payloadMap.get("action");
|
|
||||||
if (routeId != null && action != null) {
|
|
||||||
RouteStateRegistry.RouteState newState = switch (action) {
|
|
||||||
case "start", "resume" -> RouteStateRegistry.RouteState.STARTED;
|
|
||||||
case "stop" -> RouteStateRegistry.RouteState.STOPPED;
|
|
||||||
case "suspend" -> RouteStateRegistry.RouteState.SUSPENDED;
|
|
||||||
default -> null;
|
|
||||||
};
|
|
||||||
if (newState != null) {
|
|
||||||
routeStateRegistry.setState(group, routeId, newState);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("Failed to parse route-control payload for state tracking", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
auditService.log("broadcast_group_command", AuditCategory.AGENT, group,
|
auditService.log("broadcast_group_command", AuditCategory.AGENT, group,
|
||||||
java.util.Map.of("type", request.type(), "agentCount", futures.size(),
|
java.util.Map.of("type", request.type(), "agentCount", futures.size(),
|
||||||
"responded", responses.size(), "timedOut", timedOut.size()),
|
"responded", responses.size(), "timedOut", timedOut.size()),
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import com.cameleer3.server.app.dto.AgentRefreshResponse;
|
|||||||
import com.cameleer3.server.app.dto.AgentRegistrationRequest;
|
import com.cameleer3.server.app.dto.AgentRegistrationRequest;
|
||||||
import com.cameleer3.server.app.dto.AgentRegistrationResponse;
|
import com.cameleer3.server.app.dto.AgentRegistrationResponse;
|
||||||
import com.cameleer3.server.app.dto.ErrorResponse;
|
import com.cameleer3.server.app.dto.ErrorResponse;
|
||||||
|
import com.cameleer3.server.app.dto.HeartbeatRequest;
|
||||||
import com.cameleer3.server.app.security.BootstrapTokenValidator;
|
import com.cameleer3.server.app.security.BootstrapTokenValidator;
|
||||||
import com.cameleer3.server.core.admin.AuditCategory;
|
import com.cameleer3.server.core.admin.AuditCategory;
|
||||||
import com.cameleer3.server.core.admin.AuditResult;
|
import com.cameleer3.server.core.admin.AuditResult;
|
||||||
@@ -15,6 +16,7 @@ import com.cameleer3.server.core.agent.AgentEventService;
|
|||||||
import com.cameleer3.server.core.agent.AgentInfo;
|
import com.cameleer3.server.core.agent.AgentInfo;
|
||||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||||
import com.cameleer3.server.core.agent.AgentState;
|
import com.cameleer3.server.core.agent.AgentState;
|
||||||
|
import com.cameleer3.server.core.agent.RouteStateRegistry;
|
||||||
import com.cameleer3.server.core.security.Ed25519SigningService;
|
import com.cameleer3.server.core.security.Ed25519SigningService;
|
||||||
import com.cameleer3.server.core.security.InvalidTokenException;
|
import com.cameleer3.server.core.security.InvalidTokenException;
|
||||||
import com.cameleer3.server.core.security.JwtService;
|
import com.cameleer3.server.core.security.JwtService;
|
||||||
@@ -63,6 +65,7 @@ public class AgentRegistrationController {
|
|||||||
private final AgentEventService agentEventService;
|
private final AgentEventService agentEventService;
|
||||||
private final AuditService auditService;
|
private final AuditService auditService;
|
||||||
private final JdbcTemplate jdbc;
|
private final JdbcTemplate jdbc;
|
||||||
|
private final RouteStateRegistry routeStateRegistry;
|
||||||
|
|
||||||
public AgentRegistrationController(AgentRegistryService registryService,
|
public AgentRegistrationController(AgentRegistryService registryService,
|
||||||
AgentRegistryConfig config,
|
AgentRegistryConfig config,
|
||||||
@@ -71,7 +74,8 @@ public class AgentRegistrationController {
|
|||||||
Ed25519SigningService ed25519SigningService,
|
Ed25519SigningService ed25519SigningService,
|
||||||
AgentEventService agentEventService,
|
AgentEventService agentEventService,
|
||||||
AuditService auditService,
|
AuditService auditService,
|
||||||
@org.springframework.beans.factory.annotation.Qualifier("clickHouseJdbcTemplate") JdbcTemplate jdbc) {
|
@org.springframework.beans.factory.annotation.Qualifier("clickHouseJdbcTemplate") JdbcTemplate jdbc,
|
||||||
|
RouteStateRegistry routeStateRegistry) {
|
||||||
this.registryService = registryService;
|
this.registryService = registryService;
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.bootstrapTokenValidator = bootstrapTokenValidator;
|
this.bootstrapTokenValidator = bootstrapTokenValidator;
|
||||||
@@ -80,6 +84,7 @@ public class AgentRegistrationController {
|
|||||||
this.agentEventService = agentEventService;
|
this.agentEventService = agentEventService;
|
||||||
this.auditService = auditService;
|
this.auditService = auditService;
|
||||||
this.jdbc = jdbc;
|
this.jdbc = jdbc;
|
||||||
|
this.routeStateRegistry = routeStateRegistry;
|
||||||
}
|
}
|
||||||
|
|
||||||
@PostMapping("/register")
|
@PostMapping("/register")
|
||||||
@@ -191,14 +196,38 @@ public class AgentRegistrationController {
|
|||||||
description = "Updates the agent's last heartbeat timestamp")
|
description = "Updates the agent's last heartbeat timestamp")
|
||||||
@ApiResponse(responseCode = "200", description = "Heartbeat accepted")
|
@ApiResponse(responseCode = "200", description = "Heartbeat accepted")
|
||||||
@ApiResponse(responseCode = "404", description = "Agent not registered")
|
@ApiResponse(responseCode = "404", description = "Agent not registered")
|
||||||
public ResponseEntity<Void> heartbeat(@PathVariable String id) {
|
public ResponseEntity<Void> heartbeat(@PathVariable String id,
|
||||||
|
@RequestBody(required = false) HeartbeatRequest request) {
|
||||||
boolean found = registryService.heartbeat(id);
|
boolean found = registryService.heartbeat(id);
|
||||||
if (!found) {
|
if (!found) {
|
||||||
return ResponseEntity.notFound().build();
|
return ResponseEntity.notFound().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (request != null && request.routeStates() != null && !request.routeStates().isEmpty()) {
|
||||||
|
AgentInfo agent = registryService.findById(id);
|
||||||
|
if (agent != null) {
|
||||||
|
for (var entry : request.routeStates().entrySet()) {
|
||||||
|
RouteStateRegistry.RouteState state = parseRouteState(entry.getValue());
|
||||||
|
if (state != null) {
|
||||||
|
routeStateRegistry.setState(agent.applicationId(), entry.getKey(), state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return ResponseEntity.ok().build();
|
return ResponseEntity.ok().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RouteStateRegistry.RouteState parseRouteState(String state) {
|
||||||
|
if (state == null) return null;
|
||||||
|
return switch (state) {
|
||||||
|
case "Started" -> RouteStateRegistry.RouteState.STARTED;
|
||||||
|
case "Stopped" -> RouteStateRegistry.RouteState.STOPPED;
|
||||||
|
case "Suspended" -> RouteStateRegistry.RouteState.SUSPENDED;
|
||||||
|
default -> null;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
@PostMapping("/{id}/deregister")
|
@PostMapping("/{id}/deregister")
|
||||||
@Operation(summary = "Deregister agent",
|
@Operation(summary = "Deregister agent",
|
||||||
description = "Removes the agent from the registry. Called by agents during graceful shutdown.")
|
description = "Removes the agent from the registry. Called by agents during graceful shutdown.")
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import com.cameleer3.common.model.AgentEvent;
|
|||||||
import com.cameleer3.server.core.agent.AgentEventService;
|
import com.cameleer3.server.core.agent.AgentEventService;
|
||||||
import com.cameleer3.server.core.agent.AgentInfo;
|
import com.cameleer3.server.core.agent.AgentInfo;
|
||||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||||
|
import com.cameleer3.server.core.agent.RouteStateRegistry;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import io.swagger.v3.oas.annotations.Operation;
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
@@ -19,6 +20,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
|||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ingestion endpoint for agent lifecycle events.
|
* Ingestion endpoint for agent lifecycle events.
|
||||||
@@ -37,13 +39,16 @@ public class EventIngestionController {
|
|||||||
private final AgentEventService agentEventService;
|
private final AgentEventService agentEventService;
|
||||||
private final AgentRegistryService registryService;
|
private final AgentRegistryService registryService;
|
||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
|
private final RouteStateRegistry routeStateRegistry;
|
||||||
|
|
||||||
public EventIngestionController(AgentEventService agentEventService,
|
public EventIngestionController(AgentEventService agentEventService,
|
||||||
AgentRegistryService registryService,
|
AgentRegistryService registryService,
|
||||||
ObjectMapper objectMapper) {
|
ObjectMapper objectMapper,
|
||||||
|
RouteStateRegistry routeStateRegistry) {
|
||||||
this.agentEventService = agentEventService;
|
this.agentEventService = agentEventService;
|
||||||
this.registryService = registryService;
|
this.registryService = registryService;
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
|
this.routeStateRegistry = routeStateRegistry;
|
||||||
}
|
}
|
||||||
|
|
||||||
@PostMapping("/events")
|
@PostMapping("/events")
|
||||||
@@ -76,11 +81,37 @@ public class EventIngestionController {
|
|||||||
log.info("Agent {} reported graceful shutdown", instanceId);
|
log.info("Agent {} reported graceful shutdown", instanceId);
|
||||||
registryService.shutdown(instanceId);
|
registryService.shutdown(instanceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ("ROUTE_STATE_CHANGED".equals(event.getEventType())) {
|
||||||
|
Map<String, String> details = event.getDetails();
|
||||||
|
if (details != null) {
|
||||||
|
String routeId = details.get("routeId");
|
||||||
|
String newState = details.get("newState");
|
||||||
|
if (routeId != null && newState != null) {
|
||||||
|
RouteStateRegistry.RouteState state = parseRouteState(newState);
|
||||||
|
if (state != null) {
|
||||||
|
routeStateRegistry.setState(applicationId, routeId, state);
|
||||||
|
log.debug("Route state changed: {}/{} -> {} (reason: {})",
|
||||||
|
applicationId, routeId, newState, details.get("reason"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ResponseEntity.accepted().build();
|
return ResponseEntity.accepted().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RouteStateRegistry.RouteState parseRouteState(String state) {
|
||||||
|
if (state == null) return null;
|
||||||
|
return switch (state) {
|
||||||
|
case "Started" -> RouteStateRegistry.RouteState.STARTED;
|
||||||
|
case "Stopped" -> RouteStateRegistry.RouteState.STOPPED;
|
||||||
|
case "Suspended" -> RouteStateRegistry.RouteState.SUSPENDED;
|
||||||
|
default -> null;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
private String extractInstanceId() {
|
private String extractInstanceId() {
|
||||||
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
|
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
|
||||||
return auth != null ? auth.getName() : "";
|
return auth != null ? auth.getName() : "";
|
||||||
|
|||||||
@@ -0,0 +1,7 @@
|
|||||||
|
package com.cameleer3.server.app.dto;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public record HeartbeatRequest(
|
||||||
|
Map<String, String> routeStates
|
||||||
|
) {}
|
||||||
@@ -6,7 +6,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* In-memory registry tracking the operational state of routes.
|
* In-memory registry tracking the operational state of routes.
|
||||||
* State is inferred from successful route-control command ACKs.
|
* State is updated from agent heartbeats (routeStates map) and
|
||||||
|
* ROUTE_STATE_CHANGED lifecycle events.
|
||||||
* On server restart, all states reset to STARTED (default Camel behavior).
|
* On server restart, all states reset to STARTED (default Camel behavior).
|
||||||
*/
|
*/
|
||||||
public class RouteStateRegistry {
|
public class RouteStateRegistry {
|
||||||
|
|||||||
@@ -0,0 +1,283 @@
|
|||||||
|
# Protocol Extension: Route State Reporting
|
||||||
|
|
||||||
|
**Date:** 2026-04-02
|
||||||
|
**Scope:** Agent protocol v1 extension (backward-compatible)
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
Route operational state (Started/Stopped/Suspended) is currently inferred from command ACKs — the server guesses what state a route is in based on the last successful command. This fails when:
|
||||||
|
|
||||||
|
- An agent restarts (routes reset to Started, but server doesn't know)
|
||||||
|
- A route is stopped/suspended outside Cameleer (JMX, Hawtio, programmatic CamelContext)
|
||||||
|
- The server restarts (all state is lost)
|
||||||
|
- A new agent joins with routes already in non-default states
|
||||||
|
|
||||||
|
## Solution
|
||||||
|
|
||||||
|
Two complementary mechanisms, both backward-compatible:
|
||||||
|
|
||||||
|
1. **Heartbeat extension** — agent includes current route states in every heartbeat
|
||||||
|
2. **Route state change events** — agent emits an event immediately when any route changes state
|
||||||
|
|
||||||
|
The heartbeat is the steady-state source of truth. Events provide real-time updates between heartbeats.
|
||||||
|
|
||||||
|
## 1. Heartbeat Extension
|
||||||
|
|
||||||
|
### Current Protocol
|
||||||
|
|
||||||
|
```
|
||||||
|
POST /api/v1/agents/{id}/heartbeat
|
||||||
|
Authorization: Bearer <token>
|
||||||
|
(empty body)
|
||||||
|
→ 200 OK
|
||||||
|
```
|
||||||
|
|
||||||
|
### Extended Protocol
|
||||||
|
|
||||||
|
```
|
||||||
|
POST /api/v1/agents/{id}/heartbeat
|
||||||
|
Authorization: Bearer <token>
|
||||||
|
Content-Type: application/json
|
||||||
|
|
||||||
|
{
|
||||||
|
"routeStates": {
|
||||||
|
"file-processing": "Started",
|
||||||
|
"timer-heartbeat": "Started",
|
||||||
|
"error-handling-test": "Suspended",
|
||||||
|
"try-catch-test": "Stopped"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
| Field | Type | Required | Description |
|
||||||
|
|-------|------|----------|-------------|
|
||||||
|
| `routeStates` | `Map<String, String>` | No | Current state of each Camel route. Keys are route IDs, values are `"Started"`, `"Stopped"`, or `"Suspended"`. |
|
||||||
|
|
||||||
|
**Backward compatibility:** The body is optional. Agents that don't send it (older versions) continue working — the server falls back to ACK-based inference for those agents. The `Content-Type` header is only required when a body is present.
|
||||||
|
|
||||||
|
### Server Handling
|
||||||
|
|
||||||
|
When a heartbeat includes `routeStates`:
|
||||||
|
|
||||||
|
1. Update `RouteStateRegistry` with the reported states
|
||||||
|
2. For routes present in the map, set their state directly (this is authoritative)
|
||||||
|
3. For routes NOT in the map but previously tracked, mark as `STARTED` (agent didn't report them = default state)
|
||||||
|
4. Clear any ACK-inferred state for this agent's routes — heartbeat is the source of truth
|
||||||
|
|
||||||
|
**Conflict resolution:** If multiple agents for the same application report different states for the same route, use the **most restrictive** state: `STOPPED > SUSPENDED > STARTED`. This handles split-brain scenarios where a command partially succeeded.
|
||||||
|
|
||||||
|
### Heartbeat Request Model
|
||||||
|
|
||||||
|
```java
|
||||||
|
// New: optional heartbeat body
|
||||||
|
public record HeartbeatRequest(
|
||||||
|
Map<String, String> routeStates // routeId → "Started" | "Stopped" | "Suspended"
|
||||||
|
) {}
|
||||||
|
```
|
||||||
|
|
||||||
|
The server endpoint changes from accepting no body to optionally accepting `HeartbeatRequest`:
|
||||||
|
|
||||||
|
```java
|
||||||
|
@PostMapping("/{id}/heartbeat")
|
||||||
|
public ResponseEntity<Void> heartbeat(
|
||||||
|
@PathVariable String id,
|
||||||
|
@RequestBody(required = false) HeartbeatRequest request) {
|
||||||
|
boolean found = registryService.heartbeat(id);
|
||||||
|
if (!found) return ResponseEntity.notFound().build();
|
||||||
|
|
||||||
|
if (request != null && request.routeStates() != null) {
|
||||||
|
AgentInfo agent = registryService.findById(id);
|
||||||
|
if (agent != null) {
|
||||||
|
routeStateRegistry.updateFromHeartbeat(
|
||||||
|
agent.applicationId(), id, request.routeStates());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ResponseEntity.ok().build();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### RouteStateRegistry Extension
|
||||||
|
|
||||||
|
```java
|
||||||
|
/**
|
||||||
|
* Update route states from an agent heartbeat.
|
||||||
|
* Heartbeat-reported state is authoritative — it overrides ACK-inferred state.
|
||||||
|
*
|
||||||
|
* When multiple agents report different states for the same route,
|
||||||
|
* the most restrictive state wins (STOPPED > SUSPENDED > STARTED).
|
||||||
|
*/
|
||||||
|
public void updateFromHeartbeat(String applicationId, String agentId,
|
||||||
|
Map<String, String> routeStates) {
|
||||||
|
for (var entry : routeStates.entrySet()) {
|
||||||
|
String routeId = entry.getKey();
|
||||||
|
RouteState reported = parseState(entry.getValue());
|
||||||
|
RouteState current = getState(applicationId, routeId);
|
||||||
|
|
||||||
|
// Most restrictive wins across agents
|
||||||
|
if (reported.ordinal() > current.ordinal()) {
|
||||||
|
setState(applicationId, routeId, reported);
|
||||||
|
} else if (reported == RouteState.STARTED && current != RouteState.STARTED) {
|
||||||
|
// If this agent says STARTED but another said STOPPED,
|
||||||
|
// keep STOPPED until ALL agents report STARTED
|
||||||
|
// (track per-agent state to resolve this properly)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Note on multi-agent conflict:** The simple "most restrictive wins" heuristic works for the common case. A more precise implementation would track per-agent route states and compute the aggregate. This can be deferred — single-agent-per-app deployments cover most early use cases.
|
||||||
|
|
||||||
|
## 2. Route State Change Events
|
||||||
|
|
||||||
|
### Event Type
|
||||||
|
|
||||||
|
Agents emit a `ROUTE_STATE_CHANGED` event via the existing event ingestion endpoint whenever a Camel route changes operational state.
|
||||||
|
|
||||||
|
```
|
||||||
|
POST /api/v1/data/events
|
||||||
|
Authorization: Bearer <token>
|
||||||
|
Content-Type: application/json
|
||||||
|
X-Cameleer-Protocol-Version: 1
|
||||||
|
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"eventType": "ROUTE_STATE_CHANGED",
|
||||||
|
"timestamp": "2026-04-02T18:30:00Z",
|
||||||
|
"details": {
|
||||||
|
"routeId": "file-processing",
|
||||||
|
"previousState": "Started",
|
||||||
|
"newState": "Stopped",
|
||||||
|
"reason": "command",
|
||||||
|
"commandId": "a1b2c3d4-..."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
| Field | Type | Required | Description |
|
||||||
|
|-------|------|----------|-------------|
|
||||||
|
| `eventType` | `"ROUTE_STATE_CHANGED"` | Yes | Fixed event type |
|
||||||
|
| `timestamp` | ISO-8601 | Yes | When the state change occurred |
|
||||||
|
| `details.routeId` | `String` | Yes | The affected route |
|
||||||
|
| `details.previousState` | `String` | Yes | State before change: `"Started"`, `"Stopped"`, `"Suspended"` |
|
||||||
|
| `details.newState` | `String` | Yes | State after change |
|
||||||
|
| `details.reason` | `String` | No | What triggered the change: `"command"` (Cameleer command), `"external"` (JMX/Hawtio/programmatic), `"startup"` (route starting on agent boot), `"error"` (route stopped due to error) |
|
||||||
|
| `details.commandId` | `String` | No | If triggered by a Cameleer command, the command ID |
|
||||||
|
|
||||||
|
### Server Handling
|
||||||
|
|
||||||
|
When `EventIngestionController` receives a `ROUTE_STATE_CHANGED` event:
|
||||||
|
|
||||||
|
1. Store the event in the agent events table (existing behavior — all events are stored)
|
||||||
|
2. Additionally, update `RouteStateRegistry` immediately:
|
||||||
|
|
||||||
|
```java
|
||||||
|
// In EventIngestionController, after storing events:
|
||||||
|
for (AgentEvent event : events) {
|
||||||
|
if ("ROUTE_STATE_CHANGED".equals(event.getEventType())) {
|
||||||
|
Map<String, String> details = event.getDetails();
|
||||||
|
if (details != null) {
|
||||||
|
String routeId = details.get("routeId");
|
||||||
|
String newState = details.get("newState");
|
||||||
|
if (routeId != null && newState != null) {
|
||||||
|
routeStateRegistry.setState(
|
||||||
|
agent.applicationId(), routeId,
|
||||||
|
RouteStateRegistry.parseState(newState));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Agent-Side Detection
|
||||||
|
|
||||||
|
The agent detects route state changes using Camel's `EventNotifier` mechanism:
|
||||||
|
|
||||||
|
```java
|
||||||
|
// In the Cameleer3 agent (cameleer3 repo)
|
||||||
|
public class RouteStateEventNotifier extends EventNotifierSupport {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void notify(CamelEvent event) {
|
||||||
|
if (event instanceof RouteStartedEvent e) {
|
||||||
|
emit(e.getRoute().getRouteId(), "Started", previousState);
|
||||||
|
} else if (event instanceof RouteStoppedEvent e) {
|
||||||
|
emit(e.getRoute().getRouteId(), "Stopped", previousState);
|
||||||
|
} else if (event instanceof RouteSuspendedEvent e) {
|
||||||
|
emit(e.getRoute().getRouteId(), "Suspended", previousState);
|
||||||
|
} else if (event instanceof RouteResumedEvent e) {
|
||||||
|
emit(e.getRoute().getRouteId(), "Started", "Suspended");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void emit(String routeId, String newState, String previousState) {
|
||||||
|
// Send ROUTE_STATE_CHANGED event to server
|
||||||
|
// Also update local state map used for heartbeat reporting
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
For heartbeat route state reporting, the agent maintains an in-memory map of route states, updated by the same `EventNotifier`. The heartbeat sends this map on each tick.
|
||||||
|
|
||||||
|
## State Flow Diagram
|
||||||
|
|
||||||
|
```
|
||||||
|
Agent boots
|
||||||
|
│
|
||||||
|
├─ Camel routes start → EventNotifier fires RouteStartedEvent
|
||||||
|
│ ├─ Update local routeStates map: { "route1": "Started", ... }
|
||||||
|
│ └─ Emit ROUTE_STATE_CHANGED event → Server updates RouteStateRegistry
|
||||||
|
│
|
||||||
|
├─ First heartbeat (30s) → includes routeStates
|
||||||
|
│ └─ Server: RouteStateRegistry.updateFromHeartbeat() — authoritative snapshot
|
||||||
|
│
|
||||||
|
├─ User sends STOP command via Cameleer UI
|
||||||
|
│ ├─ Server dispatches to all agents via SSE
|
||||||
|
│ ├─ Agent stops route → EventNotifier fires RouteStoppedEvent
|
||||||
|
│ │ ├─ Update local routeStates map: { "route1": "Stopped" }
|
||||||
|
│ │ └─ Emit ROUTE_STATE_CHANGED event → Server updates immediately
|
||||||
|
│ └─ Agent ACKs command → Server also updates from ACK (redundant, both are fine)
|
||||||
|
│
|
||||||
|
├─ Operator stops route via JMX (outside Cameleer)
|
||||||
|
│ ├─ EventNotifier fires RouteStoppedEvent
|
||||||
|
│ │ ├─ Update local routeStates map: { "route1": "Stopped" }
|
||||||
|
│ │ └─ Emit ROUTE_STATE_CHANGED event → Server updates immediately
|
||||||
|
│ └─ Next heartbeat confirms state
|
||||||
|
│
|
||||||
|
├─ Agent restarts
|
||||||
|
│ ├─ All routes start in default state
|
||||||
|
│ ├─ EventNotifier fires RouteStartedEvent for each route
|
||||||
|
│ ├─ Re-registration includes routeIds (existing protocol)
|
||||||
|
│ └─ First heartbeat after restart: routeStates = all Started
|
||||||
|
│ └─ Server: RouteStateRegistry cleared for this agent's routes
|
||||||
|
│
|
||||||
|
└─ Server restarts
|
||||||
|
├─ RouteStateRegistry is empty (in-memory, no persistence)
|
||||||
|
└─ Next heartbeat from each agent restores accurate state
|
||||||
|
```
|
||||||
|
|
||||||
|
## Migration Path
|
||||||
|
|
||||||
|
### Phase 1: Server-side (this repo)
|
||||||
|
1. Accept optional `HeartbeatRequest` body on heartbeat endpoint
|
||||||
|
2. Handle `ROUTE_STATE_CHANGED` events in `EventIngestionController`
|
||||||
|
3. Update `RouteStateRegistry` from both sources
|
||||||
|
4. Remove ACK-based inference from `AgentCommandController` (heartbeat/events are authoritative)
|
||||||
|
|
||||||
|
### Phase 2: Agent-side (cameleer3 repo)
|
||||||
|
1. Add `RouteStateEventNotifier` to detect state changes
|
||||||
|
2. Maintain local `routeStates` map
|
||||||
|
3. Include `routeStates` in heartbeat payload
|
||||||
|
4. Emit `ROUTE_STATE_CHANGED` events on state transitions
|
||||||
|
|
||||||
|
### Backward Compatibility
|
||||||
|
|
||||||
|
- Agents without this extension: heartbeat body is empty, no events emitted. Server falls back to ACK-based inference (current behavior).
|
||||||
|
- Server without this extension: ignores unknown heartbeat body fields and stores `ROUTE_STATE_CHANGED` events as generic events (no registry update).
|
||||||
|
- Protocol version stays at `1` — this is an additive extension, not a breaking change.
|
||||||
|
|
||||||
|
## Not In Scope
|
||||||
|
|
||||||
|
- **Persistence of route state** — intentionally omitted. Route state is runtime state that resets with the application. Persisting it would create stale data after app restarts.
|
||||||
|
- **Per-agent state tracking** — the registry tracks aggregate state per application+route. Per-agent granularity can be added later if multi-agent split-brain scenarios are common.
|
||||||
|
- **Route state in SSE push** — the server does not push state changes to the UI via SSE. The UI polls the catalog API (existing 15s refresh interval) which includes route state.
|
||||||
Reference in New Issue
Block a user