Compare commits
8 Commits
e54d20bcb7
...
aa3d9f375b
| Author | SHA1 | Date | |
|---|---|---|---|
| aa3d9f375b | |||
| 2887fe9599 | |||
| b1679b110c | |||
| e7835e1100 | |||
| ed65b87af2 | |||
| 4a99e6cf6b | |||
| 4d9a9ff851 | |||
| 292a38fe30 |
@@ -1,10 +1,12 @@
|
||||
package com.cameleer3.server.app.controller;
|
||||
|
||||
import com.cameleer3.server.app.agent.SseConnectionManager;
|
||||
import com.cameleer3.server.app.dto.CommandAckRequest;
|
||||
import com.cameleer3.server.app.dto.CommandBroadcastResponse;
|
||||
import com.cameleer3.server.app.dto.CommandRequest;
|
||||
import com.cameleer3.server.app.dto.CommandSingleResponse;
|
||||
import com.cameleer3.server.core.agent.AgentCommand;
|
||||
import com.cameleer3.server.core.agent.AgentEventService;
|
||||
import com.cameleer3.server.core.agent.AgentInfo;
|
||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||
import com.cameleer3.server.core.agent.AgentState;
|
||||
@@ -48,18 +50,21 @@ public class AgentCommandController {
|
||||
private final AgentRegistryService registryService;
|
||||
private final SseConnectionManager connectionManager;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final AgentEventService agentEventService;
|
||||
|
||||
public AgentCommandController(AgentRegistryService registryService,
|
||||
SseConnectionManager connectionManager,
|
||||
ObjectMapper objectMapper) {
|
||||
ObjectMapper objectMapper,
|
||||
AgentEventService agentEventService) {
|
||||
this.registryService = registryService;
|
||||
this.connectionManager = connectionManager;
|
||||
this.objectMapper = objectMapper;
|
||||
this.agentEventService = agentEventService;
|
||||
}
|
||||
|
||||
@PostMapping("/{id}/commands")
|
||||
@Operation(summary = "Send command to a specific agent",
|
||||
description = "Sends a config-update, deep-trace, or replay command to the specified agent")
|
||||
description = "Sends a command to the specified agent via SSE")
|
||||
@ApiResponse(responseCode = "202", description = "Command accepted")
|
||||
@ApiResponse(responseCode = "400", description = "Invalid command payload")
|
||||
@ApiResponse(responseCode = "404", description = "Agent not registered")
|
||||
@@ -128,15 +133,26 @@ public class AgentCommandController {
|
||||
|
||||
@PostMapping("/{id}/commands/{commandId}/ack")
|
||||
@Operation(summary = "Acknowledge command receipt",
|
||||
description = "Agent acknowledges that it has received and processed a command")
|
||||
description = "Agent acknowledges that it has received and processed a command, with result status and message")
|
||||
@ApiResponse(responseCode = "200", description = "Command acknowledged")
|
||||
@ApiResponse(responseCode = "404", description = "Command not found")
|
||||
public ResponseEntity<Void> acknowledgeCommand(@PathVariable String id,
|
||||
@PathVariable String commandId) {
|
||||
@PathVariable String commandId,
|
||||
@RequestBody(required = false) CommandAckRequest body) {
|
||||
boolean acknowledged = registryService.acknowledgeCommand(id, commandId);
|
||||
if (!acknowledged) {
|
||||
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Command not found: " + commandId);
|
||||
}
|
||||
|
||||
// Record command result in agent event log
|
||||
if (body != null && body.status() != null) {
|
||||
AgentInfo agent = registryService.findById(id);
|
||||
String application = agent != null ? agent.application() : "unknown";
|
||||
agentEventService.recordEvent(id, application, "COMMAND_" + body.status(),
|
||||
"Command " + commandId + ": " + body.message());
|
||||
log.debug("Command {} ack from agent {}: {} - {}", commandId, id, body.status(), body.message());
|
||||
}
|
||||
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
|
||||
@@ -145,8 +161,9 @@ public class AgentCommandController {
|
||||
case "config-update" -> CommandType.CONFIG_UPDATE;
|
||||
case "deep-trace" -> CommandType.DEEP_TRACE;
|
||||
case "replay" -> CommandType.REPLAY;
|
||||
case "set-traced-processors" -> CommandType.SET_TRACED_PROCESSORS;
|
||||
default -> throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
|
||||
"Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay");
|
||||
"Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay, set-traced-processors");
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
package com.cameleer3.server.app.dto;
|
||||
|
||||
/**
|
||||
* Request body for command acknowledgment from agents.
|
||||
* Contains the result status and message of the command execution.
|
||||
*
|
||||
* @param status "SUCCESS" or "FAILURE"
|
||||
* @param message human-readable description of the result
|
||||
*/
|
||||
public record CommandAckRequest(String status, String message) {}
|
||||
@@ -27,8 +27,9 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
INSERT INTO executions (execution_id, route_id, agent_id, application_name,
|
||||
status, correlation_id, exchange_id, start_time, end_time,
|
||||
duration_ms, error_message, error_stacktrace, diagram_content_hash,
|
||||
engine_level, input_body, output_body, input_headers, output_headers,
|
||||
created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, now(), now())
|
||||
ON CONFLICT (execution_id, start_time) DO UPDATE SET
|
||||
status = CASE
|
||||
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
|
||||
@@ -42,6 +43,11 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
error_message = COALESCE(EXCLUDED.error_message, executions.error_message),
|
||||
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace),
|
||||
diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash),
|
||||
engine_level = COALESCE(EXCLUDED.engine_level, executions.engine_level),
|
||||
input_body = COALESCE(EXCLUDED.input_body, executions.input_body),
|
||||
output_body = COALESCE(EXCLUDED.output_body, executions.output_body),
|
||||
input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers),
|
||||
output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers),
|
||||
updated_at = now()
|
||||
""",
|
||||
execution.executionId(), execution.routeId(), execution.agentId(),
|
||||
@@ -50,7 +56,10 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
Timestamp.from(execution.startTime()),
|
||||
execution.endTime() != null ? Timestamp.from(execution.endTime()) : null,
|
||||
execution.durationMs(), execution.errorMessage(),
|
||||
execution.errorStacktrace(), execution.diagramContentHash());
|
||||
execution.errorStacktrace(), execution.diagramContentHash(),
|
||||
execution.engineLevel(),
|
||||
execution.inputBody(), execution.outputBody(),
|
||||
execution.inputHeaders(), execution.outputHeaders());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -109,7 +118,10 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
|
||||
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
|
||||
rs.getString("error_message"), rs.getString("error_stacktrace"),
|
||||
rs.getString("diagram_content_hash"));
|
||||
rs.getString("diagram_content_hash"),
|
||||
rs.getString("engine_level"),
|
||||
rs.getString("input_body"), rs.getString("output_body"),
|
||||
rs.getString("input_headers"), rs.getString("output_headers"));
|
||||
|
||||
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
|
||||
new ProcessorRecord(
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
-- Add engine level and route-level snapshot columns to executions table.
|
||||
-- Required for REGULAR engine level where route-level payloads exist but
|
||||
-- no processor execution records are created.
|
||||
|
||||
ALTER TABLE executions ADD COLUMN IF NOT EXISTS engine_level VARCHAR(16);
|
||||
ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_body TEXT;
|
||||
ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_body TEXT;
|
||||
ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_headers JSONB;
|
||||
ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_headers JSONB;
|
||||
@@ -6,5 +6,6 @@ package com.cameleer3.server.core.agent;
|
||||
public enum CommandType {
|
||||
CONFIG_UPDATE,
|
||||
DEEP_TRACE,
|
||||
REPLAY
|
||||
REPLAY,
|
||||
SET_TRACED_PROCESSORS
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.cameleer3.server.core.ingestion;
|
||||
|
||||
import com.cameleer3.common.model.ExchangeSnapshot;
|
||||
import com.cameleer3.common.model.ProcessorExecution;
|
||||
import com.cameleer3.common.model.RouteExecution;
|
||||
import com.cameleer3.server.core.indexing.ExecutionUpdatedEvent;
|
||||
@@ -77,6 +78,25 @@ public class IngestionService {
|
||||
String diagramHash = diagramStore
|
||||
.findContentHashForRoute(exec.getRouteId(), agentId)
|
||||
.orElse("");
|
||||
|
||||
// Extract route-level snapshots (critical for REGULAR mode where no processors are recorded)
|
||||
String inputBody = null;
|
||||
String outputBody = null;
|
||||
String inputHeaders = null;
|
||||
String outputHeaders = null;
|
||||
|
||||
ExchangeSnapshot inputSnapshot = exec.getInputSnapshot();
|
||||
if (inputSnapshot != null) {
|
||||
inputBody = truncateBody(inputSnapshot.getBody());
|
||||
inputHeaders = toJson(inputSnapshot.getHeaders());
|
||||
}
|
||||
|
||||
ExchangeSnapshot outputSnapshot = exec.getOutputSnapshot();
|
||||
if (outputSnapshot != null) {
|
||||
outputBody = truncateBody(outputSnapshot.getBody());
|
||||
outputHeaders = toJson(outputSnapshot.getHeaders());
|
||||
}
|
||||
|
||||
return new ExecutionRecord(
|
||||
exec.getExchangeId(), exec.getRouteId(), agentId, applicationName,
|
||||
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
|
||||
@@ -84,7 +104,9 @@ public class IngestionService {
|
||||
exec.getStartTime(), exec.getEndTime(),
|
||||
exec.getDurationMs(),
|
||||
exec.getErrorMessage(), exec.getErrorStackTrace(),
|
||||
diagramHash
|
||||
diagramHash,
|
||||
exec.getEngineLevel(),
|
||||
inputBody, outputBody, inputHeaders, outputHeaders
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,9 @@ public interface ExecutionStore {
|
||||
String executionId, String routeId, String agentId, String applicationName,
|
||||
String status, String correlationId, String exchangeId,
|
||||
Instant startTime, Instant endTime, Long durationMs,
|
||||
String errorMessage, String errorStacktrace, String diagramContentHash
|
||||
String errorMessage, String errorStacktrace, String diagramContentHash,
|
||||
String engineLevel,
|
||||
String inputBody, String outputBody, String inputHeaders, String outputHeaders
|
||||
) {}
|
||||
|
||||
record ProcessorRecord(
|
||||
|
||||
Reference in New Issue
Block a user