fix: resolve all integration test failures after storage layer refactor

- Use singleton container pattern for PostgreSQL + OpenSearch testcontainers
  (fixes container lifecycle issues with @TestInstance(PER_CLASS))
- Fix table name route_executions → executions in DetailControllerIT and
  ExecutionControllerIT
- Serialize processor headers as JSON (ObjectMapper) instead of Map.toString()
  for JSONB column compatibility
- Add nested mapping for processors field in OpenSearch index template
- Use .keyword sub-field for term queries on dynamically mapped text fields
- Add wildcard fallback queries for all text searches (substring matching)
- Isolate stats tests with unique route names to prevent data contamination
- Wait for OpenSearch indexing in SearchControllerIT with targeted Awaitility
- Reduce OpenSearch debounce to 100ms in test profile

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-17 00:02:19 +01:00
parent 26f5a2ce3b
commit 796be06a09
9 changed files with 117 additions and 58 deletions

View File

@@ -52,7 +52,10 @@ public class OpenSearchIndex implements SearchIndex {
.template(t -> t
.settings(s -> s
.numberOfShards("3")
.numberOfReplicas("1")))));
.numberOfReplicas("1"))
.mappings(m -> m
.properties("processors", p -> p
.nested(n -> n))))));
log.info("OpenSearch index template created");
}
} catch (IOException e) {
@@ -148,27 +151,32 @@ public class OpenSearchIndex implements SearchIndex {
})));
}
// Keyword filters
// Keyword filters (use .keyword sub-field for exact matching on dynamically mapped text fields)
if (request.status() != null)
filter.add(termQuery("status", request.status()));
filter.add(termQuery("status.keyword", request.status()));
if (request.routeId() != null)
filter.add(termQuery("route_id", request.routeId()));
filter.add(termQuery("route_id.keyword", request.routeId()));
if (request.agentId() != null)
filter.add(termQuery("agent_id", request.agentId()));
filter.add(termQuery("agent_id.keyword", request.agentId()));
if (request.correlationId() != null)
filter.add(termQuery("correlation_id", request.correlationId()));
filter.add(termQuery("correlation_id.keyword", request.correlationId()));
// Full-text search across all fields + nested processor fields
if (request.text() != null && !request.text().isBlank()) {
String text = request.text();
String wildcard = "*" + text.toLowerCase() + "*";
List<Query> textQueries = new ArrayList<>();
// Search top-level text fields
// Search top-level text fields (analyzed match + wildcard for substring)
textQueries.add(Query.of(q -> q.multiMatch(m -> m
.query(text)
.fields("error_message", "error_stacktrace"))));
textQueries.add(Query.of(q -> q.wildcard(w -> w
.field("error_message").value(wildcard).caseInsensitive(true))));
textQueries.add(Query.of(q -> q.wildcard(w -> w
.field("error_stacktrace").value(wildcard).caseInsensitive(true))));
// Search nested processor fields
// Search nested processor fields (analyzed match + wildcard)
textQueries.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
@@ -176,6 +184,14 @@ public class OpenSearchIndex implements SearchIndex {
.fields("processors.input_body", "processors.output_body",
"processors.input_headers", "processors.output_headers",
"processors.error_message", "processors.error_stacktrace"))))));
textQueries.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.bool(nb -> nb.should(
wildcardQuery("processors.input_body", wildcard),
wildcardQuery("processors.output_body", wildcard),
wildcardQuery("processors.input_headers", wildcard),
wildcardQuery("processors.output_headers", wildcard)
).minimumShouldMatch("1"))))));
// Also try keyword fields for exact matches
textQueries.add(Query.of(q -> q.multiMatch(m -> m
@@ -185,32 +201,51 @@ public class OpenSearchIndex implements SearchIndex {
must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1"))));
}
// Scoped text searches
// Scoped text searches (multiMatch + wildcard fallback for substring matching)
if (request.textInBody() != null && !request.textInBody().isBlank()) {
String bodyText = request.textInBody();
String bodyWildcard = "*" + bodyText.toLowerCase() + "*";
must.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(request.textInBody())
.fields("processors.input_body", "processors.output_body"))))));
.query(nq -> nq.bool(nb -> nb.should(
Query.of(mq -> mq.multiMatch(m -> m
.query(bodyText)
.fields("processors.input_body", "processors.output_body"))),
wildcardQuery("processors.input_body", bodyWildcard),
wildcardQuery("processors.output_body", bodyWildcard)
).minimumShouldMatch("1"))))));
}
if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) {
String headerText = request.textInHeaders();
String headerWildcard = "*" + headerText.toLowerCase() + "*";
must.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(request.textInHeaders())
.fields("processors.input_headers", "processors.output_headers"))))));
.query(nq -> nq.bool(nb -> nb.should(
Query.of(mq -> mq.multiMatch(m -> m
.query(headerText)
.fields("processors.input_headers", "processors.output_headers"))),
wildcardQuery("processors.input_headers", headerWildcard),
wildcardQuery("processors.output_headers", headerWildcard)
).minimumShouldMatch("1"))))));
}
if (request.textInErrors() != null && !request.textInErrors().isBlank()) {
String errText = request.textInErrors();
String errWildcard = "*" + errText.toLowerCase() + "*";
must.add(Query.of(q -> q.bool(b -> b.should(
Query.of(sq -> sq.multiMatch(m -> m
.query(errText)
.fields("error_message", "error_stacktrace"))),
wildcardQuery("error_message", errWildcard),
wildcardQuery("error_stacktrace", errWildcard),
Query.of(sq -> sq.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(errText)
.fields("processors.error_message", "processors.error_stacktrace")))))
.query(nq -> nq.bool(nb -> nb.should(
Query.of(nmq -> nmq.multiMatch(m -> m
.query(errText)
.fields("processors.error_message", "processors.error_stacktrace"))),
wildcardQuery("processors.error_message", errWildcard),
wildcardQuery("processors.error_stacktrace", errWildcard)
).minimumShouldMatch("1")))))
).minimumShouldMatch("1"))));
}
@@ -238,6 +273,10 @@ public class OpenSearchIndex implements SearchIndex {
return Query.of(q -> q.term(t -> t.field(field).value(FieldValue.of(value))));
}
private Query wildcardQuery(String field, String pattern) {
return Query.of(q -> q.wildcard(w -> w.field(field).value(pattern).caseInsensitive(true)));
}
private Map<String, Object> toMap(ExecutionDocument doc) {
Map<String, Object> map = new LinkedHashMap<>();
map.put("execution_id", doc.executionId());

View File

@@ -1,5 +1,6 @@
package com.cameleer3.server.app;
import org.opensearch.testcontainers.OpensearchContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -7,25 +8,29 @@ import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles("test")
@Testcontainers
public abstract class AbstractPostgresIT {
private static final DockerImageName TIMESCALEDB_IMAGE =
DockerImageName.parse("timescale/timescaledb-ha:pg16")
.asCompatibleSubstituteFor("postgres");
@Container
static final PostgreSQLContainer<?> postgres =
new PostgreSQLContainer<>(TIMESCALEDB_IMAGE)
.withDatabaseName("cameleer3")
.withUsername("cameleer")
.withPassword("test");
static final PostgreSQLContainer<?> postgres;
static final OpensearchContainer<?> opensearch;
static {
postgres = new PostgreSQLContainer<>(TIMESCALEDB_IMAGE)
.withDatabaseName("cameleer3")
.withUsername("cameleer")
.withPassword("test");
postgres.start();
opensearch = new OpensearchContainer<>("opensearchproject/opensearch:2.19.0");
opensearch.start();
}
@Autowired
protected JdbcTemplate jdbcTemplate;
@@ -37,5 +42,6 @@ public abstract class AbstractPostgresIT {
registry.add("spring.datasource.password", postgres::getPassword);
registry.add("spring.datasource.driver-class-name", () -> "org.postgresql.Driver");
registry.add("spring.flyway.enabled", () -> "true");
registry.add("opensearch.url", opensearch::getHttpHostAddress);
}
}

View File

@@ -123,13 +123,13 @@ class DetailControllerIT extends AbstractPostgresIT {
// Wait for flush and get the execution_id
await().atMost(10, SECONDS).untilAsserted(() -> {
Integer count = jdbcTemplate.queryForObject(
"SELECT count(*) FROM route_executions WHERE route_id = 'detail-test-route'",
"SELECT count(*) FROM executions WHERE route_id = 'detail-test-route'",
Integer.class);
assertThat(count).isGreaterThanOrEqualTo(1);
});
seededExecutionId = jdbcTemplate.queryForObject(
"SELECT execution_id FROM route_executions WHERE route_id = 'detail-test-route' LIMIT 1",
"SELECT execution_id FROM executions WHERE route_id = 'detail-test-route' LIMIT 1",
String.class);
}

View File

@@ -111,7 +111,7 @@ class ExecutionControllerIT extends AbstractPostgresIT {
await().atMost(10, SECONDS).untilAsserted(() -> {
Integer count = jdbcTemplate.queryForObject(
"SELECT count(*) FROM route_executions WHERE route_id = 'flush-test-route'",
"SELECT count(*) FROM executions WHERE route_id = 'flush-test-route'",
Integer.class);
assertThat(count).isGreaterThanOrEqualTo(1);
});

View File

@@ -14,7 +14,9 @@ import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
/**
* Integration tests for the search controller endpoints.
@@ -153,11 +155,19 @@ class SearchControllerIT extends AbstractPostgresIT {
""", i, i, i, i, i));
}
// Verify all data is available (synchronous writes)
// Verify all data is in PostgreSQL (synchronous writes)
Integer count = jdbcTemplate.queryForObject(
"SELECT count(*) FROM executions WHERE route_id LIKE 'search-route-%'",
Integer.class);
assertThat(count).isEqualTo(10);
// Wait for async OpenSearch indexing (debounce + index time)
// Check for last seeded execution specifically to avoid false positives from other test classes
await().atMost(30, SECONDS).untilAsserted(() -> {
ResponseEntity<String> r = searchGet("?correlationId=corr-page-10");
JsonNode body = objectMapper.readTree(r.getBody());
assertThat(body.get("total").asLong()).isGreaterThanOrEqualTo(1);
});
}
@Test

View File

@@ -10,29 +10,16 @@ import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import org.junit.jupiter.api.Test;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.indices.RefreshRequest;
import org.opensearch.testcontainers.OpensearchContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.junit.jupiter.Container;
import java.time.Instant;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
// Extends AbstractPostgresIT for PostgreSQL datasource needed by Spring context
// Extends AbstractPostgresIT which provides both PostgreSQL and OpenSearch testcontainers
class OpenSearchIndexIT extends AbstractPostgresIT {
@Container
static final OpensearchContainer<?> opensearch =
new OpensearchContainer<>("opensearchproject/opensearch:2.19.0");
@DynamicPropertySource
static void configureOpenSearch(DynamicPropertyRegistry registry) {
registry.add("opensearch.url", opensearch::getHttpHostAddress);
}
@Autowired
SearchIndex searchIndex;

View File

@@ -23,30 +23,33 @@ class PostgresStatsStoreIT extends AbstractPostgresIT {
@Test
void statsReturnsCountsForTimeWindow() {
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
insertExecution("stats-1", "route-a", "app-1", "COMPLETED", now, 100L);
insertExecution("stats-2", "route-a", "app-1", "FAILED", now.plusSeconds(10), 200L);
insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L);
// Use a unique route + statsForRoute to avoid data contamination from other tests
String uniqueRoute = "stats-route-" + System.nanoTime();
Instant base = Instant.now().minus(5, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS);
insertExecution("stats-1-" + uniqueRoute, uniqueRoute, "app-stats", "COMPLETED", base, 100L);
insertExecution("stats-2-" + uniqueRoute, uniqueRoute, "app-stats", "FAILED", base.plusSeconds(10), 200L);
insertExecution("stats-3-" + uniqueRoute, uniqueRoute, "app-stats", "COMPLETED", base.plusSeconds(20), 50L);
// Force continuous aggregate refresh
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')");
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_route', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')");
ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60));
ExecutionStats stats = statsStore.statsForRoute(base.minusSeconds(60), base.plusSeconds(60), uniqueRoute, null);
assertEquals(3, stats.totalCount());
assertEquals(1, stats.failedCount());
}
@Test
void timeseriesReturnsBuckets() {
Instant now = Instant.now().truncatedTo(ChronoUnit.MINUTES);
String uniqueRoute = "ts-route-" + System.nanoTime();
Instant base = Instant.now().minus(10, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MINUTES);
for (int i = 0; i < 10; i++) {
insertExecution("ts-" + i, "route-a", "app-1", "COMPLETED",
now.plusSeconds(i * 30), 100L + i);
insertExecution("ts-" + i + "-" + uniqueRoute, uniqueRoute, "app-ts", "COMPLETED",
base.plusSeconds(i * 30), 100L + i);
}
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')");
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_route', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')");
StatsTimeseries ts = statsStore.timeseries(now.minus(1, ChronoUnit.MINUTES), now.plus(10, ChronoUnit.MINUTES), 5);
StatsTimeseries ts = statsStore.timeseriesForRoute(base.minus(1, ChronoUnit.MINUTES), base.plus(10, ChronoUnit.MINUTES), 5, uniqueRoute, null);
assertNotNull(ts);
assertFalse(ts.buckets().isEmpty());
}

View File

@@ -4,6 +4,7 @@ spring:
opensearch:
url: http://localhost:9200
debounce-ms: 100
ingestion:
buffer-capacity: 100

View File

@@ -8,13 +8,18 @@ import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
public class IngestionService {
private static final ObjectMapper JSON = new ObjectMapper();
private final ExecutionStore executionStore;
private final DiagramStore diagramStore;
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
@@ -99,8 +104,7 @@ public class IngestionService {
p.getDurationMs(),
p.getErrorMessage(), p.getErrorStackTrace(),
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
p.getInputHeaders() != null ? p.getInputHeaders().toString() : null,
p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null
toJson(p.getInputHeaders()), toJson(p.getOutputHeaders())
));
if (p.getChildren() != null) {
flat.addAll(flattenProcessors(
@@ -116,4 +120,13 @@ public class IngestionService {
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
return body;
}
private static String toJson(Map<String, String> headers) {
if (headers == null) return null;
try {
return JSON.writeValueAsString(headers);
} catch (JsonProcessingException e) {
return "{}";
}
}
}