From a1909baad62a9c907f2554c76c9986cd33f4c35c Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 11 Mar 2026 19:16:25 +0100 Subject: [PATCH] test(03-02): integration tests for SSE and command endpoints - AgentSseControllerIT: connect, 404 unknown, config-update/deep-trace/replay delivery, ping keepalive, Last-Event-ID - AgentCommandControllerIT: single/group/broadcast commands, ack, ack-unknown, command-to-unregistered - Test config with 1s ping interval for faster SSE keepalive testing - All 71 tests pass with mvn clean verify Co-Authored-By: Claude Opus 4.6 --- .../controller/AgentCommandControllerIT.java | 181 ++++++++++++ .../app/controller/AgentSseControllerIT.java | 266 ++++++++++++++++++ .../src/test/resources/application-test.yml | 3 + 3 files changed, 450 insertions(+) create mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java create mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java new file mode 100644 index 00000000..d89d9995 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java @@ -0,0 +1,181 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.AbstractClickHouseIT; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; + +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +class AgentCommandControllerIT extends AbstractClickHouseIT { + + @Autowired + private TestRestTemplate restTemplate; + + @Autowired + private ObjectMapper objectMapper; + + private HttpHeaders protocolHeaders() { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + return headers; + } + + private HttpHeaders protocolHeadersNoBody() { + HttpHeaders headers = new HttpHeaders(); + headers.set("X-Cameleer-Protocol-Version", "1"); + return headers; + } + + private ResponseEntity registerAgent(String agentId, String name, String group) { + String json = """ + { + "agentId": "%s", + "name": "%s", + "group": "%s", + "version": "1.0.0", + "routeIds": ["route-1"], + "capabilities": {} + } + """.formatted(agentId, name, group); + + return restTemplate.postForEntity( + "/api/v1/agents/register", + new HttpEntity<>(json, protocolHeaders()), + String.class); + } + + @Test + void sendCommandToAgent_returns202WithCommandId() throws Exception { + String agentId = "cmd-it-single-" + UUID.randomUUID().toString().substring(0, 8); + registerAgent(agentId, "Command Agent", "test-group"); + + String commandJson = """ + {"type": "config-update", "payload": {"key": "value"}} + """; + + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/agents/" + agentId + "/commands", + new HttpEntity<>(commandJson, protocolHeaders()), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + + JsonNode body = objectMapper.readTree(response.getBody()); + assertThat(body.has("commandId")).isTrue(); + assertThat(body.get("commandId").asText()).isNotBlank(); + assertThat(body.get("status").asText()).isIn("PENDING", "DELIVERED"); + } + + @Test + void sendGroupCommand_returns202WithTargetCount() throws Exception { + String group = "cmd-it-group-" + UUID.randomUUID().toString().substring(0, 8); + registerAgent("agent-g1-" + group, "Group Agent 1", group); + registerAgent("agent-g2-" + group, "Group Agent 2", group); + + String commandJson = """ + {"type": "deep-trace", "payload": {"correlationId": "group-trace-1"}} + """; + + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/agents/groups/" + group + "/commands", + new HttpEntity<>(commandJson, protocolHeaders()), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + + JsonNode body = objectMapper.readTree(response.getBody()); + assertThat(body.get("targetCount").asInt()).isEqualTo(2); + assertThat(body.get("commandIds").isArray()).isTrue(); + assertThat(body.get("commandIds").size()).isEqualTo(2); + } + + @Test + void broadcastCommand_returns202WithLiveAgentCount() throws Exception { + // Register at least one agent (others may exist from other tests) + String agentId = "cmd-it-broadcast-" + UUID.randomUUID().toString().substring(0, 8); + registerAgent(agentId, "Broadcast Agent", "broadcast-group"); + + String commandJson = """ + {"type": "replay", "payload": {"exchangeId": "ex-broadcast"}} + """; + + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/agents/commands", + new HttpEntity<>(commandJson, protocolHeaders()), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + + JsonNode body = objectMapper.readTree(response.getBody()); + assertThat(body.get("targetCount").asInt()).isGreaterThanOrEqualTo(1); + assertThat(body.get("commandIds").isArray()).isTrue(); + } + + @Test + void acknowledgeCommand_returns200() throws Exception { + String agentId = "cmd-it-ack-" + UUID.randomUUID().toString().substring(0, 8); + registerAgent(agentId, "Ack Agent", "test-group"); + + // Send a command first + String commandJson = """ + {"type": "config-update", "payload": {"key": "ack-test"}} + """; + + ResponseEntity cmdResponse = restTemplate.postForEntity( + "/api/v1/agents/" + agentId + "/commands", + new HttpEntity<>(commandJson, protocolHeaders()), + String.class); + + JsonNode cmdBody = objectMapper.readTree(cmdResponse.getBody()); + String commandId = cmdBody.get("commandId").asText(); + + // Acknowledge the command + ResponseEntity ackResponse = restTemplate.exchange( + "/api/v1/agents/" + agentId + "/commands/" + commandId + "/ack", + HttpMethod.POST, + new HttpEntity<>(protocolHeadersNoBody()), + Void.class); + + assertThat(ackResponse.getStatusCode()).isEqualTo(HttpStatus.OK); + } + + @Test + void acknowledgeUnknownCommand_returns404() { + String agentId = "cmd-it-ack-unknown-" + UUID.randomUUID().toString().substring(0, 8); + registerAgent(agentId, "Ack Unknown Agent", "test-group"); + + ResponseEntity response = restTemplate.exchange( + "/api/v1/agents/" + agentId + "/commands/nonexistent-cmd-id/ack", + HttpMethod.POST, + new HttpEntity<>(protocolHeadersNoBody()), + Void.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND); + } + + @Test + void sendCommandToUnregisteredAgent_returns404() { + String commandJson = """ + {"type": "config-update", "payload": {"key": "value"}} + """; + + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/agents/nonexistent-agent-xyz/commands", + new HttpEntity<>(commandJson, protocolHeaders()), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND); + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java new file mode 100644 index 00000000..7769c40a --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java @@ -0,0 +1,266 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.AbstractClickHouseIT; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class AgentSseControllerIT extends AbstractClickHouseIT { + + @Autowired + private TestRestTemplate restTemplate; + + @Autowired + private ObjectMapper objectMapper; + + @LocalServerPort + private int port; + + private HttpHeaders protocolHeaders() { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + return headers; + } + + private ResponseEntity registerAgent(String agentId, String name, String group) { + String json = """ + { + "agentId": "%s", + "name": "%s", + "group": "%s", + "version": "1.0.0", + "routeIds": ["route-1"], + "capabilities": {} + } + """.formatted(agentId, name, group); + + return restTemplate.postForEntity( + "/api/v1/agents/register", + new HttpEntity<>(json, protocolHeaders()), + String.class); + } + + private ResponseEntity sendCommand(String agentId, String type, String payloadJson) { + String json = """ + {"type": "%s", "payload": %s} + """.formatted(type, payloadJson); + + return restTemplate.postForEntity( + "/api/v1/agents/" + agentId + "/commands", + new HttpEntity<>(json, protocolHeaders()), + String.class); + } + + /** + * Opens an SSE stream via java.net.http.HttpClient and collects lines in a list. + * Uses async API to avoid blocking the test thread. + */ + private SseStream openSseStream(String agentId) { + return openSseStream(agentId, null); + } + + private SseStream openSseStream(String agentId, String lastEventId) { + List lines = new ArrayList<>(); + CountDownLatch connected = new CountDownLatch(1); + AtomicInteger statusCode = new AtomicInteger(0); + + HttpClient client = HttpClient.newBuilder() + .connectTimeout(Duration.ofSeconds(5)) + .build(); + + HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:" + port + "/api/v1/agents/" + agentId + "/events")) + .header("Accept", "text/event-stream") + .GET(); + + if (lastEventId != null) { + requestBuilder.header("Last-Event-ID", lastEventId); + } + + HttpRequest request = requestBuilder.build(); + + CompletableFuture future = client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()) + .thenAccept(response -> { + statusCode.set(response.statusCode()); + connected.countDown(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.body()))) { + String line; + while ((line = reader.readLine()) != null) { + synchronized (lines) { + lines.add(line); + } + } + } catch (Exception e) { + // Stream closed -- expected + } + }); + + return new SseStream(lines, future, connected, statusCode); + } + + private record SseStream(List lines, CompletableFuture future, + CountDownLatch connected, AtomicInteger statusCode) { + List snapshot() { + synchronized (lines) { + return new ArrayList<>(lines); + } + } + + boolean awaitConnection(long timeoutMs) throws InterruptedException { + return connected.await(timeoutMs, TimeUnit.MILLISECONDS); + } + } + + @Test + void sseConnect_registeredAgent_returnsEventStream() throws Exception { + String agentId = "sse-it-connect-" + UUID.randomUUID().toString().substring(0, 8); + registerAgent(agentId, "SSE Connect Agent", "test-group"); + + SseStream stream = openSseStream(agentId); + + // Wait for the connection to be established + assertThat(stream.awaitConnection(5000)).isTrue(); + assertThat(stream.statusCode().get()).isEqualTo(200); + } + + @Test + void sseConnect_unknownAgent_returns404() throws Exception { + HttpClient client = HttpClient.newBuilder() + .connectTimeout(Duration.ofSeconds(5)) + .build(); + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:" + port + "/api/v1/agents/unknown-sse-agent/events")) + .header("Accept", "text/event-stream") + .GET() + .build(); + + CompletableFuture statusFuture = client.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .thenApply(HttpResponse::statusCode); + + int status = statusFuture.get(5, TimeUnit.SECONDS); + assertThat(status).isEqualTo(404); + } + + @Test + void configUpdateDelivery_receivedViaSseStream() throws Exception { + String agentId = "sse-it-config-" + UUID.randomUUID().toString().substring(0, 8); + registerAgent(agentId, "Config Update Agent", "test-group"); + + SseStream stream = openSseStream(agentId); + stream.awaitConnection(5000); + + // Give the SSE stream a moment to fully establish + await().atMost(5, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS) + .ignoreExceptions() + .until(() -> { + sendCommand(agentId, "config-update", "{\"key\":\"value\"}"); + List lines = stream.snapshot(); + return lines.stream().anyMatch(l -> l.contains("event:config-update")); + }); + + List lines = stream.snapshot(); + assertThat(lines).anyMatch(l -> l.contains("event:config-update")); + assertThat(lines).anyMatch(l -> l.startsWith("id:")); + assertThat(lines).anyMatch(l -> l.contains("\"key\":\"value\"")); + } + + @Test + void deepTraceDelivery_receivedViaSseStream() throws Exception { + String agentId = "sse-it-trace-" + UUID.randomUUID().toString().substring(0, 8); + registerAgent(agentId, "Deep Trace Agent", "test-group"); + + SseStream stream = openSseStream(agentId); + stream.awaitConnection(5000); + + await().atMost(5, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS) + .ignoreExceptions() + .until(() -> { + sendCommand(agentId, "deep-trace", "{\"correlationId\":\"test-123\"}"); + List lines = stream.snapshot(); + return lines.stream().anyMatch(l -> l.contains("event:deep-trace")); + }); + + List lines = stream.snapshot(); + assertThat(lines).anyMatch(l -> l.contains("event:deep-trace")); + assertThat(lines).anyMatch(l -> l.contains("test-123")); + } + + @Test + void replayDelivery_receivedViaSseStream() throws Exception { + String agentId = "sse-it-replay-" + UUID.randomUUID().toString().substring(0, 8); + registerAgent(agentId, "Replay Agent", "test-group"); + + SseStream stream = openSseStream(agentId); + stream.awaitConnection(5000); + + await().atMost(5, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS) + .ignoreExceptions() + .until(() -> { + sendCommand(agentId, "replay", "{\"exchangeId\":\"ex-456\"}"); + List lines = stream.snapshot(); + return lines.stream().anyMatch(l -> l.contains("event:replay")); + }); + + List lines = stream.snapshot(); + assertThat(lines).anyMatch(l -> l.contains("event:replay")); + assertThat(lines).anyMatch(l -> l.contains("ex-456")); + } + + @Test + void pingKeepalive_receivedViaSseStream() throws Exception { + String agentId = "sse-it-ping-" + UUID.randomUUID().toString().substring(0, 8); + registerAgent(agentId, "Ping Agent", "test-group"); + + SseStream stream = openSseStream(agentId); + stream.awaitConnection(5000); + + // Wait for a ping comment (sent every 1 second in test config) + await().atMost(5, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS) + .ignoreExceptions() + .until(() -> { + List lines = stream.snapshot(); + return lines.stream().anyMatch(l -> l.contains(":ping")); + }); + + List lines = stream.snapshot(); + assertThat(lines).anyMatch(l -> l.contains(":ping")); + } + + @Test + void lastEventIdHeader_connectionSucceeds() throws Exception { + String agentId = "sse-it-lastid-" + UUID.randomUUID().toString().substring(0, 8); + registerAgent(agentId, "Last-Event-ID Agent", "test-group"); + + SseStream stream = openSseStream(agentId, "some-previous-event-id"); + + // Just verify the connection succeeds (no replay expected) + assertThat(stream.awaitConnection(5000)).isTrue(); + assertThat(stream.statusCode().get()).isEqualTo(200); + } +} diff --git a/cameleer3-server-app/src/test/resources/application-test.yml b/cameleer3-server-app/src/test/resources/application-test.yml index cb294b0f..8777cc5f 100644 --- a/cameleer3-server-app/src/test/resources/application-test.yml +++ b/cameleer3-server-app/src/test/resources/application-test.yml @@ -9,3 +9,6 @@ ingestion: buffer-capacity: 100 batch-size: 10 flush-interval-ms: 100 + +agent-registry: + ping-interval-ms: 1000