diff --git a/.planning/phases/03-agent-registry-sse-push/deferred-items.md b/.planning/phases/03-agent-registry-sse-push/deferred-items.md new file mode 100644 index 00000000..e7d89000 --- /dev/null +++ b/.planning/phases/03-agent-registry-sse-push/deferred-items.md @@ -0,0 +1,5 @@ +# Phase 3 Deferred Items + +## Pre-existing Test Flakiness + +- **DiagramRenderControllerIT.seedDiagram** - EmptyResultDataAccess error (expects 1 row, gets 0). This is a pre-existing ClickHouse timing issue not caused by Phase 3 changes. The test relies on data being flushed and available before the assertion, which can fail under timing pressure. diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/Cameleer3ServerApplication.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/Cameleer3ServerApplication.java index 3ce2e43f..bf271ffc 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/Cameleer3ServerApplication.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/Cameleer3ServerApplication.java @@ -1,5 +1,6 @@ package com.cameleer3.server.app; +import com.cameleer3.server.app.config.AgentRegistryConfig; import com.cameleer3.server.app.config.IngestionConfig; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -16,7 +17,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; "com.cameleer3.server.core" }) @EnableScheduling -@EnableConfigurationProperties(IngestionConfig.class) +@EnableConfigurationProperties({IngestionConfig.class, AgentRegistryConfig.class}) public class Cameleer3ServerApplication { public static void main(String[] args) { diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/agent/AgentLifecycleMonitor.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/agent/AgentLifecycleMonitor.java new file mode 100644 index 00000000..36d48205 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/agent/AgentLifecycleMonitor.java @@ -0,0 +1,36 @@ +package com.cameleer3.server.app.agent; + +import com.cameleer3.server.core.agent.AgentRegistryService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +/** + * Periodic task that checks agent lifecycle and expires old commands. + *

+ * Runs on a configurable fixed delay (default 10 seconds). Transitions + * agents LIVE -> STALE -> DEAD based on heartbeat timing, and removes + * expired pending commands. + */ +@Component +public class AgentLifecycleMonitor { + + private static final Logger log = LoggerFactory.getLogger(AgentLifecycleMonitor.class); + + private final AgentRegistryService registryService; + + public AgentLifecycleMonitor(AgentRegistryService registryService) { + this.registryService = registryService; + } + + @Scheduled(fixedDelayString = "${agent-registry.lifecycle-check-interval-ms:10000}") + public void checkLifecycle() { + try { + registryService.checkLifecycle(); + registryService.expireOldCommands(); + } catch (Exception e) { + log.error("Error during agent lifecycle check", e); + } + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryBeanConfig.java new file mode 100644 index 00000000..f59e536f --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryBeanConfig.java @@ -0,0 +1,23 @@ +package com.cameleer3.server.app.config; + +import com.cameleer3.server.core.agent.AgentRegistryService; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Creates the {@link AgentRegistryService} bean. + *

+ * Follows the established pattern: core module plain class, app module bean config. + */ +@Configuration +public class AgentRegistryBeanConfig { + + @Bean + public AgentRegistryService agentRegistryService(AgentRegistryConfig config) { + return new AgentRegistryService( + config.getStaleThresholdMs(), + config.getDeadThresholdMs(), + config.getCommandExpiryMs() + ); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryConfig.java new file mode 100644 index 00000000..7861abd2 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/AgentRegistryConfig.java @@ -0,0 +1,68 @@ +package com.cameleer3.server.app.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Configuration properties for the agent registry. + * Bound from the {@code agent-registry.*} namespace in application.yml. + *

+ * Registered via {@code @EnableConfigurationProperties} on the application class. + */ +@ConfigurationProperties(prefix = "agent-registry") +public class AgentRegistryConfig { + + private long heartbeatIntervalMs = 30_000; + private long staleThresholdMs = 90_000; + private long deadThresholdMs = 300_000; + private long pingIntervalMs = 15_000; + private long commandExpiryMs = 60_000; + private long lifecycleCheckIntervalMs = 10_000; + + public long getHeartbeatIntervalMs() { + return heartbeatIntervalMs; + } + + public void setHeartbeatIntervalMs(long heartbeatIntervalMs) { + this.heartbeatIntervalMs = heartbeatIntervalMs; + } + + public long getStaleThresholdMs() { + return staleThresholdMs; + } + + public void setStaleThresholdMs(long staleThresholdMs) { + this.staleThresholdMs = staleThresholdMs; + } + + public long getDeadThresholdMs() { + return deadThresholdMs; + } + + public void setDeadThresholdMs(long deadThresholdMs) { + this.deadThresholdMs = deadThresholdMs; + } + + public long getPingIntervalMs() { + return pingIntervalMs; + } + + public void setPingIntervalMs(long pingIntervalMs) { + this.pingIntervalMs = pingIntervalMs; + } + + public long getCommandExpiryMs() { + return commandExpiryMs; + } + + public void setCommandExpiryMs(long commandExpiryMs) { + this.commandExpiryMs = commandExpiryMs; + } + + public long getLifecycleCheckIntervalMs() { + return lifecycleCheckIntervalMs; + } + + public void setLifecycleCheckIntervalMs(long lifecycleCheckIntervalMs) { + this.lifecycleCheckIntervalMs = lifecycleCheckIntervalMs; + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java new file mode 100644 index 00000000..1007d686 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java @@ -0,0 +1,152 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.config.AgentRegistryConfig; +import com.cameleer3.server.core.agent.AgentInfo; +import com.cameleer3.server.core.agent.AgentRegistryService; +import com.cameleer3.server.core.agent.AgentState; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Agent registration, heartbeat, and listing endpoints. + */ +@RestController +@RequestMapping("/api/v1/agents") +@Tag(name = "Agent Management", description = "Agent registration and lifecycle endpoints") +public class AgentRegistrationController { + + private static final Logger log = LoggerFactory.getLogger(AgentRegistrationController.class); + + private final AgentRegistryService registryService; + private final AgentRegistryConfig config; + private final ObjectMapper objectMapper; + + public AgentRegistrationController(AgentRegistryService registryService, + AgentRegistryConfig config, + ObjectMapper objectMapper) { + this.registryService = registryService; + this.config = config; + this.objectMapper = objectMapper; + } + + @PostMapping("/register") + @Operation(summary = "Register an agent", + description = "Registers a new agent or re-registers an existing one") + @ApiResponse(responseCode = "200", description = "Agent registered successfully") + @ApiResponse(responseCode = "400", description = "Invalid registration payload") + public ResponseEntity register(@RequestBody String body) throws JsonProcessingException { + JsonNode node = objectMapper.readTree(body); + + String agentId = getRequiredField(node, "agentId"); + String name = getRequiredField(node, "name"); + if (agentId == null || name == null) { + return ResponseEntity.badRequest() + .body("{\"error\":\"agentId and name are required\"}"); + } + + String group = node.has("group") ? node.get("group").asText() : "default"; + String version = node.has("version") ? node.get("version").asText() : null; + + List routeIds = new ArrayList<>(); + if (node.has("routeIds") && node.get("routeIds").isArray()) { + for (JsonNode rid : node.get("routeIds")) { + routeIds.add(rid.asText()); + } + } + + Map capabilities = Collections.emptyMap(); + if (node.has("capabilities") && node.get("capabilities").isObject()) { + capabilities = new LinkedHashMap<>(); + Iterator> fields = node.get("capabilities").fields(); + while (fields.hasNext()) { + Map.Entry field = fields.next(); + capabilities.put(field.getKey(), parseJsonValue(field.getValue())); + } + } + + AgentInfo agent = registryService.register(agentId, name, group, version, routeIds, capabilities); + log.info("Agent registered: {} (name={}, group={})", agentId, name, group); + + Map response = new LinkedHashMap<>(); + response.put("agentId", agent.id()); + response.put("sseEndpoint", "/api/v1/agents/" + agentId + "/events"); + response.put("heartbeatIntervalMs", config.getHeartbeatIntervalMs()); + response.put("serverPublicKey", null); + + return ResponseEntity.ok(objectMapper.writeValueAsString(response)); + } + + @PostMapping("/{id}/heartbeat") + @Operation(summary = "Agent heartbeat ping", + description = "Updates the agent's last heartbeat timestamp") + @ApiResponse(responseCode = "200", description = "Heartbeat accepted") + @ApiResponse(responseCode = "404", description = "Agent not registered") + public ResponseEntity heartbeat(@PathVariable String id) { + boolean found = registryService.heartbeat(id); + if (!found) { + return ResponseEntity.notFound().build(); + } + return ResponseEntity.ok().build(); + } + + @GetMapping + @Operation(summary = "List all agents", + description = "Returns all registered agents, optionally filtered by status") + @ApiResponse(responseCode = "200", description = "Agent list returned") + @ApiResponse(responseCode = "400", description = "Invalid status filter") + public ResponseEntity listAgents( + @RequestParam(required = false) String status) throws JsonProcessingException { + List agents; + + if (status != null) { + try { + AgentState stateFilter = AgentState.valueOf(status.toUpperCase()); + agents = registryService.findByState(stateFilter); + } catch (IllegalArgumentException e) { + return ResponseEntity.badRequest() + .body("{\"error\":\"Invalid status: " + status + ". Valid values: LIVE, STALE, DEAD\"}"); + } + } else { + agents = registryService.findAll(); + } + + return ResponseEntity.ok(objectMapper.writeValueAsString(agents)); + } + + private String getRequiredField(JsonNode node, String fieldName) { + if (!node.has(fieldName) || node.get(fieldName).isNull() || node.get(fieldName).asText().isBlank()) { + return null; + } + return node.get(fieldName).asText(); + } + + private Object parseJsonValue(JsonNode node) { + if (node.isBoolean()) return node.asBoolean(); + if (node.isInt()) return node.asInt(); + if (node.isLong()) return node.asLong(); + if (node.isDouble()) return node.asDouble(); + if (node.isTextual()) return node.asText(); + return node.toString(); + } +} diff --git a/cameleer3-server-app/src/main/resources/application.yml b/cameleer3-server-app/src/main/resources/application.yml index 024dc99d..7ce15062 100644 --- a/cameleer3-server-app/src/main/resources/application.yml +++ b/cameleer3-server-app/src/main/resources/application.yml @@ -7,12 +7,23 @@ spring: username: cameleer password: cameleer_dev driver-class-name: com.clickhouse.jdbc.ClickHouseDriver + mvc: + async: + request-timeout: -1 jackson: serialization: write-dates-as-timestamps: false deserialization: fail-on-unknown-properties: false +agent-registry: + heartbeat-interval-ms: 30000 + stale-threshold-ms: 90000 + dead-threshold-ms: 300000 + ping-interval-ms: 15000 + command-expiry-ms: 60000 + lifecycle-check-interval-ms: 10000 + ingestion: buffer-capacity: 50000 batch-size: 5000 diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java new file mode 100644 index 00000000..8cc25379 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java @@ -0,0 +1,155 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.AbstractClickHouseIT; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; + +import static org.assertj.core.api.Assertions.assertThat; + +class AgentRegistrationControllerIT extends AbstractClickHouseIT { + + @Autowired + private TestRestTemplate restTemplate; + + @Autowired + private ObjectMapper objectMapper; + + private HttpHeaders protocolHeaders() { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + return headers; + } + + private HttpHeaders protocolHeadersNoBody() { + HttpHeaders headers = new HttpHeaders(); + headers.set("X-Cameleer-Protocol-Version", "1"); + return headers; + } + + private ResponseEntity registerAgent(String agentId, String name) { + String json = """ + { + "agentId": "%s", + "name": "%s", + "group": "test-group", + "version": "1.0.0", + "routeIds": ["route-1", "route-2"], + "capabilities": {"tracing": true} + } + """.formatted(agentId, name); + + return restTemplate.postForEntity( + "/api/v1/agents/register", + new HttpEntity<>(json, protocolHeaders()), + String.class); + } + + @Test + void registerNewAgent_returns200WithAgentIdAndSseEndpoint() throws Exception { + ResponseEntity response = registerAgent("agent-it-1", "IT Agent 1"); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + + JsonNode body = objectMapper.readTree(response.getBody()); + assertThat(body.get("agentId").asText()).isEqualTo("agent-it-1"); + assertThat(body.get("sseEndpoint").asText()).isEqualTo("/api/v1/agents/agent-it-1/events"); + assertThat(body.get("heartbeatIntervalMs").asLong()).isGreaterThan(0); + assertThat(body.has("serverPublicKey")).isTrue(); + } + + @Test + void reRegisterSameAgent_returns200WithLiveState() throws Exception { + registerAgent("agent-it-reregister", "First Registration"); + + ResponseEntity response = registerAgent("agent-it-reregister", "Second Registration"); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + + JsonNode body = objectMapper.readTree(response.getBody()); + assertThat(body.get("agentId").asText()).isEqualTo("agent-it-reregister"); + } + + @Test + void heartbeatKnownAgent_returns200() { + registerAgent("agent-it-hb", "HB Agent"); + + ResponseEntity response = restTemplate.exchange( + "/api/v1/agents/agent-it-hb/heartbeat", + HttpMethod.POST, + new HttpEntity<>(protocolHeadersNoBody()), + Void.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + } + + @Test + void heartbeatUnknownAgent_returns404() { + ResponseEntity response = restTemplate.exchange( + "/api/v1/agents/unknown-agent-xyz/heartbeat", + HttpMethod.POST, + new HttpEntity<>(protocolHeadersNoBody()), + Void.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND); + } + + @Test + void listAllAgents_returnsBothAgents() throws Exception { + registerAgent("agent-it-list-1", "List Agent 1"); + registerAgent("agent-it-list-2", "List Agent 2"); + + ResponseEntity response = restTemplate.exchange( + "/api/v1/agents", + HttpMethod.GET, + new HttpEntity<>(protocolHeadersNoBody()), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + + JsonNode body = objectMapper.readTree(response.getBody()); + assertThat(body.isArray()).isTrue(); + // At minimum, our two agents should be present (may have more from other tests) + assertThat(body.size()).isGreaterThanOrEqualTo(2); + } + + @Test + void listAgentsByStatus_filtersCorrectly() throws Exception { + registerAgent("agent-it-filter", "Filter Agent"); + + ResponseEntity response = restTemplate.exchange( + "/api/v1/agents?status=LIVE", + HttpMethod.GET, + new HttpEntity<>(protocolHeadersNoBody()), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + + JsonNode body = objectMapper.readTree(response.getBody()); + assertThat(body.isArray()).isTrue(); + // All returned agents should be LIVE + for (JsonNode agent : body) { + assertThat(agent.get("state").asText()).isEqualTo("LIVE"); + } + } + + @Test + void listAgentsWithInvalidStatus_returns400() { + ResponseEntity response = restTemplate.exchange( + "/api/v1/agents?status=INVALID", + HttpMethod.GET, + new HttpEntity<>(protocolHeadersNoBody()), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.BAD_REQUEST); + } +}