feat(03-01): add agent registration controller, config, lifecycle monitor
- AgentRegistryConfig: heartbeat, stale, dead, ping, command expiry settings
- AgentRegistryBeanConfig: wires AgentRegistryService as Spring bean
- AgentLifecycleMonitor: @Scheduled lifecycle check + command expiry sweep
- AgentRegistrationController: POST /register, POST /{id}/heartbeat, GET /agents
- Updated Cameleer3ServerApplication with AgentRegistryConfig
- Updated application.yml with agent-registry section and async timeout
- 7 integration tests: register, re-register, heartbeat, list, filter, invalid status
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
# Phase 3 Deferred Items
|
||||
|
||||
## Pre-existing Test Flakiness
|
||||
|
||||
- **DiagramRenderControllerIT.seedDiagram** - EmptyResultDataAccess error (expects 1 row, gets 0). This is a pre-existing ClickHouse timing issue not caused by Phase 3 changes. The test relies on data being flushed and available before the assertion, which can fail under timing pressure.
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.cameleer3.server.app;
|
||||
|
||||
import com.cameleer3.server.app.config.AgentRegistryConfig;
|
||||
import com.cameleer3.server.app.config.IngestionConfig;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
@@ -16,7 +17,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
"com.cameleer3.server.core"
|
||||
})
|
||||
@EnableScheduling
|
||||
@EnableConfigurationProperties(IngestionConfig.class)
|
||||
@EnableConfigurationProperties({IngestionConfig.class, AgentRegistryConfig.class})
|
||||
public class Cameleer3ServerApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package com.cameleer3.server.app.agent;
|
||||
|
||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Periodic task that checks agent lifecycle and expires old commands.
|
||||
* <p>
|
||||
* Runs on a configurable fixed delay (default 10 seconds). Transitions
|
||||
* agents LIVE -> STALE -> DEAD based on heartbeat timing, and removes
|
||||
* expired pending commands.
|
||||
*/
|
||||
@Component
|
||||
public class AgentLifecycleMonitor {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AgentLifecycleMonitor.class);
|
||||
|
||||
private final AgentRegistryService registryService;
|
||||
|
||||
public AgentLifecycleMonitor(AgentRegistryService registryService) {
|
||||
this.registryService = registryService;
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelayString = "${agent-registry.lifecycle-check-interval-ms:10000}")
|
||||
public void checkLifecycle() {
|
||||
try {
|
||||
registryService.checkLifecycle();
|
||||
registryService.expireOldCommands();
|
||||
} catch (Exception e) {
|
||||
log.error("Error during agent lifecycle check", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.cameleer3.server.app.config;
|
||||
|
||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* Creates the {@link AgentRegistryService} bean.
|
||||
* <p>
|
||||
* Follows the established pattern: core module plain class, app module bean config.
|
||||
*/
|
||||
@Configuration
|
||||
public class AgentRegistryBeanConfig {
|
||||
|
||||
@Bean
|
||||
public AgentRegistryService agentRegistryService(AgentRegistryConfig config) {
|
||||
return new AgentRegistryService(
|
||||
config.getStaleThresholdMs(),
|
||||
config.getDeadThresholdMs(),
|
||||
config.getCommandExpiryMs()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
package com.cameleer3.server.app.config;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
* Configuration properties for the agent registry.
|
||||
* Bound from the {@code agent-registry.*} namespace in application.yml.
|
||||
* <p>
|
||||
* Registered via {@code @EnableConfigurationProperties} on the application class.
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "agent-registry")
|
||||
public class AgentRegistryConfig {
|
||||
|
||||
private long heartbeatIntervalMs = 30_000;
|
||||
private long staleThresholdMs = 90_000;
|
||||
private long deadThresholdMs = 300_000;
|
||||
private long pingIntervalMs = 15_000;
|
||||
private long commandExpiryMs = 60_000;
|
||||
private long lifecycleCheckIntervalMs = 10_000;
|
||||
|
||||
public long getHeartbeatIntervalMs() {
|
||||
return heartbeatIntervalMs;
|
||||
}
|
||||
|
||||
public void setHeartbeatIntervalMs(long heartbeatIntervalMs) {
|
||||
this.heartbeatIntervalMs = heartbeatIntervalMs;
|
||||
}
|
||||
|
||||
public long getStaleThresholdMs() {
|
||||
return staleThresholdMs;
|
||||
}
|
||||
|
||||
public void setStaleThresholdMs(long staleThresholdMs) {
|
||||
this.staleThresholdMs = staleThresholdMs;
|
||||
}
|
||||
|
||||
public long getDeadThresholdMs() {
|
||||
return deadThresholdMs;
|
||||
}
|
||||
|
||||
public void setDeadThresholdMs(long deadThresholdMs) {
|
||||
this.deadThresholdMs = deadThresholdMs;
|
||||
}
|
||||
|
||||
public long getPingIntervalMs() {
|
||||
return pingIntervalMs;
|
||||
}
|
||||
|
||||
public void setPingIntervalMs(long pingIntervalMs) {
|
||||
this.pingIntervalMs = pingIntervalMs;
|
||||
}
|
||||
|
||||
public long getCommandExpiryMs() {
|
||||
return commandExpiryMs;
|
||||
}
|
||||
|
||||
public void setCommandExpiryMs(long commandExpiryMs) {
|
||||
this.commandExpiryMs = commandExpiryMs;
|
||||
}
|
||||
|
||||
public long getLifecycleCheckIntervalMs() {
|
||||
return lifecycleCheckIntervalMs;
|
||||
}
|
||||
|
||||
public void setLifecycleCheckIntervalMs(long lifecycleCheckIntervalMs) {
|
||||
this.lifecycleCheckIntervalMs = lifecycleCheckIntervalMs;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
package com.cameleer3.server.app.controller;
|
||||
|
||||
import com.cameleer3.server.app.config.AgentRegistryConfig;
|
||||
import com.cameleer3.server.core.agent.AgentInfo;
|
||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||
import com.cameleer3.server.core.agent.AgentState;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Agent registration, heartbeat, and listing endpoints.
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/api/v1/agents")
|
||||
@Tag(name = "Agent Management", description = "Agent registration and lifecycle endpoints")
|
||||
public class AgentRegistrationController {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AgentRegistrationController.class);
|
||||
|
||||
private final AgentRegistryService registryService;
|
||||
private final AgentRegistryConfig config;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public AgentRegistrationController(AgentRegistryService registryService,
|
||||
AgentRegistryConfig config,
|
||||
ObjectMapper objectMapper) {
|
||||
this.registryService = registryService;
|
||||
this.config = config;
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
@PostMapping("/register")
|
||||
@Operation(summary = "Register an agent",
|
||||
description = "Registers a new agent or re-registers an existing one")
|
||||
@ApiResponse(responseCode = "200", description = "Agent registered successfully")
|
||||
@ApiResponse(responseCode = "400", description = "Invalid registration payload")
|
||||
public ResponseEntity<String> register(@RequestBody String body) throws JsonProcessingException {
|
||||
JsonNode node = objectMapper.readTree(body);
|
||||
|
||||
String agentId = getRequiredField(node, "agentId");
|
||||
String name = getRequiredField(node, "name");
|
||||
if (agentId == null || name == null) {
|
||||
return ResponseEntity.badRequest()
|
||||
.body("{\"error\":\"agentId and name are required\"}");
|
||||
}
|
||||
|
||||
String group = node.has("group") ? node.get("group").asText() : "default";
|
||||
String version = node.has("version") ? node.get("version").asText() : null;
|
||||
|
||||
List<String> routeIds = new ArrayList<>();
|
||||
if (node.has("routeIds") && node.get("routeIds").isArray()) {
|
||||
for (JsonNode rid : node.get("routeIds")) {
|
||||
routeIds.add(rid.asText());
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Object> capabilities = Collections.emptyMap();
|
||||
if (node.has("capabilities") && node.get("capabilities").isObject()) {
|
||||
capabilities = new LinkedHashMap<>();
|
||||
Iterator<Map.Entry<String, JsonNode>> fields = node.get("capabilities").fields();
|
||||
while (fields.hasNext()) {
|
||||
Map.Entry<String, JsonNode> field = fields.next();
|
||||
capabilities.put(field.getKey(), parseJsonValue(field.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
AgentInfo agent = registryService.register(agentId, name, group, version, routeIds, capabilities);
|
||||
log.info("Agent registered: {} (name={}, group={})", agentId, name, group);
|
||||
|
||||
Map<String, Object> response = new LinkedHashMap<>();
|
||||
response.put("agentId", agent.id());
|
||||
response.put("sseEndpoint", "/api/v1/agents/" + agentId + "/events");
|
||||
response.put("heartbeatIntervalMs", config.getHeartbeatIntervalMs());
|
||||
response.put("serverPublicKey", null);
|
||||
|
||||
return ResponseEntity.ok(objectMapper.writeValueAsString(response));
|
||||
}
|
||||
|
||||
@PostMapping("/{id}/heartbeat")
|
||||
@Operation(summary = "Agent heartbeat ping",
|
||||
description = "Updates the agent's last heartbeat timestamp")
|
||||
@ApiResponse(responseCode = "200", description = "Heartbeat accepted")
|
||||
@ApiResponse(responseCode = "404", description = "Agent not registered")
|
||||
public ResponseEntity<Void> heartbeat(@PathVariable String id) {
|
||||
boolean found = registryService.heartbeat(id);
|
||||
if (!found) {
|
||||
return ResponseEntity.notFound().build();
|
||||
}
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
|
||||
@GetMapping
|
||||
@Operation(summary = "List all agents",
|
||||
description = "Returns all registered agents, optionally filtered by status")
|
||||
@ApiResponse(responseCode = "200", description = "Agent list returned")
|
||||
@ApiResponse(responseCode = "400", description = "Invalid status filter")
|
||||
public ResponseEntity<String> listAgents(
|
||||
@RequestParam(required = false) String status) throws JsonProcessingException {
|
||||
List<AgentInfo> agents;
|
||||
|
||||
if (status != null) {
|
||||
try {
|
||||
AgentState stateFilter = AgentState.valueOf(status.toUpperCase());
|
||||
agents = registryService.findByState(stateFilter);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return ResponseEntity.badRequest()
|
||||
.body("{\"error\":\"Invalid status: " + status + ". Valid values: LIVE, STALE, DEAD\"}");
|
||||
}
|
||||
} else {
|
||||
agents = registryService.findAll();
|
||||
}
|
||||
|
||||
return ResponseEntity.ok(objectMapper.writeValueAsString(agents));
|
||||
}
|
||||
|
||||
private String getRequiredField(JsonNode node, String fieldName) {
|
||||
if (!node.has(fieldName) || node.get(fieldName).isNull() || node.get(fieldName).asText().isBlank()) {
|
||||
return null;
|
||||
}
|
||||
return node.get(fieldName).asText();
|
||||
}
|
||||
|
||||
private Object parseJsonValue(JsonNode node) {
|
||||
if (node.isBoolean()) return node.asBoolean();
|
||||
if (node.isInt()) return node.asInt();
|
||||
if (node.isLong()) return node.asLong();
|
||||
if (node.isDouble()) return node.asDouble();
|
||||
if (node.isTextual()) return node.asText();
|
||||
return node.toString();
|
||||
}
|
||||
}
|
||||
@@ -7,12 +7,23 @@ spring:
|
||||
username: cameleer
|
||||
password: cameleer_dev
|
||||
driver-class-name: com.clickhouse.jdbc.ClickHouseDriver
|
||||
mvc:
|
||||
async:
|
||||
request-timeout: -1
|
||||
jackson:
|
||||
serialization:
|
||||
write-dates-as-timestamps: false
|
||||
deserialization:
|
||||
fail-on-unknown-properties: false
|
||||
|
||||
agent-registry:
|
||||
heartbeat-interval-ms: 30000
|
||||
stale-threshold-ms: 90000
|
||||
dead-threshold-ms: 300000
|
||||
ping-interval-ms: 15000
|
||||
command-expiry-ms: 60000
|
||||
lifecycle-check-interval-ms: 10000
|
||||
|
||||
ingestion:
|
||||
buffer-capacity: 50000
|
||||
batch-size: 5000
|
||||
|
||||
@@ -0,0 +1,155 @@
|
||||
package com.cameleer3.server.app.controller;
|
||||
|
||||
import com.cameleer3.server.app.AbstractClickHouseIT;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class AgentRegistrationControllerIT extends AbstractClickHouseIT {
|
||||
|
||||
@Autowired
|
||||
private TestRestTemplate restTemplate;
|
||||
|
||||
@Autowired
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
private HttpHeaders protocolHeaders() {
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||
return headers;
|
||||
}
|
||||
|
||||
private HttpHeaders protocolHeadersNoBody() {
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||
return headers;
|
||||
}
|
||||
|
||||
private ResponseEntity<String> registerAgent(String agentId, String name) {
|
||||
String json = """
|
||||
{
|
||||
"agentId": "%s",
|
||||
"name": "%s",
|
||||
"group": "test-group",
|
||||
"version": "1.0.0",
|
||||
"routeIds": ["route-1", "route-2"],
|
||||
"capabilities": {"tracing": true}
|
||||
}
|
||||
""".formatted(agentId, name);
|
||||
|
||||
return restTemplate.postForEntity(
|
||||
"/api/v1/agents/register",
|
||||
new HttpEntity<>(json, protocolHeaders()),
|
||||
String.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
void registerNewAgent_returns200WithAgentIdAndSseEndpoint() throws Exception {
|
||||
ResponseEntity<String> response = registerAgent("agent-it-1", "IT Agent 1");
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
|
||||
JsonNode body = objectMapper.readTree(response.getBody());
|
||||
assertThat(body.get("agentId").asText()).isEqualTo("agent-it-1");
|
||||
assertThat(body.get("sseEndpoint").asText()).isEqualTo("/api/v1/agents/agent-it-1/events");
|
||||
assertThat(body.get("heartbeatIntervalMs").asLong()).isGreaterThan(0);
|
||||
assertThat(body.has("serverPublicKey")).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
void reRegisterSameAgent_returns200WithLiveState() throws Exception {
|
||||
registerAgent("agent-it-reregister", "First Registration");
|
||||
|
||||
ResponseEntity<String> response = registerAgent("agent-it-reregister", "Second Registration");
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
|
||||
JsonNode body = objectMapper.readTree(response.getBody());
|
||||
assertThat(body.get("agentId").asText()).isEqualTo("agent-it-reregister");
|
||||
}
|
||||
|
||||
@Test
|
||||
void heartbeatKnownAgent_returns200() {
|
||||
registerAgent("agent-it-hb", "HB Agent");
|
||||
|
||||
ResponseEntity<Void> response = restTemplate.exchange(
|
||||
"/api/v1/agents/agent-it-hb/heartbeat",
|
||||
HttpMethod.POST,
|
||||
new HttpEntity<>(protocolHeadersNoBody()),
|
||||
Void.class);
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
}
|
||||
|
||||
@Test
|
||||
void heartbeatUnknownAgent_returns404() {
|
||||
ResponseEntity<Void> response = restTemplate.exchange(
|
||||
"/api/v1/agents/unknown-agent-xyz/heartbeat",
|
||||
HttpMethod.POST,
|
||||
new HttpEntity<>(protocolHeadersNoBody()),
|
||||
Void.class);
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
|
||||
}
|
||||
|
||||
@Test
|
||||
void listAllAgents_returnsBothAgents() throws Exception {
|
||||
registerAgent("agent-it-list-1", "List Agent 1");
|
||||
registerAgent("agent-it-list-2", "List Agent 2");
|
||||
|
||||
ResponseEntity<String> response = restTemplate.exchange(
|
||||
"/api/v1/agents",
|
||||
HttpMethod.GET,
|
||||
new HttpEntity<>(protocolHeadersNoBody()),
|
||||
String.class);
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
|
||||
JsonNode body = objectMapper.readTree(response.getBody());
|
||||
assertThat(body.isArray()).isTrue();
|
||||
// At minimum, our two agents should be present (may have more from other tests)
|
||||
assertThat(body.size()).isGreaterThanOrEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void listAgentsByStatus_filtersCorrectly() throws Exception {
|
||||
registerAgent("agent-it-filter", "Filter Agent");
|
||||
|
||||
ResponseEntity<String> response = restTemplate.exchange(
|
||||
"/api/v1/agents?status=LIVE",
|
||||
HttpMethod.GET,
|
||||
new HttpEntity<>(protocolHeadersNoBody()),
|
||||
String.class);
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
|
||||
JsonNode body = objectMapper.readTree(response.getBody());
|
||||
assertThat(body.isArray()).isTrue();
|
||||
// All returned agents should be LIVE
|
||||
for (JsonNode agent : body) {
|
||||
assertThat(agent.get("state").asText()).isEqualTo("LIVE");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void listAgentsWithInvalidStatus_returns400() {
|
||||
ResponseEntity<String> response = restTemplate.exchange(
|
||||
"/api/v1/agents?status=INVALID",
|
||||
HttpMethod.GET,
|
||||
new HttpEntity<>(protocolHeadersNoBody()),
|
||||
String.class);
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user