From 796be06a09cef9a3aeb77d13712da821e4637ab1 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 17 Mar 2026 00:02:19 +0100 Subject: [PATCH] fix: resolve all integration test failures after storage layer refactor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Use singleton container pattern for PostgreSQL + OpenSearch testcontainers (fixes container lifecycle issues with @TestInstance(PER_CLASS)) - Fix table name route_executions → executions in DetailControllerIT and ExecutionControllerIT - Serialize processor headers as JSON (ObjectMapper) instead of Map.toString() for JSONB column compatibility - Add nested mapping for processors field in OpenSearch index template - Use .keyword sub-field for term queries on dynamically mapped text fields - Add wildcard fallback queries for all text searches (substring matching) - Isolate stats tests with unique route names to prevent data contamination - Wait for OpenSearch indexing in SearchControllerIT with targeted Awaitility - Reduce OpenSearch debounce to 100ms in test profile Co-Authored-By: Claude Opus 4.6 (1M context) --- .../server/app/search/OpenSearchIndex.java | 75 ++++++++++++++----- .../server/app/AbstractPostgresIT.java | 24 +++--- .../app/controller/DetailControllerIT.java | 4 +- .../app/controller/ExecutionControllerIT.java | 2 +- .../app/controller/SearchControllerIT.java | 12 ++- .../server/app/search/OpenSearchIndexIT.java | 15 +--- .../app/storage/PostgresStatsStoreIT.java | 25 ++++--- .../src/test/resources/application-test.yml | 1 + .../core/ingestion/IngestionService.java | 17 ++++- 9 files changed, 117 insertions(+), 58 deletions(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java index 892792fc..062f12fb 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java @@ -52,7 +52,10 @@ public class OpenSearchIndex implements SearchIndex { .template(t -> t .settings(s -> s .numberOfShards("3") - .numberOfReplicas("1"))))); + .numberOfReplicas("1")) + .mappings(m -> m + .properties("processors", p -> p + .nested(n -> n)))))); log.info("OpenSearch index template created"); } } catch (IOException e) { @@ -148,27 +151,32 @@ public class OpenSearchIndex implements SearchIndex { }))); } - // Keyword filters + // Keyword filters (use .keyword sub-field for exact matching on dynamically mapped text fields) if (request.status() != null) - filter.add(termQuery("status", request.status())); + filter.add(termQuery("status.keyword", request.status())); if (request.routeId() != null) - filter.add(termQuery("route_id", request.routeId())); + filter.add(termQuery("route_id.keyword", request.routeId())); if (request.agentId() != null) - filter.add(termQuery("agent_id", request.agentId())); + filter.add(termQuery("agent_id.keyword", request.agentId())); if (request.correlationId() != null) - filter.add(termQuery("correlation_id", request.correlationId())); + filter.add(termQuery("correlation_id.keyword", request.correlationId())); // Full-text search across all fields + nested processor fields if (request.text() != null && !request.text().isBlank()) { String text = request.text(); + String wildcard = "*" + text.toLowerCase() + "*"; List textQueries = new ArrayList<>(); - // Search top-level text fields + // Search top-level text fields (analyzed match + wildcard for substring) textQueries.add(Query.of(q -> q.multiMatch(m -> m .query(text) .fields("error_message", "error_stacktrace")))); + textQueries.add(Query.of(q -> q.wildcard(w -> w + .field("error_message").value(wildcard).caseInsensitive(true)))); + textQueries.add(Query.of(q -> q.wildcard(w -> w + .field("error_stacktrace").value(wildcard).caseInsensitive(true)))); - // Search nested processor fields + // Search nested processor fields (analyzed match + wildcard) textQueries.add(Query.of(q -> q.nested(n -> n .path("processors") .query(nq -> nq.multiMatch(m -> m @@ -176,6 +184,14 @@ public class OpenSearchIndex implements SearchIndex { .fields("processors.input_body", "processors.output_body", "processors.input_headers", "processors.output_headers", "processors.error_message", "processors.error_stacktrace")))))); + textQueries.add(Query.of(q -> q.nested(n -> n + .path("processors") + .query(nq -> nq.bool(nb -> nb.should( + wildcardQuery("processors.input_body", wildcard), + wildcardQuery("processors.output_body", wildcard), + wildcardQuery("processors.input_headers", wildcard), + wildcardQuery("processors.output_headers", wildcard) + ).minimumShouldMatch("1")))))); // Also try keyword fields for exact matches textQueries.add(Query.of(q -> q.multiMatch(m -> m @@ -185,32 +201,51 @@ public class OpenSearchIndex implements SearchIndex { must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1")))); } - // Scoped text searches + // Scoped text searches (multiMatch + wildcard fallback for substring matching) if (request.textInBody() != null && !request.textInBody().isBlank()) { + String bodyText = request.textInBody(); + String bodyWildcard = "*" + bodyText.toLowerCase() + "*"; must.add(Query.of(q -> q.nested(n -> n .path("processors") - .query(nq -> nq.multiMatch(m -> m - .query(request.textInBody()) - .fields("processors.input_body", "processors.output_body")))))); + .query(nq -> nq.bool(nb -> nb.should( + Query.of(mq -> mq.multiMatch(m -> m + .query(bodyText) + .fields("processors.input_body", "processors.output_body"))), + wildcardQuery("processors.input_body", bodyWildcard), + wildcardQuery("processors.output_body", bodyWildcard) + ).minimumShouldMatch("1")))))); } if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) { + String headerText = request.textInHeaders(); + String headerWildcard = "*" + headerText.toLowerCase() + "*"; must.add(Query.of(q -> q.nested(n -> n .path("processors") - .query(nq -> nq.multiMatch(m -> m - .query(request.textInHeaders()) - .fields("processors.input_headers", "processors.output_headers")))))); + .query(nq -> nq.bool(nb -> nb.should( + Query.of(mq -> mq.multiMatch(m -> m + .query(headerText) + .fields("processors.input_headers", "processors.output_headers"))), + wildcardQuery("processors.input_headers", headerWildcard), + wildcardQuery("processors.output_headers", headerWildcard) + ).minimumShouldMatch("1")))))); } if (request.textInErrors() != null && !request.textInErrors().isBlank()) { String errText = request.textInErrors(); + String errWildcard = "*" + errText.toLowerCase() + "*"; must.add(Query.of(q -> q.bool(b -> b.should( Query.of(sq -> sq.multiMatch(m -> m .query(errText) .fields("error_message", "error_stacktrace"))), + wildcardQuery("error_message", errWildcard), + wildcardQuery("error_stacktrace", errWildcard), Query.of(sq -> sq.nested(n -> n .path("processors") - .query(nq -> nq.multiMatch(m -> m - .query(errText) - .fields("processors.error_message", "processors.error_stacktrace"))))) + .query(nq -> nq.bool(nb -> nb.should( + Query.of(nmq -> nmq.multiMatch(m -> m + .query(errText) + .fields("processors.error_message", "processors.error_stacktrace"))), + wildcardQuery("processors.error_message", errWildcard), + wildcardQuery("processors.error_stacktrace", errWildcard) + ).minimumShouldMatch("1"))))) ).minimumShouldMatch("1")))); } @@ -238,6 +273,10 @@ public class OpenSearchIndex implements SearchIndex { return Query.of(q -> q.term(t -> t.field(field).value(FieldValue.of(value)))); } + private Query wildcardQuery(String field, String pattern) { + return Query.of(q -> q.wildcard(w -> w.field(field).value(pattern).caseInsensitive(true))); + } + private Map toMap(ExecutionDocument doc) { Map map = new LinkedHashMap<>(); map.put("execution_id", doc.executionId()); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java index b912bc1b..40962efd 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java @@ -1,5 +1,6 @@ package com.cameleer3.server.app; +import org.opensearch.testcontainers.OpensearchContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.jdbc.core.JdbcTemplate; @@ -7,25 +8,29 @@ import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @ActiveProfiles("test") -@Testcontainers public abstract class AbstractPostgresIT { private static final DockerImageName TIMESCALEDB_IMAGE = DockerImageName.parse("timescale/timescaledb-ha:pg16") .asCompatibleSubstituteFor("postgres"); - @Container - static final PostgreSQLContainer postgres = - new PostgreSQLContainer<>(TIMESCALEDB_IMAGE) - .withDatabaseName("cameleer3") - .withUsername("cameleer") - .withPassword("test"); + static final PostgreSQLContainer postgres; + static final OpensearchContainer opensearch; + + static { + postgres = new PostgreSQLContainer<>(TIMESCALEDB_IMAGE) + .withDatabaseName("cameleer3") + .withUsername("cameleer") + .withPassword("test"); + postgres.start(); + + opensearch = new OpensearchContainer<>("opensearchproject/opensearch:2.19.0"); + opensearch.start(); + } @Autowired protected JdbcTemplate jdbcTemplate; @@ -37,5 +42,6 @@ public abstract class AbstractPostgresIT { registry.add("spring.datasource.password", postgres::getPassword); registry.add("spring.datasource.driver-class-name", () -> "org.postgresql.Driver"); registry.add("spring.flyway.enabled", () -> "true"); + registry.add("opensearch.url", opensearch::getHttpHostAddress); } } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DetailControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DetailControllerIT.java index f0cef246..5229f883 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DetailControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DetailControllerIT.java @@ -123,13 +123,13 @@ class DetailControllerIT extends AbstractPostgresIT { // Wait for flush and get the execution_id await().atMost(10, SECONDS).untilAsserted(() -> { Integer count = jdbcTemplate.queryForObject( - "SELECT count(*) FROM route_executions WHERE route_id = 'detail-test-route'", + "SELECT count(*) FROM executions WHERE route_id = 'detail-test-route'", Integer.class); assertThat(count).isGreaterThanOrEqualTo(1); }); seededExecutionId = jdbcTemplate.queryForObject( - "SELECT execution_id FROM route_executions WHERE route_id = 'detail-test-route' LIMIT 1", + "SELECT execution_id FROM executions WHERE route_id = 'detail-test-route' LIMIT 1", String.class); } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java index 65f72d85..1ee376e2 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java @@ -111,7 +111,7 @@ class ExecutionControllerIT extends AbstractPostgresIT { await().atMost(10, SECONDS).untilAsserted(() -> { Integer count = jdbcTemplate.queryForObject( - "SELECT count(*) FROM route_executions WHERE route_id = 'flush-test-route'", + "SELECT count(*) FROM executions WHERE route_id = 'flush-test-route'", Integer.class); assertThat(count).isGreaterThanOrEqualTo(1); }); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java index dfcbe9a9..6a21552f 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java @@ -14,7 +14,9 @@ import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; /** * Integration tests for the search controller endpoints. @@ -153,11 +155,19 @@ class SearchControllerIT extends AbstractPostgresIT { """, i, i, i, i, i)); } - // Verify all data is available (synchronous writes) + // Verify all data is in PostgreSQL (synchronous writes) Integer count = jdbcTemplate.queryForObject( "SELECT count(*) FROM executions WHERE route_id LIKE 'search-route-%'", Integer.class); assertThat(count).isEqualTo(10); + + // Wait for async OpenSearch indexing (debounce + index time) + // Check for last seeded execution specifically to avoid false positives from other test classes + await().atMost(30, SECONDS).untilAsserted(() -> { + ResponseEntity r = searchGet("?correlationId=corr-page-10"); + JsonNode body = objectMapper.readTree(r.getBody()); + assertThat(body.get("total").asLong()).isGreaterThanOrEqualTo(1); + }); } @Test diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java index 7c8635ac..cdb0bff4 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java @@ -10,29 +10,16 @@ import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; import org.junit.jupiter.api.Test; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch.indices.RefreshRequest; -import org.opensearch.testcontainers.OpensearchContainer; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; -import org.testcontainers.junit.jupiter.Container; import java.time.Instant; import java.util.List; import static org.junit.jupiter.api.Assertions.*; -// Extends AbstractPostgresIT for PostgreSQL datasource needed by Spring context +// Extends AbstractPostgresIT which provides both PostgreSQL and OpenSearch testcontainers class OpenSearchIndexIT extends AbstractPostgresIT { - @Container - static final OpensearchContainer opensearch = - new OpensearchContainer<>("opensearchproject/opensearch:2.19.0"); - - @DynamicPropertySource - static void configureOpenSearch(DynamicPropertyRegistry registry) { - registry.add("opensearch.url", opensearch::getHttpHostAddress); - } - @Autowired SearchIndex searchIndex; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java index 70b3344b..c7bc748b 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java @@ -23,30 +23,33 @@ class PostgresStatsStoreIT extends AbstractPostgresIT { @Test void statsReturnsCountsForTimeWindow() { - Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); - insertExecution("stats-1", "route-a", "app-1", "COMPLETED", now, 100L); - insertExecution("stats-2", "route-a", "app-1", "FAILED", now.plusSeconds(10), 200L); - insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L); + // Use a unique route + statsForRoute to avoid data contamination from other tests + String uniqueRoute = "stats-route-" + System.nanoTime(); + Instant base = Instant.now().minus(5, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS); + insertExecution("stats-1-" + uniqueRoute, uniqueRoute, "app-stats", "COMPLETED", base, 100L); + insertExecution("stats-2-" + uniqueRoute, uniqueRoute, "app-stats", "FAILED", base.plusSeconds(10), 200L); + insertExecution("stats-3-" + uniqueRoute, uniqueRoute, "app-stats", "COMPLETED", base.plusSeconds(20), 50L); // Force continuous aggregate refresh - jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')"); + jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_route', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')"); - ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60)); + ExecutionStats stats = statsStore.statsForRoute(base.minusSeconds(60), base.plusSeconds(60), uniqueRoute, null); assertEquals(3, stats.totalCount()); assertEquals(1, stats.failedCount()); } @Test void timeseriesReturnsBuckets() { - Instant now = Instant.now().truncatedTo(ChronoUnit.MINUTES); + String uniqueRoute = "ts-route-" + System.nanoTime(); + Instant base = Instant.now().minus(10, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MINUTES); for (int i = 0; i < 10; i++) { - insertExecution("ts-" + i, "route-a", "app-1", "COMPLETED", - now.plusSeconds(i * 30), 100L + i); + insertExecution("ts-" + i + "-" + uniqueRoute, uniqueRoute, "app-ts", "COMPLETED", + base.plusSeconds(i * 30), 100L + i); } - jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')"); + jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_route', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')"); - StatsTimeseries ts = statsStore.timeseries(now.minus(1, ChronoUnit.MINUTES), now.plus(10, ChronoUnit.MINUTES), 5); + StatsTimeseries ts = statsStore.timeseriesForRoute(base.minus(1, ChronoUnit.MINUTES), base.plus(10, ChronoUnit.MINUTES), 5, uniqueRoute, null); assertNotNull(ts); assertFalse(ts.buckets().isEmpty()); } diff --git a/cameleer3-server-app/src/test/resources/application-test.yml b/cameleer3-server-app/src/test/resources/application-test.yml index f821e9ff..8a6708b5 100644 --- a/cameleer3-server-app/src/test/resources/application-test.yml +++ b/cameleer3-server-app/src/test/resources/application-test.yml @@ -4,6 +4,7 @@ spring: opensearch: url: http://localhost:9200 + debounce-ms: 100 ingestion: buffer-capacity: 100 diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java index e2c5e741..bdd32cfb 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java @@ -8,13 +8,18 @@ import com.cameleer3.server.core.storage.ExecutionStore; import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord; import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.function.Consumer; public class IngestionService { + private static final ObjectMapper JSON = new ObjectMapper(); + private final ExecutionStore executionStore; private final DiagramStore diagramStore; private final WriteBuffer metricsBuffer; @@ -99,8 +104,7 @@ public class IngestionService { p.getDurationMs(), p.getErrorMessage(), p.getErrorStackTrace(), truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()), - p.getInputHeaders() != null ? p.getInputHeaders().toString() : null, - p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null + toJson(p.getInputHeaders()), toJson(p.getOutputHeaders()) )); if (p.getChildren() != null) { flat.addAll(flattenProcessors( @@ -116,4 +120,13 @@ public class IngestionService { if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit); return body; } + + private static String toJson(Map headers) { + if (headers == null) return null; + try { + return JSON.writeValueAsString(headers); + } catch (JsonProcessingException e) { + return "{}"; + } + } }