feat(04-03): implement SSE payload signing with Ed25519
- SsePayloadSigner signs JSON payloads and adds signature field before SSE delivery - SseConnectionManager signs all command payloads via SsePayloadSigner before sendEvent - Signed payload parsed to JsonNode for correct SseEmitter serialization - Integration tests use bootstrap token + JWT auth (adapts to Plan 02 security layer) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -4,6 +4,8 @@ import com.cameleer3.server.app.config.AgentRegistryConfig;
|
|||||||
import com.cameleer3.server.core.agent.AgentCommand;
|
import com.cameleer3.server.core.agent.AgentCommand;
|
||||||
import com.cameleer3.server.core.agent.AgentEventListener;
|
import com.cameleer3.server.core.agent.AgentEventListener;
|
||||||
import com.cameleer3.server.core.agent.AgentRegistryService;
|
import com.cameleer3.server.core.agent.AgentRegistryService;
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@@ -31,10 +33,15 @@ public class SseConnectionManager implements AgentEventListener {
|
|||||||
private final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
|
||||||
private final AgentRegistryService registryService;
|
private final AgentRegistryService registryService;
|
||||||
private final AgentRegistryConfig config;
|
private final AgentRegistryConfig config;
|
||||||
|
private final SsePayloadSigner ssePayloadSigner;
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
public SseConnectionManager(AgentRegistryService registryService, AgentRegistryConfig config) {
|
public SseConnectionManager(AgentRegistryService registryService, AgentRegistryConfig config,
|
||||||
|
SsePayloadSigner ssePayloadSigner, ObjectMapper objectMapper) {
|
||||||
this.registryService = registryService;
|
this.registryService = registryService;
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
this.ssePayloadSigner = ssePayloadSigner;
|
||||||
|
this.objectMapper = objectMapper;
|
||||||
}
|
}
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
@@ -136,7 +143,16 @@ public class SseConnectionManager implements AgentEventListener {
|
|||||||
@Override
|
@Override
|
||||||
public void onCommandReady(String agentId, AgentCommand command) {
|
public void onCommandReady(String agentId, AgentCommand command) {
|
||||||
String eventType = command.type().name().toLowerCase().replace('_', '-');
|
String eventType = command.type().name().toLowerCase().replace('_', '-');
|
||||||
boolean sent = sendEvent(agentId, command.id(), eventType, command.payload());
|
String signedPayload = ssePayloadSigner.signPayload(command.payload());
|
||||||
|
// Parse to JsonNode so SseEmitter serializes the tree correctly (avoids double-quoting a raw string)
|
||||||
|
Object data;
|
||||||
|
try {
|
||||||
|
data = objectMapper.readTree(signedPayload);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Failed to parse signed payload as JSON, sending raw string", e);
|
||||||
|
data = signedPayload;
|
||||||
|
}
|
||||||
|
boolean sent = sendEvent(agentId, command.id(), eventType, data);
|
||||||
if (sent) {
|
if (sent) {
|
||||||
registryService.markDelivered(agentId, command.id());
|
registryService.markDelivered(agentId, command.id());
|
||||||
log.debug("Command {} ({}) delivered to agent {} via SSE", command.id(), eventType, agentId);
|
log.debug("Command {} ({}) delivered to agent {} via SSE", command.id(), eventType, agentId);
|
||||||
|
|||||||
@@ -0,0 +1,77 @@
|
|||||||
|
package com.cameleer3.server.app.agent;
|
||||||
|
|
||||||
|
import com.cameleer3.server.core.security.Ed25519SigningService;
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signs SSE command payloads with Ed25519 before delivery.
|
||||||
|
* <p>
|
||||||
|
* The signature is computed over the original JSON payload string (without the
|
||||||
|
* signature field). The resulting Base64-encoded signature is added as a
|
||||||
|
* {@code "signature"} field to the JSON before returning.
|
||||||
|
* <p>
|
||||||
|
* Agents verify the signature by:
|
||||||
|
* <ol>
|
||||||
|
* <li>Extracting and removing the {@code "signature"} field from the received JSON</li>
|
||||||
|
* <li>Serializing the remaining fields back to a JSON string</li>
|
||||||
|
* <li>Verifying the signature against that string using the server's Ed25519 public key</li>
|
||||||
|
* </ol>
|
||||||
|
* In practice, agents should verify against the original payload — the signature is
|
||||||
|
* computed over the exact JSON string as received by the server.
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class SsePayloadSigner {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SsePayloadSigner.class);
|
||||||
|
|
||||||
|
private final Ed25519SigningService ed25519SigningService;
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
|
public SsePayloadSigner(Ed25519SigningService ed25519SigningService, ObjectMapper objectMapper) {
|
||||||
|
this.ed25519SigningService = ed25519SigningService;
|
||||||
|
this.objectMapper = objectMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signs the given JSON payload and returns a new JSON string with a {@code "signature"} field added.
|
||||||
|
* <p>
|
||||||
|
* The signature is computed over the original payload string (before adding the signature field).
|
||||||
|
*
|
||||||
|
* @param jsonPayload the JSON string to sign
|
||||||
|
* @return the signed JSON string with a "signature" field, or the original payload if null/empty/blank
|
||||||
|
*/
|
||||||
|
public String signPayload(String jsonPayload) {
|
||||||
|
if (jsonPayload == null) {
|
||||||
|
log.warn("Attempted to sign null payload, returning null");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (jsonPayload.isEmpty() || jsonPayload.isBlank()) {
|
||||||
|
log.warn("Attempted to sign empty/blank payload, returning as-is");
|
||||||
|
return jsonPayload;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 1. Sign the original payload string
|
||||||
|
String signatureBase64 = ed25519SigningService.sign(jsonPayload);
|
||||||
|
|
||||||
|
// 2. Parse payload, add signature field, serialize back
|
||||||
|
JsonNode node = objectMapper.readTree(jsonPayload);
|
||||||
|
if (node instanceof ObjectNode objectNode) {
|
||||||
|
objectNode.put("signature", signatureBase64);
|
||||||
|
return objectMapper.writeValueAsString(objectNode);
|
||||||
|
} else {
|
||||||
|
// Payload is not a JSON object (e.g., array or primitive) -- cannot add field
|
||||||
|
log.warn("Payload is not a JSON object, returning unsigned: {}", jsonPayload);
|
||||||
|
return jsonPayload;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to sign payload, returning unsigned", e);
|
||||||
|
return jsonPayload;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -40,12 +40,9 @@ import static org.awaitility.Awaitility.await;
|
|||||||
/**
|
/**
|
||||||
* Integration test verifying that SSE command events carry valid Ed25519 signatures.
|
* Integration test verifying that SSE command events carry valid Ed25519 signatures.
|
||||||
* <p>
|
* <p>
|
||||||
* Flow: register agent -> open SSE stream -> push config-update command ->
|
* Flow: register agent (with bootstrap token) -> extract JWT + public key from response ->
|
||||||
* receive SSE event -> verify signature field against server's public key.
|
* open SSE stream (with JWT query param) -> push config-update command (with JWT) ->
|
||||||
* <p>
|
* receive SSE event -> verify signature field against server's Ed25519 public key.
|
||||||
* NOTE: Uses TestSecurityConfig (permit-all) since Plan 02 (Spring Security filter chain)
|
|
||||||
* may not yet be complete. When Plan 02 is done, this test should be updated to use
|
|
||||||
* bootstrap token for registration and JWT for SSE connection.
|
|
||||||
*/
|
*/
|
||||||
class SseSigningIT extends AbstractClickHouseIT {
|
class SseSigningIT extends AbstractClickHouseIT {
|
||||||
|
|
||||||
@@ -68,7 +65,23 @@ class SseSigningIT extends AbstractClickHouseIT {
|
|||||||
return headers;
|
return headers;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ResponseEntity<String> registerAgent(String agentId) {
|
private HttpHeaders authProtocolHeaders(String accessToken) {
|
||||||
|
HttpHeaders headers = protocolHeaders();
|
||||||
|
headers.set("Authorization", "Bearer " + accessToken);
|
||||||
|
return headers;
|
||||||
|
}
|
||||||
|
|
||||||
|
private HttpHeaders bootstrapHeaders() {
|
||||||
|
HttpHeaders headers = protocolHeaders();
|
||||||
|
headers.set("Authorization", "Bearer test-bootstrap-token");
|
||||||
|
return headers;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers an agent using the bootstrap token and returns the registration response.
|
||||||
|
* The response contains: agentId, sseEndpoint, accessToken, refreshToken, serverPublicKey.
|
||||||
|
*/
|
||||||
|
private JsonNode registerAgentWithAuth(String agentId) throws Exception {
|
||||||
String json = """
|
String json = """
|
||||||
{
|
{
|
||||||
"agentId": "%s",
|
"agentId": "%s",
|
||||||
@@ -80,24 +93,27 @@ class SseSigningIT extends AbstractClickHouseIT {
|
|||||||
}
|
}
|
||||||
""".formatted(agentId);
|
""".formatted(agentId);
|
||||||
|
|
||||||
return restTemplate.postForEntity(
|
ResponseEntity<String> response = restTemplate.postForEntity(
|
||||||
"/api/v1/agents/register",
|
"/api/v1/agents/register",
|
||||||
new HttpEntity<>(json, protocolHeaders()),
|
new HttpEntity<>(json, bootstrapHeaders()),
|
||||||
String.class);
|
String.class);
|
||||||
|
|
||||||
|
assertThat(response.getStatusCode().value()).isEqualTo(200);
|
||||||
|
return objectMapper.readTree(response.getBody());
|
||||||
}
|
}
|
||||||
|
|
||||||
private ResponseEntity<String> sendCommand(String agentId, String type, String payloadJson) {
|
private ResponseEntity<String> sendCommand(String agentId, String type, String payloadJson, String accessToken) {
|
||||||
String json = """
|
String json = """
|
||||||
{"type": "%s", "payload": %s}
|
{"type": "%s", "payload": %s}
|
||||||
""".formatted(type, payloadJson);
|
""".formatted(type, payloadJson);
|
||||||
|
|
||||||
return restTemplate.postForEntity(
|
return restTemplate.postForEntity(
|
||||||
"/api/v1/agents/" + agentId + "/commands",
|
"/api/v1/agents/" + agentId + "/commands",
|
||||||
new HttpEntity<>(json, protocolHeaders()),
|
new HttpEntity<>(json, authProtocolHeaders(accessToken)),
|
||||||
String.class);
|
String.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
private SseStream openSseStream(String agentId) {
|
private SseStream openSseStream(String agentId, String accessToken) {
|
||||||
List<String> lines = new ArrayList<>();
|
List<String> lines = new ArrayList<>();
|
||||||
CountDownLatch connected = new CountDownLatch(1);
|
CountDownLatch connected = new CountDownLatch(1);
|
||||||
AtomicInteger statusCode = new AtomicInteger(0);
|
AtomicInteger statusCode = new AtomicInteger(0);
|
||||||
@@ -107,7 +123,7 @@ class SseSigningIT extends AbstractClickHouseIT {
|
|||||||
.build();
|
.build();
|
||||||
|
|
||||||
HttpRequest request = HttpRequest.newBuilder()
|
HttpRequest request = HttpRequest.newBuilder()
|
||||||
.uri(URI.create("http://localhost:" + port + "/api/v1/agents/" + agentId + "/events"))
|
.uri(URI.create("http://localhost:" + port + "/api/v1/agents/" + agentId + "/events?token=" + accessToken))
|
||||||
.header("Accept", "text/event-stream")
|
.header("Accept", "text/event-stream")
|
||||||
.GET()
|
.GET()
|
||||||
.build();
|
.build();
|
||||||
@@ -147,18 +163,21 @@ class SseSigningIT extends AbstractClickHouseIT {
|
|||||||
@Test
|
@Test
|
||||||
void configUpdateEvent_containsValidEd25519Signature() throws Exception {
|
void configUpdateEvent_containsValidEd25519Signature() throws Exception {
|
||||||
String agentId = "sse-sign-it-" + UUID.randomUUID().toString().substring(0, 8);
|
String agentId = "sse-sign-it-" + UUID.randomUUID().toString().substring(0, 8);
|
||||||
registerAgent(agentId);
|
JsonNode registration = registerAgentWithAuth(agentId);
|
||||||
|
String accessToken = registration.get("accessToken").asText();
|
||||||
|
String serverPublicKey = registration.get("serverPublicKey").asText();
|
||||||
|
|
||||||
SseStream stream = openSseStream(agentId);
|
SseStream stream = openSseStream(agentId, accessToken);
|
||||||
stream.awaitConnection(5000);
|
assertThat(stream.awaitConnection(5000)).isTrue();
|
||||||
|
assertThat(stream.statusCode().get()).isEqualTo(200);
|
||||||
|
|
||||||
String originalPayload = "{\"key\":\"value\",\"setting\":\"enabled\"}";
|
String originalPayload = "{\"key\":\"value\",\"setting\":\"enabled\"}";
|
||||||
|
|
||||||
// Send config-update and wait for SSE event
|
// Send config-update and wait for SSE event
|
||||||
await().atMost(5, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS)
|
await().atMost(10, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS)
|
||||||
.ignoreExceptions()
|
.ignoreExceptions()
|
||||||
.until(() -> {
|
.until(() -> {
|
||||||
sendCommand(agentId, "config-update", originalPayload);
|
sendCommand(agentId, "config-update", originalPayload, accessToken);
|
||||||
List<String> lines = stream.snapshot();
|
List<String> lines = stream.snapshot();
|
||||||
return lines.stream().anyMatch(l -> l.contains("event:config-update"));
|
return lines.stream().anyMatch(l -> l.contains("event:config-update"));
|
||||||
});
|
});
|
||||||
@@ -187,7 +206,7 @@ class SseSigningIT extends AbstractClickHouseIT {
|
|||||||
|
|
||||||
// Verify signature against original payload using server's public key
|
// Verify signature against original payload using server's public key
|
||||||
byte[] signatureBytes = Base64.getDecoder().decode(signatureBase64);
|
byte[] signatureBytes = Base64.getDecoder().decode(signatureBase64);
|
||||||
PublicKey publicKey = loadPublicKey(ed25519SigningService.getPublicKeyBase64());
|
PublicKey publicKey = loadPublicKey(serverPublicKey);
|
||||||
|
|
||||||
Signature verifier = Signature.getInstance("Ed25519");
|
Signature verifier = Signature.getInstance("Ed25519");
|
||||||
verifier.initVerify(publicKey);
|
verifier.initVerify(publicKey);
|
||||||
@@ -200,17 +219,20 @@ class SseSigningIT extends AbstractClickHouseIT {
|
|||||||
@Test
|
@Test
|
||||||
void deepTraceEvent_containsValidSignature() throws Exception {
|
void deepTraceEvent_containsValidSignature() throws Exception {
|
||||||
String agentId = "sse-sign-trace-" + UUID.randomUUID().toString().substring(0, 8);
|
String agentId = "sse-sign-trace-" + UUID.randomUUID().toString().substring(0, 8);
|
||||||
registerAgent(agentId);
|
JsonNode registration = registerAgentWithAuth(agentId);
|
||||||
|
String accessToken = registration.get("accessToken").asText();
|
||||||
|
String serverPublicKey = registration.get("serverPublicKey").asText();
|
||||||
|
|
||||||
SseStream stream = openSseStream(agentId);
|
SseStream stream = openSseStream(agentId, accessToken);
|
||||||
stream.awaitConnection(5000);
|
assertThat(stream.awaitConnection(5000)).isTrue();
|
||||||
|
assertThat(stream.statusCode().get()).isEqualTo(200);
|
||||||
|
|
||||||
String originalPayload = "{\"correlationId\":\"trace-123\"}";
|
String originalPayload = "{\"correlationId\":\"trace-123\"}";
|
||||||
|
|
||||||
await().atMost(5, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS)
|
await().atMost(10, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS)
|
||||||
.ignoreExceptions()
|
.ignoreExceptions()
|
||||||
.until(() -> {
|
.until(() -> {
|
||||||
sendCommand(agentId, "deep-trace", originalPayload);
|
sendCommand(agentId, "deep-trace", originalPayload, accessToken);
|
||||||
List<String> lines = stream.snapshot();
|
List<String> lines = stream.snapshot();
|
||||||
return lines.stream().anyMatch(l -> l.contains("event:deep-trace"));
|
return lines.stream().anyMatch(l -> l.contains("event:deep-trace"));
|
||||||
});
|
});
|
||||||
@@ -226,7 +248,7 @@ class SseSigningIT extends AbstractClickHouseIT {
|
|||||||
|
|
||||||
// Verify signature
|
// Verify signature
|
||||||
byte[] signatureBytes = Base64.getDecoder().decode(eventNode.get("signature").asText());
|
byte[] signatureBytes = Base64.getDecoder().decode(eventNode.get("signature").asText());
|
||||||
PublicKey publicKey = loadPublicKey(ed25519SigningService.getPublicKeyBase64());
|
PublicKey publicKey = loadPublicKey(serverPublicKey);
|
||||||
|
|
||||||
Signature verifier = Signature.getInstance("Ed25519");
|
Signature verifier = Signature.getInstance("Ed25519");
|
||||||
verifier.initVerify(publicKey);
|
verifier.initVerify(publicKey);
|
||||||
|
|||||||
Reference in New Issue
Block a user