fix(test): REST-drive Diagram-linking and IngestionSchema ITs

Both tests extend AbstractPostgresIT and inherit the Postgres jdbcTemplate,
which they were using to query ClickHouse-resident tables (executions,
processor_executions, route_diagrams). Now:
- DiagramLinkingIT reads diagramContentHash off the execution-detail REST
  response (and tolerates JSON null by normalising to empty string, which
  matches how the ingestion service stamps un-linked executions).
- IngestionSchemaIT asserts the reconstructed processor tree through the
  execution-detail endpoint (covers both flattening on write and
  buildTree on read) and reads processor bodies via the processor-snapshot
  endpoint rather than raw processor_executions rows.

Both tests now use the ExecutionChunk envelope on POST /data/executions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-21 22:20:05 +02:00
parent 5684479938
commit d5adaaab72
2 changed files with 221 additions and 128 deletions

View File

@@ -2,20 +2,27 @@ package com.cameleer.server.app.storage;
import com.cameleer.server.app.AbstractPostgresIT;
import com.cameleer.server.app.TestSecurityHelper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
/**
* Integration test proving that diagram_content_hash is populated during
* execution ingestion when a RouteGraph exists for the same route+agent.
* Integration test proving that diagram_content_hash is populated on
* executions when a RouteGraph exists for the same route+agent. All
* assertions go through the REST search + execution-detail endpoints
* (no raw SQL against ClickHouse).
*/
class DiagramLinkingIT extends AbstractPostgresIT {
@@ -25,16 +32,21 @@ class DiagramLinkingIT extends AbstractPostgresIT {
@Autowired
private TestSecurityHelper securityHelper;
private final ObjectMapper objectMapper = new ObjectMapper();
private HttpHeaders authHeaders;
private HttpHeaders viewerHeaders;
private final String agentId = "test-agent-diagram-linking-it";
@BeforeEach
void setUp() {
String jwt = securityHelper.registerTestAgent("test-agent-diagram-linking-it");
String jwt = securityHelper.registerTestAgent(agentId);
authHeaders = securityHelper.authHeaders(jwt);
viewerHeaders = securityHelper.authHeadersNoBody(securityHelper.viewerToken());
}
@Test
void diagramHashPopulated_whenRouteGraphExistsBeforeExecution() {
void diagramHashPopulated_whenRouteGraphExistsBeforeExecution() throws Exception {
String graphJson = """
{
"routeId": "diagram-link-route",
@@ -56,33 +68,43 @@ class DiagramLinkingIT extends AbstractPostgresIT {
String.class);
assertThat(diagramResponse.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
String diagramHash = jdbcTemplate.queryForObject(
"SELECT content_hash FROM route_diagrams WHERE route_id = 'diagram-link-route' LIMIT 1",
String.class);
assertThat(diagramHash).isNotNull().isNotEmpty();
// Confirm the diagram is addressable via REST before we ingest the
// execution — otherwise the ingestion-service hash lookup could miss
// the not-yet-flushed graph and stamp an empty hash on the execution.
await().atMost(15, SECONDS).untilAsserted(() -> {
ResponseEntity<String> probe = restTemplate.exchange(
"/api/v1/environments/default/apps/test-group/routes/diagram-link-route/diagram",
HttpMethod.GET,
new HttpEntity<>(viewerHeaders),
String.class);
assertThat(probe.getStatusCode()).isEqualTo(HttpStatus.OK);
});
String executionJson = """
{
"routeId": "diagram-link-route",
"exchangeId": "ex-diag-link-1",
"applicationId": "test-group",
"instanceId": "%s",
"routeId": "diagram-link-route",
"correlationId": "corr-diag-link-1",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:01Z",
"durationMs": 1000,
"chunkSeq": 0,
"final": true,
"processors": [
{
"seq": 1,
"processorId": "proc-1",
"processorType": "bean",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:00.500Z",
"durationMs": 500,
"children": []
"durationMs": 500
}
]
}
""";
""".formatted(agentId);
ResponseEntity<String> execResponse = restTemplate.postForEntity(
"/api/v1/data/executions",
@@ -90,40 +112,44 @@ class DiagramLinkingIT extends AbstractPostgresIT {
String.class);
assertThat(execResponse.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
String hash = jdbcTemplate.queryForObject(
"SELECT diagram_content_hash FROM executions WHERE route_id = 'diagram-link-route'",
String.class);
assertThat(hash)
.isNotNull()
.isNotEmpty()
.hasSize(64)
.matches("[a-f0-9]{64}");
await().atMost(15, SECONDS).untilAsserted(() -> {
String hash = fetchDiagramContentHashByCorrelationId("corr-diag-link-1");
assertThat(hash)
.as("diagram_content_hash on linked execution")
.isNotNull()
.isNotEmpty()
.hasSize(64)
.matches("[a-f0-9]{64}");
});
}
@Test
void diagramHashEmpty_whenNoRouteGraphExists() {
void diagramHashEmpty_whenNoRouteGraphExists() throws Exception {
String executionJson = """
{
"routeId": "no-diagram-route",
"exchangeId": "ex-no-diag-1",
"applicationId": "test-group",
"instanceId": "%s",
"routeId": "no-diagram-route",
"correlationId": "corr-no-diag-1",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:01Z",
"durationMs": 1000,
"chunkSeq": 0,
"final": true,
"processors": [
{
"seq": 1,
"processorId": "proc-no-diag",
"processorType": "log",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:00.500Z",
"durationMs": 500,
"children": []
"durationMs": 500
}
]
}
""";
""".formatted(agentId);
ResponseEntity<String> response = restTemplate.postForEntity(
"/api/v1/data/executions",
@@ -131,11 +157,42 @@ class DiagramLinkingIT extends AbstractPostgresIT {
String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
String hash = jdbcTemplate.queryForObject(
"SELECT diagram_content_hash FROM executions WHERE route_id = 'no-diagram-route'",
await().atMost(15, SECONDS).untilAsserted(() -> {
String hash = fetchDiagramContentHashByCorrelationId("corr-no-diag-1");
assertThat(hash)
.as("diagram_content_hash on un-linked execution")
.isNotNull()
.isEmpty();
});
}
/**
* Returns the {@code diagramContentHash} field off the execution-detail
* REST response, or null if the execution isn't visible yet. Forces the
* assertion pipeline to go controller→service→store rather than a raw
* SQL read against ClickHouse.
*/
private String fetchDiagramContentHashByCorrelationId(String correlationId) throws Exception {
ResponseEntity<String> search = restTemplate.exchange(
"/api/v1/environments/default/executions?correlationId=" + correlationId,
HttpMethod.GET,
new HttpEntity<>(viewerHeaders),
String.class);
assertThat(hash)
.isNotNull()
.isEmpty();
if (search.getStatusCode() != HttpStatus.OK) return null;
JsonNode body = objectMapper.readTree(search.getBody());
if (body.get("total").asLong() < 1) return null;
String execId = body.get("data").get(0).get("executionId").asText();
ResponseEntity<String> detail = restTemplate.exchange(
"/api/v1/executions/" + execId,
HttpMethod.GET,
new HttpEntity<>(viewerHeaders),
String.class);
if (detail.getStatusCode() != HttpStatus.OK) return null;
JsonNode detailBody = objectMapper.readTree(detail.getBody());
JsonNode field = detailBody.path("diagramContentHash");
// JSON null → empty string, mirroring how the ingestion service
// stamps "" on executions with no linked diagram.
return field.isMissingNode() || field.isNull() ? "" : field.asText();
}
}

View File

@@ -2,23 +2,28 @@ package com.cameleer.server.app.storage;
import com.cameleer.server.app.AbstractPostgresIT;
import com.cameleer.server.app.TestSecurityHelper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import java.util.List;
import java.util.Map;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
/**
* Integration test verifying that processor execution data is correctly populated
* during ingestion of route executions with nested processors and exchange data.
* Verifies the ingest→store→read pipeline preserves processor-tree shape and
* exchange bodies. All assertions go through the REST search + execution-
* detail endpoints — the processor tree returned there is reconstructed by
* DetailService.buildTree from the flat processor_executions rows, so it
* exercises both the write path (flattening) and the read path (tree build).
*/
class IngestionSchemaIT extends AbstractPostgresIT {
@@ -28,178 +33,209 @@ class IngestionSchemaIT extends AbstractPostgresIT {
@Autowired
private TestSecurityHelper securityHelper;
private final ObjectMapper objectMapper = new ObjectMapper();
private final String agentId = "test-agent-ingestion-schema-it";
private HttpHeaders authHeaders;
private HttpHeaders viewerHeaders;
@BeforeEach
void setUp() {
String jwt = securityHelper.registerTestAgent("test-agent-ingestion-schema-it");
String jwt = securityHelper.registerTestAgent(agentId);
authHeaders = securityHelper.authHeaders(jwt);
viewerHeaders = securityHelper.authHeadersNoBody(securityHelper.viewerToken());
}
@Test
void processorTreeMetadata_depthsAndParentIdsCorrect() {
void processorTreeMetadata_depthsAndParentIdsCorrect() throws Exception {
String json = """
{
"routeId": "schema-test-tree",
"exchangeId": "ex-tree-1",
"applicationId": "test-group",
"instanceId": "%s",
"routeId": "schema-test-tree",
"correlationId": "corr-tree-1",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:01Z",
"durationMs": 1000,
"chunkSeq": 0,
"final": true,
"processors": [
{
"seq": 1,
"processorId": "root-proc",
"processorType": "bean",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:00.500Z",
"durationMs": 500,
"inputBody": "root-input",
"inputBody": "root-input",
"outputBody": "root-output",
"inputHeaders": {"Content-Type": "application/json"},
"outputHeaders": {"X-Result": "ok"},
"children": [
{
"processorId": "child-proc",
"processorType": "log",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00.100Z",
"endTime": "2026-03-11T10:00:00.400Z",
"durationMs": 300,
"inputBody": "child-input",
"outputBody": "child-output",
"children": [
{
"processorId": "grandchild-proc",
"processorType": "setHeader",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00.200Z",
"endTime": "2026-03-11T10:00:00.300Z",
"durationMs": 100,
"children": []
}
]
}
]
"outputHeaders": {"X-Result": "ok"}
},
{
"seq": 2,
"parentSeq": 1,
"parentProcessorId": "root-proc",
"processorId": "child-proc",
"processorType": "log",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00.100Z",
"durationMs": 300,
"inputBody": "child-input",
"outputBody": "child-output"
},
{
"seq": 3,
"parentSeq": 2,
"parentProcessorId": "child-proc",
"processorId": "grandchild-proc",
"processorType": "setHeader",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00.200Z",
"durationMs": 100
}
]
}
""";
""".formatted(agentId);
postExecution(json);
// Verify execution row exists
Integer execCount = jdbcTemplate.queryForObject(
"SELECT count(*) FROM executions WHERE execution_id = 'ex-tree-1'",
Integer.class);
assertThat(execCount).isEqualTo(1);
JsonNode detail = awaitExecutionDetail("corr-tree-1");
JsonNode processors = detail.get("processors");
assertThat(processors).isNotNull();
assertThat(processors).hasSize(1); // single root in the reconstructed tree
// Verify processors were flattened into processor_executions
List<Map<String, Object>> processors = jdbcTemplate.queryForList(
"SELECT processor_id, processor_type, depth, parent_processor_id, " +
"input_body, output_body, input_headers " +
"FROM processor_executions WHERE execution_id = 'ex-tree-1' " +
"ORDER BY depth, processor_id");
assertThat(processors).hasSize(3);
JsonNode root = processors.get(0);
assertThat(root.get("processorId").asText()).isEqualTo("root-proc");
assertThat(root.get("processorType").asText()).isEqualTo("bean");
assertThat(root.get("children")).hasSize(1);
// Root processor: depth=0, no parent
assertThat(processors.get(0).get("processor_id")).isEqualTo("root-proc");
assertThat(((Number) processors.get(0).get("depth")).intValue()).isEqualTo(0);
assertThat(processors.get(0).get("parent_processor_id")).isNull();
assertThat(processors.get(0).get("input_body")).isEqualTo("root-input");
assertThat(processors.get(0).get("output_body")).isEqualTo("root-output");
assertThat(processors.get(0).get("input_headers").toString()).contains("Content-Type");
JsonNode child = root.get("children").get(0);
assertThat(child.get("processorId").asText()).isEqualTo("child-proc");
assertThat(child.get("children")).hasSize(1);
// Child processor: depth=1, parent=root-proc
assertThat(processors.get(1).get("processor_id")).isEqualTo("child-proc");
assertThat(((Number) processors.get(1).get("depth")).intValue()).isEqualTo(1);
assertThat(processors.get(1).get("parent_processor_id")).isEqualTo("root-proc");
assertThat(processors.get(1).get("input_body")).isEqualTo("child-input");
assertThat(processors.get(1).get("output_body")).isEqualTo("child-output");
// Grandchild processor: depth=2, parent=child-proc
assertThat(processors.get(2).get("processor_id")).isEqualTo("grandchild-proc");
assertThat(((Number) processors.get(2).get("depth")).intValue()).isEqualTo(2);
assertThat(processors.get(2).get("parent_processor_id")).isEqualTo("child-proc");
JsonNode grandchild = child.get("children").get(0);
assertThat(grandchild.get("processorId").asText()).isEqualTo("grandchild-proc");
assertThat(grandchild.get("children")).isEmpty();
}
@Test
void exchangeBodiesStored() {
void exchangeBodiesStored() throws Exception {
String json = """
{
"routeId": "schema-test-bodies",
"exchangeId": "ex-bodies-1",
"applicationId": "test-group",
"instanceId": "%s",
"routeId": "schema-test-bodies",
"correlationId": "corr-bodies-1",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:01Z",
"durationMs": 1000,
"chunkSeq": 0,
"final": true,
"processors": [
{
"seq": 1,
"processorId": "proc-1",
"processorType": "bean",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:00.500Z",
"durationMs": 500,
"inputBody": "processor-body-text",
"outputBody": "processor-output-text",
"children": []
"outputBody": "processor-output-text"
}
]
}
""";
""".formatted(agentId);
postExecution(json);
// Verify processor body data
List<Map<String, Object>> processors = jdbcTemplate.queryForList(
"SELECT input_body, output_body FROM processor_executions " +
"WHERE execution_id = 'ex-bodies-1'");
assertThat(processors).hasSize(1);
assertThat(processors.get(0).get("input_body")).isEqualTo("processor-body-text");
assertThat(processors.get(0).get("output_body")).isEqualTo("processor-output-text");
JsonNode detail = awaitExecutionDetail("corr-bodies-1");
String execId = detail.get("executionId").asText();
// Processor bodies are served via the detail processor-snapshot route
// (see rules: GET /api/v1/executions/{id}/processors/{seq}/snapshot).
ResponseEntity<String> snap = restTemplate.exchange(
"/api/v1/executions/" + execId + "/processors/0/snapshot",
HttpMethod.GET,
new HttpEntity<>(viewerHeaders),
String.class);
assertThat(snap.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode snapBody = objectMapper.readTree(snap.getBody());
assertThat(snapBody.get("inputBody").asText()).isEqualTo("processor-body-text");
assertThat(snapBody.get("outputBody").asText()).isEqualTo("processor-output-text");
}
@Test
void nullSnapshots_insertSucceedsWithEmptyDefaults() {
void nullSnapshots_insertSucceedsWithEmptyDefaults() throws Exception {
String json = """
{
"routeId": "schema-test-null-snap",
"exchangeId": "ex-null-1",
"applicationId": "test-group",
"instanceId": "%s",
"routeId": "schema-test-null-snap",
"correlationId": "corr-null-1",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:01Z",
"durationMs": 1000,
"chunkSeq": 0,
"final": true,
"processors": [
{
"seq": 1,
"processorId": "proc-null",
"processorType": "log",
"status": "COMPLETED",
"startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:00.500Z",
"durationMs": 500,
"children": []
"durationMs": 500
}
]
}
""";
""".formatted(agentId);
postExecution(json);
// Verify execution exists
Integer count = jdbcTemplate.queryForObject(
"SELECT count(*) FROM executions WHERE execution_id = 'ex-null-1'",
Integer.class);
assertThat(count).isEqualTo(1);
// Verify processor with null bodies inserted successfully
List<Map<String, Object>> processors = jdbcTemplate.queryForList(
"SELECT depth, parent_processor_id, input_body, output_body " +
"FROM processor_executions WHERE execution_id = 'ex-null-1'");
JsonNode detail = awaitExecutionDetail("corr-null-1");
JsonNode processors = detail.get("processors");
assertThat(processors).isNotNull();
assertThat(processors).hasSize(1);
assertThat(((Number) processors.get(0).get("depth")).intValue()).isEqualTo(0);
assertThat(processors.get(0).get("parent_processor_id")).isNull();
JsonNode root = processors.get(0);
assertThat(root.get("processorId").asText()).isEqualTo("proc-null");
// Root has no parent in the reconstructed tree.
assertThat(root.get("children")).isEmpty();
}
/**
* Poll the search + detail endpoints until the execution shows up, then
* return the execution-detail JSON. Drives both CH writes and reads
* through the full REST stack.
*/
private JsonNode awaitExecutionDetail(String correlationId) throws Exception {
JsonNode[] holder = new JsonNode[1];
await().atMost(15, SECONDS).untilAsserted(() -> {
ResponseEntity<String> search = restTemplate.exchange(
"/api/v1/environments/default/executions?correlationId=" + correlationId,
HttpMethod.GET,
new HttpEntity<>(viewerHeaders),
String.class);
assertThat(search.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(search.getBody());
assertThat(body.get("total").asLong()).isGreaterThanOrEqualTo(1);
String execId = body.get("data").get(0).get("executionId").asText();
ResponseEntity<String> detail = restTemplate.exchange(
"/api/v1/executions/" + execId,
HttpMethod.GET,
new HttpEntity<>(viewerHeaders),
String.class);
assertThat(detail.getStatusCode()).isEqualTo(HttpStatus.OK);
holder[0] = objectMapper.readTree(detail.getBody());
});
return holder[0];
}
private void postExecution(String json) {