diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/DiagramLinkingIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/DiagramLinkingIT.java index 5da644d8..b44b634d 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/DiagramLinkingIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/DiagramLinkingIT.java @@ -2,20 +2,27 @@ package com.cameleer.server.app.storage; import com.cameleer.server.app.AbstractPostgresIT; import com.cameleer.server.app.TestSecurityHelper; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; /** - * Integration test proving that diagram_content_hash is populated during - * execution ingestion when a RouteGraph exists for the same route+agent. + * Integration test proving that diagram_content_hash is populated on + * executions when a RouteGraph exists for the same route+agent. All + * assertions go through the REST search + execution-detail endpoints + * (no raw SQL against ClickHouse). */ class DiagramLinkingIT extends AbstractPostgresIT { @@ -25,16 +32,21 @@ class DiagramLinkingIT extends AbstractPostgresIT { @Autowired private TestSecurityHelper securityHelper; + private final ObjectMapper objectMapper = new ObjectMapper(); + private HttpHeaders authHeaders; + private HttpHeaders viewerHeaders; + private final String agentId = "test-agent-diagram-linking-it"; @BeforeEach void setUp() { - String jwt = securityHelper.registerTestAgent("test-agent-diagram-linking-it"); + String jwt = securityHelper.registerTestAgent(agentId); authHeaders = securityHelper.authHeaders(jwt); + viewerHeaders = securityHelper.authHeadersNoBody(securityHelper.viewerToken()); } @Test - void diagramHashPopulated_whenRouteGraphExistsBeforeExecution() { + void diagramHashPopulated_whenRouteGraphExistsBeforeExecution() throws Exception { String graphJson = """ { "routeId": "diagram-link-route", @@ -56,33 +68,43 @@ class DiagramLinkingIT extends AbstractPostgresIT { String.class); assertThat(diagramResponse.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); - String diagramHash = jdbcTemplate.queryForObject( - "SELECT content_hash FROM route_diagrams WHERE route_id = 'diagram-link-route' LIMIT 1", - String.class); - assertThat(diagramHash).isNotNull().isNotEmpty(); + // Confirm the diagram is addressable via REST before we ingest the + // execution — otherwise the ingestion-service hash lookup could miss + // the not-yet-flushed graph and stamp an empty hash on the execution. + await().atMost(15, SECONDS).untilAsserted(() -> { + ResponseEntity probe = restTemplate.exchange( + "/api/v1/environments/default/apps/test-group/routes/diagram-link-route/diagram", + HttpMethod.GET, + new HttpEntity<>(viewerHeaders), + String.class); + assertThat(probe.getStatusCode()).isEqualTo(HttpStatus.OK); + }); String executionJson = """ { - "routeId": "diagram-link-route", "exchangeId": "ex-diag-link-1", + "applicationId": "test-group", + "instanceId": "%s", + "routeId": "diagram-link-route", "correlationId": "corr-diag-link-1", "status": "COMPLETED", "startTime": "2026-03-11T10:00:00Z", "endTime": "2026-03-11T10:00:01Z", "durationMs": 1000, + "chunkSeq": 0, + "final": true, "processors": [ { + "seq": 1, "processorId": "proc-1", "processorType": "bean", "status": "COMPLETED", "startTime": "2026-03-11T10:00:00Z", - "endTime": "2026-03-11T10:00:00.500Z", - "durationMs": 500, - "children": [] + "durationMs": 500 } ] } - """; + """.formatted(agentId); ResponseEntity execResponse = restTemplate.postForEntity( "/api/v1/data/executions", @@ -90,40 +112,44 @@ class DiagramLinkingIT extends AbstractPostgresIT { String.class); assertThat(execResponse.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); - String hash = jdbcTemplate.queryForObject( - "SELECT diagram_content_hash FROM executions WHERE route_id = 'diagram-link-route'", - String.class); - assertThat(hash) - .isNotNull() - .isNotEmpty() - .hasSize(64) - .matches("[a-f0-9]{64}"); + await().atMost(15, SECONDS).untilAsserted(() -> { + String hash = fetchDiagramContentHashByCorrelationId("corr-diag-link-1"); + assertThat(hash) + .as("diagram_content_hash on linked execution") + .isNotNull() + .isNotEmpty() + .hasSize(64) + .matches("[a-f0-9]{64}"); + }); } @Test - void diagramHashEmpty_whenNoRouteGraphExists() { + void diagramHashEmpty_whenNoRouteGraphExists() throws Exception { String executionJson = """ { - "routeId": "no-diagram-route", "exchangeId": "ex-no-diag-1", + "applicationId": "test-group", + "instanceId": "%s", + "routeId": "no-diagram-route", "correlationId": "corr-no-diag-1", "status": "COMPLETED", "startTime": "2026-03-11T10:00:00Z", "endTime": "2026-03-11T10:00:01Z", "durationMs": 1000, + "chunkSeq": 0, + "final": true, "processors": [ { + "seq": 1, "processorId": "proc-no-diag", "processorType": "log", "status": "COMPLETED", "startTime": "2026-03-11T10:00:00Z", - "endTime": "2026-03-11T10:00:00.500Z", - "durationMs": 500, - "children": [] + "durationMs": 500 } ] } - """; + """.formatted(agentId); ResponseEntity response = restTemplate.postForEntity( "/api/v1/data/executions", @@ -131,11 +157,42 @@ class DiagramLinkingIT extends AbstractPostgresIT { String.class); assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); - String hash = jdbcTemplate.queryForObject( - "SELECT diagram_content_hash FROM executions WHERE route_id = 'no-diagram-route'", + await().atMost(15, SECONDS).untilAsserted(() -> { + String hash = fetchDiagramContentHashByCorrelationId("corr-no-diag-1"); + assertThat(hash) + .as("diagram_content_hash on un-linked execution") + .isNotNull() + .isEmpty(); + }); + } + + /** + * Returns the {@code diagramContentHash} field off the execution-detail + * REST response, or null if the execution isn't visible yet. Forces the + * assertion pipeline to go controller→service→store rather than a raw + * SQL read against ClickHouse. + */ + private String fetchDiagramContentHashByCorrelationId(String correlationId) throws Exception { + ResponseEntity search = restTemplate.exchange( + "/api/v1/environments/default/executions?correlationId=" + correlationId, + HttpMethod.GET, + new HttpEntity<>(viewerHeaders), String.class); - assertThat(hash) - .isNotNull() - .isEmpty(); + if (search.getStatusCode() != HttpStatus.OK) return null; + JsonNode body = objectMapper.readTree(search.getBody()); + if (body.get("total").asLong() < 1) return null; + String execId = body.get("data").get(0).get("executionId").asText(); + + ResponseEntity detail = restTemplate.exchange( + "/api/v1/executions/" + execId, + HttpMethod.GET, + new HttpEntity<>(viewerHeaders), + String.class); + if (detail.getStatusCode() != HttpStatus.OK) return null; + JsonNode detailBody = objectMapper.readTree(detail.getBody()); + JsonNode field = detailBody.path("diagramContentHash"); + // JSON null → empty string, mirroring how the ingestion service + // stamps "" on executions with no linked diagram. + return field.isMissingNode() || field.isNull() ? "" : field.asText(); } } diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/IngestionSchemaIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/IngestionSchemaIT.java index 414a700d..3219a191 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/IngestionSchemaIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/IngestionSchemaIT.java @@ -2,23 +2,28 @@ package com.cameleer.server.app.storage; import com.cameleer.server.app.AbstractPostgresIT; import com.cameleer.server.app.TestSecurityHelper; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import java.util.List; -import java.util.Map; - +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; /** - * Integration test verifying that processor execution data is correctly populated - * during ingestion of route executions with nested processors and exchange data. + * Verifies the ingest→store→read pipeline preserves processor-tree shape and + * exchange bodies. All assertions go through the REST search + execution- + * detail endpoints — the processor tree returned there is reconstructed by + * DetailService.buildTree from the flat processor_executions rows, so it + * exercises both the write path (flattening) and the read path (tree build). */ class IngestionSchemaIT extends AbstractPostgresIT { @@ -28,178 +33,209 @@ class IngestionSchemaIT extends AbstractPostgresIT { @Autowired private TestSecurityHelper securityHelper; + private final ObjectMapper objectMapper = new ObjectMapper(); + + private final String agentId = "test-agent-ingestion-schema-it"; private HttpHeaders authHeaders; + private HttpHeaders viewerHeaders; @BeforeEach void setUp() { - String jwt = securityHelper.registerTestAgent("test-agent-ingestion-schema-it"); + String jwt = securityHelper.registerTestAgent(agentId); authHeaders = securityHelper.authHeaders(jwt); + viewerHeaders = securityHelper.authHeadersNoBody(securityHelper.viewerToken()); } @Test - void processorTreeMetadata_depthsAndParentIdsCorrect() { + void processorTreeMetadata_depthsAndParentIdsCorrect() throws Exception { String json = """ { - "routeId": "schema-test-tree", "exchangeId": "ex-tree-1", + "applicationId": "test-group", + "instanceId": "%s", + "routeId": "schema-test-tree", "correlationId": "corr-tree-1", "status": "COMPLETED", "startTime": "2026-03-11T10:00:00Z", "endTime": "2026-03-11T10:00:01Z", "durationMs": 1000, + "chunkSeq": 0, + "final": true, "processors": [ { + "seq": 1, "processorId": "root-proc", "processorType": "bean", "status": "COMPLETED", "startTime": "2026-03-11T10:00:00Z", - "endTime": "2026-03-11T10:00:00.500Z", "durationMs": 500, - "inputBody": "root-input", + "inputBody": "root-input", "outputBody": "root-output", "inputHeaders": {"Content-Type": "application/json"}, - "outputHeaders": {"X-Result": "ok"}, - "children": [ - { - "processorId": "child-proc", - "processorType": "log", - "status": "COMPLETED", - "startTime": "2026-03-11T10:00:00.100Z", - "endTime": "2026-03-11T10:00:00.400Z", - "durationMs": 300, - "inputBody": "child-input", - "outputBody": "child-output", - "children": [ - { - "processorId": "grandchild-proc", - "processorType": "setHeader", - "status": "COMPLETED", - "startTime": "2026-03-11T10:00:00.200Z", - "endTime": "2026-03-11T10:00:00.300Z", - "durationMs": 100, - "children": [] - } - ] - } - ] + "outputHeaders": {"X-Result": "ok"} + }, + { + "seq": 2, + "parentSeq": 1, + "parentProcessorId": "root-proc", + "processorId": "child-proc", + "processorType": "log", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00.100Z", + "durationMs": 300, + "inputBody": "child-input", + "outputBody": "child-output" + }, + { + "seq": 3, + "parentSeq": 2, + "parentProcessorId": "child-proc", + "processorId": "grandchild-proc", + "processorType": "setHeader", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00.200Z", + "durationMs": 100 } ] } - """; + """.formatted(agentId); postExecution(json); - // Verify execution row exists - Integer execCount = jdbcTemplate.queryForObject( - "SELECT count(*) FROM executions WHERE execution_id = 'ex-tree-1'", - Integer.class); - assertThat(execCount).isEqualTo(1); + JsonNode detail = awaitExecutionDetail("corr-tree-1"); + JsonNode processors = detail.get("processors"); + assertThat(processors).isNotNull(); + assertThat(processors).hasSize(1); // single root in the reconstructed tree - // Verify processors were flattened into processor_executions - List> processors = jdbcTemplate.queryForList( - "SELECT processor_id, processor_type, depth, parent_processor_id, " + - "input_body, output_body, input_headers " + - "FROM processor_executions WHERE execution_id = 'ex-tree-1' " + - "ORDER BY depth, processor_id"); - assertThat(processors).hasSize(3); + JsonNode root = processors.get(0); + assertThat(root.get("processorId").asText()).isEqualTo("root-proc"); + assertThat(root.get("processorType").asText()).isEqualTo("bean"); + assertThat(root.get("children")).hasSize(1); - // Root processor: depth=0, no parent - assertThat(processors.get(0).get("processor_id")).isEqualTo("root-proc"); - assertThat(((Number) processors.get(0).get("depth")).intValue()).isEqualTo(0); - assertThat(processors.get(0).get("parent_processor_id")).isNull(); - assertThat(processors.get(0).get("input_body")).isEqualTo("root-input"); - assertThat(processors.get(0).get("output_body")).isEqualTo("root-output"); - assertThat(processors.get(0).get("input_headers").toString()).contains("Content-Type"); + JsonNode child = root.get("children").get(0); + assertThat(child.get("processorId").asText()).isEqualTo("child-proc"); + assertThat(child.get("children")).hasSize(1); - // Child processor: depth=1, parent=root-proc - assertThat(processors.get(1).get("processor_id")).isEqualTo("child-proc"); - assertThat(((Number) processors.get(1).get("depth")).intValue()).isEqualTo(1); - assertThat(processors.get(1).get("parent_processor_id")).isEqualTo("root-proc"); - assertThat(processors.get(1).get("input_body")).isEqualTo("child-input"); - assertThat(processors.get(1).get("output_body")).isEqualTo("child-output"); - - // Grandchild processor: depth=2, parent=child-proc - assertThat(processors.get(2).get("processor_id")).isEqualTo("grandchild-proc"); - assertThat(((Number) processors.get(2).get("depth")).intValue()).isEqualTo(2); - assertThat(processors.get(2).get("parent_processor_id")).isEqualTo("child-proc"); + JsonNode grandchild = child.get("children").get(0); + assertThat(grandchild.get("processorId").asText()).isEqualTo("grandchild-proc"); + assertThat(grandchild.get("children")).isEmpty(); } @Test - void exchangeBodiesStored() { + void exchangeBodiesStored() throws Exception { String json = """ { - "routeId": "schema-test-bodies", "exchangeId": "ex-bodies-1", + "applicationId": "test-group", + "instanceId": "%s", + "routeId": "schema-test-bodies", + "correlationId": "corr-bodies-1", "status": "COMPLETED", "startTime": "2026-03-11T10:00:00Z", "endTime": "2026-03-11T10:00:01Z", "durationMs": 1000, + "chunkSeq": 0, + "final": true, "processors": [ { + "seq": 1, "processorId": "proc-1", "processorType": "bean", "status": "COMPLETED", "startTime": "2026-03-11T10:00:00Z", - "endTime": "2026-03-11T10:00:00.500Z", "durationMs": 500, "inputBody": "processor-body-text", - "outputBody": "processor-output-text", - "children": [] + "outputBody": "processor-output-text" } ] } - """; + """.formatted(agentId); postExecution(json); - // Verify processor body data - List> processors = jdbcTemplate.queryForList( - "SELECT input_body, output_body FROM processor_executions " + - "WHERE execution_id = 'ex-bodies-1'"); - assertThat(processors).hasSize(1); - assertThat(processors.get(0).get("input_body")).isEqualTo("processor-body-text"); - assertThat(processors.get(0).get("output_body")).isEqualTo("processor-output-text"); + JsonNode detail = awaitExecutionDetail("corr-bodies-1"); + String execId = detail.get("executionId").asText(); + + // Processor bodies are served via the detail processor-snapshot route + // (see rules: GET /api/v1/executions/{id}/processors/{seq}/snapshot). + ResponseEntity snap = restTemplate.exchange( + "/api/v1/executions/" + execId + "/processors/0/snapshot", + HttpMethod.GET, + new HttpEntity<>(viewerHeaders), + String.class); + assertThat(snap.getStatusCode()).isEqualTo(HttpStatus.OK); + JsonNode snapBody = objectMapper.readTree(snap.getBody()); + assertThat(snapBody.get("inputBody").asText()).isEqualTo("processor-body-text"); + assertThat(snapBody.get("outputBody").asText()).isEqualTo("processor-output-text"); } @Test - void nullSnapshots_insertSucceedsWithEmptyDefaults() { + void nullSnapshots_insertSucceedsWithEmptyDefaults() throws Exception { String json = """ { - "routeId": "schema-test-null-snap", "exchangeId": "ex-null-1", + "applicationId": "test-group", + "instanceId": "%s", + "routeId": "schema-test-null-snap", + "correlationId": "corr-null-1", "status": "COMPLETED", "startTime": "2026-03-11T10:00:00Z", "endTime": "2026-03-11T10:00:01Z", "durationMs": 1000, + "chunkSeq": 0, + "final": true, "processors": [ { + "seq": 1, "processorId": "proc-null", "processorType": "log", "status": "COMPLETED", "startTime": "2026-03-11T10:00:00Z", - "endTime": "2026-03-11T10:00:00.500Z", - "durationMs": 500, - "children": [] + "durationMs": 500 } ] } - """; + """.formatted(agentId); postExecution(json); - // Verify execution exists - Integer count = jdbcTemplate.queryForObject( - "SELECT count(*) FROM executions WHERE execution_id = 'ex-null-1'", - Integer.class); - assertThat(count).isEqualTo(1); - - // Verify processor with null bodies inserted successfully - List> processors = jdbcTemplate.queryForList( - "SELECT depth, parent_processor_id, input_body, output_body " + - "FROM processor_executions WHERE execution_id = 'ex-null-1'"); + JsonNode detail = awaitExecutionDetail("corr-null-1"); + JsonNode processors = detail.get("processors"); + assertThat(processors).isNotNull(); assertThat(processors).hasSize(1); - assertThat(((Number) processors.get(0).get("depth")).intValue()).isEqualTo(0); - assertThat(processors.get(0).get("parent_processor_id")).isNull(); + JsonNode root = processors.get(0); + assertThat(root.get("processorId").asText()).isEqualTo("proc-null"); + // Root has no parent in the reconstructed tree. + assertThat(root.get("children")).isEmpty(); + } + + /** + * Poll the search + detail endpoints until the execution shows up, then + * return the execution-detail JSON. Drives both CH writes and reads + * through the full REST stack. + */ + private JsonNode awaitExecutionDetail(String correlationId) throws Exception { + JsonNode[] holder = new JsonNode[1]; + await().atMost(15, SECONDS).untilAsserted(() -> { + ResponseEntity search = restTemplate.exchange( + "/api/v1/environments/default/executions?correlationId=" + correlationId, + HttpMethod.GET, + new HttpEntity<>(viewerHeaders), + String.class); + assertThat(search.getStatusCode()).isEqualTo(HttpStatus.OK); + JsonNode body = objectMapper.readTree(search.getBody()); + assertThat(body.get("total").asLong()).isGreaterThanOrEqualTo(1); + String execId = body.get("data").get(0).get("executionId").asText(); + + ResponseEntity detail = restTemplate.exchange( + "/api/v1/executions/" + execId, + HttpMethod.GET, + new HttpEntity<>(viewerHeaders), + String.class); + assertThat(detail.getStatusCode()).isEqualTo(HttpStatus.OK); + holder[0] = objectMapper.readTree(detail.getBody()); + }); + return holder[0]; } private void postExecution(String json) {