feat(sse): enforce signing handoff — fail visibility, capability surfacing, optional hard-gate

- SseConnectionManager: wrap signPayload in try/catch; on failure log at ERROR,
  increment cameleer.sse.signing.failures{command_type=...}, return early without
  delivering. ServerMetrics injected via @Lazy setter to break circular dep with
  the existing gauge that polls SseConnectionManager.getConnectionCount().
- ServerMetrics: add recordSseSigningFailure(commandType) and
  recordCommandToNonCapableAgent(commandType) using lazy ConcurrentHashMap counter
  cache (cameleer.sse.signing.failures / cameleer.sse.commands.non_capable_target).
- AgentInstanceResponse: expose requireSignedCommands as last record field so
  operators can see fleet signing-capability coverage via the agent list API.
- AgentCommandController: WARN + metric when targeting non-capable agent in
  sendCommand and sendGroupCommand; summary WARN for broadcast (no per-agent noise).
  Hard-gate (409) behind cameleer.server.security.enforce-signed-commands (default
  false). Group enforcement refuses the whole batch — loud failure preferred over
  silent partial fan-out. Broadcast skips enforcement (blocking an ops broadcast
  because one old agent is still in fleet is too disruptive; documented in comment).
- SecurityProperties: add enforceSignedCommands boolean (default false), bound from
  cameleer.server.security.enforce-signed-commands.

Tests: SseConnectionManagerSigningFailureIT (unit — mock signer throws, assert no
delivery + metric); AgentCommandEnforcementIT (IT — flag-off 202, flag-on non-capable
409, flag-on capable 202). All 13 targeted ITs + 299 unit tests green.

Per cameleer/docs/server-team-sse-command-signing.md handoff migration steps 2 and 4.
Default flag off keeps universal-signing rollout safe for mixed-fleet customers.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-29 11:27:21 +02:00
parent ad4be0d7a6
commit ccf31a4067
7 changed files with 405 additions and 6 deletions

View File

@@ -1,6 +1,7 @@
package io.cameleer.server.app.agent;
import io.cameleer.server.app.config.AgentRegistryConfig;
import io.cameleer.server.app.metrics.ServerMetrics;
import io.cameleer.server.core.agent.AgentCommand;
import io.cameleer.server.core.agent.AgentEventListener;
import io.cameleer.server.core.agent.AgentRegistryService;
@@ -9,6 +10,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -35,6 +38,10 @@ public class SseConnectionManager implements AgentEventListener {
private final AgentRegistryConfig config;
private final SsePayloadSigner ssePayloadSigner;
private final ObjectMapper objectMapper;
// Injected via setter to break the ServerMetrics <-> SseConnectionManager circular dependency.
// ServerMetrics holds a gauge over SseConnectionManager.getConnectionCount(), so constructor
// injection in either direction creates a cycle. @Lazy proxy defers the actual bean lookup.
private ServerMetrics serverMetrics;
public SseConnectionManager(AgentRegistryService registryService, AgentRegistryConfig config,
SsePayloadSigner ssePayloadSigner, ObjectMapper objectMapper) {
@@ -44,6 +51,11 @@ public class SseConnectionManager implements AgentEventListener {
this.objectMapper = objectMapper;
}
@Autowired
public void setServerMetrics(@Lazy ServerMetrics serverMetrics) {
this.serverMetrics = serverMetrics;
}
@PostConstruct
void init() {
registryService.setEventListener(this);
@@ -159,7 +171,15 @@ public class SseConnectionManager implements AgentEventListener {
@Override
public void onCommandReady(String agentId, AgentCommand command) {
String eventType = command.type().name().toLowerCase().replace('_', '-');
String signedPayload = ssePayloadSigner.signPayload(command.payload());
String signedPayload;
try {
signedPayload = ssePayloadSigner.signPayload(command.payload());
} catch (RuntimeException e) {
log.error("SSE signing failed for command {} ({}) targeting agent {}: {}",
command.id(), eventType, agentId, e.getMessage());
serverMetrics.recordSseSigningFailure(eventType);
return; // command stays PENDING; expires per commandExpiryMs
}
// Parse to JsonNode so SseEmitter serializes the tree correctly (avoids double-quoting a raw string)
Object data;
try {

View File

@@ -2,7 +2,9 @@ package io.cameleer.server.app.controller;
import io.cameleer.server.app.agent.SseConnectionManager;
import io.cameleer.server.app.dto.CommandAckRequest;
import io.cameleer.server.app.metrics.ServerMetrics;
import io.cameleer.server.app.security.AgentOwnershipGuard;
import io.cameleer.server.app.security.SecurityProperties;
import io.cameleer.server.app.dto.CommandBroadcastResponse;
import io.cameleer.server.app.dto.CommandGroupResponse;
import io.cameleer.server.app.dto.CommandRequest;
@@ -70,19 +72,25 @@ public class AgentCommandController {
private final AgentEventService agentEventService;
private final AuditService auditService;
private final AgentOwnershipGuard ownershipGuard;
private final ServerMetrics serverMetrics;
private final SecurityProperties securityProperties;
public AgentCommandController(AgentRegistryService registryService,
SseConnectionManager connectionManager,
ObjectMapper objectMapper,
AgentEventService agentEventService,
AuditService auditService,
AgentOwnershipGuard ownershipGuard) {
AgentOwnershipGuard ownershipGuard,
ServerMetrics serverMetrics,
SecurityProperties securityProperties) {
this.registryService = registryService;
this.connectionManager = connectionManager;
this.objectMapper = objectMapper;
this.agentEventService = agentEventService;
this.auditService = auditService;
this.ownershipGuard = ownershipGuard;
this.serverMetrics = serverMetrics;
this.securityProperties = securityProperties;
}
@PostMapping("/{id}/commands")
@@ -100,6 +108,23 @@ public class AgentCommandController {
}
CommandType type = mapCommandType(request.type());
// E4: warn when targeting a non-capable agent (one that won't verify the signature)
if (!agent.requireSignedCommands()) {
log.warn("Operator sent {} command to agent {} which does not advertise requireSignedCommands=true. " +
"The agent will accept the signed payload but won't actually verify the signature. Upgrade the agent.",
type, id);
serverMetrics.recordCommandToNonCapableAgent(type.name());
}
// E5: hard-enforce when flag is enabled
if (securityProperties.isEnforceSignedCommands() && !agent.requireSignedCommands()) {
throw new ResponseStatusException(HttpStatus.CONFLICT,
"Refusing to send " + type + " command to agent " + id + ": agent does not advertise " +
"requireSignedCommands=true and cameleer.server.security.enforce-signed-commands is enabled. " +
"Upgrade the agent or disable enforcement.");
}
String payloadJson = request.payload() != null ? objectMapper.writeValueAsString(request.payload()) : "{}";
AgentCommand command = registryService.addCommand(id, type, payloadJson);
@@ -123,6 +148,36 @@ public class AgentCommandController {
@RequestBody CommandRequest request,
HttpServletRequest httpRequest) throws JsonProcessingException {
CommandType type = mapCommandType(request.type());
// E4/E5: per-agent capability check before dispatching to the group.
// We resolve the group members the same way addGroupCommandWithReplies does:
// by applicationId (== group) and optional environmentId.
List<AgentInfo> groupMembers = registryService.findByState(AgentState.LIVE).stream()
.filter(a -> group.equals(a.applicationId()))
.filter(a -> environment == null || environment.equals(a.environmentId()))
.toList();
List<String> nonCapableIds = groupMembers.stream()
.filter(a -> !a.requireSignedCommands())
.map(AgentInfo::instanceId)
.toList();
if (!nonCapableIds.isEmpty()) {
log.warn("Operator sent {} command to group {} — {} of {} agents do not advertise " +
"requireSignedCommands=true (ids: {}). They will accept but not verify the signature. Upgrade the agents.",
type, group, nonCapableIds.size(), groupMembers.size(), nonCapableIds);
nonCapableIds.forEach(agentId -> serverMetrics.recordCommandToNonCapableAgent(type.name()));
}
// E5: refuse the whole batch — loud failure is better than silent partial fan-out
if (securityProperties.isEnforceSignedCommands() && !nonCapableIds.isEmpty()) {
throw new ResponseStatusException(HttpStatus.CONFLICT,
"Refusing to send " + type + " command to group " + group + ": " +
nonCapableIds.size() + " agent(s) do not advertise requireSignedCommands=true " +
"and cameleer.server.security.enforce-signed-commands is enabled. " +
"Non-capable agents: " + nonCapableIds + ". Upgrade the agents or disable enforcement.");
}
String payloadJson = request.payload() != null ? objectMapper.writeValueAsString(request.payload()) : "{}";
Map<String, CompletableFuture<CommandReply>> futures =
@@ -190,6 +245,16 @@ public class AgentCommandController {
.toList();
}
// E4: emit a single summary warning for the whole broadcast scope; per-agent warns
// would be too noisy in large fleets. Broadcast skips E5 enforcement — blocking an
// ops broadcast because one old agent is still in the fleet is too disruptive.
long nonCapableBroadcastCount = liveAgents.stream().filter(a -> !a.requireSignedCommands()).count();
if (nonCapableBroadcastCount > 0) {
log.warn("Broadcast {} command targets {} live agent(s) of which {} do not advertise " +
"requireSignedCommands=true. They will accept but not verify the signature. Upgrade those agents.",
type, liveAgents.size(), nonCapableBroadcastCount);
}
List<String> commandIds = new ArrayList<>();
for (AgentInfo agent : liveAgents) {
AgentCommand command = registryService.addCommand(agent.instanceId(), type, payloadJson);

View File

@@ -26,7 +26,8 @@ public record AgentInstanceResponse(
int activeRoutes,
int totalRoutes,
long uptimeSeconds,
@Schema(description = "Recent average CPU usage (0.01.0), -1 if unavailable") double cpuUsage
@Schema(description = "Recent average CPU usage (0.01.0), -1 if unavailable") double cpuUsage,
@Schema(description = "True if the agent advertises support for verifying signed SSE commands") boolean requireSignedCommands
) {
public static AgentInstanceResponse from(AgentInfo info) {
long uptime = Duration.between(info.registeredAt(), Instant.now()).toSeconds();
@@ -38,7 +39,8 @@ public record AgentInstanceResponse(
info.version(), info.capabilities(),
0.0, 0.0,
0, info.routeIds() != null ? info.routeIds().size() : 0,
uptime, -1
uptime, -1,
info.requireSignedCommands()
);
}
@@ -47,7 +49,8 @@ public record AgentInstanceResponse(
instanceId, displayName, applicationId, environmentId,
status, routeIds, registeredAt, lastHeartbeat,
version, capabilities,
tps, errorRate, activeRoutes, totalRoutes, uptimeSeconds, cpuUsage
tps, errorRate, activeRoutes, totalRoutes, uptimeSeconds, cpuUsage,
requireSignedCommands
);
}
@@ -56,7 +59,8 @@ public record AgentInstanceResponse(
instanceId, displayName, applicationId, environmentId,
status, routeIds, registeredAt, lastHeartbeat,
version, capabilities,
tps, errorRate, activeRoutes, totalRoutes, uptimeSeconds, cpuUsage
tps, errorRate, activeRoutes, totalRoutes, uptimeSeconds, cpuUsage,
requireSignedCommands
);
}
}

View File

@@ -15,6 +15,7 @@ import io.micrometer.core.instrument.Timer;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Component
@@ -187,4 +188,30 @@ public class ServerMetrics {
case "oidc_rejected" -> authFailuresOidc.increment();
}
}
// ── SSE signing failures ────────────────────────────────────────────
private final ConcurrentHashMap<String, Counter> sseSigningFailureCounters = new ConcurrentHashMap<>();
public void recordSseSigningFailure(String commandType) {
sseSigningFailureCounters.computeIfAbsent(commandType, ct ->
Counter.builder("cameleer.sse.signing.failures")
.description("Count of SSE command signing failures, by command type")
.tag("command_type", ct)
.register(registry)
).increment();
}
// ── Commands to non-capable agents ─────────────────────────────────
private final ConcurrentHashMap<String, Counter> nonCapableTargetCounters = new ConcurrentHashMap<>();
public void recordCommandToNonCapableAgent(String commandType) {
nonCapableTargetCounters.computeIfAbsent(commandType, ct ->
Counter.builder("cameleer.sse.commands.non_capable_target")
.description("Count of commands sent to agents that do not advertise requireSignedCommands=true")
.tag("command_type", ct)
.register(registry)
).increment();
}
}

View File

@@ -22,6 +22,7 @@ public class SecurityProperties {
private String uiOrigin;
private String jwtSecret;
private String corsAllowedOrigins;
private boolean enforceSignedCommands = false;
private Oidc oidc = new Oidc();
public static class Oidc {
@@ -58,6 +59,8 @@ public class SecurityProperties {
public void setJwtSecret(String jwtSecret) { this.jwtSecret = jwtSecret; }
public String getCorsAllowedOrigins() { return corsAllowedOrigins; }
public void setCorsAllowedOrigins(String corsAllowedOrigins) { this.corsAllowedOrigins = corsAllowedOrigins; }
public boolean isEnforceSignedCommands() { return enforceSignedCommands; }
public void setEnforceSignedCommands(boolean enforceSignedCommands) { this.enforceSignedCommands = enforceSignedCommands; }
public Oidc getOidc() { return oidc; }
public void setOidc(Oidc oidc) { this.oidc = oidc; }

View File

@@ -0,0 +1,143 @@
package io.cameleer.server.app.agent;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cameleer.server.app.config.AgentRegistryConfig;
import io.cameleer.server.app.metrics.ServerMetrics;
import io.cameleer.server.core.agent.AgentCommand;
import io.cameleer.server.core.agent.AgentRegistryService;
import io.cameleer.server.core.agent.CommandStatus;
import io.cameleer.server.core.agent.CommandType;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.time.Instant;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Unit-style test for SseConnectionManager signing failure handling (E1).
* <p>
* Verifies that when {@link SsePayloadSigner#signPayload} throws, the manager:
* <ul>
* <li>does NOT send any SSE event,</li>
* <li>does NOT call markDelivered,</li>
* <li>increments the {@code cameleer.sse.signing.failures} counter.</li>
* </ul>
*/
@ExtendWith(MockitoExtension.class)
class SseConnectionManagerSigningFailureIT {
@Mock
private AgentRegistryService registryService;
@Mock
private AgentRegistryConfig agentRegistryConfig;
@Mock
private SsePayloadSigner ssePayloadSigner;
@Mock
private ServerMetrics serverMetrics;
private SseConnectionManager manager;
@BeforeEach
void setUp() {
manager = new SseConnectionManager(
registryService, agentRegistryConfig, ssePayloadSigner, new ObjectMapper());
// Inject via setter (mirrors production wiring — breaks circular dep with ServerMetrics)
manager.setServerMetrics(serverMetrics);
}
@Test
void onCommandReady_whenSigningThrows_doesNotSendAndIncrementsCounter() {
// Arrange
AgentCommand command = new AgentCommand(
"cmd-fail-1",
CommandType.CONFIG_UPDATE,
"{\"key\":\"value\"}",
"agent-x",
Instant.now(),
CommandStatus.PENDING
);
when(ssePayloadSigner.signPayload(anyString()))
.thenThrow(new IllegalStateException("signing key not available"));
// Act
manager.onCommandReady("agent-x", command);
// Assert: no event sent (no emitter registered for agent-x anyway, but markDelivered must not be called)
verify(registryService, never()).markDelivered(anyString(), anyString());
// Assert: signing failure metric incremented with the correct command type tag
verify(serverMetrics).recordSseSigningFailure("config-update");
}
@Test
void onCommandReady_whenSigningThrowsIllegalArgument_doesNotDeliverAndCountsMetric() {
AgentCommand command = new AgentCommand(
"cmd-fail-2",
CommandType.DEEP_TRACE,
"{}",
"agent-y",
Instant.now(),
CommandStatus.PENDING
);
when(ssePayloadSigner.signPayload(anyString()))
.thenThrow(new IllegalArgumentException("null payload"));
manager.onCommandReady("agent-y", command);
verify(registryService, never()).markDelivered(anyString(), anyString());
verify(serverMetrics).recordSseSigningFailure("deep-trace");
}
/**
* Smoke test with a real SimpleMeterRegistry to verify the counter actually registers.
*/
@Test
void recordSseSigningFailure_incrementsRealCounter() {
SimpleMeterRegistry realRegistry = new SimpleMeterRegistry();
// Build a minimal ServerMetrics using only the fields needed for lazy counters.
// We re-use the existing recordSseSigningFailure method via a partial-real approach:
// construct ServerMetrics with all its dependencies mocked, but use a real registry.
io.cameleer.server.core.ingestion.WriteBuffer<io.cameleer.server.core.ingestion.MergedExecution> execBuf =
org.mockito.Mockito.mock(io.cameleer.server.core.ingestion.WriteBuffer.class);
io.cameleer.server.core.ingestion.WriteBuffer<io.cameleer.server.core.ingestion.ChunkAccumulator.ProcessorBatch> procBuf =
org.mockito.Mockito.mock(io.cameleer.server.core.ingestion.WriteBuffer.class);
io.cameleer.server.core.ingestion.WriteBuffer<io.cameleer.server.core.ingestion.BufferedLogEntry> logBuf =
org.mockito.Mockito.mock(io.cameleer.server.core.ingestion.WriteBuffer.class);
io.cameleer.server.core.ingestion.WriteBuffer<io.cameleer.server.core.storage.model.MetricsSnapshot> metBuf =
org.mockito.Mockito.mock(io.cameleer.server.core.ingestion.WriteBuffer.class);
io.cameleer.server.core.ingestion.ChunkAccumulator accumulator =
org.mockito.Mockito.mock(io.cameleer.server.core.ingestion.ChunkAccumulator.class);
SseConnectionManager dummyManager =
org.mockito.Mockito.mock(SseConnectionManager.class);
ServerMetrics realMetrics = new ServerMetrics(realRegistry, registryService, dummyManager,
execBuf, procBuf, logBuf, metBuf, accumulator);
realMetrics.recordSseSigningFailure("route-control");
realMetrics.recordSseSigningFailure("route-control");
Counter counter = realRegistry.find("cameleer.sse.signing.failures")
.tag("command_type", "route-control")
.counter();
assertThat(counter).isNotNull();
assertThat(counter.count()).isEqualTo(2.0);
}
}

View File

@@ -0,0 +1,137 @@
package io.cameleer.server.app.controller;
import io.cameleer.server.app.AbstractPostgresIT;
import io.cameleer.server.app.TestSecurityHelper;
import io.cameleer.server.core.agent.AgentRegistryService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.TestPropertySource;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Integration tests for E4 (operator warning) and E5 (hard enforcement) of the SSE signing handoff.
* <p>
* Two top-level classes share this file. Each spins up its own application context via
* {@code @TestPropertySource} to flip the {@code enforce-signed-commands} flag:
* <ul>
* <li>{@link EnforcementDisabled} — flag off (default): non-capable agent gets 202.</li>
* <li>{@link EnforcementEnabled} — flag on: non-capable gets 409, capable gets 202.</li>
* </ul>
*/
class AgentCommandEnforcementIT {
// ── Flag OFF ─────────────────────────────────────────────────────────────
@TestPropertySource(properties = "cameleer.server.security.enforce-signed-commands=false")
static class EnforcementDisabled extends AbstractPostgresIT {
@Autowired private TestRestTemplate restTemplate;
@Autowired private ObjectMapper objectMapper;
@Autowired private TestSecurityHelper securityHelper;
@Autowired private AgentRegistryService registryService;
private String operatorJwt;
@BeforeEach
void setUp() {
securityHelper.installSyntheticUnsignedLicense(Map.of("max_agents", 100));
operatorJwt = securityHelper.operatorToken();
}
@AfterEach
void tearDown() {
securityHelper.clearTestLicense();
}
@Test
void sendCommand_nonCapableAgent_returns202WhenEnforcementDisabled() throws Exception {
String agentId = "enforce-off-" + UUID.randomUUID().toString().substring(0, 8);
// requireSignedCommands=false (the default overload)
registryService.register(agentId, agentId, "test-app", "default", "1.0", List.of(), Map.of());
ResponseEntity<String> response = sendConfigUpdate(agentId);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.has("commandId")).isTrue();
}
private ResponseEntity<String> sendConfigUpdate(String agentId) {
return restTemplate.postForEntity(
"/api/v1/agents/" + agentId + "/commands",
new HttpEntity<>("""
{"type": "config-update", "payload": {"key": "value"}}
""", securityHelper.authHeaders(operatorJwt)),
String.class);
}
}
// ── Flag ON ──────────────────────────────────────────────────────────────
@TestPropertySource(properties = "cameleer.server.security.enforce-signed-commands=true")
static class EnforcementEnabled extends AbstractPostgresIT {
@Autowired private TestRestTemplate restTemplate;
@Autowired private ObjectMapper objectMapper;
@Autowired private TestSecurityHelper securityHelper;
@Autowired private AgentRegistryService registryService;
private String operatorJwt;
@BeforeEach
void setUp() {
securityHelper.installSyntheticUnsignedLicense(Map.of("max_agents", 100));
operatorJwt = securityHelper.operatorToken();
}
@AfterEach
void tearDown() {
securityHelper.clearTestLicense();
}
@Test
void sendCommand_nonCapableAgent_returns409WhenEnforcementEnabled() {
String agentId = "enforce-on-noncap-" + UUID.randomUUID().toString().substring(0, 8);
registryService.register(agentId, agentId, "test-app", "default", "1.0", List.of(), Map.of());
ResponseEntity<String> response = sendConfigUpdate(agentId);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.CONFLICT);
}
@Test
void sendCommand_capableAgent_returns202WhenEnforcementEnabled() throws Exception {
String agentId = "enforce-on-cap-" + UUID.randomUUID().toString().substring(0, 8);
// requireSignedCommands=true — agent has been upgraded to verify signatures
registryService.register(agentId, agentId, "test-app", "default", "2.0", List.of(), Map.of(), true);
ResponseEntity<String> response = sendConfigUpdate(agentId);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.has("commandId")).isTrue();
}
private ResponseEntity<String> sendConfigUpdate(String agentId) {
return restTemplate.postForEntity(
"/api/v1/agents/" + agentId + "/commands",
new HttpEntity<>("""
{"type": "config-update", "payload": {"key": "value"}}
""", securityHelper.authHeaders(operatorJwt)),
String.class);
}
}
}