test(01-02): add failing integration tests for ingestion endpoints
- ExecutionControllerIT: single/array POST, flush verification, unknown fields - DiagramControllerIT: single/array POST, flush verification - MetricsControllerIT: POST metrics, flush verification - BackpressureIT: buffer-full returns 503, buffered data not lost Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,110 @@
|
|||||||
|
package com.cameleer3.server.app.controller;
|
||||||
|
|
||||||
|
import com.cameleer3.server.app.AbstractClickHouseIT;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||||
|
import org.springframework.http.HttpEntity;
|
||||||
|
import org.springframework.http.HttpHeaders;
|
||||||
|
import org.springframework.http.HttpStatus;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.http.ResponseEntity;
|
||||||
|
import org.springframework.test.context.TestPropertySource;
|
||||||
|
|
||||||
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests backpressure behavior when write buffers are full.
|
||||||
|
* Uses a tiny buffer (capacity=5) to easily trigger backpressure.
|
||||||
|
*/
|
||||||
|
@TestPropertySource(properties = {
|
||||||
|
"ingestion.buffer-capacity=5",
|
||||||
|
"ingestion.flush-interval-ms=5000" // slow flush so buffer stays full
|
||||||
|
})
|
||||||
|
class BackpressureIT extends AbstractClickHouseIT {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TestRestTemplate restTemplate;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenBufferFull_returns503WithRetryAfter() {
|
||||||
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||||
|
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||||
|
|
||||||
|
// Fill the buffer (capacity=5) by posting single executions
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
String json = String.format("""
|
||||||
|
{
|
||||||
|
"routeId": "bp-route-%d",
|
||||||
|
"exchangeId": "bp-exchange-%d",
|
||||||
|
"status": "COMPLETED",
|
||||||
|
"startTime": "2026-03-11T10:00:00Z",
|
||||||
|
"durationMs": 100,
|
||||||
|
"processors": []
|
||||||
|
}
|
||||||
|
""", i, i);
|
||||||
|
|
||||||
|
restTemplate.postForEntity(
|
||||||
|
"/api/v1/data/executions",
|
||||||
|
new HttpEntity<>(json, headers),
|
||||||
|
String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next POST should get 503 since buffer is full
|
||||||
|
String overflowJson = """
|
||||||
|
{
|
||||||
|
"routeId": "bp-overflow",
|
||||||
|
"exchangeId": "bp-overflow-exchange",
|
||||||
|
"status": "COMPLETED",
|
||||||
|
"startTime": "2026-03-11T10:00:00Z",
|
||||||
|
"durationMs": 100,
|
||||||
|
"processors": []
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
|
||||||
|
ResponseEntity<String> response = restTemplate.postForEntity(
|
||||||
|
"/api/v1/data/executions",
|
||||||
|
new HttpEntity<>(overflowJson, headers),
|
||||||
|
String.class);
|
||||||
|
|
||||||
|
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE);
|
||||||
|
assertThat(response.getHeaders().getFirst("Retry-After")).isNotNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void bufferedDataNotLost_afterBackpressure() {
|
||||||
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||||
|
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||||
|
|
||||||
|
// Post data that fills the buffer
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
String json = String.format("""
|
||||||
|
{
|
||||||
|
"routeId": "bp-persist-route",
|
||||||
|
"exchangeId": "bp-persist-%d",
|
||||||
|
"status": "COMPLETED",
|
||||||
|
"startTime": "2026-03-11T10:00:00Z",
|
||||||
|
"durationMs": 100,
|
||||||
|
"processors": []
|
||||||
|
}
|
||||||
|
""", i);
|
||||||
|
|
||||||
|
restTemplate.postForEntity(
|
||||||
|
"/api/v1/data/executions",
|
||||||
|
new HttpEntity<>(json, headers),
|
||||||
|
String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for flush to happen (flush interval is 5s in this test, but we wait longer)
|
||||||
|
await().atMost(15, SECONDS).untilAsserted(() -> {
|
||||||
|
Integer count = jdbcTemplate.queryForObject(
|
||||||
|
"SELECT count() FROM route_executions WHERE route_id = 'bp-persist-route'",
|
||||||
|
Integer.class);
|
||||||
|
assertThat(count).isGreaterThanOrEqualTo(3);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,105 @@
|
|||||||
|
package com.cameleer3.server.app.controller;
|
||||||
|
|
||||||
|
import com.cameleer3.server.app.AbstractClickHouseIT;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||||
|
import org.springframework.http.HttpEntity;
|
||||||
|
import org.springframework.http.HttpHeaders;
|
||||||
|
import org.springframework.http.HttpStatus;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.http.ResponseEntity;
|
||||||
|
|
||||||
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
|
class DiagramControllerIT extends AbstractClickHouseIT {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TestRestTemplate restTemplate;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void postSingleDiagram_returns202() {
|
||||||
|
String json = """
|
||||||
|
{
|
||||||
|
"routeId": "diagram-route-1",
|
||||||
|
"description": "Test route",
|
||||||
|
"version": 1,
|
||||||
|
"nodes": [],
|
||||||
|
"edges": [],
|
||||||
|
"processorNodeMapping": {}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
|
||||||
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||||
|
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||||
|
|
||||||
|
ResponseEntity<String> response = restTemplate.postForEntity(
|
||||||
|
"/api/v1/data/diagrams",
|
||||||
|
new HttpEntity<>(json, headers),
|
||||||
|
String.class);
|
||||||
|
|
||||||
|
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void postDiagram_dataAppearsInClickHouseAfterFlush() {
|
||||||
|
String json = """
|
||||||
|
{
|
||||||
|
"routeId": "diagram-flush-route",
|
||||||
|
"description": "Flush test",
|
||||||
|
"version": 1,
|
||||||
|
"nodes": [],
|
||||||
|
"edges": [],
|
||||||
|
"processorNodeMapping": {}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
|
||||||
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||||
|
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||||
|
|
||||||
|
restTemplate.postForEntity(
|
||||||
|
"/api/v1/data/diagrams",
|
||||||
|
new HttpEntity<>(json, headers),
|
||||||
|
String.class);
|
||||||
|
|
||||||
|
await().atMost(10, SECONDS).untilAsserted(() -> {
|
||||||
|
Integer count = jdbcTemplate.queryForObject(
|
||||||
|
"SELECT count() FROM route_diagrams WHERE route_id = 'diagram-flush-route'",
|
||||||
|
Integer.class);
|
||||||
|
assertThat(count).isGreaterThanOrEqualTo(1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void postArrayOfDiagrams_returns202() {
|
||||||
|
String json = """
|
||||||
|
[{
|
||||||
|
"routeId": "diagram-arr-1",
|
||||||
|
"version": 1,
|
||||||
|
"nodes": [],
|
||||||
|
"edges": []
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"routeId": "diagram-arr-2",
|
||||||
|
"version": 1,
|
||||||
|
"nodes": [],
|
||||||
|
"edges": []
|
||||||
|
}]
|
||||||
|
""";
|
||||||
|
|
||||||
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||||
|
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||||
|
|
||||||
|
ResponseEntity<String> response = restTemplate.postForEntity(
|
||||||
|
"/api/v1/data/diagrams",
|
||||||
|
new HttpEntity<>(json, headers),
|
||||||
|
String.class);
|
||||||
|
|
||||||
|
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,146 @@
|
|||||||
|
package com.cameleer3.server.app.controller;
|
||||||
|
|
||||||
|
import com.cameleer3.server.app.AbstractClickHouseIT;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||||
|
import org.springframework.http.HttpEntity;
|
||||||
|
import org.springframework.http.HttpHeaders;
|
||||||
|
import org.springframework.http.HttpStatus;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.http.ResponseEntity;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
|
||||||
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
|
class ExecutionControllerIT extends AbstractClickHouseIT {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TestRestTemplate restTemplate;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void postSingleExecution_returns202() {
|
||||||
|
String json = """
|
||||||
|
{
|
||||||
|
"routeId": "route-1",
|
||||||
|
"exchangeId": "exchange-1",
|
||||||
|
"correlationId": "corr-1",
|
||||||
|
"status": "COMPLETED",
|
||||||
|
"startTime": "2026-03-11T10:00:00Z",
|
||||||
|
"endTime": "2026-03-11T10:00:01Z",
|
||||||
|
"durationMs": 1000,
|
||||||
|
"errorMessage": "",
|
||||||
|
"errorStackTrace": "",
|
||||||
|
"processors": []
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
|
||||||
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||||
|
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||||
|
|
||||||
|
ResponseEntity<String> response = restTemplate.postForEntity(
|
||||||
|
"/api/v1/data/executions",
|
||||||
|
new HttpEntity<>(json, headers),
|
||||||
|
String.class);
|
||||||
|
|
||||||
|
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void postArrayOfExecutions_returns202() {
|
||||||
|
String json = """
|
||||||
|
[{
|
||||||
|
"routeId": "route-2",
|
||||||
|
"exchangeId": "exchange-2",
|
||||||
|
"status": "COMPLETED",
|
||||||
|
"startTime": "2026-03-11T10:00:00Z",
|
||||||
|
"endTime": "2026-03-11T10:00:01Z",
|
||||||
|
"durationMs": 1000,
|
||||||
|
"processors": []
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"routeId": "route-3",
|
||||||
|
"exchangeId": "exchange-3",
|
||||||
|
"status": "FAILED",
|
||||||
|
"startTime": "2026-03-11T10:00:00Z",
|
||||||
|
"endTime": "2026-03-11T10:00:02Z",
|
||||||
|
"durationMs": 2000,
|
||||||
|
"errorMessage": "Something went wrong",
|
||||||
|
"processors": []
|
||||||
|
}]
|
||||||
|
""";
|
||||||
|
|
||||||
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||||
|
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||||
|
|
||||||
|
ResponseEntity<String> response = restTemplate.postForEntity(
|
||||||
|
"/api/v1/data/executions",
|
||||||
|
new HttpEntity<>(json, headers),
|
||||||
|
String.class);
|
||||||
|
|
||||||
|
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void postExecution_dataAppearsInClickHouseAfterFlush() {
|
||||||
|
String json = """
|
||||||
|
{
|
||||||
|
"routeId": "flush-test-route",
|
||||||
|
"exchangeId": "flush-exchange-1",
|
||||||
|
"correlationId": "flush-corr-1",
|
||||||
|
"status": "COMPLETED",
|
||||||
|
"startTime": "2026-03-11T10:00:00Z",
|
||||||
|
"endTime": "2026-03-11T10:00:01Z",
|
||||||
|
"durationMs": 1000,
|
||||||
|
"processors": []
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
|
||||||
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||||
|
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||||
|
|
||||||
|
restTemplate.postForEntity(
|
||||||
|
"/api/v1/data/executions",
|
||||||
|
new HttpEntity<>(json, headers),
|
||||||
|
String.class);
|
||||||
|
|
||||||
|
await().atMost(10, SECONDS).untilAsserted(() -> {
|
||||||
|
Integer count = jdbcTemplate.queryForObject(
|
||||||
|
"SELECT count() FROM route_executions WHERE route_id = 'flush-test-route'",
|
||||||
|
Integer.class);
|
||||||
|
assertThat(count).isGreaterThanOrEqualTo(1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void postExecution_unknownFieldsAccepted() {
|
||||||
|
String json = """
|
||||||
|
{
|
||||||
|
"routeId": "route-unk",
|
||||||
|
"exchangeId": "exchange-unk",
|
||||||
|
"status": "COMPLETED",
|
||||||
|
"startTime": "2026-03-11T10:00:00Z",
|
||||||
|
"durationMs": 500,
|
||||||
|
"unknownField": "should-be-ignored",
|
||||||
|
"anotherUnknown": 42,
|
||||||
|
"processors": []
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
|
||||||
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||||
|
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||||
|
|
||||||
|
ResponseEntity<String> response = restTemplate.postForEntity(
|
||||||
|
"/api/v1/data/executions",
|
||||||
|
new HttpEntity<>(json, headers),
|
||||||
|
String.class);
|
||||||
|
|
||||||
|
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,74 @@
|
|||||||
|
package com.cameleer3.server.app.controller;
|
||||||
|
|
||||||
|
import com.cameleer3.server.app.AbstractClickHouseIT;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||||
|
import org.springframework.http.HttpEntity;
|
||||||
|
import org.springframework.http.HttpHeaders;
|
||||||
|
import org.springframework.http.HttpStatus;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.http.ResponseEntity;
|
||||||
|
|
||||||
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
|
class MetricsControllerIT extends AbstractClickHouseIT {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TestRestTemplate restTemplate;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void postMetrics_returns202() {
|
||||||
|
String json = """
|
||||||
|
[{
|
||||||
|
"agentId": "agent-1",
|
||||||
|
"collectedAt": "2026-03-11T10:00:00Z",
|
||||||
|
"metricName": "cpu.usage",
|
||||||
|
"metricValue": 75.5,
|
||||||
|
"tags": {"host": "server-1"}
|
||||||
|
}]
|
||||||
|
""";
|
||||||
|
|
||||||
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||||
|
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||||
|
|
||||||
|
ResponseEntity<String> response = restTemplate.postForEntity(
|
||||||
|
"/api/v1/data/metrics",
|
||||||
|
new HttpEntity<>(json, headers),
|
||||||
|
String.class);
|
||||||
|
|
||||||
|
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void postMetrics_dataAppearsInClickHouseAfterFlush() {
|
||||||
|
String json = """
|
||||||
|
[{
|
||||||
|
"agentId": "agent-flush-test",
|
||||||
|
"collectedAt": "2026-03-11T10:00:00Z",
|
||||||
|
"metricName": "memory.used",
|
||||||
|
"metricValue": 1024.0,
|
||||||
|
"tags": {}
|
||||||
|
}]
|
||||||
|
""";
|
||||||
|
|
||||||
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||||
|
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||||
|
|
||||||
|
restTemplate.postForEntity(
|
||||||
|
"/api/v1/data/metrics",
|
||||||
|
new HttpEntity<>(json, headers),
|
||||||
|
String.class);
|
||||||
|
|
||||||
|
await().atMost(10, SECONDS).untilAsserted(() -> {
|
||||||
|
Integer count = jdbcTemplate.queryForObject(
|
||||||
|
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-flush-test'",
|
||||||
|
Integer.class);
|
||||||
|
assertThat(count).isGreaterThanOrEqualTo(1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user