fix(test): update Forward-compat / Protocol-version / Backpressure ITs

- ForwardCompatIT: send a valid ExecutionChunk envelope with extra
  unknown fields instead of a bare {futureField}. Was being parsed into
  an empty/degenerate chunk and rejected with 400.
- ProtocolVersionIT.requestWithCorrectProtocolVersionPassesInterceptor:
  same shape fix — minimal valid chunk so the controller's 400 is not
  an ambiguous signal for interceptor-passthrough.
- BackpressureIT:
  * TestPropertySource keys were "ingestion.*" but IngestionConfig is
    bound under "cameleer.server.ingestion.*" — overrides were ignored
    and the buffer stayed at its default 50_000, so the 503 overflow
    branch was unreachable. Corrected the keys.
  * MetricsFlushScheduler's @Scheduled uses a *different* key again
    ("ingestion.flush-interval-ms"), so we override that separately to
    stop the default 1s flush from draining the buffer mid-test.
  * executionIngestion_isSynchronous_returnsAccepted now uses the
    chunked envelope format.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-21 22:26:48 +02:00
parent 8283d531f6
commit 95f90f43dc
3 changed files with 62 additions and 7 deletions

View File

@@ -22,9 +22,17 @@ import static org.assertj.core.api.Assertions.assertThat;
* Only the metrics pipeline still uses a write buffer with backpressure. * Only the metrics pipeline still uses a write buffer with backpressure.
*/ */
@TestPropertySource(properties = { @TestPropertySource(properties = {
"ingestion.buffer-capacity=5", // Property keys must match the IngestionConfig @ConfigurationProperties
"ingestion.batch-size=5", // prefix (cameleer.server.ingestion) exactly — the old "ingestion.*"
"ingestion.flush-interval-ms=60000" // 60s -- effectively no flush during test // form was silently ignored and the metrics buffer stayed at its
// default of 50_000, which made the 503 overflow scenario unreachable.
"cameleer.server.ingestion.buffercapacity=5",
"cameleer.server.ingestion.batchsize=5",
"cameleer.server.ingestion.flushintervalms=60000",
// MetricsFlushScheduler's @Scheduled reads ingestion.flush-interval-ms
// (a separate key) — override both so the test doesn't race against
// the default 1s flush during the fill-then-overflow scenario.
"ingestion.flush-interval-ms=60000"
}) })
class BackpressureIT extends AbstractPostgresIT { class BackpressureIT extends AbstractPostgresIT {
@@ -81,7 +89,19 @@ class BackpressureIT extends AbstractPostgresIT {
@Test @Test
void executionIngestion_isSynchronous_returnsAccepted() { void executionIngestion_isSynchronous_returnsAccepted() {
String json = """ String json = """
{"routeId":"bp-sync","exchangeId":"bp-sync-e","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]} {
"exchangeId": "bp-sync-e",
"applicationId": "test-group",
"instanceId": "test-agent-backpressure-it",
"routeId": "bp-sync",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:00.100Z",
"durationMs": 100,
"chunkSeq": 0,
"final": true,
"processors": []
}
"""; """;
ResponseEntity<String> response = restTemplate.postForEntity( ResponseEntity<String> response = restTemplate.postForEntity(

View File

@@ -33,10 +33,26 @@ class ForwardCompatIT extends AbstractPostgresIT {
@Test @Test
void unknownFieldsInRequestBodyDoNotCauseError() { void unknownFieldsInRequestBodyDoNotCauseError() {
// Valid ExecutionChunk plus extra fields a future agent version
// might send. Jackson is configured with FAIL_ON_UNKNOWN_PROPERTIES
// = false on ChunkIngestionController, so the extras must be ignored
// and the envelope accepted with 202.
String jsonWithUnknownFields = """ String jsonWithUnknownFields = """
{ {
"futureField": "value", "exchangeId": "fwd-compat-1",
"anotherUnknown": 42 "applicationId": "test-group",
"instanceId": "test-agent-forward-compat-it",
"routeId": "fwd-compat-route",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:01Z",
"durationMs": 1000,
"chunkSeq": 0,
"final": true,
"processors": [],
"futureField": "value",
"anotherUnknown": 42,
"someNested": {"key": "v"}
} }
"""; """;

View File

@@ -65,7 +65,26 @@ class ProtocolVersionIT extends AbstractPostgresIT {
headers.setContentType(MediaType.APPLICATION_JSON); headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("Authorization", "Bearer " + jwt); headers.set("Authorization", "Bearer " + jwt);
headers.set("X-Cameleer-Protocol-Version", "1"); headers.set("X-Cameleer-Protocol-Version", "1");
var entity = new HttpEntity<>("{}", headers); // Minimal valid ExecutionChunk envelope so the controller can accept
// it; the prior {} body was treated by the chunk pipeline as an empty
// envelope and rejected with 400, which made the interceptor-passed
// signal ambiguous.
String chunk = """
{
"exchangeId": "protocol-version-1",
"applicationId": "test-group",
"instanceId": "test-agent-protocol-it",
"routeId": "protocol-version-route",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:01Z",
"durationMs": 1,
"chunkSeq": 0,
"final": true,
"processors": []
}
""";
var entity = new HttpEntity<>(chunk, headers);
var response = restTemplate.exchange( var response = restTemplate.exchange(
"/api/v1/data/executions", HttpMethod.POST, entity, String.class); "/api/v1/data/executions", HttpMethod.POST, entity, String.class);