feat(03-01): implement agent registry service and domain types

- AgentRegistryService: register, heartbeat, lifecycle, commands
- ConcurrentHashMap with atomic record-swapping for thread safety
- LIVE->STALE->DEAD lifecycle transitions via checkLifecycle()
- Heartbeat revives STALE agents back to LIVE
- Command queue with PENDING/DELIVERED/ACKNOWLEDGED/EXPIRED states
- AgentEventListener callback for SSE bridge integration
- All 23 unit tests pass

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-11 18:30:02 +01:00
parent 4cd7ed9e9a
commit 61f39021b3

View File

@@ -1,76 +1,280 @@
package com.cameleer3.server.core.agent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
/**
* In-memory agent registry managing agent lifecycle, heartbeats, and commands.
* <p>
* Plain class (no Spring annotations) -- wired as a bean by the app module.
* Stub implementation for TDD RED phase.
* Uses {@link ConcurrentHashMap} for thread-safe agent storage with atomic
* record-swapping via {@code compute} operations.
*/
public class AgentRegistryService {
private static final Logger log = LoggerFactory.getLogger(AgentRegistryService.class);
private final long staleThresholdMs;
private final long deadThresholdMs;
private final long commandExpiryMs;
private final ConcurrentHashMap<String, AgentInfo> agents = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<AgentCommand>> commands = new ConcurrentHashMap<>();
private volatile AgentEventListener eventListener;
public AgentRegistryService(long staleThresholdMs, long deadThresholdMs, long commandExpiryMs) {
this.staleThresholdMs = staleThresholdMs;
this.deadThresholdMs = deadThresholdMs;
this.commandExpiryMs = commandExpiryMs;
}
/**
* Register a new agent or re-register an existing one.
* Re-registration updates metadata, transitions state to LIVE, and resets timestamps.
*/
public AgentInfo register(String id, String name, String group, String version,
List<String> routeIds, Map<String, Object> capabilities) {
throw new UnsupportedOperationException("Not yet implemented");
Instant now = Instant.now();
AgentInfo newAgent = new AgentInfo(id, name, group, version,
List.copyOf(routeIds), Map.copyOf(capabilities),
AgentState.LIVE, now, now, null);
AgentInfo result = agents.compute(id, (key, existing) -> {
if (existing != null) {
// Re-registration: update metadata, reset to LIVE
log.info("Agent {} re-registering (was {})", id, existing.state());
return existing
.withMetadata(name, group, version, List.copyOf(routeIds), Map.copyOf(capabilities))
.withState(AgentState.LIVE)
.withLastHeartbeat(now)
.withRegisteredAt(now)
.withStaleTransitionTime(null);
}
log.info("Agent {} registered (name={}, group={})", id, name, group);
return newAgent;
});
return result;
}
/**
* Process a heartbeat from an agent.
* Updates lastHeartbeat and transitions STALE agents back to LIVE.
*
* @return true if the agent is known, false otherwise
*/
public boolean heartbeat(String id) {
throw new UnsupportedOperationException("Not yet implemented");
AgentInfo updated = agents.computeIfPresent(id, (key, existing) -> {
Instant now = Instant.now();
AgentInfo result = existing.withLastHeartbeat(now);
if (existing.state() == AgentState.STALE) {
result = result.withState(AgentState.LIVE).withStaleTransitionTime(null);
log.info("Agent {} revived from STALE to LIVE via heartbeat", id);
}
return result;
});
return updated != null;
}
/**
* Manually transition an agent to a new state.
* Sets staleTransitionTime when transitioning to STALE.
*/
public void transitionState(String id, AgentState newState) {
throw new UnsupportedOperationException("Not yet implemented");
agents.computeIfPresent(id, (key, existing) -> {
AgentInfo result = existing.withState(newState);
if (newState == AgentState.STALE) {
result = result.withStaleTransitionTime(Instant.now());
} else if (newState == AgentState.LIVE) {
result = result.withStaleTransitionTime(null);
}
log.debug("Agent {} transitioned {} -> {}", id, existing.state(), newState);
return result;
});
}
/**
* Check all agents and apply lifecycle transitions:
* LIVE -> STALE when lastHeartbeat exceeds staleThresholdMs,
* STALE -> DEAD when staleTransitionTime exceeds deadThresholdMs.
*/
public void checkLifecycle() {
throw new UnsupportedOperationException("Not yet implemented");
Instant now = Instant.now();
for (Map.Entry<String, AgentInfo> entry : agents.entrySet()) {
AgentInfo agent = entry.getValue();
if (agent.state() == AgentState.LIVE) {
Duration sinceHeartbeat = Duration.between(agent.lastHeartbeat(), now);
if (sinceHeartbeat.toMillis() > staleThresholdMs) {
agents.computeIfPresent(entry.getKey(), (k, current) -> {
if (current.state() == AgentState.LIVE) {
log.info("Agent {} LIVE -> STALE (no heartbeat for {}ms)", k, sinceHeartbeat.toMillis());
return current.withState(AgentState.STALE)
.withStaleTransitionTime(now);
}
return current;
});
}
} else if (agent.state() == AgentState.STALE && agent.staleTransitionTime() != null) {
Duration sinceStale = Duration.between(agent.staleTransitionTime(), now);
if (sinceStale.toMillis() > deadThresholdMs) {
agents.computeIfPresent(entry.getKey(), (k, current) -> {
if (current.state() == AgentState.STALE) {
log.info("Agent {} STALE -> DEAD (stale for {}ms)", k, sinceStale.toMillis());
return current.withState(AgentState.DEAD);
}
return current;
});
}
}
// DEAD agents remain DEAD (no auto-purge)
}
}
/**
* Find an agent by its ID.
*
* @return the agent info, or null if not found
*/
public AgentInfo findById(String id) {
throw new UnsupportedOperationException("Not yet implemented");
return agents.get(id);
}
/**
* Return all registered agents regardless of state.
*/
public List<AgentInfo> findAll() {
throw new UnsupportedOperationException("Not yet implemented");
return new ArrayList<>(agents.values());
}
/**
* Return all agents in the specified state.
*/
public List<AgentInfo> findByState(AgentState state) {
throw new UnsupportedOperationException("Not yet implemented");
return agents.values().stream()
.filter(a -> a.state() == state)
.collect(Collectors.toList());
}
/**
* Add a command to an agent's pending queue.
* Notifies the event listener if one is set.
*
* @return the created command
*/
public AgentCommand addCommand(String agentId, CommandType type, String payload) {
throw new UnsupportedOperationException("Not yet implemented");
AgentCommand command = new AgentCommand(
UUID.randomUUID().toString(),
type,
payload,
agentId,
Instant.now(),
CommandStatus.PENDING
);
commands.computeIfAbsent(agentId, k -> new ConcurrentLinkedQueue<>()).add(command);
log.debug("Command {} ({}) added for agent {}", command.id(), type, agentId);
AgentEventListener listener = this.eventListener;
if (listener != null) {
listener.onCommandReady(agentId, command);
}
return command;
}
/**
* Acknowledge a command, transitioning it from PENDING/DELIVERED to ACKNOWLEDGED.
*
* @return true if the command was found and acknowledged
*/
public boolean acknowledgeCommand(String agentId, String commandId) {
throw new UnsupportedOperationException("Not yet implemented");
ConcurrentLinkedQueue<AgentCommand> queue = commands.get(agentId);
if (queue == null) {
return false;
}
// Replace the command in the queue with an acknowledged version
boolean[] found = {false};
ConcurrentLinkedQueue<AgentCommand> newQueue = new ConcurrentLinkedQueue<>();
for (AgentCommand cmd : queue) {
if (cmd.id().equals(commandId) && !found[0]) {
newQueue.add(cmd.withStatus(CommandStatus.ACKNOWLEDGED));
found[0] = true;
} else {
newQueue.add(cmd);
}
}
if (found[0]) {
commands.put(agentId, newQueue);
log.debug("Command {} acknowledged by agent {}", commandId, agentId);
}
return found[0];
}
/**
* Return all PENDING commands for the specified agent.
*/
public List<AgentCommand> findPendingCommands(String agentId) {
throw new UnsupportedOperationException("Not yet implemented");
ConcurrentLinkedQueue<AgentCommand> queue = commands.get(agentId);
if (queue == null) {
return List.of();
}
return queue.stream()
.filter(cmd -> cmd.status() == CommandStatus.PENDING)
.collect(Collectors.toList());
}
/**
* Mark a command as DELIVERED (sent via SSE but not yet acknowledged).
*/
public void markDelivered(String agentId, String commandId) {
throw new UnsupportedOperationException("Not yet implemented");
ConcurrentLinkedQueue<AgentCommand> queue = commands.get(agentId);
if (queue == null) {
return;
}
ConcurrentLinkedQueue<AgentCommand> newQueue = new ConcurrentLinkedQueue<>();
for (AgentCommand cmd : queue) {
if (cmd.id().equals(commandId) && cmd.status() == CommandStatus.PENDING) {
newQueue.add(cmd.withStatus(CommandStatus.DELIVERED));
} else {
newQueue.add(cmd);
}
}
commands.put(agentId, newQueue);
}
/**
* Expire PENDING commands older than commandExpiryMs.
*/
public void expireOldCommands() {
throw new UnsupportedOperationException("Not yet implemented");
Instant cutoff = Instant.now().minusMillis(commandExpiryMs);
for (Map.Entry<String, ConcurrentLinkedQueue<AgentCommand>> entry : commands.entrySet()) {
ConcurrentLinkedQueue<AgentCommand> queue = entry.getValue();
queue.removeIf(cmd ->
cmd.status() == CommandStatus.PENDING && cmd.createdAt().isBefore(cutoff));
}
}
/**
* Set the event listener for command notifications.
* The SSE layer in the app module implements this interface.
*/
public void setEventListener(AgentEventListener listener) {
throw new UnsupportedOperationException("Not yet implemented");
this.eventListener = listener;
}
}