diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java
index 3f54d7c0..07e4e231 100644
--- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java
+++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java
@@ -1,76 +1,280 @@
package com.cameleer3.server.core.agent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
/**
* In-memory agent registry managing agent lifecycle, heartbeats, and commands.
*
* Plain class (no Spring annotations) -- wired as a bean by the app module.
- * Stub implementation for TDD RED phase.
+ * Uses {@link ConcurrentHashMap} for thread-safe agent storage with atomic
+ * record-swapping via {@code compute} operations.
*/
public class AgentRegistryService {
+ private static final Logger log = LoggerFactory.getLogger(AgentRegistryService.class);
+
private final long staleThresholdMs;
private final long deadThresholdMs;
private final long commandExpiryMs;
+ private final ConcurrentHashMap agents = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap> commands = new ConcurrentHashMap<>();
+
+ private volatile AgentEventListener eventListener;
+
public AgentRegistryService(long staleThresholdMs, long deadThresholdMs, long commandExpiryMs) {
this.staleThresholdMs = staleThresholdMs;
this.deadThresholdMs = deadThresholdMs;
this.commandExpiryMs = commandExpiryMs;
}
+ /**
+ * Register a new agent or re-register an existing one.
+ * Re-registration updates metadata, transitions state to LIVE, and resets timestamps.
+ */
public AgentInfo register(String id, String name, String group, String version,
List routeIds, Map capabilities) {
- throw new UnsupportedOperationException("Not yet implemented");
+ Instant now = Instant.now();
+ AgentInfo newAgent = new AgentInfo(id, name, group, version,
+ List.copyOf(routeIds), Map.copyOf(capabilities),
+ AgentState.LIVE, now, now, null);
+
+ AgentInfo result = agents.compute(id, (key, existing) -> {
+ if (existing != null) {
+ // Re-registration: update metadata, reset to LIVE
+ log.info("Agent {} re-registering (was {})", id, existing.state());
+ return existing
+ .withMetadata(name, group, version, List.copyOf(routeIds), Map.copyOf(capabilities))
+ .withState(AgentState.LIVE)
+ .withLastHeartbeat(now)
+ .withRegisteredAt(now)
+ .withStaleTransitionTime(null);
+ }
+ log.info("Agent {} registered (name={}, group={})", id, name, group);
+ return newAgent;
+ });
+
+ return result;
}
+ /**
+ * Process a heartbeat from an agent.
+ * Updates lastHeartbeat and transitions STALE agents back to LIVE.
+ *
+ * @return true if the agent is known, false otherwise
+ */
public boolean heartbeat(String id) {
- throw new UnsupportedOperationException("Not yet implemented");
+ AgentInfo updated = agents.computeIfPresent(id, (key, existing) -> {
+ Instant now = Instant.now();
+ AgentInfo result = existing.withLastHeartbeat(now);
+ if (existing.state() == AgentState.STALE) {
+ result = result.withState(AgentState.LIVE).withStaleTransitionTime(null);
+ log.info("Agent {} revived from STALE to LIVE via heartbeat", id);
+ }
+ return result;
+ });
+ return updated != null;
}
+ /**
+ * Manually transition an agent to a new state.
+ * Sets staleTransitionTime when transitioning to STALE.
+ */
public void transitionState(String id, AgentState newState) {
- throw new UnsupportedOperationException("Not yet implemented");
+ agents.computeIfPresent(id, (key, existing) -> {
+ AgentInfo result = existing.withState(newState);
+ if (newState == AgentState.STALE) {
+ result = result.withStaleTransitionTime(Instant.now());
+ } else if (newState == AgentState.LIVE) {
+ result = result.withStaleTransitionTime(null);
+ }
+ log.debug("Agent {} transitioned {} -> {}", id, existing.state(), newState);
+ return result;
+ });
}
+ /**
+ * Check all agents and apply lifecycle transitions:
+ * LIVE -> STALE when lastHeartbeat exceeds staleThresholdMs,
+ * STALE -> DEAD when staleTransitionTime exceeds deadThresholdMs.
+ */
public void checkLifecycle() {
- throw new UnsupportedOperationException("Not yet implemented");
+ Instant now = Instant.now();
+ for (Map.Entry entry : agents.entrySet()) {
+ AgentInfo agent = entry.getValue();
+
+ if (agent.state() == AgentState.LIVE) {
+ Duration sinceHeartbeat = Duration.between(agent.lastHeartbeat(), now);
+ if (sinceHeartbeat.toMillis() > staleThresholdMs) {
+ agents.computeIfPresent(entry.getKey(), (k, current) -> {
+ if (current.state() == AgentState.LIVE) {
+ log.info("Agent {} LIVE -> STALE (no heartbeat for {}ms)", k, sinceHeartbeat.toMillis());
+ return current.withState(AgentState.STALE)
+ .withStaleTransitionTime(now);
+ }
+ return current;
+ });
+ }
+ } else if (agent.state() == AgentState.STALE && agent.staleTransitionTime() != null) {
+ Duration sinceStale = Duration.between(agent.staleTransitionTime(), now);
+ if (sinceStale.toMillis() > deadThresholdMs) {
+ agents.computeIfPresent(entry.getKey(), (k, current) -> {
+ if (current.state() == AgentState.STALE) {
+ log.info("Agent {} STALE -> DEAD (stale for {}ms)", k, sinceStale.toMillis());
+ return current.withState(AgentState.DEAD);
+ }
+ return current;
+ });
+ }
+ }
+ // DEAD agents remain DEAD (no auto-purge)
+ }
}
+ /**
+ * Find an agent by its ID.
+ *
+ * @return the agent info, or null if not found
+ */
public AgentInfo findById(String id) {
- throw new UnsupportedOperationException("Not yet implemented");
+ return agents.get(id);
}
+ /**
+ * Return all registered agents regardless of state.
+ */
public List findAll() {
- throw new UnsupportedOperationException("Not yet implemented");
+ return new ArrayList<>(agents.values());
}
+ /**
+ * Return all agents in the specified state.
+ */
public List findByState(AgentState state) {
- throw new UnsupportedOperationException("Not yet implemented");
+ return agents.values().stream()
+ .filter(a -> a.state() == state)
+ .collect(Collectors.toList());
}
+ /**
+ * Add a command to an agent's pending queue.
+ * Notifies the event listener if one is set.
+ *
+ * @return the created command
+ */
public AgentCommand addCommand(String agentId, CommandType type, String payload) {
- throw new UnsupportedOperationException("Not yet implemented");
+ AgentCommand command = new AgentCommand(
+ UUID.randomUUID().toString(),
+ type,
+ payload,
+ agentId,
+ Instant.now(),
+ CommandStatus.PENDING
+ );
+
+ commands.computeIfAbsent(agentId, k -> new ConcurrentLinkedQueue<>()).add(command);
+ log.debug("Command {} ({}) added for agent {}", command.id(), type, agentId);
+
+ AgentEventListener listener = this.eventListener;
+ if (listener != null) {
+ listener.onCommandReady(agentId, command);
+ }
+
+ return command;
}
+ /**
+ * Acknowledge a command, transitioning it from PENDING/DELIVERED to ACKNOWLEDGED.
+ *
+ * @return true if the command was found and acknowledged
+ */
public boolean acknowledgeCommand(String agentId, String commandId) {
- throw new UnsupportedOperationException("Not yet implemented");
+ ConcurrentLinkedQueue queue = commands.get(agentId);
+ if (queue == null) {
+ return false;
+ }
+
+ // Replace the command in the queue with an acknowledged version
+ boolean[] found = {false};
+ ConcurrentLinkedQueue newQueue = new ConcurrentLinkedQueue<>();
+ for (AgentCommand cmd : queue) {
+ if (cmd.id().equals(commandId) && !found[0]) {
+ newQueue.add(cmd.withStatus(CommandStatus.ACKNOWLEDGED));
+ found[0] = true;
+ } else {
+ newQueue.add(cmd);
+ }
+ }
+
+ if (found[0]) {
+ commands.put(agentId, newQueue);
+ log.debug("Command {} acknowledged by agent {}", commandId, agentId);
+ }
+
+ return found[0];
}
+ /**
+ * Return all PENDING commands for the specified agent.
+ */
public List findPendingCommands(String agentId) {
- throw new UnsupportedOperationException("Not yet implemented");
+ ConcurrentLinkedQueue queue = commands.get(agentId);
+ if (queue == null) {
+ return List.of();
+ }
+ return queue.stream()
+ .filter(cmd -> cmd.status() == CommandStatus.PENDING)
+ .collect(Collectors.toList());
}
+ /**
+ * Mark a command as DELIVERED (sent via SSE but not yet acknowledged).
+ */
public void markDelivered(String agentId, String commandId) {
- throw new UnsupportedOperationException("Not yet implemented");
+ ConcurrentLinkedQueue queue = commands.get(agentId);
+ if (queue == null) {
+ return;
+ }
+
+ ConcurrentLinkedQueue newQueue = new ConcurrentLinkedQueue<>();
+ for (AgentCommand cmd : queue) {
+ if (cmd.id().equals(commandId) && cmd.status() == CommandStatus.PENDING) {
+ newQueue.add(cmd.withStatus(CommandStatus.DELIVERED));
+ } else {
+ newQueue.add(cmd);
+ }
+ }
+ commands.put(agentId, newQueue);
}
+ /**
+ * Expire PENDING commands older than commandExpiryMs.
+ */
public void expireOldCommands() {
- throw new UnsupportedOperationException("Not yet implemented");
+ Instant cutoff = Instant.now().minusMillis(commandExpiryMs);
+ for (Map.Entry> entry : commands.entrySet()) {
+ ConcurrentLinkedQueue queue = entry.getValue();
+ queue.removeIf(cmd ->
+ cmd.status() == CommandStatus.PENDING && cmd.createdAt().isBefore(cutoff));
+ }
}
+ /**
+ * Set the event listener for command notifications.
+ * The SSE layer in the app module implements this interface.
+ */
public void setEventListener(AgentEventListener listener) {
- throw new UnsupportedOperationException("Not yet implemented");
+ this.eventListener = listener;
}
}