From 61f39021b3a91eb7195ae9e5539cf14047aac1bc Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 11 Mar 2026 18:30:02 +0100 Subject: [PATCH] feat(03-01): implement agent registry service and domain types - AgentRegistryService: register, heartbeat, lifecycle, commands - ConcurrentHashMap with atomic record-swapping for thread safety - LIVE->STALE->DEAD lifecycle transitions via checkLifecycle() - Heartbeat revives STALE agents back to LIVE - Command queue with PENDING/DELIVERED/ACKNOWLEDGED/EXPIRED states - AgentEventListener callback for SSE bridge integration - All 23 unit tests pass Co-Authored-By: Claude Opus 4.6 --- .../core/agent/AgentRegistryService.java | 232 ++++++++++++++++-- 1 file changed, 218 insertions(+), 14 deletions(-) diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java index 3f54d7c0..07e4e231 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java @@ -1,76 +1,280 @@ package com.cameleer3.server.core.agent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; /** * In-memory agent registry managing agent lifecycle, heartbeats, and commands. *

* Plain class (no Spring annotations) -- wired as a bean by the app module. - * Stub implementation for TDD RED phase. + * Uses {@link ConcurrentHashMap} for thread-safe agent storage with atomic + * record-swapping via {@code compute} operations. */ public class AgentRegistryService { + private static final Logger log = LoggerFactory.getLogger(AgentRegistryService.class); + private final long staleThresholdMs; private final long deadThresholdMs; private final long commandExpiryMs; + private final ConcurrentHashMap agents = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> commands = new ConcurrentHashMap<>(); + + private volatile AgentEventListener eventListener; + public AgentRegistryService(long staleThresholdMs, long deadThresholdMs, long commandExpiryMs) { this.staleThresholdMs = staleThresholdMs; this.deadThresholdMs = deadThresholdMs; this.commandExpiryMs = commandExpiryMs; } + /** + * Register a new agent or re-register an existing one. + * Re-registration updates metadata, transitions state to LIVE, and resets timestamps. + */ public AgentInfo register(String id, String name, String group, String version, List routeIds, Map capabilities) { - throw new UnsupportedOperationException("Not yet implemented"); + Instant now = Instant.now(); + AgentInfo newAgent = new AgentInfo(id, name, group, version, + List.copyOf(routeIds), Map.copyOf(capabilities), + AgentState.LIVE, now, now, null); + + AgentInfo result = agents.compute(id, (key, existing) -> { + if (existing != null) { + // Re-registration: update metadata, reset to LIVE + log.info("Agent {} re-registering (was {})", id, existing.state()); + return existing + .withMetadata(name, group, version, List.copyOf(routeIds), Map.copyOf(capabilities)) + .withState(AgentState.LIVE) + .withLastHeartbeat(now) + .withRegisteredAt(now) + .withStaleTransitionTime(null); + } + log.info("Agent {} registered (name={}, group={})", id, name, group); + return newAgent; + }); + + return result; } + /** + * Process a heartbeat from an agent. + * Updates lastHeartbeat and transitions STALE agents back to LIVE. + * + * @return true if the agent is known, false otherwise + */ public boolean heartbeat(String id) { - throw new UnsupportedOperationException("Not yet implemented"); + AgentInfo updated = agents.computeIfPresent(id, (key, existing) -> { + Instant now = Instant.now(); + AgentInfo result = existing.withLastHeartbeat(now); + if (existing.state() == AgentState.STALE) { + result = result.withState(AgentState.LIVE).withStaleTransitionTime(null); + log.info("Agent {} revived from STALE to LIVE via heartbeat", id); + } + return result; + }); + return updated != null; } + /** + * Manually transition an agent to a new state. + * Sets staleTransitionTime when transitioning to STALE. + */ public void transitionState(String id, AgentState newState) { - throw new UnsupportedOperationException("Not yet implemented"); + agents.computeIfPresent(id, (key, existing) -> { + AgentInfo result = existing.withState(newState); + if (newState == AgentState.STALE) { + result = result.withStaleTransitionTime(Instant.now()); + } else if (newState == AgentState.LIVE) { + result = result.withStaleTransitionTime(null); + } + log.debug("Agent {} transitioned {} -> {}", id, existing.state(), newState); + return result; + }); } + /** + * Check all agents and apply lifecycle transitions: + * LIVE -> STALE when lastHeartbeat exceeds staleThresholdMs, + * STALE -> DEAD when staleTransitionTime exceeds deadThresholdMs. + */ public void checkLifecycle() { - throw new UnsupportedOperationException("Not yet implemented"); + Instant now = Instant.now(); + for (Map.Entry entry : agents.entrySet()) { + AgentInfo agent = entry.getValue(); + + if (agent.state() == AgentState.LIVE) { + Duration sinceHeartbeat = Duration.between(agent.lastHeartbeat(), now); + if (sinceHeartbeat.toMillis() > staleThresholdMs) { + agents.computeIfPresent(entry.getKey(), (k, current) -> { + if (current.state() == AgentState.LIVE) { + log.info("Agent {} LIVE -> STALE (no heartbeat for {}ms)", k, sinceHeartbeat.toMillis()); + return current.withState(AgentState.STALE) + .withStaleTransitionTime(now); + } + return current; + }); + } + } else if (agent.state() == AgentState.STALE && agent.staleTransitionTime() != null) { + Duration sinceStale = Duration.between(agent.staleTransitionTime(), now); + if (sinceStale.toMillis() > deadThresholdMs) { + agents.computeIfPresent(entry.getKey(), (k, current) -> { + if (current.state() == AgentState.STALE) { + log.info("Agent {} STALE -> DEAD (stale for {}ms)", k, sinceStale.toMillis()); + return current.withState(AgentState.DEAD); + } + return current; + }); + } + } + // DEAD agents remain DEAD (no auto-purge) + } } + /** + * Find an agent by its ID. + * + * @return the agent info, or null if not found + */ public AgentInfo findById(String id) { - throw new UnsupportedOperationException("Not yet implemented"); + return agents.get(id); } + /** + * Return all registered agents regardless of state. + */ public List findAll() { - throw new UnsupportedOperationException("Not yet implemented"); + return new ArrayList<>(agents.values()); } + /** + * Return all agents in the specified state. + */ public List findByState(AgentState state) { - throw new UnsupportedOperationException("Not yet implemented"); + return agents.values().stream() + .filter(a -> a.state() == state) + .collect(Collectors.toList()); } + /** + * Add a command to an agent's pending queue. + * Notifies the event listener if one is set. + * + * @return the created command + */ public AgentCommand addCommand(String agentId, CommandType type, String payload) { - throw new UnsupportedOperationException("Not yet implemented"); + AgentCommand command = new AgentCommand( + UUID.randomUUID().toString(), + type, + payload, + agentId, + Instant.now(), + CommandStatus.PENDING + ); + + commands.computeIfAbsent(agentId, k -> new ConcurrentLinkedQueue<>()).add(command); + log.debug("Command {} ({}) added for agent {}", command.id(), type, agentId); + + AgentEventListener listener = this.eventListener; + if (listener != null) { + listener.onCommandReady(agentId, command); + } + + return command; } + /** + * Acknowledge a command, transitioning it from PENDING/DELIVERED to ACKNOWLEDGED. + * + * @return true if the command was found and acknowledged + */ public boolean acknowledgeCommand(String agentId, String commandId) { - throw new UnsupportedOperationException("Not yet implemented"); + ConcurrentLinkedQueue queue = commands.get(agentId); + if (queue == null) { + return false; + } + + // Replace the command in the queue with an acknowledged version + boolean[] found = {false}; + ConcurrentLinkedQueue newQueue = new ConcurrentLinkedQueue<>(); + for (AgentCommand cmd : queue) { + if (cmd.id().equals(commandId) && !found[0]) { + newQueue.add(cmd.withStatus(CommandStatus.ACKNOWLEDGED)); + found[0] = true; + } else { + newQueue.add(cmd); + } + } + + if (found[0]) { + commands.put(agentId, newQueue); + log.debug("Command {} acknowledged by agent {}", commandId, agentId); + } + + return found[0]; } + /** + * Return all PENDING commands for the specified agent. + */ public List findPendingCommands(String agentId) { - throw new UnsupportedOperationException("Not yet implemented"); + ConcurrentLinkedQueue queue = commands.get(agentId); + if (queue == null) { + return List.of(); + } + return queue.stream() + .filter(cmd -> cmd.status() == CommandStatus.PENDING) + .collect(Collectors.toList()); } + /** + * Mark a command as DELIVERED (sent via SSE but not yet acknowledged). + */ public void markDelivered(String agentId, String commandId) { - throw new UnsupportedOperationException("Not yet implemented"); + ConcurrentLinkedQueue queue = commands.get(agentId); + if (queue == null) { + return; + } + + ConcurrentLinkedQueue newQueue = new ConcurrentLinkedQueue<>(); + for (AgentCommand cmd : queue) { + if (cmd.id().equals(commandId) && cmd.status() == CommandStatus.PENDING) { + newQueue.add(cmd.withStatus(CommandStatus.DELIVERED)); + } else { + newQueue.add(cmd); + } + } + commands.put(agentId, newQueue); } + /** + * Expire PENDING commands older than commandExpiryMs. + */ public void expireOldCommands() { - throw new UnsupportedOperationException("Not yet implemented"); + Instant cutoff = Instant.now().minusMillis(commandExpiryMs); + for (Map.Entry> entry : commands.entrySet()) { + ConcurrentLinkedQueue queue = entry.getValue(); + queue.removeIf(cmd -> + cmd.status() == CommandStatus.PENDING && cmd.createdAt().isBefore(cutoff)); + } } + /** + * Set the event listener for command notifications. + * The SSE layer in the app module implements this interface. + */ public void setEventListener(AgentEventListener listener) { - throw new UnsupportedOperationException("Not yet implemented"); + this.eventListener = listener; } }