feat(04-03): implement SSE payload signing with Ed25519
- SsePayloadSigner signs JSON payloads and adds signature field before SSE delivery - SseConnectionManager signs all command payloads via SsePayloadSigner before sendEvent - Signed payload parsed to JsonNode for correct SseEmitter serialization - Integration tests use bootstrap token + JWT auth (adapts to Plan 02 security layer) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -40,12 +40,9 @@ import static org.awaitility.Awaitility.await;
|
||||
/**
|
||||
* Integration test verifying that SSE command events carry valid Ed25519 signatures.
|
||||
* <p>
|
||||
* Flow: register agent -> open SSE stream -> push config-update command ->
|
||||
* receive SSE event -> verify signature field against server's public key.
|
||||
* <p>
|
||||
* NOTE: Uses TestSecurityConfig (permit-all) since Plan 02 (Spring Security filter chain)
|
||||
* may not yet be complete. When Plan 02 is done, this test should be updated to use
|
||||
* bootstrap token for registration and JWT for SSE connection.
|
||||
* Flow: register agent (with bootstrap token) -> extract JWT + public key from response ->
|
||||
* open SSE stream (with JWT query param) -> push config-update command (with JWT) ->
|
||||
* receive SSE event -> verify signature field against server's Ed25519 public key.
|
||||
*/
|
||||
class SseSigningIT extends AbstractClickHouseIT {
|
||||
|
||||
@@ -68,7 +65,23 @@ class SseSigningIT extends AbstractClickHouseIT {
|
||||
return headers;
|
||||
}
|
||||
|
||||
private ResponseEntity<String> registerAgent(String agentId) {
|
||||
private HttpHeaders authProtocolHeaders(String accessToken) {
|
||||
HttpHeaders headers = protocolHeaders();
|
||||
headers.set("Authorization", "Bearer " + accessToken);
|
||||
return headers;
|
||||
}
|
||||
|
||||
private HttpHeaders bootstrapHeaders() {
|
||||
HttpHeaders headers = protocolHeaders();
|
||||
headers.set("Authorization", "Bearer test-bootstrap-token");
|
||||
return headers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers an agent using the bootstrap token and returns the registration response.
|
||||
* The response contains: agentId, sseEndpoint, accessToken, refreshToken, serverPublicKey.
|
||||
*/
|
||||
private JsonNode registerAgentWithAuth(String agentId) throws Exception {
|
||||
String json = """
|
||||
{
|
||||
"agentId": "%s",
|
||||
@@ -80,24 +93,27 @@ class SseSigningIT extends AbstractClickHouseIT {
|
||||
}
|
||||
""".formatted(agentId);
|
||||
|
||||
return restTemplate.postForEntity(
|
||||
ResponseEntity<String> response = restTemplate.postForEntity(
|
||||
"/api/v1/agents/register",
|
||||
new HttpEntity<>(json, protocolHeaders()),
|
||||
new HttpEntity<>(json, bootstrapHeaders()),
|
||||
String.class);
|
||||
|
||||
assertThat(response.getStatusCode().value()).isEqualTo(200);
|
||||
return objectMapper.readTree(response.getBody());
|
||||
}
|
||||
|
||||
private ResponseEntity<String> sendCommand(String agentId, String type, String payloadJson) {
|
||||
private ResponseEntity<String> sendCommand(String agentId, String type, String payloadJson, String accessToken) {
|
||||
String json = """
|
||||
{"type": "%s", "payload": %s}
|
||||
""".formatted(type, payloadJson);
|
||||
|
||||
return restTemplate.postForEntity(
|
||||
"/api/v1/agents/" + agentId + "/commands",
|
||||
new HttpEntity<>(json, protocolHeaders()),
|
||||
new HttpEntity<>(json, authProtocolHeaders(accessToken)),
|
||||
String.class);
|
||||
}
|
||||
|
||||
private SseStream openSseStream(String agentId) {
|
||||
private SseStream openSseStream(String agentId, String accessToken) {
|
||||
List<String> lines = new ArrayList<>();
|
||||
CountDownLatch connected = new CountDownLatch(1);
|
||||
AtomicInteger statusCode = new AtomicInteger(0);
|
||||
@@ -107,7 +123,7 @@ class SseSigningIT extends AbstractClickHouseIT {
|
||||
.build();
|
||||
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create("http://localhost:" + port + "/api/v1/agents/" + agentId + "/events"))
|
||||
.uri(URI.create("http://localhost:" + port + "/api/v1/agents/" + agentId + "/events?token=" + accessToken))
|
||||
.header("Accept", "text/event-stream")
|
||||
.GET()
|
||||
.build();
|
||||
@@ -147,18 +163,21 @@ class SseSigningIT extends AbstractClickHouseIT {
|
||||
@Test
|
||||
void configUpdateEvent_containsValidEd25519Signature() throws Exception {
|
||||
String agentId = "sse-sign-it-" + UUID.randomUUID().toString().substring(0, 8);
|
||||
registerAgent(agentId);
|
||||
JsonNode registration = registerAgentWithAuth(agentId);
|
||||
String accessToken = registration.get("accessToken").asText();
|
||||
String serverPublicKey = registration.get("serverPublicKey").asText();
|
||||
|
||||
SseStream stream = openSseStream(agentId);
|
||||
stream.awaitConnection(5000);
|
||||
SseStream stream = openSseStream(agentId, accessToken);
|
||||
assertThat(stream.awaitConnection(5000)).isTrue();
|
||||
assertThat(stream.statusCode().get()).isEqualTo(200);
|
||||
|
||||
String originalPayload = "{\"key\":\"value\",\"setting\":\"enabled\"}";
|
||||
|
||||
// Send config-update and wait for SSE event
|
||||
await().atMost(5, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS)
|
||||
await().atMost(10, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS)
|
||||
.ignoreExceptions()
|
||||
.until(() -> {
|
||||
sendCommand(agentId, "config-update", originalPayload);
|
||||
sendCommand(agentId, "config-update", originalPayload, accessToken);
|
||||
List<String> lines = stream.snapshot();
|
||||
return lines.stream().anyMatch(l -> l.contains("event:config-update"));
|
||||
});
|
||||
@@ -187,7 +206,7 @@ class SseSigningIT extends AbstractClickHouseIT {
|
||||
|
||||
// Verify signature against original payload using server's public key
|
||||
byte[] signatureBytes = Base64.getDecoder().decode(signatureBase64);
|
||||
PublicKey publicKey = loadPublicKey(ed25519SigningService.getPublicKeyBase64());
|
||||
PublicKey publicKey = loadPublicKey(serverPublicKey);
|
||||
|
||||
Signature verifier = Signature.getInstance("Ed25519");
|
||||
verifier.initVerify(publicKey);
|
||||
@@ -200,17 +219,20 @@ class SseSigningIT extends AbstractClickHouseIT {
|
||||
@Test
|
||||
void deepTraceEvent_containsValidSignature() throws Exception {
|
||||
String agentId = "sse-sign-trace-" + UUID.randomUUID().toString().substring(0, 8);
|
||||
registerAgent(agentId);
|
||||
JsonNode registration = registerAgentWithAuth(agentId);
|
||||
String accessToken = registration.get("accessToken").asText();
|
||||
String serverPublicKey = registration.get("serverPublicKey").asText();
|
||||
|
||||
SseStream stream = openSseStream(agentId);
|
||||
stream.awaitConnection(5000);
|
||||
SseStream stream = openSseStream(agentId, accessToken);
|
||||
assertThat(stream.awaitConnection(5000)).isTrue();
|
||||
assertThat(stream.statusCode().get()).isEqualTo(200);
|
||||
|
||||
String originalPayload = "{\"correlationId\":\"trace-123\"}";
|
||||
|
||||
await().atMost(5, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS)
|
||||
await().atMost(10, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS)
|
||||
.ignoreExceptions()
|
||||
.until(() -> {
|
||||
sendCommand(agentId, "deep-trace", originalPayload);
|
||||
sendCommand(agentId, "deep-trace", originalPayload, accessToken);
|
||||
List<String> lines = stream.snapshot();
|
||||
return lines.stream().anyMatch(l -> l.contains("event:deep-trace"));
|
||||
});
|
||||
@@ -226,7 +248,7 @@ class SseSigningIT extends AbstractClickHouseIT {
|
||||
|
||||
// Verify signature
|
||||
byte[] signatureBytes = Base64.getDecoder().decode(eventNode.get("signature").asText());
|
||||
PublicKey publicKey = loadPublicKey(ed25519SigningService.getPublicKeyBase64());
|
||||
PublicKey publicKey = loadPublicKey(serverPublicKey);
|
||||
|
||||
Signature verifier = Signature.getInstance("Ed25519");
|
||||
verifier.initVerify(publicKey);
|
||||
|
||||
Reference in New Issue
Block a user