diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java new file mode 100644 index 00000000..6ed392b2 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java @@ -0,0 +1,110 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.AbstractClickHouseIT; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.TestPropertySource; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * Tests backpressure behavior when write buffers are full. + * Uses a tiny buffer (capacity=5) to easily trigger backpressure. + */ +@TestPropertySource(properties = { + "ingestion.buffer-capacity=5", + "ingestion.flush-interval-ms=5000" // slow flush so buffer stays full +}) +class BackpressureIT extends AbstractClickHouseIT { + + @Autowired + private TestRestTemplate restTemplate; + + @Test + void whenBufferFull_returns503WithRetryAfter() { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + + // Fill the buffer (capacity=5) by posting single executions + for (int i = 0; i < 5; i++) { + String json = String.format(""" + { + "routeId": "bp-route-%d", + "exchangeId": "bp-exchange-%d", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "durationMs": 100, + "processors": [] + } + """, i, i); + + restTemplate.postForEntity( + "/api/v1/data/executions", + new HttpEntity<>(json, headers), + String.class); + } + + // Next POST should get 503 since buffer is full + String overflowJson = """ + { + "routeId": "bp-overflow", + "exchangeId": "bp-overflow-exchange", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "durationMs": 100, + "processors": [] + } + """; + + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/data/executions", + new HttpEntity<>(overflowJson, headers), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE); + assertThat(response.getHeaders().getFirst("Retry-After")).isNotNull(); + } + + @Test + void bufferedDataNotLost_afterBackpressure() { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + + // Post data that fills the buffer + for (int i = 0; i < 3; i++) { + String json = String.format(""" + { + "routeId": "bp-persist-route", + "exchangeId": "bp-persist-%d", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "durationMs": 100, + "processors": [] + } + """, i); + + restTemplate.postForEntity( + "/api/v1/data/executions", + new HttpEntity<>(json, headers), + String.class); + } + + // Wait for flush to happen (flush interval is 5s in this test, but we wait longer) + await().atMost(15, SECONDS).untilAsserted(() -> { + Integer count = jdbcTemplate.queryForObject( + "SELECT count() FROM route_executions WHERE route_id = 'bp-persist-route'", + Integer.class); + assertThat(count).isGreaterThanOrEqualTo(3); + }); + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java new file mode 100644 index 00000000..0b0fb2f4 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java @@ -0,0 +1,105 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.AbstractClickHouseIT; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class DiagramControllerIT extends AbstractClickHouseIT { + + @Autowired + private TestRestTemplate restTemplate; + + @Test + void postSingleDiagram_returns202() { + String json = """ + { + "routeId": "diagram-route-1", + "description": "Test route", + "version": 1, + "nodes": [], + "edges": [], + "processorNodeMapping": {} + } + """; + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/data/diagrams", + new HttpEntity<>(json, headers), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + } + + @Test + void postDiagram_dataAppearsInClickHouseAfterFlush() { + String json = """ + { + "routeId": "diagram-flush-route", + "description": "Flush test", + "version": 1, + "nodes": [], + "edges": [], + "processorNodeMapping": {} + } + """; + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + + restTemplate.postForEntity( + "/api/v1/data/diagrams", + new HttpEntity<>(json, headers), + String.class); + + await().atMost(10, SECONDS).untilAsserted(() -> { + Integer count = jdbcTemplate.queryForObject( + "SELECT count() FROM route_diagrams WHERE route_id = 'diagram-flush-route'", + Integer.class); + assertThat(count).isGreaterThanOrEqualTo(1); + }); + } + + @Test + void postArrayOfDiagrams_returns202() { + String json = """ + [{ + "routeId": "diagram-arr-1", + "version": 1, + "nodes": [], + "edges": [] + }, + { + "routeId": "diagram-arr-2", + "version": 1, + "nodes": [], + "edges": [] + }] + """; + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/data/diagrams", + new HttpEntity<>(json, headers), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java new file mode 100644 index 00000000..0358ba74 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java @@ -0,0 +1,146 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.AbstractClickHouseIT; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.jdbc.core.JdbcTemplate; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class ExecutionControllerIT extends AbstractClickHouseIT { + + @Autowired + private TestRestTemplate restTemplate; + + @Test + void postSingleExecution_returns202() { + String json = """ + { + "routeId": "route-1", + "exchangeId": "exchange-1", + "correlationId": "corr-1", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "endTime": "2026-03-11T10:00:01Z", + "durationMs": 1000, + "errorMessage": "", + "errorStackTrace": "", + "processors": [] + } + """; + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/data/executions", + new HttpEntity<>(json, headers), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + } + + @Test + void postArrayOfExecutions_returns202() { + String json = """ + [{ + "routeId": "route-2", + "exchangeId": "exchange-2", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "endTime": "2026-03-11T10:00:01Z", + "durationMs": 1000, + "processors": [] + }, + { + "routeId": "route-3", + "exchangeId": "exchange-3", + "status": "FAILED", + "startTime": "2026-03-11T10:00:00Z", + "endTime": "2026-03-11T10:00:02Z", + "durationMs": 2000, + "errorMessage": "Something went wrong", + "processors": [] + }] + """; + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/data/executions", + new HttpEntity<>(json, headers), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + } + + @Test + void postExecution_dataAppearsInClickHouseAfterFlush() { + String json = """ + { + "routeId": "flush-test-route", + "exchangeId": "flush-exchange-1", + "correlationId": "flush-corr-1", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "endTime": "2026-03-11T10:00:01Z", + "durationMs": 1000, + "processors": [] + } + """; + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + + restTemplate.postForEntity( + "/api/v1/data/executions", + new HttpEntity<>(json, headers), + String.class); + + await().atMost(10, SECONDS).untilAsserted(() -> { + Integer count = jdbcTemplate.queryForObject( + "SELECT count() FROM route_executions WHERE route_id = 'flush-test-route'", + Integer.class); + assertThat(count).isGreaterThanOrEqualTo(1); + }); + } + + @Test + void postExecution_unknownFieldsAccepted() { + String json = """ + { + "routeId": "route-unk", + "exchangeId": "exchange-unk", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "durationMs": 500, + "unknownField": "should-be-ignored", + "anotherUnknown": 42, + "processors": [] + } + """; + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/data/executions", + new HttpEntity<>(json, headers), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java new file mode 100644 index 00000000..2d3b5c71 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java @@ -0,0 +1,74 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.AbstractClickHouseIT; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class MetricsControllerIT extends AbstractClickHouseIT { + + @Autowired + private TestRestTemplate restTemplate; + + @Test + void postMetrics_returns202() { + String json = """ + [{ + "agentId": "agent-1", + "collectedAt": "2026-03-11T10:00:00Z", + "metricName": "cpu.usage", + "metricValue": 75.5, + "tags": {"host": "server-1"} + }] + """; + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/data/metrics", + new HttpEntity<>(json, headers), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + } + + @Test + void postMetrics_dataAppearsInClickHouseAfterFlush() { + String json = """ + [{ + "agentId": "agent-flush-test", + "collectedAt": "2026-03-11T10:00:00Z", + "metricName": "memory.used", + "metricValue": 1024.0, + "tags": {} + }] + """; + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + + restTemplate.postForEntity( + "/api/v1/data/metrics", + new HttpEntity<>(json, headers), + String.class); + + await().atMost(10, SECONDS).untilAsserted(() -> { + Integer count = jdbcTemplate.queryForObject( + "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-flush-test'", + Integer.class); + assertThat(count).isGreaterThanOrEqualTo(1); + }); + } +}