fix: update remaining ITs for synchronous ingestion and PostgreSQL storage

- SearchControllerIT: remove @TestInstance(PER_CLASS), use @BeforeEach with
  static guard, fix table name (route_executions -> executions), remove
  Awaitility polling
- OpenSearchIndexIT: replace Thread.sleep with explicit index refresh via
  OpenSearchClient
- DiagramLinkingIT: fix table name, remove Awaitility awaits (writes are
  synchronous)
- IngestionSchemaIT: rewrite queries for PostgreSQL relational model
  (processor_executions table instead of ClickHouse array columns)
- PostgresStatsStoreIT: use explicit time bounds in
  refresh_continuous_aggregate calls
- IngestionService: populate diagramContentHash during execution ingestion
  by looking up the latest diagram for the route+agent

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-16 22:03:29 +01:00
parent d23b899f00
commit 26f5a2ce3b
6 changed files with 102 additions and 136 deletions

View File

@@ -4,9 +4,8 @@ import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper; import com.cameleer3.server.app.TestSecurityHelper;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.HttpEntity; import org.springframework.http.HttpEntity;
@@ -15,15 +14,12 @@ import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
/** /**
* Integration tests for the search controller endpoints. * Integration tests for the search controller endpoints.
* Tests all filter types independently and in combination. * Tests all filter types independently and in combination.
*/ */
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class SearchControllerIT extends AbstractPostgresIT { class SearchControllerIT extends AbstractPostgresIT {
@Autowired @Autowired
@@ -34,15 +30,18 @@ class SearchControllerIT extends AbstractPostgresIT {
private final ObjectMapper objectMapper = new ObjectMapper(); private final ObjectMapper objectMapper = new ObjectMapper();
private String jwt; private static String jwt;
private String viewerJwt; private static String viewerJwt;
private static boolean seeded;
/** /**
* Seed test data: Insert executions with varying statuses, times, durations, * Seed test data: Insert executions with varying statuses, times, durations,
* correlationIds, error messages, and exchange snapshot data. * correlationIds, error messages, and exchange snapshot data.
*/ */
@BeforeAll @BeforeEach
void seedTestData() { void seedTestData() {
if (seeded) return;
seeded = true;
jwt = securityHelper.registerTestAgent("test-agent-search-it"); jwt = securityHelper.registerTestAgent("test-agent-search-it");
viewerJwt = securityHelper.viewerToken(); viewerJwt = securityHelper.viewerToken();
@@ -154,13 +153,11 @@ class SearchControllerIT extends AbstractPostgresIT {
""", i, i, i, i, i)); """, i, i, i, i, i));
} }
// Wait for all data to flush // Verify all data is available (synchronous writes)
await().atMost(10, SECONDS).untilAsserted(() -> { Integer count = jdbcTemplate.queryForObject(
Integer count = jdbcTemplate.queryForObject( "SELECT count(*) FROM executions WHERE route_id LIKE 'search-route-%'",
"SELECT count(*) FROM route_executions WHERE route_id LIKE 'search-route-%'", Integer.class);
Integer.class); assertThat(count).isEqualTo(10);
assertThat(count).isEqualTo(10);
});
} }
@Test @Test

View File

@@ -8,6 +8,8 @@ import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument; import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.indices.RefreshRequest;
import org.opensearch.testcontainers.OpensearchContainer; import org.opensearch.testcontainers.OpensearchContainer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertyRegistry;
@@ -34,6 +36,9 @@ class OpenSearchIndexIT extends AbstractPostgresIT {
@Autowired @Autowired
SearchIndex searchIndex; SearchIndex searchIndex;
@Autowired
OpenSearchClient openSearchClient;
@Test @Test
void indexAndSearchByText() throws Exception { void indexAndSearchByText() throws Exception {
Instant now = Instant.now(); Instant now = Instant.now();
@@ -46,7 +51,7 @@ class OpenSearchIndexIT extends AbstractPostgresIT {
null, null, "request body with customer-99", null, null, null))); null, null, "request body with customer-99", null, null, null)));
searchIndex.index(doc); searchIndex.index(doc);
Thread.sleep(1500); // Allow OpenSearch refresh refreshOpenSearchIndices();
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, now.minusSeconds(60), now.plusSeconds(60), null, now.minusSeconds(60), now.plusSeconds(60),
@@ -71,7 +76,7 @@ class OpenSearchIndexIT extends AbstractPostgresIT {
null, null, "UniquePayloadIdentifier12345", null, null, null))); null, null, "UniquePayloadIdentifier12345", null, null, null)));
searchIndex.index(doc); searchIndex.index(doc);
Thread.sleep(1500); refreshOpenSearchIndices();
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, now.minusSeconds(60), now.plusSeconds(60), null, now.minusSeconds(60), now.plusSeconds(60),
@@ -83,4 +88,8 @@ class OpenSearchIndexIT extends AbstractPostgresIT {
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertTrue(result.total() > 0); assertTrue(result.total() > 0);
} }
private void refreshOpenSearchIndices() throws Exception {
openSearchClient.indices().refresh(RefreshRequest.of(r -> r.index("executions-*")));
}
} }

View File

@@ -11,9 +11,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
/** /**
* Integration test proving that diagram_content_hash is populated during * Integration test proving that diagram_content_hash is populated during
@@ -59,12 +57,10 @@ class DiagramLinkingIT extends AbstractPostgresIT {
String.class); String.class);
assertThat(diagramResponse.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); assertThat(diagramResponse.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
await().atMost(10, SECONDS).untilAsserted(() -> { String diagramHash = jdbcTemplate.queryForObject(
String hash = jdbcTemplate.queryForObject( "SELECT content_hash FROM route_diagrams WHERE route_id = 'diagram-link-route' LIMIT 1",
"SELECT content_hash FROM route_diagrams WHERE route_id = 'diagram-link-route' LIMIT 1", String.class);
String.class); assertThat(diagramHash).isNotNull().isNotEmpty();
assertThat(hash).isNotNull().isNotEmpty();
});
String executionJson = """ String executionJson = """
{ {
@@ -95,16 +91,14 @@ class DiagramLinkingIT extends AbstractPostgresIT {
String.class); String.class);
assertThat(execResponse.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); assertThat(execResponse.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
await().atMost(10, SECONDS).ignoreExceptions().untilAsserted(() -> { String hash = jdbcTemplate.queryForObject(
String hash = jdbcTemplate.queryForObject( "SELECT diagram_content_hash FROM executions WHERE route_id = 'diagram-link-route'",
"SELECT diagram_content_hash FROM route_executions WHERE route_id = 'diagram-link-route'", String.class);
String.class); assertThat(hash)
assertThat(hash) .isNotNull()
.isNotNull() .isNotEmpty()
.isNotEmpty() .hasSize(64)
.hasSize(64) .matches("[a-f0-9]{64}");
.matches("[a-f0-9]{64}");
});
} }
@Test @Test
@@ -138,13 +132,11 @@ class DiagramLinkingIT extends AbstractPostgresIT {
String.class); String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
await().atMost(10, SECONDS).ignoreExceptions().untilAsserted(() -> { String hash = jdbcTemplate.queryForObject(
String hash = jdbcTemplate.queryForObject( "SELECT diagram_content_hash FROM executions WHERE route_id = 'no-diagram-route'",
"SELECT diagram_content_hash FROM route_executions WHERE route_id = 'no-diagram-route'", String.class);
String.class); assertThat(hash)
assertThat(hash) .isNotNull()
.isNotNull() .isEmpty();
.isEmpty();
});
} }
} }

View File

@@ -11,15 +11,13 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
/** /**
* Integration test verifying that Phase 2 schema columns are correctly populated * Integration test verifying that processor execution data is correctly populated
* during ingestion of route executions with nested processors and exchange data. * during ingestion of route executions with nested processors and exchange data.
*/ */
class IngestionSchemaIT extends AbstractPostgresIT { class IngestionSchemaIT extends AbstractPostgresIT {
@@ -39,7 +37,7 @@ class IngestionSchemaIT extends AbstractPostgresIT {
} }
@Test @Test
void processorTreeMetadata_depthsAndParentIndexesCorrect() { void processorTreeMetadata_depthsAndParentIdsCorrect() {
String json = """ String json = """
{ {
"routeId": "schema-test-tree", "routeId": "schema-test-tree",
@@ -94,44 +92,46 @@ class IngestionSchemaIT extends AbstractPostgresIT {
postExecution(json); postExecution(json);
await().atMost(30, SECONDS).ignoreExceptions().untilAsserted(() -> { // Verify execution row exists
var depths = queryArray( Integer execCount = jdbcTemplate.queryForObject(
"SELECT processor_depths FROM route_executions WHERE route_id = 'schema-test-tree'"); "SELECT count(*) FROM executions WHERE execution_id = 'ex-tree-1'",
assertThat(depths).containsExactly("0", "1", "2"); Integer.class);
assertThat(execCount).isEqualTo(1);
var parentIndexes = queryArray( // Verify processors were flattened into processor_executions
"SELECT processor_parent_indexes FROM route_executions WHERE route_id = 'schema-test-tree'"); List<Map<String, Object>> processors = jdbcTemplate.queryForList(
assertThat(parentIndexes).containsExactly("-1", "0", "1"); "SELECT processor_id, processor_type, depth, parent_processor_id, " +
"diagram_node_id, input_body, output_body, input_headers " +
"FROM processor_executions WHERE execution_id = 'ex-tree-1' " +
"ORDER BY depth, processor_id");
assertThat(processors).hasSize(3);
var diagramNodeIds = queryArray( // Root processor: depth=0, no parent
"SELECT processor_diagram_node_ids FROM route_executions WHERE route_id = 'schema-test-tree'"); assertThat(processors.get(0).get("processor_id")).isEqualTo("root-proc");
assertThat(diagramNodeIds).containsExactly("node-root", "node-child", "node-grandchild"); assertThat(((Number) processors.get(0).get("depth")).intValue()).isEqualTo(0);
assertThat(processors.get(0).get("parent_processor_id")).isNull();
assertThat(processors.get(0).get("diagram_node_id")).isEqualTo("node-root");
assertThat(processors.get(0).get("input_body")).isEqualTo("root-input");
assertThat(processors.get(0).get("output_body")).isEqualTo("root-output");
assertThat(processors.get(0).get("input_headers").toString()).contains("Content-Type");
String bodies = jdbcTemplate.queryForObject( // Child processor: depth=1, parent=root-proc
"SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-tree'", assertThat(processors.get(1).get("processor_id")).isEqualTo("child-proc");
String.class); assertThat(((Number) processors.get(1).get("depth")).intValue()).isEqualTo(1);
assertThat(bodies).contains("root-input"); assertThat(processors.get(1).get("parent_processor_id")).isEqualTo("root-proc");
assertThat(bodies).contains("root-output"); assertThat(processors.get(1).get("diagram_node_id")).isEqualTo("node-child");
assertThat(bodies).contains("child-input"); assertThat(processors.get(1).get("input_body")).isEqualTo("child-input");
assertThat(bodies).contains("child-output"); assertThat(processors.get(1).get("output_body")).isEqualTo("child-output");
var inputBodies = queryArray( // Grandchild processor: depth=2, parent=child-proc
"SELECT processor_input_bodies FROM route_executions WHERE route_id = 'schema-test-tree'"); assertThat(processors.get(2).get("processor_id")).isEqualTo("grandchild-proc");
assertThat(inputBodies).containsExactly("root-input", "child-input", ""); assertThat(((Number) processors.get(2).get("depth")).intValue()).isEqualTo(2);
assertThat(processors.get(2).get("parent_processor_id")).isEqualTo("child-proc");
var outputBodies = queryArray( assertThat(processors.get(2).get("diagram_node_id")).isEqualTo("node-grandchild");
"SELECT processor_output_bodies FROM route_executions WHERE route_id = 'schema-test-tree'");
assertThat(outputBodies).containsExactly("root-output", "child-output", "");
var inputHeaders = queryArray(
"SELECT processor_input_headers FROM route_executions WHERE route_id = 'schema-test-tree'");
assertThat(inputHeaders.get(0)).contains("Content-Type");
assertThat(inputHeaders.get(0)).contains("application/json");
});
} }
@Test @Test
void exchangeBodiesContainsConcatenatedText() { void exchangeBodiesStored() {
String json = """ String json = """
{ {
"routeId": "schema-test-bodies", "routeId": "schema-test-bodies",
@@ -140,14 +140,6 @@ class IngestionSchemaIT extends AbstractPostgresIT {
"startTime": "2026-03-11T10:00:00Z", "startTime": "2026-03-11T10:00:00Z",
"endTime": "2026-03-11T10:00:01Z", "endTime": "2026-03-11T10:00:01Z",
"durationMs": 1000, "durationMs": 1000,
"inputSnapshot": {
"body": "route-level-input-body",
"headers": {"X-Route": "header-value"}
},
"outputSnapshot": {
"body": "route-level-output-body",
"headers": {}
},
"processors": [ "processors": [
{ {
"processorId": "proc-1", "processorId": "proc-1",
@@ -166,21 +158,13 @@ class IngestionSchemaIT extends AbstractPostgresIT {
postExecution(json); postExecution(json);
await().atMost(30, SECONDS).ignoreExceptions().untilAsserted(() -> { // Verify processor body data
String bodies = jdbcTemplate.queryForObject( List<Map<String, Object>> processors = jdbcTemplate.queryForList(
"SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-bodies'", "SELECT input_body, output_body FROM processor_executions " +
String.class); "WHERE execution_id = 'ex-bodies-1'");
assertThat(bodies).contains("processor-body-text"); assertThat(processors).hasSize(1);
assertThat(bodies).contains("processor-output-text"); assertThat(processors.get(0).get("input_body")).isEqualTo("processor-body-text");
assertThat(bodies).contains("route-level-input-body"); assertThat(processors.get(0).get("output_body")).isEqualTo("processor-output-text");
assertThat(bodies).contains("route-level-output-body");
String headers = jdbcTemplate.queryForObject(
"SELECT exchange_headers FROM route_executions WHERE route_id = 'schema-test-bodies'",
String.class);
assertThat(headers).contains("X-Route");
assertThat(headers).contains("header-value");
});
} }
@Test @Test
@@ -209,20 +193,19 @@ class IngestionSchemaIT extends AbstractPostgresIT {
postExecution(json); postExecution(json);
await().atMost(30, SECONDS).ignoreExceptions().untilAsserted(() -> { // Verify execution exists
String bodies = jdbcTemplate.queryForObject( Integer count = jdbcTemplate.queryForObject(
"SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-null-snap'", "SELECT count(*) FROM executions WHERE execution_id = 'ex-null-1'",
String.class); Integer.class);
assertThat(bodies).isNotNull(); assertThat(count).isEqualTo(1);
var depths = queryArray( // Verify processor with null bodies inserted successfully
"SELECT processor_depths FROM route_executions WHERE route_id = 'schema-test-null-snap'"); List<Map<String, Object>> processors = jdbcTemplate.queryForList(
assertThat(depths).containsExactly("0"); "SELECT depth, parent_processor_id, input_body, output_body " +
"FROM processor_executions WHERE execution_id = 'ex-null-1'");
var parentIndexes = queryArray( assertThat(processors).hasSize(1);
"SELECT processor_parent_indexes FROM route_executions WHERE route_id = 'schema-test-null-snap'"); assertThat(((Number) processors.get(0).get("depth")).intValue()).isEqualTo(0);
assertThat(parentIndexes).containsExactly("-1"); assertThat(processors.get(0).get("parent_processor_id")).isNull();
});
} }
private void postExecution(String json) { private void postExecution(String json) {
@@ -233,22 +216,4 @@ class IngestionSchemaIT extends AbstractPostgresIT {
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
} }
private List<String> queryArray(String sql) {
return jdbcTemplate.query(sql, (rs, rowNum) -> {
Object arr = rs.getArray(1).getArray();
if (arr instanceof Object[] objects) {
return Arrays.stream(objects).map(Object::toString).toList();
} else if (arr instanceof short[] shorts) {
var result = new java.util.ArrayList<String>();
for (short s : shorts) result.add(String.valueOf(s));
return result;
} else if (arr instanceof int[] ints) {
var result = new java.util.ArrayList<String>();
for (int v : ints) result.add(String.valueOf(v));
return result;
}
return List.<String>of();
}).get(0);
}
} }

View File

@@ -29,7 +29,7 @@ class PostgresStatsStoreIT extends AbstractPostgresIT {
insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L); insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L);
// Force continuous aggregate refresh // Force continuous aggregate refresh
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)"); jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')");
ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60)); ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60));
assertEquals(3, stats.totalCount()); assertEquals(3, stats.totalCount());
@@ -44,7 +44,7 @@ class PostgresStatsStoreIT extends AbstractPostgresIT {
now.plusSeconds(i * 30), 100L + i); now.plusSeconds(i * 30), 100L + i);
} }
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)"); jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')");
StatsTimeseries ts = statsStore.timeseries(now.minus(1, ChronoUnit.MINUTES), now.plus(10, ChronoUnit.MINUTES), 5); StatsTimeseries ts = statsStore.timeseries(now.minus(1, ChronoUnit.MINUTES), now.plus(10, ChronoUnit.MINUTES), 5);
assertNotNull(ts); assertNotNull(ts);

View File

@@ -69,6 +69,9 @@ public class IngestionService {
private ExecutionRecord toExecutionRecord(String agentId, String groupName, private ExecutionRecord toExecutionRecord(String agentId, String groupName,
RouteExecution exec) { RouteExecution exec) {
String diagramHash = diagramStore
.findContentHashForRoute(exec.getRouteId(), agentId)
.orElse("");
return new ExecutionRecord( return new ExecutionRecord(
exec.getExchangeId(), exec.getRouteId(), agentId, groupName, exec.getExchangeId(), exec.getRouteId(), agentId, groupName,
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING", exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
@@ -76,7 +79,7 @@ public class IngestionService {
exec.getStartTime(), exec.getEndTime(), exec.getStartTime(), exec.getEndTime(),
exec.getDurationMs(), exec.getDurationMs(),
exec.getErrorMessage(), exec.getErrorStackTrace(), exec.getErrorMessage(), exec.getErrorStackTrace(),
null // diagramContentHash set separately diagramHash
); );
} }