test(03-02): integration tests for SSE and command endpoints

- AgentSseControllerIT: connect, 404 unknown, config-update/deep-trace/replay delivery, ping keepalive, Last-Event-ID
- AgentCommandControllerIT: single/group/broadcast commands, ack, ack-unknown, command-to-unregistered
- Test config with 1s ping interval for faster SSE keepalive testing
- All 71 tests pass with mvn clean verify

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-11 19:16:25 +01:00
parent 5746886a0b
commit a1909baad6
3 changed files with 450 additions and 0 deletions

View File

@@ -0,0 +1,181 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.AbstractClickHouseIT;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
class AgentCommandControllerIT extends AbstractClickHouseIT {
@Autowired
private TestRestTemplate restTemplate;
@Autowired
private ObjectMapper objectMapper;
private HttpHeaders protocolHeaders() {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("X-Cameleer-Protocol-Version", "1");
return headers;
}
private HttpHeaders protocolHeadersNoBody() {
HttpHeaders headers = new HttpHeaders();
headers.set("X-Cameleer-Protocol-Version", "1");
return headers;
}
private ResponseEntity<String> registerAgent(String agentId, String name, String group) {
String json = """
{
"agentId": "%s",
"name": "%s",
"group": "%s",
"version": "1.0.0",
"routeIds": ["route-1"],
"capabilities": {}
}
""".formatted(agentId, name, group);
return restTemplate.postForEntity(
"/api/v1/agents/register",
new HttpEntity<>(json, protocolHeaders()),
String.class);
}
@Test
void sendCommandToAgent_returns202WithCommandId() throws Exception {
String agentId = "cmd-it-single-" + UUID.randomUUID().toString().substring(0, 8);
registerAgent(agentId, "Command Agent", "test-group");
String commandJson = """
{"type": "config-update", "payload": {"key": "value"}}
""";
ResponseEntity<String> response = restTemplate.postForEntity(
"/api/v1/agents/" + agentId + "/commands",
new HttpEntity<>(commandJson, protocolHeaders()),
String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.has("commandId")).isTrue();
assertThat(body.get("commandId").asText()).isNotBlank();
assertThat(body.get("status").asText()).isIn("PENDING", "DELIVERED");
}
@Test
void sendGroupCommand_returns202WithTargetCount() throws Exception {
String group = "cmd-it-group-" + UUID.randomUUID().toString().substring(0, 8);
registerAgent("agent-g1-" + group, "Group Agent 1", group);
registerAgent("agent-g2-" + group, "Group Agent 2", group);
String commandJson = """
{"type": "deep-trace", "payload": {"correlationId": "group-trace-1"}}
""";
ResponseEntity<String> response = restTemplate.postForEntity(
"/api/v1/agents/groups/" + group + "/commands",
new HttpEntity<>(commandJson, protocolHeaders()),
String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("targetCount").asInt()).isEqualTo(2);
assertThat(body.get("commandIds").isArray()).isTrue();
assertThat(body.get("commandIds").size()).isEqualTo(2);
}
@Test
void broadcastCommand_returns202WithLiveAgentCount() throws Exception {
// Register at least one agent (others may exist from other tests)
String agentId = "cmd-it-broadcast-" + UUID.randomUUID().toString().substring(0, 8);
registerAgent(agentId, "Broadcast Agent", "broadcast-group");
String commandJson = """
{"type": "replay", "payload": {"exchangeId": "ex-broadcast"}}
""";
ResponseEntity<String> response = restTemplate.postForEntity(
"/api/v1/agents/commands",
new HttpEntity<>(commandJson, protocolHeaders()),
String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("targetCount").asInt()).isGreaterThanOrEqualTo(1);
assertThat(body.get("commandIds").isArray()).isTrue();
}
@Test
void acknowledgeCommand_returns200() throws Exception {
String agentId = "cmd-it-ack-" + UUID.randomUUID().toString().substring(0, 8);
registerAgent(agentId, "Ack Agent", "test-group");
// Send a command first
String commandJson = """
{"type": "config-update", "payload": {"key": "ack-test"}}
""";
ResponseEntity<String> cmdResponse = restTemplate.postForEntity(
"/api/v1/agents/" + agentId + "/commands",
new HttpEntity<>(commandJson, protocolHeaders()),
String.class);
JsonNode cmdBody = objectMapper.readTree(cmdResponse.getBody());
String commandId = cmdBody.get("commandId").asText();
// Acknowledge the command
ResponseEntity<Void> ackResponse = restTemplate.exchange(
"/api/v1/agents/" + agentId + "/commands/" + commandId + "/ack",
HttpMethod.POST,
new HttpEntity<>(protocolHeadersNoBody()),
Void.class);
assertThat(ackResponse.getStatusCode()).isEqualTo(HttpStatus.OK);
}
@Test
void acknowledgeUnknownCommand_returns404() {
String agentId = "cmd-it-ack-unknown-" + UUID.randomUUID().toString().substring(0, 8);
registerAgent(agentId, "Ack Unknown Agent", "test-group");
ResponseEntity<Void> response = restTemplate.exchange(
"/api/v1/agents/" + agentId + "/commands/nonexistent-cmd-id/ack",
HttpMethod.POST,
new HttpEntity<>(protocolHeadersNoBody()),
Void.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
@Test
void sendCommandToUnregisteredAgent_returns404() {
String commandJson = """
{"type": "config-update", "payload": {"key": "value"}}
""";
ResponseEntity<String> response = restTemplate.postForEntity(
"/api/v1/agents/nonexistent-agent-xyz/commands",
new HttpEntity<>(commandJson, protocolHeaders()),
String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
}

View File

@@ -0,0 +1,266 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.AbstractClickHouseIT;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
class AgentSseControllerIT extends AbstractClickHouseIT {
@Autowired
private TestRestTemplate restTemplate;
@Autowired
private ObjectMapper objectMapper;
@LocalServerPort
private int port;
private HttpHeaders protocolHeaders() {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("X-Cameleer-Protocol-Version", "1");
return headers;
}
private ResponseEntity<String> registerAgent(String agentId, String name, String group) {
String json = """
{
"agentId": "%s",
"name": "%s",
"group": "%s",
"version": "1.0.0",
"routeIds": ["route-1"],
"capabilities": {}
}
""".formatted(agentId, name, group);
return restTemplate.postForEntity(
"/api/v1/agents/register",
new HttpEntity<>(json, protocolHeaders()),
String.class);
}
private ResponseEntity<String> sendCommand(String agentId, String type, String payloadJson) {
String json = """
{"type": "%s", "payload": %s}
""".formatted(type, payloadJson);
return restTemplate.postForEntity(
"/api/v1/agents/" + agentId + "/commands",
new HttpEntity<>(json, protocolHeaders()),
String.class);
}
/**
* Opens an SSE stream via java.net.http.HttpClient and collects lines in a list.
* Uses async API to avoid blocking the test thread.
*/
private SseStream openSseStream(String agentId) {
return openSseStream(agentId, null);
}
private SseStream openSseStream(String agentId, String lastEventId) {
List<String> lines = new ArrayList<>();
CountDownLatch connected = new CountDownLatch(1);
AtomicInteger statusCode = new AtomicInteger(0);
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:" + port + "/api/v1/agents/" + agentId + "/events"))
.header("Accept", "text/event-stream")
.GET();
if (lastEventId != null) {
requestBuilder.header("Last-Event-ID", lastEventId);
}
HttpRequest request = requestBuilder.build();
CompletableFuture<Void> future = client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream())
.thenAccept(response -> {
statusCode.set(response.statusCode());
connected.countDown();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.body()))) {
String line;
while ((line = reader.readLine()) != null) {
synchronized (lines) {
lines.add(line);
}
}
} catch (Exception e) {
// Stream closed -- expected
}
});
return new SseStream(lines, future, connected, statusCode);
}
private record SseStream(List<String> lines, CompletableFuture<Void> future,
CountDownLatch connected, AtomicInteger statusCode) {
List<String> snapshot() {
synchronized (lines) {
return new ArrayList<>(lines);
}
}
boolean awaitConnection(long timeoutMs) throws InterruptedException {
return connected.await(timeoutMs, TimeUnit.MILLISECONDS);
}
}
@Test
void sseConnect_registeredAgent_returnsEventStream() throws Exception {
String agentId = "sse-it-connect-" + UUID.randomUUID().toString().substring(0, 8);
registerAgent(agentId, "SSE Connect Agent", "test-group");
SseStream stream = openSseStream(agentId);
// Wait for the connection to be established
assertThat(stream.awaitConnection(5000)).isTrue();
assertThat(stream.statusCode().get()).isEqualTo(200);
}
@Test
void sseConnect_unknownAgent_returns404() throws Exception {
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:" + port + "/api/v1/agents/unknown-sse-agent/events"))
.header("Accept", "text/event-stream")
.GET()
.build();
CompletableFuture<Integer> statusFuture = client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::statusCode);
int status = statusFuture.get(5, TimeUnit.SECONDS);
assertThat(status).isEqualTo(404);
}
@Test
void configUpdateDelivery_receivedViaSseStream() throws Exception {
String agentId = "sse-it-config-" + UUID.randomUUID().toString().substring(0, 8);
registerAgent(agentId, "Config Update Agent", "test-group");
SseStream stream = openSseStream(agentId);
stream.awaitConnection(5000);
// Give the SSE stream a moment to fully establish
await().atMost(5, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS)
.ignoreExceptions()
.until(() -> {
sendCommand(agentId, "config-update", "{\"key\":\"value\"}");
List<String> lines = stream.snapshot();
return lines.stream().anyMatch(l -> l.contains("event:config-update"));
});
List<String> lines = stream.snapshot();
assertThat(lines).anyMatch(l -> l.contains("event:config-update"));
assertThat(lines).anyMatch(l -> l.startsWith("id:"));
assertThat(lines).anyMatch(l -> l.contains("\"key\":\"value\""));
}
@Test
void deepTraceDelivery_receivedViaSseStream() throws Exception {
String agentId = "sse-it-trace-" + UUID.randomUUID().toString().substring(0, 8);
registerAgent(agentId, "Deep Trace Agent", "test-group");
SseStream stream = openSseStream(agentId);
stream.awaitConnection(5000);
await().atMost(5, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS)
.ignoreExceptions()
.until(() -> {
sendCommand(agentId, "deep-trace", "{\"correlationId\":\"test-123\"}");
List<String> lines = stream.snapshot();
return lines.stream().anyMatch(l -> l.contains("event:deep-trace"));
});
List<String> lines = stream.snapshot();
assertThat(lines).anyMatch(l -> l.contains("event:deep-trace"));
assertThat(lines).anyMatch(l -> l.contains("test-123"));
}
@Test
void replayDelivery_receivedViaSseStream() throws Exception {
String agentId = "sse-it-replay-" + UUID.randomUUID().toString().substring(0, 8);
registerAgent(agentId, "Replay Agent", "test-group");
SseStream stream = openSseStream(agentId);
stream.awaitConnection(5000);
await().atMost(5, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS)
.ignoreExceptions()
.until(() -> {
sendCommand(agentId, "replay", "{\"exchangeId\":\"ex-456\"}");
List<String> lines = stream.snapshot();
return lines.stream().anyMatch(l -> l.contains("event:replay"));
});
List<String> lines = stream.snapshot();
assertThat(lines).anyMatch(l -> l.contains("event:replay"));
assertThat(lines).anyMatch(l -> l.contains("ex-456"));
}
@Test
void pingKeepalive_receivedViaSseStream() throws Exception {
String agentId = "sse-it-ping-" + UUID.randomUUID().toString().substring(0, 8);
registerAgent(agentId, "Ping Agent", "test-group");
SseStream stream = openSseStream(agentId);
stream.awaitConnection(5000);
// Wait for a ping comment (sent every 1 second in test config)
await().atMost(5, TimeUnit.SECONDS).pollInterval(200, TimeUnit.MILLISECONDS)
.ignoreExceptions()
.until(() -> {
List<String> lines = stream.snapshot();
return lines.stream().anyMatch(l -> l.contains(":ping"));
});
List<String> lines = stream.snapshot();
assertThat(lines).anyMatch(l -> l.contains(":ping"));
}
@Test
void lastEventIdHeader_connectionSucceeds() throws Exception {
String agentId = "sse-it-lastid-" + UUID.randomUUID().toString().substring(0, 8);
registerAgent(agentId, "Last-Event-ID Agent", "test-group");
SseStream stream = openSseStream(agentId, "some-previous-event-id");
// Just verify the connection succeeds (no replay expected)
assertThat(stream.awaitConnection(5000)).isTrue();
assertThat(stream.statusCode().get()).isEqualTo(200);
}
}

View File

@@ -9,3 +9,6 @@ ingestion:
buffer-capacity: 100
batch-size: 10
flush-interval-ms: 100
agent-registry:
ping-interval-ms: 1000