feat: add TEST_EXPRESSION command with request-reply infrastructure
Adds CompletableFuture-based request-reply mechanism for commands that need synchronous results. CommandReply record in core, pendingReplies map in AgentRegistryService, test-expression endpoint on config controller with 5s timeout. CommandAckRequest extended with optional data field. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -166,6 +166,12 @@ public class AgentCommandController {
|
||||
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Command not found: " + commandId);
|
||||
}
|
||||
|
||||
// Complete any pending reply future (for synchronous request-reply commands like TEST_EXPRESSION)
|
||||
registryService.completeReply(commandId,
|
||||
body != null ? body.status() : "SUCCESS",
|
||||
body != null ? body.message() : null,
|
||||
body != null ? body.data() : null);
|
||||
|
||||
// Record command result in agent event log
|
||||
if (body != null && body.status() != null) {
|
||||
AgentInfo agent = registryService.findById(id);
|
||||
@@ -184,8 +190,9 @@ public class AgentCommandController {
|
||||
case "deep-trace" -> CommandType.DEEP_TRACE;
|
||||
case "replay" -> CommandType.REPLAY;
|
||||
case "set-traced-processors" -> CommandType.SET_TRACED_PROCESSORS;
|
||||
case "test-expression" -> CommandType.TEST_EXPRESSION;
|
||||
default -> throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
|
||||
"Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay, set-traced-processors");
|
||||
"Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay, set-traced-processors, test-expression");
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.cameleer3.server.app.controller;
|
||||
|
||||
import com.cameleer3.common.model.ApplicationConfig;
|
||||
import com.cameleer3.server.app.dto.TestExpressionRequest;
|
||||
import com.cameleer3.server.app.dto.TestExpressionResponse;
|
||||
import com.cameleer3.server.app.storage.PostgresApplicationConfigRepository;
|
||||
import com.cameleer3.server.core.admin.AuditCategory;
|
||||
import com.cameleer3.server.core.admin.AuditResult;
|
||||
@@ -9,6 +11,7 @@ import com.cameleer3.server.core.agent.AgentCommand;
|
||||
import com.cameleer3.server.core.agent.AgentInfo;
|
||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||
import com.cameleer3.server.core.agent.AgentState;
|
||||
import com.cameleer3.server.core.agent.CommandReply;
|
||||
import com.cameleer3.server.core.agent.CommandType;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
@@ -18,12 +21,17 @@ import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.security.core.Authentication;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* Per-application configuration management.
|
||||
@@ -95,6 +103,63 @@ public class ApplicationConfigController {
|
||||
return ResponseEntity.ok(saved);
|
||||
}
|
||||
|
||||
@PostMapping("/{application}/test-expression")
|
||||
@Operation(summary = "Test a tap expression against sample data via a live agent")
|
||||
@ApiResponse(responseCode = "200", description = "Expression evaluated successfully")
|
||||
@ApiResponse(responseCode = "404", description = "No live agent available for this application")
|
||||
@ApiResponse(responseCode = "504", description = "Agent did not respond in time")
|
||||
public ResponseEntity<TestExpressionResponse> testExpression(
|
||||
@PathVariable String application,
|
||||
@RequestBody TestExpressionRequest request) {
|
||||
// Find a LIVE agent for this application
|
||||
AgentInfo agent = registryService.findAll().stream()
|
||||
.filter(a -> application.equals(a.application()))
|
||||
.filter(a -> a.state() == AgentState.LIVE)
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
|
||||
if (agent == null) {
|
||||
return ResponseEntity.status(HttpStatus.NOT_FOUND)
|
||||
.body(new TestExpressionResponse(null, "No live agent available for application: " + application));
|
||||
}
|
||||
|
||||
// Build payload JSON
|
||||
String payloadJson;
|
||||
try {
|
||||
payloadJson = objectMapper.writeValueAsString(Map.of(
|
||||
"expression", request.expression() != null ? request.expression() : "",
|
||||
"language", request.language() != null ? request.language() : "",
|
||||
"body", request.body() != null ? request.body() : "",
|
||||
"target", request.target() != null ? request.target() : ""
|
||||
));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Failed to serialize test-expression payload", e);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
|
||||
.body(new TestExpressionResponse(null, "Failed to serialize request"));
|
||||
}
|
||||
|
||||
// Send command and await reply
|
||||
CompletableFuture<CommandReply> future = registryService.addCommandWithReply(
|
||||
agent.id(), CommandType.TEST_EXPRESSION, payloadJson);
|
||||
|
||||
try {
|
||||
CommandReply reply = future.orTimeout(5, TimeUnit.SECONDS).join();
|
||||
if ("SUCCESS".equals(reply.status())) {
|
||||
return ResponseEntity.ok(new TestExpressionResponse(reply.data(), null));
|
||||
} else {
|
||||
return ResponseEntity.ok(new TestExpressionResponse(null, reply.message()));
|
||||
}
|
||||
} catch (CompletionException e) {
|
||||
if (e.getCause() instanceof TimeoutException) {
|
||||
return ResponseEntity.status(HttpStatus.GATEWAY_TIMEOUT)
|
||||
.body(new TestExpressionResponse(null, "Agent did not respond within 5 seconds"));
|
||||
}
|
||||
log.error("Error awaiting test-expression reply from agent {}", agent.id(), e);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
|
||||
.body(new TestExpressionResponse(null, "Internal error: " + e.getCause().getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
private int pushConfigToAgents(String application, ApplicationConfig config) {
|
||||
String payloadJson;
|
||||
try {
|
||||
|
||||
@@ -6,5 +6,6 @@ package com.cameleer3.server.app.dto;
|
||||
*
|
||||
* @param status "SUCCESS" or "FAILURE"
|
||||
* @param message human-readable description of the result
|
||||
* @param data optional structured JSON data returned by the agent (e.g. expression evaluation results)
|
||||
*/
|
||||
public record CommandAckRequest(String status, String message) {}
|
||||
public record CommandAckRequest(String status, String message, String data) {}
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.cameleer3.server.app.dto;
|
||||
|
||||
/**
|
||||
* Request body for testing a tap expression against sample data via a live agent.
|
||||
*
|
||||
* @param expression the expression to evaluate (e.g. Simple, JSONPath, XPath)
|
||||
* @param language the expression language identifier
|
||||
* @param body sample message body to evaluate the expression against
|
||||
* @param target what the expression targets (e.g. "body", "header", "property")
|
||||
*/
|
||||
public record TestExpressionRequest(String expression, String language, String body, String target) {}
|
||||
@@ -0,0 +1,9 @@
|
||||
package com.cameleer3.server.app.dto;
|
||||
|
||||
/**
|
||||
* Response from testing a tap expression against sample data.
|
||||
*
|
||||
* @param result the evaluation result (null if an error occurred)
|
||||
* @param error error message if evaluation failed (null on success)
|
||||
*/
|
||||
public record TestExpressionResponse(String result, String error) {}
|
||||
@@ -9,6 +9,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -30,6 +31,7 @@ public class AgentRegistryService {
|
||||
|
||||
private final ConcurrentHashMap<String, AgentInfo> agents = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<AgentCommand>> commands = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, CompletableFuture<CommandReply>> pendingReplies = new ConcurrentHashMap<>();
|
||||
|
||||
private volatile AgentEventListener eventListener;
|
||||
|
||||
@@ -279,6 +281,31 @@ public class AgentRegistryService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a command that expects a synchronous reply from the agent.
|
||||
* Returns a CompletableFuture that will be completed when the agent ACKs the command.
|
||||
* Auto-cleans up from the pending map on completion or timeout.
|
||||
*/
|
||||
public CompletableFuture<CommandReply> addCommandWithReply(String agentId, CommandType type, String payload) {
|
||||
AgentCommand command = addCommand(agentId, type, payload);
|
||||
CompletableFuture<CommandReply> future = new CompletableFuture<>();
|
||||
pendingReplies.put(command.id(), future);
|
||||
future.whenComplete((result, ex) -> pendingReplies.remove(command.id()));
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete a pending reply future for a command.
|
||||
* Called when an agent ACKs a command that was registered via {@link #addCommandWithReply}.
|
||||
* No-op if no pending future exists for the given command ID.
|
||||
*/
|
||||
public void completeReply(String commandId, String status, String message, String data) {
|
||||
CompletableFuture<CommandReply> future = pendingReplies.remove(commandId);
|
||||
if (future != null) {
|
||||
future.complete(new CommandReply(status, message, data));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the event listener for command notifications.
|
||||
* The SSE layer in the app module implements this interface.
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.cameleer3.server.core.agent;
|
||||
|
||||
/**
|
||||
* Represents the reply data from an agent command acknowledgment.
|
||||
* Used for synchronous request-reply command patterns (e.g. TEST_EXPRESSION).
|
||||
*
|
||||
* @param status "SUCCESS" or "FAILURE"
|
||||
* @param message human-readable description of the result
|
||||
* @param data optional structured JSON data returned by the agent
|
||||
*/
|
||||
public record CommandReply(String status, String message, String data) {}
|
||||
@@ -7,5 +7,6 @@ public enum CommandType {
|
||||
CONFIG_UPDATE,
|
||||
DEEP_TRACE,
|
||||
REPLAY,
|
||||
SET_TRACED_PROCESSORS
|
||||
SET_TRACED_PROCESSORS,
|
||||
TEST_EXPRESSION
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user