From 292a38fe3082d859784355e34fe5fb42f3444d13 Mon Sep 17 00:00:00 2001 From: claude Date: Tue, 24 Mar 2026 16:10:21 +0100 Subject: [PATCH 1/7] feat: add SET_TRACED_PROCESSORS command type for per-processor overrides --- .../main/java/com/cameleer3/server/core/agent/CommandType.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java index f9295fd1..585239b9 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/CommandType.java @@ -6,5 +6,6 @@ package com.cameleer3.server.core.agent; public enum CommandType { CONFIG_UPDATE, DEEP_TRACE, - REPLAY + REPLAY, + SET_TRACED_PROCESSORS } From 4d9a9ff8518fdfb40f1b2c07e1e1299d21d6700c Mon Sep 17 00:00:00 2001 From: claude Date: Tue, 24 Mar 2026 16:10:27 +0100 Subject: [PATCH 2/7] feat: add CommandAckRequest DTO for enriched command acknowledgments --- .../cameleer3/server/app/dto/CommandAckRequest.java | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/CommandAckRequest.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/CommandAckRequest.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/CommandAckRequest.java new file mode 100644 index 00000000..81ff245b --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/CommandAckRequest.java @@ -0,0 +1,10 @@ +package com.cameleer3.server.app.dto; + +/** + * Request body for command acknowledgment from agents. + * Contains the result status and message of the command execution. + * + * @param status "SUCCESS" or "FAILURE" + * @param message human-readable description of the result + */ +public record CommandAckRequest(String status, String message) {} From 4a99e6cf6bee815a18e6ca9e046e4aecdf829c35 Mon Sep 17 00:00:00 2001 From: claude Date: Tue, 24 Mar 2026 16:11:04 +0100 Subject: [PATCH 3/7] feat: support enriched command ack with status/message + set-traced-processors command type - Add @RequestBody(required=false) CommandAckRequest to ack endpoint for receiving agent command results (backward compat with old agents) - Record command results in agent event log via AgentEventService - Add set-traced-processors to mapCommandType switch - Inject AgentEventService dependency --- .../controller/AgentCommandController.java | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java index 9d34cb7d..6fd1a449 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentCommandController.java @@ -1,10 +1,12 @@ package com.cameleer3.server.app.controller; import com.cameleer3.server.app.agent.SseConnectionManager; +import com.cameleer3.server.app.dto.CommandAckRequest; import com.cameleer3.server.app.dto.CommandBroadcastResponse; import com.cameleer3.server.app.dto.CommandRequest; import com.cameleer3.server.app.dto.CommandSingleResponse; import com.cameleer3.server.core.agent.AgentCommand; +import com.cameleer3.server.core.agent.AgentEventService; import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.agent.AgentState; @@ -48,18 +50,21 @@ public class AgentCommandController { private final AgentRegistryService registryService; private final SseConnectionManager connectionManager; private final ObjectMapper objectMapper; + private final AgentEventService agentEventService; public AgentCommandController(AgentRegistryService registryService, SseConnectionManager connectionManager, - ObjectMapper objectMapper) { + ObjectMapper objectMapper, + AgentEventService agentEventService) { this.registryService = registryService; this.connectionManager = connectionManager; this.objectMapper = objectMapper; + this.agentEventService = agentEventService; } @PostMapping("/{id}/commands") @Operation(summary = "Send command to a specific agent", - description = "Sends a config-update, deep-trace, or replay command to the specified agent") + description = "Sends a command to the specified agent via SSE") @ApiResponse(responseCode = "202", description = "Command accepted") @ApiResponse(responseCode = "400", description = "Invalid command payload") @ApiResponse(responseCode = "404", description = "Agent not registered") @@ -128,15 +133,26 @@ public class AgentCommandController { @PostMapping("/{id}/commands/{commandId}/ack") @Operation(summary = "Acknowledge command receipt", - description = "Agent acknowledges that it has received and processed a command") + description = "Agent acknowledges that it has received and processed a command, with result status and message") @ApiResponse(responseCode = "200", description = "Command acknowledged") @ApiResponse(responseCode = "404", description = "Command not found") public ResponseEntity acknowledgeCommand(@PathVariable String id, - @PathVariable String commandId) { + @PathVariable String commandId, + @RequestBody(required = false) CommandAckRequest body) { boolean acknowledged = registryService.acknowledgeCommand(id, commandId); if (!acknowledged) { throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Command not found: " + commandId); } + + // Record command result in agent event log + if (body != null && body.status() != null) { + AgentInfo agent = registryService.findById(id); + String application = agent != null ? agent.application() : "unknown"; + agentEventService.recordEvent(id, application, "COMMAND_" + body.status(), + "Command " + commandId + ": " + body.message()); + log.debug("Command {} ack from agent {}: {} - {}", commandId, id, body.status(), body.message()); + } + return ResponseEntity.ok().build(); } @@ -145,8 +161,9 @@ public class AgentCommandController { case "config-update" -> CommandType.CONFIG_UPDATE; case "deep-trace" -> CommandType.DEEP_TRACE; case "replay" -> CommandType.REPLAY; + case "set-traced-processors" -> CommandType.SET_TRACED_PROCESSORS; default -> throw new ResponseStatusException(HttpStatus.BAD_REQUEST, - "Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay"); + "Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay, set-traced-processors"); }; } } From ed65b87af27258e5e09637db017c89d47afef608 Mon Sep 17 00:00:00 2001 From: claude Date: Tue, 24 Mar 2026 16:11:26 +0100 Subject: [PATCH 4/7] feat: add engineLevel and route-level snapshot fields to ExecutionRecord Adds engineLevel (NONE/MINIMAL/REGULAR/COMPLETE) and inputBody/outputBody/ inputHeaders/outputHeaders to ExecutionRecord so REGULAR mode route-level payloads are persisted (previously only processor-level records had payloads). --- .../com/cameleer3/server/core/storage/ExecutionStore.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java index 000a6acb..d335c9c2 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java @@ -20,7 +20,9 @@ public interface ExecutionStore { String executionId, String routeId, String agentId, String applicationName, String status, String correlationId, String exchangeId, Instant startTime, Instant endTime, Long durationMs, - String errorMessage, String errorStacktrace, String diagramContentHash + String errorMessage, String errorStacktrace, String diagramContentHash, + String engineLevel, + String inputBody, String outputBody, String inputHeaders, String outputHeaders ) {} record ProcessorRecord( From e7835e1100e3e5fb39678e1e9aa5ed97869989ad Mon Sep 17 00:00:00 2001 From: claude Date: Tue, 24 Mar 2026 16:11:55 +0100 Subject: [PATCH 5/7] feat: map engineLevel and route-level snapshots in IngestionService Extract inputBody/outputBody/inputHeaders/outputHeaders from RouteExecution snapshots and pass to ExecutionRecord. Maps engineLevel field. Critical for REGULAR mode where no processor records exist but route-level payloads do. --- .../core/ingestion/IngestionService.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java index 3d02fec0..119defcc 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java @@ -1,5 +1,6 @@ package com.cameleer3.server.core.ingestion; +import com.cameleer3.common.model.ExchangeSnapshot; import com.cameleer3.common.model.ProcessorExecution; import com.cameleer3.common.model.RouteExecution; import com.cameleer3.server.core.indexing.ExecutionUpdatedEvent; @@ -77,6 +78,25 @@ public class IngestionService { String diagramHash = diagramStore .findContentHashForRoute(exec.getRouteId(), agentId) .orElse(""); + + // Extract route-level snapshots (critical for REGULAR mode where no processors are recorded) + String inputBody = null; + String outputBody = null; + String inputHeaders = null; + String outputHeaders = null; + + ExchangeSnapshot inputSnapshot = exec.getInputSnapshot(); + if (inputSnapshot != null) { + inputBody = truncateBody(inputSnapshot.getBody()); + inputHeaders = toJson(inputSnapshot.getHeaders()); + } + + ExchangeSnapshot outputSnapshot = exec.getOutputSnapshot(); + if (outputSnapshot != null) { + outputBody = truncateBody(outputSnapshot.getBody()); + outputHeaders = toJson(outputSnapshot.getHeaders()); + } + return new ExecutionRecord( exec.getExchangeId(), exec.getRouteId(), agentId, applicationName, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING", @@ -84,7 +104,9 @@ public class IngestionService { exec.getStartTime(), exec.getEndTime(), exec.getDurationMs(), exec.getErrorMessage(), exec.getErrorStackTrace(), - diagramHash + diagramHash, + exec.getEngineLevel(), + inputBody, outputBody, inputHeaders, outputHeaders ); } From b1679b110c1335afe2e356adb6d6e0ae8ce38f33 Mon Sep 17 00:00:00 2001 From: claude Date: Tue, 24 Mar 2026 16:12:46 +0100 Subject: [PATCH 6/7] feat: add engine_level and route-level snapshot columns to PostgresExecutionStore Add engine_level, input_body, output_body, input_headers, output_headers to the executions INSERT/SELECT/UPSERT and row mapper. Required for REGULAR mode where route-level payloads exist but no processor records. Note: requires ALTER TABLE migration to add the new columns. --- .../app/storage/PostgresExecutionStore.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java index 53535d1d..3f81fa3c 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java @@ -27,8 +27,9 @@ public class PostgresExecutionStore implements ExecutionStore { INSERT INTO executions (execution_id, route_id, agent_id, application_name, status, correlation_id, exchange_id, start_time, end_time, duration_ms, error_message, error_stacktrace, diagram_content_hash, + engine_level, input_body, output_body, input_headers, output_headers, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now()) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, now(), now()) ON CONFLICT (execution_id, start_time) DO UPDATE SET status = CASE WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED') @@ -42,6 +43,11 @@ public class PostgresExecutionStore implements ExecutionStore { error_message = COALESCE(EXCLUDED.error_message, executions.error_message), error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace), diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash), + engine_level = COALESCE(EXCLUDED.engine_level, executions.engine_level), + input_body = COALESCE(EXCLUDED.input_body, executions.input_body), + output_body = COALESCE(EXCLUDED.output_body, executions.output_body), + input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers), + output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers), updated_at = now() """, execution.executionId(), execution.routeId(), execution.agentId(), @@ -50,7 +56,10 @@ public class PostgresExecutionStore implements ExecutionStore { Timestamp.from(execution.startTime()), execution.endTime() != null ? Timestamp.from(execution.endTime()) : null, execution.durationMs(), execution.errorMessage(), - execution.errorStacktrace(), execution.diagramContentHash()); + execution.errorStacktrace(), execution.diagramContentHash(), + execution.engineLevel(), + execution.inputBody(), execution.outputBody(), + execution.inputHeaders(), execution.outputHeaders()); } @Override @@ -109,7 +118,10 @@ public class PostgresExecutionStore implements ExecutionStore { toInstant(rs, "start_time"), toInstant(rs, "end_time"), rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, rs.getString("error_message"), rs.getString("error_stacktrace"), - rs.getString("diagram_content_hash")); + rs.getString("diagram_content_hash"), + rs.getString("engine_level"), + rs.getString("input_body"), rs.getString("output_body"), + rs.getString("input_headers"), rs.getString("output_headers")); private static final RowMapper PROCESSOR_MAPPER = (rs, rowNum) -> new ProcessorRecord( From 2887fe95991f1f080ca841832151ccf88f5a2c65 Mon Sep 17 00:00:00 2001 From: claude Date: Tue, 24 Mar 2026 16:13:11 +0100 Subject: [PATCH 7/7] feat: add V3 migration for engine_level and route-level snapshot columns --- .../db/migration/V3__engine_level_and_snapshots.sql | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 cameleer3-server-app/src/main/resources/db/migration/V3__engine_level_and_snapshots.sql diff --git a/cameleer3-server-app/src/main/resources/db/migration/V3__engine_level_and_snapshots.sql b/cameleer3-server-app/src/main/resources/db/migration/V3__engine_level_and_snapshots.sql new file mode 100644 index 00000000..a8d65e14 --- /dev/null +++ b/cameleer3-server-app/src/main/resources/db/migration/V3__engine_level_and_snapshots.sql @@ -0,0 +1,9 @@ +-- Add engine level and route-level snapshot columns to executions table. +-- Required for REGULAR engine level where route-level payloads exist but +-- no processor execution records are created. + +ALTER TABLE executions ADD COLUMN IF NOT EXISTS engine_level VARCHAR(16); +ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_body TEXT; +ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_body TEXT; +ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_headers JSONB; +ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_headers JSONB;