Compare commits

...

8 Commits

Author SHA1 Message Date
aa3d9f375b Merge pull request 'feat: agent protocol v2 — engine levels, enriched acks, route snapshots' (#91) from fix/agent-protocol-v2 into main
Some checks failed
CI / build (push) Failing after 1m0s
CI / cleanup-branch (push) Has been skipped
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
Reviewed-on: cameleer/cameleer3-server#91
2026-03-24 16:50:09 +01:00
2887fe9599 feat: add V3 migration for engine_level and route-level snapshot columns
Some checks failed
CI / build (push) Failing after 51s
CI / cleanup-branch (push) Has been skipped
CI / build (pull_request) Failing after 52s
CI / cleanup-branch (pull_request) Has been skipped
CI / docker (push) Has been skipped
CI / docker (pull_request) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
CI / deploy (pull_request) Has been skipped
CI / deploy-feature (pull_request) Has been skipped
2026-03-24 16:13:11 +01:00
b1679b110c feat: add engine_level and route-level snapshot columns to PostgresExecutionStore
Some checks failed
CI / docker (push) Has been cancelled
CI / build (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / deploy-feature (push) Has been cancelled
CI / cleanup-branch (push) Has been cancelled
Add engine_level, input_body, output_body, input_headers, output_headers
to the executions INSERT/SELECT/UPSERT and row mapper. Required for
REGULAR mode where route-level payloads exist but no processor records.

Note: requires ALTER TABLE migration to add the new columns.
2026-03-24 16:12:46 +01:00
e7835e1100 feat: map engineLevel and route-level snapshots in IngestionService
Some checks failed
CI / docker (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / deploy-feature (push) Has been cancelled
CI / cleanup-branch (push) Has been cancelled
CI / build (push) Has been cancelled
Extract inputBody/outputBody/inputHeaders/outputHeaders from RouteExecution
snapshots and pass to ExecutionRecord. Maps engineLevel field. Critical for
REGULAR mode where no processor records exist but route-level payloads do.
2026-03-24 16:11:55 +01:00
ed65b87af2 feat: add engineLevel and route-level snapshot fields to ExecutionRecord
Some checks failed
CI / docker (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / deploy-feature (push) Has been cancelled
CI / cleanup-branch (push) Has been cancelled
CI / build (push) Has been cancelled
Adds engineLevel (NONE/MINIMAL/REGULAR/COMPLETE) and inputBody/outputBody/
inputHeaders/outputHeaders to ExecutionRecord so REGULAR mode route-level
payloads are persisted (previously only processor-level records had payloads).
2026-03-24 16:11:26 +01:00
4a99e6cf6b feat: support enriched command ack with status/message + set-traced-processors command type
Some checks failed
CI / docker (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / deploy-feature (push) Has been cancelled
CI / cleanup-branch (push) Has been cancelled
CI / build (push) Has been cancelled
- Add @RequestBody(required=false) CommandAckRequest to ack endpoint for
  receiving agent command results (backward compat with old agents)
- Record command results in agent event log via AgentEventService
- Add set-traced-processors to mapCommandType switch
- Inject AgentEventService dependency
2026-03-24 16:11:04 +01:00
4d9a9ff851 feat: add CommandAckRequest DTO for enriched command acknowledgments
Some checks failed
CI / build (push) Has started running
CI / docker (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / deploy-feature (push) Has been cancelled
CI / cleanup-branch (push) Has been cancelled
2026-03-24 16:10:27 +01:00
292a38fe30 feat: add SET_TRACED_PROCESSORS command type for per-processor overrides
Some checks failed
CI / docker (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / deploy-feature (push) Has been cancelled
CI / cleanup-branch (push) Has been cancelled
CI / build (push) Has been cancelled
2026-03-24 16:10:21 +01:00
7 changed files with 84 additions and 11 deletions

View File

@@ -1,10 +1,12 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.agent.SseConnectionManager;
import com.cameleer3.server.app.dto.CommandAckRequest;
import com.cameleer3.server.app.dto.CommandBroadcastResponse;
import com.cameleer3.server.app.dto.CommandRequest;
import com.cameleer3.server.app.dto.CommandSingleResponse;
import com.cameleer3.server.core.agent.AgentCommand;
import com.cameleer3.server.core.agent.AgentEventService;
import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.agent.AgentState;
@@ -48,18 +50,21 @@ public class AgentCommandController {
private final AgentRegistryService registryService;
private final SseConnectionManager connectionManager;
private final ObjectMapper objectMapper;
private final AgentEventService agentEventService;
public AgentCommandController(AgentRegistryService registryService,
SseConnectionManager connectionManager,
ObjectMapper objectMapper) {
ObjectMapper objectMapper,
AgentEventService agentEventService) {
this.registryService = registryService;
this.connectionManager = connectionManager;
this.objectMapper = objectMapper;
this.agentEventService = agentEventService;
}
@PostMapping("/{id}/commands")
@Operation(summary = "Send command to a specific agent",
description = "Sends a config-update, deep-trace, or replay command to the specified agent")
description = "Sends a command to the specified agent via SSE")
@ApiResponse(responseCode = "202", description = "Command accepted")
@ApiResponse(responseCode = "400", description = "Invalid command payload")
@ApiResponse(responseCode = "404", description = "Agent not registered")
@@ -128,15 +133,26 @@ public class AgentCommandController {
@PostMapping("/{id}/commands/{commandId}/ack")
@Operation(summary = "Acknowledge command receipt",
description = "Agent acknowledges that it has received and processed a command")
description = "Agent acknowledges that it has received and processed a command, with result status and message")
@ApiResponse(responseCode = "200", description = "Command acknowledged")
@ApiResponse(responseCode = "404", description = "Command not found")
public ResponseEntity<Void> acknowledgeCommand(@PathVariable String id,
@PathVariable String commandId) {
@PathVariable String commandId,
@RequestBody(required = false) CommandAckRequest body) {
boolean acknowledged = registryService.acknowledgeCommand(id, commandId);
if (!acknowledged) {
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Command not found: " + commandId);
}
// Record command result in agent event log
if (body != null && body.status() != null) {
AgentInfo agent = registryService.findById(id);
String application = agent != null ? agent.application() : "unknown";
agentEventService.recordEvent(id, application, "COMMAND_" + body.status(),
"Command " + commandId + ": " + body.message());
log.debug("Command {} ack from agent {}: {} - {}", commandId, id, body.status(), body.message());
}
return ResponseEntity.ok().build();
}
@@ -145,8 +161,9 @@ public class AgentCommandController {
case "config-update" -> CommandType.CONFIG_UPDATE;
case "deep-trace" -> CommandType.DEEP_TRACE;
case "replay" -> CommandType.REPLAY;
case "set-traced-processors" -> CommandType.SET_TRACED_PROCESSORS;
default -> throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
"Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay");
"Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay, set-traced-processors");
};
}
}

View File

@@ -0,0 +1,10 @@
package com.cameleer3.server.app.dto;
/**
* Request body for command acknowledgment from agents.
* Contains the result status and message of the command execution.
*
* @param status "SUCCESS" or "FAILURE"
* @param message human-readable description of the result
*/
public record CommandAckRequest(String status, String message) {}

View File

@@ -27,8 +27,9 @@ public class PostgresExecutionStore implements ExecutionStore {
INSERT INTO executions (execution_id, route_id, agent_id, application_name,
status, correlation_id, exchange_id, start_time, end_time,
duration_ms, error_message, error_stacktrace, diagram_content_hash,
engine_level, input_body, output_body, input_headers, output_headers,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, now(), now())
ON CONFLICT (execution_id, start_time) DO UPDATE SET
status = CASE
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
@@ -42,6 +43,11 @@ public class PostgresExecutionStore implements ExecutionStore {
error_message = COALESCE(EXCLUDED.error_message, executions.error_message),
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace),
diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash),
engine_level = COALESCE(EXCLUDED.engine_level, executions.engine_level),
input_body = COALESCE(EXCLUDED.input_body, executions.input_body),
output_body = COALESCE(EXCLUDED.output_body, executions.output_body),
input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers),
output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers),
updated_at = now()
""",
execution.executionId(), execution.routeId(), execution.agentId(),
@@ -50,7 +56,10 @@ public class PostgresExecutionStore implements ExecutionStore {
Timestamp.from(execution.startTime()),
execution.endTime() != null ? Timestamp.from(execution.endTime()) : null,
execution.durationMs(), execution.errorMessage(),
execution.errorStacktrace(), execution.diagramContentHash());
execution.errorStacktrace(), execution.diagramContentHash(),
execution.engineLevel(),
execution.inputBody(), execution.outputBody(),
execution.inputHeaders(), execution.outputHeaders());
}
@Override
@@ -109,7 +118,10 @@ public class PostgresExecutionStore implements ExecutionStore {
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
rs.getString("error_message"), rs.getString("error_stacktrace"),
rs.getString("diagram_content_hash"));
rs.getString("diagram_content_hash"),
rs.getString("engine_level"),
rs.getString("input_body"), rs.getString("output_body"),
rs.getString("input_headers"), rs.getString("output_headers"));
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
new ProcessorRecord(

View File

@@ -0,0 +1,9 @@
-- Add engine level and route-level snapshot columns to executions table.
-- Required for REGULAR engine level where route-level payloads exist but
-- no processor execution records are created.
ALTER TABLE executions ADD COLUMN IF NOT EXISTS engine_level VARCHAR(16);
ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_body TEXT;
ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_body TEXT;
ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_headers JSONB;
ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_headers JSONB;

View File

@@ -6,5 +6,6 @@ package com.cameleer3.server.core.agent;
public enum CommandType {
CONFIG_UPDATE,
DEEP_TRACE,
REPLAY
REPLAY,
SET_TRACED_PROCESSORS
}

View File

@@ -1,5 +1,6 @@
package com.cameleer3.server.core.ingestion;
import com.cameleer3.common.model.ExchangeSnapshot;
import com.cameleer3.common.model.ProcessorExecution;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.indexing.ExecutionUpdatedEvent;
@@ -77,6 +78,25 @@ public class IngestionService {
String diagramHash = diagramStore
.findContentHashForRoute(exec.getRouteId(), agentId)
.orElse("");
// Extract route-level snapshots (critical for REGULAR mode where no processors are recorded)
String inputBody = null;
String outputBody = null;
String inputHeaders = null;
String outputHeaders = null;
ExchangeSnapshot inputSnapshot = exec.getInputSnapshot();
if (inputSnapshot != null) {
inputBody = truncateBody(inputSnapshot.getBody());
inputHeaders = toJson(inputSnapshot.getHeaders());
}
ExchangeSnapshot outputSnapshot = exec.getOutputSnapshot();
if (outputSnapshot != null) {
outputBody = truncateBody(outputSnapshot.getBody());
outputHeaders = toJson(outputSnapshot.getHeaders());
}
return new ExecutionRecord(
exec.getExchangeId(), exec.getRouteId(), agentId, applicationName,
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
@@ -84,7 +104,9 @@ public class IngestionService {
exec.getStartTime(), exec.getEndTime(),
exec.getDurationMs(),
exec.getErrorMessage(), exec.getErrorStackTrace(),
diagramHash
diagramHash,
exec.getEngineLevel(),
inputBody, outputBody, inputHeaders, outputHeaders
);
}

View File

@@ -20,7 +20,9 @@ public interface ExecutionStore {
String executionId, String routeId, String agentId, String applicationName,
String status, String correlationId, String exchangeId,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace, String diagramContentHash
String errorMessage, String errorStacktrace, String diagramContentHash,
String engineLevel,
String inputBody, String outputBody, String inputHeaders, String outputHeaders
) {}
record ProcessorRecord(