From 26f5a2ce3b0bd6cd443762b912213aa0c5b7a191 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Mon, 16 Mar 2026 22:03:29 +0100 Subject: [PATCH] fix: update remaining ITs for synchronous ingestion and PostgreSQL storage - SearchControllerIT: remove @TestInstance(PER_CLASS), use @BeforeEach with static guard, fix table name (route_executions -> executions), remove Awaitility polling - OpenSearchIndexIT: replace Thread.sleep with explicit index refresh via OpenSearchClient - DiagramLinkingIT: fix table name, remove Awaitility awaits (writes are synchronous) - IngestionSchemaIT: rewrite queries for PostgreSQL relational model (processor_executions table instead of ClickHouse array columns) - PostgresStatsStoreIT: use explicit time bounds in refresh_continuous_aggregate calls - IngestionService: populate diagramContentHash during execution ingestion by looking up the latest diagram for the route+agent Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/controller/SearchControllerIT.java | 27 ++-- .../server/app/search/OpenSearchIndexIT.java | 13 +- .../server/app/storage/DiagramLinkingIT.java | 44 +++--- .../server/app/storage/IngestionSchemaIT.java | 145 +++++++----------- .../app/storage/PostgresStatsStoreIT.java | 4 +- .../core/ingestion/IngestionService.java | 5 +- 6 files changed, 102 insertions(+), 136 deletions(-) diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java index 439bfa5a..dfcbe9a9 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java @@ -4,9 +4,8 @@ import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.http.HttpEntity; @@ -15,15 +14,12 @@ import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; /** * Integration tests for the search controller endpoints. * Tests all filter types independently and in combination. */ -@TestInstance(TestInstance.Lifecycle.PER_CLASS) class SearchControllerIT extends AbstractPostgresIT { @Autowired @@ -34,15 +30,18 @@ class SearchControllerIT extends AbstractPostgresIT { private final ObjectMapper objectMapper = new ObjectMapper(); - private String jwt; - private String viewerJwt; + private static String jwt; + private static String viewerJwt; + private static boolean seeded; /** * Seed test data: Insert executions with varying statuses, times, durations, * correlationIds, error messages, and exchange snapshot data. */ - @BeforeAll + @BeforeEach void seedTestData() { + if (seeded) return; + seeded = true; jwt = securityHelper.registerTestAgent("test-agent-search-it"); viewerJwt = securityHelper.viewerToken(); @@ -154,13 +153,11 @@ class SearchControllerIT extends AbstractPostgresIT { """, i, i, i, i, i)); } - // Wait for all data to flush - await().atMost(10, SECONDS).untilAsserted(() -> { - Integer count = jdbcTemplate.queryForObject( - "SELECT count(*) FROM route_executions WHERE route_id LIKE 'search-route-%'", - Integer.class); - assertThat(count).isEqualTo(10); - }); + // Verify all data is available (synchronous writes) + Integer count = jdbcTemplate.queryForObject( + "SELECT count(*) FROM executions WHERE route_id LIKE 'search-route-%'", + Integer.class); + assertThat(count).isEqualTo(10); } @Test diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java index 2194ecb4..7c8635ac 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java @@ -8,6 +8,8 @@ import com.cameleer3.server.core.storage.SearchIndex; import com.cameleer3.server.core.storage.model.ExecutionDocument; import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; import org.junit.jupiter.api.Test; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.indices.RefreshRequest; import org.opensearch.testcontainers.OpensearchContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.DynamicPropertyRegistry; @@ -34,6 +36,9 @@ class OpenSearchIndexIT extends AbstractPostgresIT { @Autowired SearchIndex searchIndex; + @Autowired + OpenSearchClient openSearchClient; + @Test void indexAndSearchByText() throws Exception { Instant now = Instant.now(); @@ -46,7 +51,7 @@ class OpenSearchIndexIT extends AbstractPostgresIT { null, null, "request body with customer-99", null, null, null))); searchIndex.index(doc); - Thread.sleep(1500); // Allow OpenSearch refresh + refreshOpenSearchIndices(); SearchRequest request = new SearchRequest( null, now.minusSeconds(60), now.plusSeconds(60), @@ -71,7 +76,7 @@ class OpenSearchIndexIT extends AbstractPostgresIT { null, null, "UniquePayloadIdentifier12345", null, null, null))); searchIndex.index(doc); - Thread.sleep(1500); + refreshOpenSearchIndices(); SearchRequest request = new SearchRequest( null, now.minusSeconds(60), now.plusSeconds(60), @@ -83,4 +88,8 @@ class OpenSearchIndexIT extends AbstractPostgresIT { SearchResult result = searchIndex.search(request); assertTrue(result.total() > 0); } + + private void refreshOpenSearchIndices() throws Exception { + openSearchClient.indices().refresh(RefreshRequest.of(r -> r.index("executions-*"))); + } } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/DiagramLinkingIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/DiagramLinkingIT.java index ab0f01c3..7805b133 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/DiagramLinkingIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/DiagramLinkingIT.java @@ -11,9 +11,7 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; /** * Integration test proving that diagram_content_hash is populated during @@ -59,12 +57,10 @@ class DiagramLinkingIT extends AbstractPostgresIT { String.class); assertThat(diagramResponse.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); - await().atMost(10, SECONDS).untilAsserted(() -> { - String hash = jdbcTemplate.queryForObject( - "SELECT content_hash FROM route_diagrams WHERE route_id = 'diagram-link-route' LIMIT 1", - String.class); - assertThat(hash).isNotNull().isNotEmpty(); - }); + String diagramHash = jdbcTemplate.queryForObject( + "SELECT content_hash FROM route_diagrams WHERE route_id = 'diagram-link-route' LIMIT 1", + String.class); + assertThat(diagramHash).isNotNull().isNotEmpty(); String executionJson = """ { @@ -95,16 +91,14 @@ class DiagramLinkingIT extends AbstractPostgresIT { String.class); assertThat(execResponse.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); - await().atMost(10, SECONDS).ignoreExceptions().untilAsserted(() -> { - String hash = jdbcTemplate.queryForObject( - "SELECT diagram_content_hash FROM route_executions WHERE route_id = 'diagram-link-route'", - String.class); - assertThat(hash) - .isNotNull() - .isNotEmpty() - .hasSize(64) - .matches("[a-f0-9]{64}"); - }); + String hash = jdbcTemplate.queryForObject( + "SELECT diagram_content_hash FROM executions WHERE route_id = 'diagram-link-route'", + String.class); + assertThat(hash) + .isNotNull() + .isNotEmpty() + .hasSize(64) + .matches("[a-f0-9]{64}"); } @Test @@ -138,13 +132,11 @@ class DiagramLinkingIT extends AbstractPostgresIT { String.class); assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); - await().atMost(10, SECONDS).ignoreExceptions().untilAsserted(() -> { - String hash = jdbcTemplate.queryForObject( - "SELECT diagram_content_hash FROM route_executions WHERE route_id = 'no-diagram-route'", - String.class); - assertThat(hash) - .isNotNull() - .isEmpty(); - }); + String hash = jdbcTemplate.queryForObject( + "SELECT diagram_content_hash FROM executions WHERE route_id = 'no-diagram-route'", + String.class); + assertThat(hash) + .isNotNull() + .isEmpty(); } } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java index 4cfa8247..13cf60c8 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java @@ -11,15 +11,13 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import java.util.Arrays; import java.util.List; +import java.util.Map; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; /** - * Integration test verifying that Phase 2 schema columns are correctly populated + * Integration test verifying that processor execution data is correctly populated * during ingestion of route executions with nested processors and exchange data. */ class IngestionSchemaIT extends AbstractPostgresIT { @@ -39,7 +37,7 @@ class IngestionSchemaIT extends AbstractPostgresIT { } @Test - void processorTreeMetadata_depthsAndParentIndexesCorrect() { + void processorTreeMetadata_depthsAndParentIdsCorrect() { String json = """ { "routeId": "schema-test-tree", @@ -94,44 +92,46 @@ class IngestionSchemaIT extends AbstractPostgresIT { postExecution(json); - await().atMost(30, SECONDS).ignoreExceptions().untilAsserted(() -> { - var depths = queryArray( - "SELECT processor_depths FROM route_executions WHERE route_id = 'schema-test-tree'"); - assertThat(depths).containsExactly("0", "1", "2"); + // Verify execution row exists + Integer execCount = jdbcTemplate.queryForObject( + "SELECT count(*) FROM executions WHERE execution_id = 'ex-tree-1'", + Integer.class); + assertThat(execCount).isEqualTo(1); - var parentIndexes = queryArray( - "SELECT processor_parent_indexes FROM route_executions WHERE route_id = 'schema-test-tree'"); - assertThat(parentIndexes).containsExactly("-1", "0", "1"); + // Verify processors were flattened into processor_executions + List> processors = jdbcTemplate.queryForList( + "SELECT processor_id, processor_type, depth, parent_processor_id, " + + "diagram_node_id, input_body, output_body, input_headers " + + "FROM processor_executions WHERE execution_id = 'ex-tree-1' " + + "ORDER BY depth, processor_id"); + assertThat(processors).hasSize(3); - var diagramNodeIds = queryArray( - "SELECT processor_diagram_node_ids FROM route_executions WHERE route_id = 'schema-test-tree'"); - assertThat(diagramNodeIds).containsExactly("node-root", "node-child", "node-grandchild"); + // Root processor: depth=0, no parent + assertThat(processors.get(0).get("processor_id")).isEqualTo("root-proc"); + assertThat(((Number) processors.get(0).get("depth")).intValue()).isEqualTo(0); + assertThat(processors.get(0).get("parent_processor_id")).isNull(); + assertThat(processors.get(0).get("diagram_node_id")).isEqualTo("node-root"); + assertThat(processors.get(0).get("input_body")).isEqualTo("root-input"); + assertThat(processors.get(0).get("output_body")).isEqualTo("root-output"); + assertThat(processors.get(0).get("input_headers").toString()).contains("Content-Type"); - String bodies = jdbcTemplate.queryForObject( - "SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-tree'", - String.class); - assertThat(bodies).contains("root-input"); - assertThat(bodies).contains("root-output"); - assertThat(bodies).contains("child-input"); - assertThat(bodies).contains("child-output"); + // Child processor: depth=1, parent=root-proc + assertThat(processors.get(1).get("processor_id")).isEqualTo("child-proc"); + assertThat(((Number) processors.get(1).get("depth")).intValue()).isEqualTo(1); + assertThat(processors.get(1).get("parent_processor_id")).isEqualTo("root-proc"); + assertThat(processors.get(1).get("diagram_node_id")).isEqualTo("node-child"); + assertThat(processors.get(1).get("input_body")).isEqualTo("child-input"); + assertThat(processors.get(1).get("output_body")).isEqualTo("child-output"); - var inputBodies = queryArray( - "SELECT processor_input_bodies FROM route_executions WHERE route_id = 'schema-test-tree'"); - assertThat(inputBodies).containsExactly("root-input", "child-input", ""); - - var outputBodies = queryArray( - "SELECT processor_output_bodies FROM route_executions WHERE route_id = 'schema-test-tree'"); - assertThat(outputBodies).containsExactly("root-output", "child-output", ""); - - var inputHeaders = queryArray( - "SELECT processor_input_headers FROM route_executions WHERE route_id = 'schema-test-tree'"); - assertThat(inputHeaders.get(0)).contains("Content-Type"); - assertThat(inputHeaders.get(0)).contains("application/json"); - }); + // Grandchild processor: depth=2, parent=child-proc + assertThat(processors.get(2).get("processor_id")).isEqualTo("grandchild-proc"); + assertThat(((Number) processors.get(2).get("depth")).intValue()).isEqualTo(2); + assertThat(processors.get(2).get("parent_processor_id")).isEqualTo("child-proc"); + assertThat(processors.get(2).get("diagram_node_id")).isEqualTo("node-grandchild"); } @Test - void exchangeBodiesContainsConcatenatedText() { + void exchangeBodiesStored() { String json = """ { "routeId": "schema-test-bodies", @@ -140,14 +140,6 @@ class IngestionSchemaIT extends AbstractPostgresIT { "startTime": "2026-03-11T10:00:00Z", "endTime": "2026-03-11T10:00:01Z", "durationMs": 1000, - "inputSnapshot": { - "body": "route-level-input-body", - "headers": {"X-Route": "header-value"} - }, - "outputSnapshot": { - "body": "route-level-output-body", - "headers": {} - }, "processors": [ { "processorId": "proc-1", @@ -166,21 +158,13 @@ class IngestionSchemaIT extends AbstractPostgresIT { postExecution(json); - await().atMost(30, SECONDS).ignoreExceptions().untilAsserted(() -> { - String bodies = jdbcTemplate.queryForObject( - "SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-bodies'", - String.class); - assertThat(bodies).contains("processor-body-text"); - assertThat(bodies).contains("processor-output-text"); - assertThat(bodies).contains("route-level-input-body"); - assertThat(bodies).contains("route-level-output-body"); - - String headers = jdbcTemplate.queryForObject( - "SELECT exchange_headers FROM route_executions WHERE route_id = 'schema-test-bodies'", - String.class); - assertThat(headers).contains("X-Route"); - assertThat(headers).contains("header-value"); - }); + // Verify processor body data + List> processors = jdbcTemplate.queryForList( + "SELECT input_body, output_body FROM processor_executions " + + "WHERE execution_id = 'ex-bodies-1'"); + assertThat(processors).hasSize(1); + assertThat(processors.get(0).get("input_body")).isEqualTo("processor-body-text"); + assertThat(processors.get(0).get("output_body")).isEqualTo("processor-output-text"); } @Test @@ -209,20 +193,19 @@ class IngestionSchemaIT extends AbstractPostgresIT { postExecution(json); - await().atMost(30, SECONDS).ignoreExceptions().untilAsserted(() -> { - String bodies = jdbcTemplate.queryForObject( - "SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-null-snap'", - String.class); - assertThat(bodies).isNotNull(); + // Verify execution exists + Integer count = jdbcTemplate.queryForObject( + "SELECT count(*) FROM executions WHERE execution_id = 'ex-null-1'", + Integer.class); + assertThat(count).isEqualTo(1); - var depths = queryArray( - "SELECT processor_depths FROM route_executions WHERE route_id = 'schema-test-null-snap'"); - assertThat(depths).containsExactly("0"); - - var parentIndexes = queryArray( - "SELECT processor_parent_indexes FROM route_executions WHERE route_id = 'schema-test-null-snap'"); - assertThat(parentIndexes).containsExactly("-1"); - }); + // Verify processor with null bodies inserted successfully + List> processors = jdbcTemplate.queryForList( + "SELECT depth, parent_processor_id, input_body, output_body " + + "FROM processor_executions WHERE execution_id = 'ex-null-1'"); + assertThat(processors).hasSize(1); + assertThat(((Number) processors.get(0).get("depth")).intValue()).isEqualTo(0); + assertThat(processors.get(0).get("parent_processor_id")).isNull(); } private void postExecution(String json) { @@ -233,22 +216,4 @@ class IngestionSchemaIT extends AbstractPostgresIT { assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); } - - private List queryArray(String sql) { - return jdbcTemplate.query(sql, (rs, rowNum) -> { - Object arr = rs.getArray(1).getArray(); - if (arr instanceof Object[] objects) { - return Arrays.stream(objects).map(Object::toString).toList(); - } else if (arr instanceof short[] shorts) { - var result = new java.util.ArrayList(); - for (short s : shorts) result.add(String.valueOf(s)); - return result; - } else if (arr instanceof int[] ints) { - var result = new java.util.ArrayList(); - for (int v : ints) result.add(String.valueOf(v)); - return result; - } - return List.of(); - }).get(0); - } } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java index efdbeea1..70b3344b 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java @@ -29,7 +29,7 @@ class PostgresStatsStoreIT extends AbstractPostgresIT { insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L); // Force continuous aggregate refresh - jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)"); + jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')"); ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60)); assertEquals(3, stats.totalCount()); @@ -44,7 +44,7 @@ class PostgresStatsStoreIT extends AbstractPostgresIT { now.plusSeconds(i * 30), 100L + i); } - jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)"); + jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')"); StatsTimeseries ts = statsStore.timeseries(now.minus(1, ChronoUnit.MINUTES), now.plus(10, ChronoUnit.MINUTES), 5); assertNotNull(ts); diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java index 36419fb8..e2c5e741 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java @@ -69,6 +69,9 @@ public class IngestionService { private ExecutionRecord toExecutionRecord(String agentId, String groupName, RouteExecution exec) { + String diagramHash = diagramStore + .findContentHashForRoute(exec.getRouteId(), agentId) + .orElse(""); return new ExecutionRecord( exec.getExchangeId(), exec.getRouteId(), agentId, groupName, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING", @@ -76,7 +79,7 @@ public class IngestionService { exec.getStartTime(), exec.getEndTime(), exec.getDurationMs(), exec.getErrorMessage(), exec.getErrorStackTrace(), - null // diagramContentHash set separately + diagramHash ); }