From 4cd7ed9e9a53beef0121e600ad599a9a05942ec6 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 11 Mar 2026 18:28:28 +0100 Subject: [PATCH] test(03-01): add failing tests for agent registry service - 23 unit tests covering registration, heartbeat, lifecycle, queries, commands - Domain types: AgentInfo, AgentState, AgentCommand, CommandStatus, CommandType - AgentEventListener interface for SSE bridge - AgentRegistryService stub with UnsupportedOperationException Co-Authored-By: Claude Opus 4.6 --- .../server/core/agent/AgentCommand.java | 27 ++ .../server/core/agent/AgentEventListener.java | 19 + .../server/core/agent/AgentInfo.java | 63 ++++ .../core/agent/AgentRegistryService.java | 76 ++++ .../server/core/agent/AgentState.java | 10 + .../server/core/agent/CommandStatus.java | 11 + .../server/core/agent/CommandType.java | 10 + .../core/agent/AgentRegistryServiceTest.java | 326 ++++++++++++++++++ 8 files changed, 542 insertions(+) create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentCommand.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentEventListener.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentInfo.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentState.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandStatus.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java create mode 100644 cameleer3-server-core/src/test/java/com/cameleer3/server/core/agent/AgentRegistryServiceTest.java diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentCommand.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentCommand.java new file mode 100644 index 00000000..1123b08f --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentCommand.java @@ -0,0 +1,27 @@ +package com.cameleer3.server.core.agent; + +import java.time.Instant; + +/** + * Immutable record representing a command to be pushed to an agent. + * + * @param id unique command identifier (UUID) + * @param type command type + * @param payload raw JSON payload + * @param targetAgentId target agent identifier + * @param createdAt when the command was created + * @param status current delivery status + */ +public record AgentCommand( + String id, + CommandType type, + String payload, + String targetAgentId, + Instant createdAt, + CommandStatus status +) { + + public AgentCommand withStatus(CommandStatus newStatus) { + return new AgentCommand(id, type, payload, targetAgentId, createdAt, newStatus); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentEventListener.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentEventListener.java new file mode 100644 index 00000000..ea2f0e9f --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentEventListener.java @@ -0,0 +1,19 @@ +package com.cameleer3.server.core.agent; + +/** + * Listener interface for agent registry events. + *

+ * Defined in the core module so the registry service can notify listeners + * without depending on Spring or SSE classes. The app module provides the + * implementation that bridges to the SSE connection manager. + */ +public interface AgentEventListener { + + /** + * Called when a new command is ready to be delivered to an agent. + * + * @param agentId the target agent identifier + * @param command the command to deliver + */ + void onCommandReady(String agentId, AgentCommand command); +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentInfo.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentInfo.java new file mode 100644 index 00000000..d8a883dd --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentInfo.java @@ -0,0 +1,63 @@ +package com.cameleer3.server.core.agent; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +/** + * Immutable record representing a registered agent's current state. + *

+ * Stored in a {@link java.util.concurrent.ConcurrentHashMap} and atomically swapped + * via {@code computeIfPresent} for thread-safe state transitions. Wither-style methods + * return new instances with the specified field changed. + * + * @param id agent-provided persistent identifier + * @param name human-readable agent name + * @param group logical grouping (e.g., "order-service-prod") + * @param version agent software version + * @param routeIds list of Camel route IDs managed by this agent + * @param capabilities agent-declared capabilities (free-form) + * @param state current lifecycle state + * @param registeredAt when the agent first registered (or re-registered) + * @param lastHeartbeat last heartbeat timestamp + * @param staleTransitionTime when the agent transitioned to STALE (null if not STALE/DEAD) + */ +public record AgentInfo( + String id, + String name, + String group, + String version, + List routeIds, + Map capabilities, + AgentState state, + Instant registeredAt, + Instant lastHeartbeat, + Instant staleTransitionTime +) { + + public AgentInfo withState(AgentState newState) { + return new AgentInfo(id, name, group, version, routeIds, capabilities, + newState, registeredAt, lastHeartbeat, staleTransitionTime); + } + + public AgentInfo withLastHeartbeat(Instant newLastHeartbeat) { + return new AgentInfo(id, name, group, version, routeIds, capabilities, + state, registeredAt, newLastHeartbeat, staleTransitionTime); + } + + public AgentInfo withRegisteredAt(Instant newRegisteredAt) { + return new AgentInfo(id, name, group, version, routeIds, capabilities, + state, newRegisteredAt, lastHeartbeat, staleTransitionTime); + } + + public AgentInfo withStaleTransitionTime(Instant newStaleTransitionTime) { + return new AgentInfo(id, name, group, version, routeIds, capabilities, + state, registeredAt, lastHeartbeat, newStaleTransitionTime); + } + + public AgentInfo withMetadata(String name, String group, String version, + List routeIds, Map capabilities) { + return new AgentInfo(id, name, group, version, routeIds, capabilities, + state, registeredAt, lastHeartbeat, staleTransitionTime); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java new file mode 100644 index 00000000..3f54d7c0 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java @@ -0,0 +1,76 @@ +package com.cameleer3.server.core.agent; + +import java.util.List; +import java.util.Map; + +/** + * In-memory agent registry managing agent lifecycle, heartbeats, and commands. + *

+ * Plain class (no Spring annotations) -- wired as a bean by the app module. + * Stub implementation for TDD RED phase. + */ +public class AgentRegistryService { + + private final long staleThresholdMs; + private final long deadThresholdMs; + private final long commandExpiryMs; + + public AgentRegistryService(long staleThresholdMs, long deadThresholdMs, long commandExpiryMs) { + this.staleThresholdMs = staleThresholdMs; + this.deadThresholdMs = deadThresholdMs; + this.commandExpiryMs = commandExpiryMs; + } + + public AgentInfo register(String id, String name, String group, String version, + List routeIds, Map capabilities) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public boolean heartbeat(String id) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public void transitionState(String id, AgentState newState) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public void checkLifecycle() { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public AgentInfo findById(String id) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public List findAll() { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public List findByState(AgentState state) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public AgentCommand addCommand(String agentId, CommandType type, String payload) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public boolean acknowledgeCommand(String agentId, String commandId) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public List findPendingCommands(String agentId) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public void markDelivered(String agentId, String commandId) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public void expireOldCommands() { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public void setEventListener(AgentEventListener listener) { + throw new UnsupportedOperationException("Not yet implemented"); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentState.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentState.java new file mode 100644 index 00000000..88b3a46e --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentState.java @@ -0,0 +1,10 @@ +package com.cameleer3.server.core.agent; + +/** + * Lifecycle states for a connected agent. + */ +public enum AgentState { + LIVE, + STALE, + DEAD +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandStatus.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandStatus.java new file mode 100644 index 00000000..4a13e78a --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandStatus.java @@ -0,0 +1,11 @@ +package com.cameleer3.server.core.agent; + +/** + * Delivery status of a command pushed to an agent. + */ +public enum CommandStatus { + PENDING, + DELIVERED, + ACKNOWLEDGED, + EXPIRED +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java new file mode 100644 index 00000000..f9295fd1 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java @@ -0,0 +1,10 @@ +package com.cameleer3.server.core.agent; + +/** + * Types of commands that can be pushed to agents. + */ +public enum CommandType { + CONFIG_UPDATE, + DEEP_TRACE, + REPLAY +} diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/agent/AgentRegistryServiceTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/agent/AgentRegistryServiceTest.java new file mode 100644 index 00000000..3f46edd0 --- /dev/null +++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/agent/AgentRegistryServiceTest.java @@ -0,0 +1,326 @@ +package com.cameleer3.server.core.agent; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + +class AgentRegistryServiceTest { + + // Thresholds: 90s stale, 300s dead, 60s command expiry + private AgentRegistryService registry; + + @BeforeEach + void setUp() { + registry = new AgentRegistryService(90_000, 300_000, 60_000); + } + + @Nested + class Registration { + + @Test + void registerNewAgent_createsWithLiveState() { + AgentInfo agent = registry.register("agent-1", "Order Agent", "order-svc", + "1.0.0", List.of("route1", "route2"), Map.of("feature", "tracing")); + + assertThat(agent).isNotNull(); + assertThat(agent.id()).isEqualTo("agent-1"); + assertThat(agent.name()).isEqualTo("Order Agent"); + assertThat(agent.group()).isEqualTo("order-svc"); + assertThat(agent.version()).isEqualTo("1.0.0"); + assertThat(agent.routeIds()).containsExactly("route1", "route2"); + assertThat(agent.capabilities()).containsEntry("feature", "tracing"); + assertThat(agent.state()).isEqualTo(AgentState.LIVE); + assertThat(agent.registeredAt()).isNotNull(); + assertThat(agent.lastHeartbeat()).isNotNull(); + assertThat(agent.staleTransitionTime()).isNull(); + } + + @Test + void reRegisterSameId_updatesMetadataAndTransitionsToLive() { + registry.register("agent-1", "Old Name", "old-group", + "1.0.0", List.of("route1"), Map.of()); + + AgentInfo updated = registry.register("agent-1", "New Name", "new-group", + "2.0.0", List.of("route1", "route2"), Map.of("new", "cap")); + + assertThat(updated.id()).isEqualTo("agent-1"); + assertThat(updated.name()).isEqualTo("New Name"); + assertThat(updated.group()).isEqualTo("new-group"); + assertThat(updated.version()).isEqualTo("2.0.0"); + assertThat(updated.routeIds()).containsExactly("route1", "route2"); + assertThat(updated.capabilities()).containsEntry("new", "cap"); + assertThat(updated.state()).isEqualTo(AgentState.LIVE); + assertThat(updated.staleTransitionTime()).isNull(); + } + + @Test + void reRegisterSameId_updatesRegisteredAtAndLastHeartbeat() { + AgentInfo first = registry.register("agent-1", "Name", "group", + "1.0.0", List.of(), Map.of()); + Instant firstRegisteredAt = first.registeredAt(); + + AgentInfo second = registry.register("agent-1", "Name", "group", + "1.0.0", List.of(), Map.of()); + + assertThat(second.registeredAt()).isAfterOrEqualTo(firstRegisteredAt); + assertThat(second.lastHeartbeat()).isAfterOrEqualTo(first.lastHeartbeat()); + } + } + + @Nested + class Heartbeat { + + @Test + void heartbeatKnownAgent_returnsTrue() { + registry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + + boolean result = registry.heartbeat("agent-1"); + + assertThat(result).isTrue(); + } + + @Test + void heartbeatKnownAgent_updatesLastHeartbeat() { + registry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + Instant before = registry.findById("agent-1").lastHeartbeat(); + + registry.heartbeat("agent-1"); + + AgentInfo after = registry.findById("agent-1"); + assertThat(after.lastHeartbeat()).isAfterOrEqualTo(before); + } + + @Test + void heartbeatUnknownAgent_returnsFalse() { + boolean result = registry.heartbeat("unknown-agent"); + + assertThat(result).isFalse(); + } + + @Test + void heartbeatStaleAgent_transitionsToLive() { + registry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + registry.transitionState("agent-1", AgentState.STALE); + + assertThat(registry.findById("agent-1").state()).isEqualTo(AgentState.STALE); + + registry.heartbeat("agent-1"); + + assertThat(registry.findById("agent-1").state()).isEqualTo(AgentState.LIVE); + assertThat(registry.findById("agent-1").staleTransitionTime()).isNull(); + } + } + + @Nested + class LifecycleTransitions { + + @Test + void liveAgentBeyondStaleThreshold_transitionsToStale() { + // Use very short thresholds for test + AgentRegistryService shortRegistry = new AgentRegistryService(1, 300_000, 60_000); + shortRegistry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + + // Wait briefly to exceed 1ms threshold + try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + + shortRegistry.checkLifecycle(); + + AgentInfo agent = shortRegistry.findById("agent-1"); + assertThat(agent.state()).isEqualTo(AgentState.STALE); + assertThat(agent.staleTransitionTime()).isNotNull(); + } + + @Test + void staleAgentBeyondDeadThreshold_transitionsToDead() { + // Use very short thresholds for test: 1ms stale, 1ms dead + AgentRegistryService shortRegistry = new AgentRegistryService(1, 1, 60_000); + shortRegistry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + + try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + shortRegistry.checkLifecycle(); // LIVE -> STALE + + try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + shortRegistry.checkLifecycle(); // STALE -> DEAD + + assertThat(shortRegistry.findById("agent-1").state()).isEqualTo(AgentState.DEAD); + } + + @Test + void deadAgentRemainsDead() { + AgentRegistryService shortRegistry = new AgentRegistryService(1, 1, 60_000); + shortRegistry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + + try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + shortRegistry.checkLifecycle(); + try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + shortRegistry.checkLifecycle(); + + assertThat(shortRegistry.findById("agent-1").state()).isEqualTo(AgentState.DEAD); + + // Additional lifecycle check should not change state + shortRegistry.checkLifecycle(); + assertThat(shortRegistry.findById("agent-1").state()).isEqualTo(AgentState.DEAD); + } + + @Test + void transitionState_setsStaleTransitionTimeWhenGoingStale() { + registry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + + registry.transitionState("agent-1", AgentState.STALE); + + AgentInfo agent = registry.findById("agent-1"); + assertThat(agent.state()).isEqualTo(AgentState.STALE); + assertThat(agent.staleTransitionTime()).isNotNull(); + } + } + + @Nested + class Queries { + + @Test + void findAll_returnsAllAgents() { + registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + registry.register("agent-2", "A2", "g", "1.0", List.of(), Map.of()); + + List all = registry.findAll(); + + assertThat(all).hasSize(2); + assertThat(all).extracting(AgentInfo::id).containsExactlyInAnyOrder("agent-1", "agent-2"); + } + + @Test + void findByState_filtersCorrectly() { + registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + registry.register("agent-2", "A2", "g", "1.0", List.of(), Map.of()); + registry.transitionState("agent-2", AgentState.STALE); + + List live = registry.findByState(AgentState.LIVE); + List stale = registry.findByState(AgentState.STALE); + + assertThat(live).hasSize(1).extracting(AgentInfo::id).containsExactly("agent-1"); + assertThat(stale).hasSize(1).extracting(AgentInfo::id).containsExactly("agent-2"); + } + + @Test + void findById_unknownReturnsNull() { + AgentInfo result = registry.findById("nonexistent"); + + assertThat(result).isNull(); + } + + @Test + void findById_knownReturnsAgent() { + registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + + AgentInfo result = registry.findById("agent-1"); + + assertThat(result).isNotNull(); + assertThat(result.id()).isEqualTo("agent-1"); + } + } + + @Nested + class Commands { + + @Test + void addCommand_createsPendingCommand() { + registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + + AgentCommand cmd = registry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{\"key\":\"val\"}"); + + assertThat(cmd).isNotNull(); + assertThat(cmd.id()).isNotNull(); + assertThat(cmd.type()).isEqualTo(CommandType.CONFIG_UPDATE); + assertThat(cmd.payload()).isEqualTo("{\"key\":\"val\"}"); + assertThat(cmd.targetAgentId()).isEqualTo("agent-1"); + assertThat(cmd.status()).isEqualTo(CommandStatus.PENDING); + assertThat(cmd.createdAt()).isNotNull(); + } + + @Test + void addCommand_notifiesEventListener() { + registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + + AtomicReference received = new AtomicReference<>(); + registry.setEventListener((agentId, command) -> received.set(command)); + + AgentCommand cmd = registry.addCommand("agent-1", CommandType.DEEP_TRACE, "{}"); + + assertThat(received.get()).isNotNull(); + assertThat(received.get().id()).isEqualTo(cmd.id()); + } + + @Test + void acknowledgeCommand_transitionsStatus() { + registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + AgentCommand cmd = registry.addCommand("agent-1", CommandType.REPLAY, "{}"); + + boolean acked = registry.acknowledgeCommand("agent-1", cmd.id()); + + assertThat(acked).isTrue(); + } + + @Test + void acknowledgeCommand_unknownReturnsFalse() { + registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + + boolean acked = registry.acknowledgeCommand("agent-1", "nonexistent-cmd"); + + assertThat(acked).isFalse(); + } + + @Test + void findPendingCommands_returnsOnlyPending() { + registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + AgentCommand cmd1 = registry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{}"); + AgentCommand cmd2 = registry.addCommand("agent-1", CommandType.DEEP_TRACE, "{}"); + registry.acknowledgeCommand("agent-1", cmd1.id()); + + List pending = registry.findPendingCommands("agent-1"); + + assertThat(pending).hasSize(1); + assertThat(pending.get(0).id()).isEqualTo(cmd2.id()); + } + + @Test + void markDelivered_updatesStatus() { + registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + AgentCommand cmd = registry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{}"); + + registry.markDelivered("agent-1", cmd.id()); + + // After marking delivered, the command should no longer be PENDING + List pending = registry.findPendingCommands("agent-1"); + assertThat(pending).isEmpty(); + } + + @Test + void expireOldCommands_removesExpiredPendingCommands() { + // Use 1ms expiry for test + AgentRegistryService shortRegistry = new AgentRegistryService(90_000, 300_000, 1); + shortRegistry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + shortRegistry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{}"); + + try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + + shortRegistry.expireOldCommands(); + + List pending = shortRegistry.findPendingCommands("agent-1"); + assertThat(pending).isEmpty(); + } + + @Test + void findPendingCommands_emptyForUnknownAgent() { + List pending = registry.findPendingCommands("unknown"); + + assertThat(pending).isEmpty(); + } + } +}