diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java index e71dbc3b..41186e0b 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java @@ -166,6 +166,12 @@ public class AgentCommandController { throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Command not found: " + commandId); } + // Complete any pending reply future (for synchronous request-reply commands like TEST_EXPRESSION) + registryService.completeReply(commandId, + body != null ? body.status() : "SUCCESS", + body != null ? body.message() : null, + body != null ? body.data() : null); + // Record command result in agent event log if (body != null && body.status() != null) { AgentInfo agent = registryService.findById(id); @@ -184,8 +190,9 @@ public class AgentCommandController { case "deep-trace" -> CommandType.DEEP_TRACE; case "replay" -> CommandType.REPLAY; case "set-traced-processors" -> CommandType.SET_TRACED_PROCESSORS; + case "test-expression" -> CommandType.TEST_EXPRESSION; default -> throw new ResponseStatusException(HttpStatus.BAD_REQUEST, - "Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay, set-traced-processors"); + "Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay, set-traced-processors, test-expression"); }; } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ApplicationConfigController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ApplicationConfigController.java index 594aff5d..bcade2b8 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ApplicationConfigController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ApplicationConfigController.java @@ -1,6 +1,8 @@ package com.cameleer3.server.app.controller; import com.cameleer3.common.model.ApplicationConfig; +import com.cameleer3.server.app.dto.TestExpressionRequest; +import com.cameleer3.server.app.dto.TestExpressionResponse; import com.cameleer3.server.app.storage.PostgresApplicationConfigRepository; import com.cameleer3.server.core.admin.AuditCategory; import com.cameleer3.server.core.admin.AuditResult; @@ -9,6 +11,7 @@ import com.cameleer3.server.core.agent.AgentCommand; import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.agent.AgentState; +import com.cameleer3.server.core.agent.CommandReply; import com.cameleer3.server.core.agent.CommandType; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -18,12 +21,17 @@ import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.servlet.http.HttpServletRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.core.Authentication; import org.springframework.web.bind.annotation.*; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Per-application configuration management. @@ -95,6 +103,63 @@ public class ApplicationConfigController { return ResponseEntity.ok(saved); } + @PostMapping("/{application}/test-expression") + @Operation(summary = "Test a tap expression against sample data via a live agent") + @ApiResponse(responseCode = "200", description = "Expression evaluated successfully") + @ApiResponse(responseCode = "404", description = "No live agent available for this application") + @ApiResponse(responseCode = "504", description = "Agent did not respond in time") + public ResponseEntity testExpression( + @PathVariable String application, + @RequestBody TestExpressionRequest request) { + // Find a LIVE agent for this application + AgentInfo agent = registryService.findAll().stream() + .filter(a -> application.equals(a.application())) + .filter(a -> a.state() == AgentState.LIVE) + .findFirst() + .orElse(null); + + if (agent == null) { + return ResponseEntity.status(HttpStatus.NOT_FOUND) + .body(new TestExpressionResponse(null, "No live agent available for application: " + application)); + } + + // Build payload JSON + String payloadJson; + try { + payloadJson = objectMapper.writeValueAsString(Map.of( + "expression", request.expression() != null ? request.expression() : "", + "language", request.language() != null ? request.language() : "", + "body", request.body() != null ? request.body() : "", + "target", request.target() != null ? request.target() : "" + )); + } catch (JsonProcessingException e) { + log.error("Failed to serialize test-expression payload", e); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) + .body(new TestExpressionResponse(null, "Failed to serialize request")); + } + + // Send command and await reply + CompletableFuture future = registryService.addCommandWithReply( + agent.id(), CommandType.TEST_EXPRESSION, payloadJson); + + try { + CommandReply reply = future.orTimeout(5, TimeUnit.SECONDS).join(); + if ("SUCCESS".equals(reply.status())) { + return ResponseEntity.ok(new TestExpressionResponse(reply.data(), null)); + } else { + return ResponseEntity.ok(new TestExpressionResponse(null, reply.message())); + } + } catch (CompletionException e) { + if (e.getCause() instanceof TimeoutException) { + return ResponseEntity.status(HttpStatus.GATEWAY_TIMEOUT) + .body(new TestExpressionResponse(null, "Agent did not respond within 5 seconds")); + } + log.error("Error awaiting test-expression reply from agent {}", agent.id(), e); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) + .body(new TestExpressionResponse(null, "Internal error: " + e.getCause().getMessage())); + } + } + private int pushConfigToAgents(String application, ApplicationConfig config) { String payloadJson; try { diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/CommandAckRequest.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/CommandAckRequest.java index 81ff245b..2e420b11 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/CommandAckRequest.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/CommandAckRequest.java @@ -6,5 +6,6 @@ package com.cameleer3.server.app.dto; * * @param status "SUCCESS" or "FAILURE" * @param message human-readable description of the result + * @param data optional structured JSON data returned by the agent (e.g. expression evaluation results) */ -public record CommandAckRequest(String status, String message) {} +public record CommandAckRequest(String status, String message, String data) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/TestExpressionRequest.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/TestExpressionRequest.java new file mode 100644 index 00000000..d5ba7f6d --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/TestExpressionRequest.java @@ -0,0 +1,11 @@ +package com.cameleer3.server.app.dto; + +/** + * Request body for testing a tap expression against sample data via a live agent. + * + * @param expression the expression to evaluate (e.g. Simple, JSONPath, XPath) + * @param language the expression language identifier + * @param body sample message body to evaluate the expression against + * @param target what the expression targets (e.g. "body", "header", "property") + */ +public record TestExpressionRequest(String expression, String language, String body, String target) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/TestExpressionResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/TestExpressionResponse.java new file mode 100644 index 00000000..e2e93aa3 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/TestExpressionResponse.java @@ -0,0 +1,9 @@ +package com.cameleer3.server.app.dto; + +/** + * Response from testing a tap expression against sample data. + * + * @param result the evaluation result (null if an error occurred) + * @param error error message if evaluation failed (null on success) + */ +public record TestExpressionResponse(String result, String error) {} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java index a3b50692..3f42f06e 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; @@ -30,6 +31,7 @@ public class AgentRegistryService { private final ConcurrentHashMap agents = new ConcurrentHashMap<>(); private final ConcurrentHashMap> commands = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> pendingReplies = new ConcurrentHashMap<>(); private volatile AgentEventListener eventListener; @@ -279,6 +281,31 @@ public class AgentRegistryService { } } + /** + * Register a command that expects a synchronous reply from the agent. + * Returns a CompletableFuture that will be completed when the agent ACKs the command. + * Auto-cleans up from the pending map on completion or timeout. + */ + public CompletableFuture addCommandWithReply(String agentId, CommandType type, String payload) { + AgentCommand command = addCommand(agentId, type, payload); + CompletableFuture future = new CompletableFuture<>(); + pendingReplies.put(command.id(), future); + future.whenComplete((result, ex) -> pendingReplies.remove(command.id())); + return future; + } + + /** + * Complete a pending reply future for a command. + * Called when an agent ACKs a command that was registered via {@link #addCommandWithReply}. + * No-op if no pending future exists for the given command ID. + */ + public void completeReply(String commandId, String status, String message, String data) { + CompletableFuture future = pendingReplies.remove(commandId); + if (future != null) { + future.complete(new CommandReply(status, message, data)); + } + } + /** * Set the event listener for command notifications. * The SSE layer in the app module implements this interface. diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandReply.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandReply.java new file mode 100644 index 00000000..3538d3d3 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandReply.java @@ -0,0 +1,11 @@ +package com.cameleer3.server.core.agent; + +/** + * Represents the reply data from an agent command acknowledgment. + * Used for synchronous request-reply command patterns (e.g. TEST_EXPRESSION). + * + * @param status "SUCCESS" or "FAILURE" + * @param message human-readable description of the result + * @param data optional structured JSON data returned by the agent + */ +public record CommandReply(String status, String message, String data) {} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java index 585239b9..0e0000d8 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java @@ -7,5 +7,6 @@ public enum CommandType { CONFIG_UPDATE, DEEP_TRACE, REPLAY, - SET_TRACED_PROCESSORS + SET_TRACED_PROCESSORS, + TEST_EXPRESSION }