Merge pull request 'feat: agent protocol v2 — engine levels, enriched acks, route snapshots' (#91) from fix/agent-protocol-v2 into main
Some checks failed
CI / build (push) Failing after 1m0s
CI / cleanup-branch (push) Has been skipped
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped

Reviewed-on: cameleer/cameleer3-server#91
This commit is contained in:
2026-03-24 16:50:09 +01:00
7 changed files with 84 additions and 11 deletions

View File

@@ -1,10 +1,12 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.agent.SseConnectionManager;
import com.cameleer3.server.app.dto.CommandAckRequest;
import com.cameleer3.server.app.dto.CommandBroadcastResponse;
import com.cameleer3.server.app.dto.CommandRequest;
import com.cameleer3.server.app.dto.CommandSingleResponse;
import com.cameleer3.server.core.agent.AgentCommand;
import com.cameleer3.server.core.agent.AgentEventService;
import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.agent.AgentState;
@@ -48,18 +50,21 @@ public class AgentCommandController {
private final AgentRegistryService registryService;
private final SseConnectionManager connectionManager;
private final ObjectMapper objectMapper;
private final AgentEventService agentEventService;
public AgentCommandController(AgentRegistryService registryService,
SseConnectionManager connectionManager,
ObjectMapper objectMapper) {
ObjectMapper objectMapper,
AgentEventService agentEventService) {
this.registryService = registryService;
this.connectionManager = connectionManager;
this.objectMapper = objectMapper;
this.agentEventService = agentEventService;
}
@PostMapping("/{id}/commands")
@Operation(summary = "Send command to a specific agent",
description = "Sends a config-update, deep-trace, or replay command to the specified agent")
description = "Sends a command to the specified agent via SSE")
@ApiResponse(responseCode = "202", description = "Command accepted")
@ApiResponse(responseCode = "400", description = "Invalid command payload")
@ApiResponse(responseCode = "404", description = "Agent not registered")
@@ -128,15 +133,26 @@ public class AgentCommandController {
@PostMapping("/{id}/commands/{commandId}/ack")
@Operation(summary = "Acknowledge command receipt",
description = "Agent acknowledges that it has received and processed a command")
description = "Agent acknowledges that it has received and processed a command, with result status and message")
@ApiResponse(responseCode = "200", description = "Command acknowledged")
@ApiResponse(responseCode = "404", description = "Command not found")
public ResponseEntity<Void> acknowledgeCommand(@PathVariable String id,
@PathVariable String commandId) {
@PathVariable String commandId,
@RequestBody(required = false) CommandAckRequest body) {
boolean acknowledged = registryService.acknowledgeCommand(id, commandId);
if (!acknowledged) {
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Command not found: " + commandId);
}
// Record command result in agent event log
if (body != null && body.status() != null) {
AgentInfo agent = registryService.findById(id);
String application = agent != null ? agent.application() : "unknown";
agentEventService.recordEvent(id, application, "COMMAND_" + body.status(),
"Command " + commandId + ": " + body.message());
log.debug("Command {} ack from agent {}: {} - {}", commandId, id, body.status(), body.message());
}
return ResponseEntity.ok().build();
}
@@ -145,8 +161,9 @@ public class AgentCommandController {
case "config-update" -> CommandType.CONFIG_UPDATE;
case "deep-trace" -> CommandType.DEEP_TRACE;
case "replay" -> CommandType.REPLAY;
case "set-traced-processors" -> CommandType.SET_TRACED_PROCESSORS;
default -> throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
"Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay");
"Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay, set-traced-processors");
};
}
}

View File

@@ -0,0 +1,10 @@
package com.cameleer3.server.app.dto;
/**
* Request body for command acknowledgment from agents.
* Contains the result status and message of the command execution.
*
* @param status "SUCCESS" or "FAILURE"
* @param message human-readable description of the result
*/
public record CommandAckRequest(String status, String message) {}

View File

@@ -27,8 +27,9 @@ public class PostgresExecutionStore implements ExecutionStore {
INSERT INTO executions (execution_id, route_id, agent_id, application_name,
status, correlation_id, exchange_id, start_time, end_time,
duration_ms, error_message, error_stacktrace, diagram_content_hash,
engine_level, input_body, output_body, input_headers, output_headers,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, now(), now())
ON CONFLICT (execution_id, start_time) DO UPDATE SET
status = CASE
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
@@ -42,6 +43,11 @@ public class PostgresExecutionStore implements ExecutionStore {
error_message = COALESCE(EXCLUDED.error_message, executions.error_message),
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace),
diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash),
engine_level = COALESCE(EXCLUDED.engine_level, executions.engine_level),
input_body = COALESCE(EXCLUDED.input_body, executions.input_body),
output_body = COALESCE(EXCLUDED.output_body, executions.output_body),
input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers),
output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers),
updated_at = now()
""",
execution.executionId(), execution.routeId(), execution.agentId(),
@@ -50,7 +56,10 @@ public class PostgresExecutionStore implements ExecutionStore {
Timestamp.from(execution.startTime()),
execution.endTime() != null ? Timestamp.from(execution.endTime()) : null,
execution.durationMs(), execution.errorMessage(),
execution.errorStacktrace(), execution.diagramContentHash());
execution.errorStacktrace(), execution.diagramContentHash(),
execution.engineLevel(),
execution.inputBody(), execution.outputBody(),
execution.inputHeaders(), execution.outputHeaders());
}
@Override
@@ -109,7 +118,10 @@ public class PostgresExecutionStore implements ExecutionStore {
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
rs.getString("error_message"), rs.getString("error_stacktrace"),
rs.getString("diagram_content_hash"));
rs.getString("diagram_content_hash"),
rs.getString("engine_level"),
rs.getString("input_body"), rs.getString("output_body"),
rs.getString("input_headers"), rs.getString("output_headers"));
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
new ProcessorRecord(

View File

@@ -0,0 +1,9 @@
-- Add engine level and route-level snapshot columns to executions table.
-- Required for REGULAR engine level where route-level payloads exist but
-- no processor execution records are created.
ALTER TABLE executions ADD COLUMN IF NOT EXISTS engine_level VARCHAR(16);
ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_body TEXT;
ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_body TEXT;
ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_headers JSONB;
ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_headers JSONB;

View File

@@ -6,5 +6,6 @@ package com.cameleer3.server.core.agent;
public enum CommandType {
CONFIG_UPDATE,
DEEP_TRACE,
REPLAY
REPLAY,
SET_TRACED_PROCESSORS
}

View File

@@ -1,5 +1,6 @@
package com.cameleer3.server.core.ingestion;
import com.cameleer3.common.model.ExchangeSnapshot;
import com.cameleer3.common.model.ProcessorExecution;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.indexing.ExecutionUpdatedEvent;
@@ -77,6 +78,25 @@ public class IngestionService {
String diagramHash = diagramStore
.findContentHashForRoute(exec.getRouteId(), agentId)
.orElse("");
// Extract route-level snapshots (critical for REGULAR mode where no processors are recorded)
String inputBody = null;
String outputBody = null;
String inputHeaders = null;
String outputHeaders = null;
ExchangeSnapshot inputSnapshot = exec.getInputSnapshot();
if (inputSnapshot != null) {
inputBody = truncateBody(inputSnapshot.getBody());
inputHeaders = toJson(inputSnapshot.getHeaders());
}
ExchangeSnapshot outputSnapshot = exec.getOutputSnapshot();
if (outputSnapshot != null) {
outputBody = truncateBody(outputSnapshot.getBody());
outputHeaders = toJson(outputSnapshot.getHeaders());
}
return new ExecutionRecord(
exec.getExchangeId(), exec.getRouteId(), agentId, applicationName,
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
@@ -84,7 +104,9 @@ public class IngestionService {
exec.getStartTime(), exec.getEndTime(),
exec.getDurationMs(),
exec.getErrorMessage(), exec.getErrorStackTrace(),
diagramHash
diagramHash,
exec.getEngineLevel(),
inputBody, outputBody, inputHeaders, outputHeaders
);
}

View File

@@ -20,7 +20,9 @@ public interface ExecutionStore {
String executionId, String routeId, String agentId, String applicationName,
String status, String correlationId, String exchangeId,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace, String diagramContentHash
String errorMessage, String errorStacktrace, String diagramContentHash,
String engineLevel,
String inputBody, String outputBody, String inputHeaders, String outputHeaders
) {}
record ProcessorRecord(