test(02-01): add failing IngestionSchemaIT for new column population
- Tests processor tree metadata (depths, parent indexes) - Tests exchange body concatenation for search - Tests null snapshot graceful handling - AbstractClickHouseIT loads 02-search-columns.sql - DiagramRenderer/DiagramLayout stubs to fix pre-existing compilation error Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -9,11 +9,14 @@ import org.springframework.jdbc.core.BatchPreparedStatementSetter;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import com.cameleer3.server.core.detail.RawExecutionRow;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
@@ -107,6 +110,11 @@ public class ClickHouseExecutionRepository implements ExecutionRepository {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<RawExecutionRow> findRawById(String executionId) {
|
||||
throw new UnsupportedOperationException("Not yet implemented");
|
||||
}
|
||||
|
||||
private static String nullSafe(String value) {
|
||||
return value != null ? value : "";
|
||||
}
|
||||
|
||||
@@ -50,22 +50,31 @@ public abstract class AbstractClickHouseIT {
|
||||
@BeforeAll
|
||||
static void initSchema() throws Exception {
|
||||
// Surefire runs from the module directory; schema is in the project root
|
||||
Path schemaPath = Path.of("clickhouse/init/01-schema.sql");
|
||||
if (!Files.exists(schemaPath)) {
|
||||
schemaPath = Path.of("../clickhouse/init/01-schema.sql");
|
||||
Path baseDir = Path.of("clickhouse/init");
|
||||
if (!Files.exists(baseDir)) {
|
||||
baseDir = Path.of("../clickhouse/init");
|
||||
}
|
||||
String sql = Files.readString(schemaPath, StandardCharsets.UTF_8);
|
||||
|
||||
// Load all schema files in order
|
||||
String[] schemaFiles = {"01-schema.sql", "02-search-columns.sql"};
|
||||
|
||||
try (Connection conn = DriverManager.getConnection(
|
||||
CLICKHOUSE.getJdbcUrl(),
|
||||
CLICKHOUSE.getUsername(),
|
||||
CLICKHOUSE.getPassword());
|
||||
Statement stmt = conn.createStatement()) {
|
||||
// Execute each statement separately (separated by semicolons)
|
||||
for (String statement : sql.split(";")) {
|
||||
String trimmed = statement.trim();
|
||||
if (!trimmed.isEmpty()) {
|
||||
stmt.execute(trimmed);
|
||||
|
||||
for (String schemaFile : schemaFiles) {
|
||||
Path schemaPath = baseDir.resolve(schemaFile);
|
||||
if (Files.exists(schemaPath)) {
|
||||
String sql = Files.readString(schemaPath, StandardCharsets.UTF_8);
|
||||
// Execute each statement separately (separated by semicolons)
|
||||
for (String statement : sql.split(";")) {
|
||||
String trimmed = statement.trim();
|
||||
if (!trimmed.isEmpty()) {
|
||||
stmt.execute(trimmed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,262 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.common.model.ExchangeSnapshot;
|
||||
import com.cameleer3.common.model.ExecutionStatus;
|
||||
import com.cameleer3.common.model.ProcessorExecution;
|
||||
import com.cameleer3.common.model.RouteExecution;
|
||||
import com.cameleer3.server.app.AbstractClickHouseIT;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
|
||||
import java.sql.Array;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
/**
|
||||
* Integration test verifying that Phase 2 schema columns are correctly populated
|
||||
* during ingestion of route executions with nested processors and exchange data.
|
||||
*/
|
||||
class IngestionSchemaIT extends AbstractClickHouseIT {
|
||||
|
||||
@Autowired
|
||||
private TestRestTemplate restTemplate;
|
||||
|
||||
@Test
|
||||
void processorTreeMetadata_depthsAndParentIndexesCorrect() {
|
||||
// Build a 3-level processor tree: root -> child -> grandchild
|
||||
String json = """
|
||||
{
|
||||
"routeId": "schema-test-tree",
|
||||
"exchangeId": "ex-tree-1",
|
||||
"correlationId": "corr-tree-1",
|
||||
"status": "COMPLETED",
|
||||
"startTime": "2026-03-11T10:00:00Z",
|
||||
"endTime": "2026-03-11T10:00:01Z",
|
||||
"durationMs": 1000,
|
||||
"processors": [
|
||||
{
|
||||
"processorId": "root-proc",
|
||||
"processorType": "bean",
|
||||
"status": "COMPLETED",
|
||||
"startTime": "2026-03-11T10:00:00Z",
|
||||
"endTime": "2026-03-11T10:00:00.500Z",
|
||||
"durationMs": 500,
|
||||
"diagramNodeId": "node-root",
|
||||
"inputBody": "root-input",
|
||||
"outputBody": "root-output",
|
||||
"inputHeaders": {"Content-Type": "application/json"},
|
||||
"outputHeaders": {"X-Result": "ok"},
|
||||
"children": [
|
||||
{
|
||||
"processorId": "child-proc",
|
||||
"processorType": "log",
|
||||
"status": "COMPLETED",
|
||||
"startTime": "2026-03-11T10:00:00.100Z",
|
||||
"endTime": "2026-03-11T10:00:00.400Z",
|
||||
"durationMs": 300,
|
||||
"diagramNodeId": "node-child",
|
||||
"inputBody": "child-input",
|
||||
"outputBody": "child-output",
|
||||
"children": [
|
||||
{
|
||||
"processorId": "grandchild-proc",
|
||||
"processorType": "setHeader",
|
||||
"status": "COMPLETED",
|
||||
"startTime": "2026-03-11T10:00:00.200Z",
|
||||
"endTime": "2026-03-11T10:00:00.300Z",
|
||||
"durationMs": 100,
|
||||
"diagramNodeId": "node-grandchild",
|
||||
"children": []
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
""";
|
||||
|
||||
postExecution(json);
|
||||
|
||||
await().atMost(10, SECONDS).untilAsserted(() -> {
|
||||
var rows = jdbcTemplate.queryForList(
|
||||
"SELECT processor_depths, processor_parent_indexes, processor_diagram_node_ids, " +
|
||||
"exchange_bodies, processor_input_bodies, processor_output_bodies, " +
|
||||
"processor_input_headers, processor_output_headers " +
|
||||
"FROM route_executions WHERE route_id = 'schema-test-tree'");
|
||||
|
||||
assertThat(rows).hasSize(1);
|
||||
var row = rows.get(0);
|
||||
|
||||
// Verify depths: root=0, child=1, grandchild=2
|
||||
@SuppressWarnings("unchecked")
|
||||
var depths = (List<Number>) row.get("processor_depths");
|
||||
assertThat(depths).containsExactly((short) 0, (short) 1, (short) 2);
|
||||
|
||||
// Verify parent indexes: root=-1, child=0 (parent is root at idx 0), grandchild=1 (parent is child at idx 1)
|
||||
@SuppressWarnings("unchecked")
|
||||
var parentIndexes = (List<Number>) row.get("processor_parent_indexes");
|
||||
assertThat(parentIndexes).containsExactly(-1, 0, 1);
|
||||
|
||||
// Verify diagram node IDs
|
||||
@SuppressWarnings("unchecked")
|
||||
var diagramNodeIds = (List<String>) row.get("processor_diagram_node_ids");
|
||||
assertThat(diagramNodeIds).containsExactly("node-root", "node-child", "node-grandchild");
|
||||
|
||||
// Verify exchange_bodies contains concatenated text
|
||||
String bodies = (String) row.get("exchange_bodies");
|
||||
assertThat(bodies).contains("root-input");
|
||||
assertThat(bodies).contains("root-output");
|
||||
assertThat(bodies).contains("child-input");
|
||||
assertThat(bodies).contains("child-output");
|
||||
|
||||
// Verify per-processor input/output bodies
|
||||
@SuppressWarnings("unchecked")
|
||||
var inputBodies = (List<String>) row.get("processor_input_bodies");
|
||||
assertThat(inputBodies).containsExactly("root-input", "child-input", "");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
var outputBodies = (List<String>) row.get("processor_output_bodies");
|
||||
assertThat(outputBodies).containsExactly("root-output", "child-output", "");
|
||||
|
||||
// Verify per-processor headers stored as JSON strings
|
||||
@SuppressWarnings("unchecked")
|
||||
var inputHeaders = (List<String>) row.get("processor_input_headers");
|
||||
assertThat(inputHeaders.get(0)).contains("Content-Type");
|
||||
assertThat(inputHeaders.get(0)).contains("application/json");
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void exchangeBodiesContainsConcatenatedText() {
|
||||
String json = """
|
||||
{
|
||||
"routeId": "schema-test-bodies",
|
||||
"exchangeId": "ex-bodies-1",
|
||||
"status": "COMPLETED",
|
||||
"startTime": "2026-03-11T10:00:00Z",
|
||||
"endTime": "2026-03-11T10:00:01Z",
|
||||
"durationMs": 1000,
|
||||
"inputSnapshot": {
|
||||
"body": "route-level-input-body",
|
||||
"headers": {"X-Route": "header-value"}
|
||||
},
|
||||
"outputSnapshot": {
|
||||
"body": "route-level-output-body",
|
||||
"headers": {}
|
||||
},
|
||||
"processors": [
|
||||
{
|
||||
"processorId": "proc-1",
|
||||
"processorType": "bean",
|
||||
"status": "COMPLETED",
|
||||
"startTime": "2026-03-11T10:00:00Z",
|
||||
"endTime": "2026-03-11T10:00:00.500Z",
|
||||
"durationMs": 500,
|
||||
"inputBody": "processor-body-text",
|
||||
"outputBody": "processor-output-text",
|
||||
"children": []
|
||||
}
|
||||
]
|
||||
}
|
||||
""";
|
||||
|
||||
postExecution(json);
|
||||
|
||||
await().atMost(10, SECONDS).untilAsserted(() -> {
|
||||
var rows = jdbcTemplate.queryForList(
|
||||
"SELECT exchange_bodies, exchange_headers " +
|
||||
"FROM route_executions WHERE route_id = 'schema-test-bodies'");
|
||||
|
||||
assertThat(rows).hasSize(1);
|
||||
var row = rows.get(0);
|
||||
|
||||
// Bodies should contain all sources
|
||||
String bodies = (String) row.get("exchange_bodies");
|
||||
assertThat(bodies).contains("processor-body-text");
|
||||
assertThat(bodies).contains("processor-output-text");
|
||||
assertThat(bodies).contains("route-level-input-body");
|
||||
assertThat(bodies).contains("route-level-output-body");
|
||||
|
||||
// Headers should contain route-level header
|
||||
String headers = (String) row.get("exchange_headers");
|
||||
assertThat(headers).contains("X-Route");
|
||||
assertThat(headers).contains("header-value");
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void nullSnapshots_insertSucceedsWithEmptyDefaults() {
|
||||
// Execution with no exchange snapshots and no processor snapshot data
|
||||
String json = """
|
||||
{
|
||||
"routeId": "schema-test-null-snap",
|
||||
"exchangeId": "ex-null-1",
|
||||
"status": "COMPLETED",
|
||||
"startTime": "2026-03-11T10:00:00Z",
|
||||
"endTime": "2026-03-11T10:00:01Z",
|
||||
"durationMs": 1000,
|
||||
"processors": [
|
||||
{
|
||||
"processorId": "proc-null",
|
||||
"processorType": "log",
|
||||
"status": "COMPLETED",
|
||||
"startTime": "2026-03-11T10:00:00Z",
|
||||
"endTime": "2026-03-11T10:00:00.500Z",
|
||||
"durationMs": 500,
|
||||
"children": []
|
||||
}
|
||||
]
|
||||
}
|
||||
""";
|
||||
|
||||
postExecution(json);
|
||||
|
||||
await().atMost(10, SECONDS).untilAsserted(() -> {
|
||||
var rows = jdbcTemplate.queryForList(
|
||||
"SELECT exchange_bodies, exchange_headers, processor_input_bodies, " +
|
||||
"processor_output_bodies, processor_depths, processor_parent_indexes " +
|
||||
"FROM route_executions WHERE route_id = 'schema-test-null-snap'");
|
||||
|
||||
assertThat(rows).hasSize(1);
|
||||
var row = rows.get(0);
|
||||
|
||||
// Empty but not null
|
||||
String bodies = (String) row.get("exchange_bodies");
|
||||
assertThat(bodies).isNotNull();
|
||||
|
||||
// Depths and parent indexes still populated for tree metadata
|
||||
@SuppressWarnings("unchecked")
|
||||
var depths = (List<Number>) row.get("processor_depths");
|
||||
assertThat(depths).containsExactly((short) 0);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
var parentIndexes = (List<Number>) row.get("processor_parent_indexes");
|
||||
assertThat(parentIndexes).containsExactly(-1);
|
||||
});
|
||||
}
|
||||
|
||||
private void postExecution(String json) {
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||
headers.set("X-Cameleer-Protocol-Version", "1");
|
||||
|
||||
ResponseEntity<String> response = restTemplate.postForEntity(
|
||||
"/api/v1/data/executions",
|
||||
new HttpEntity<>(json, headers),
|
||||
String.class);
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
|
||||
}
|
||||
}
|
||||
@@ -3,26 +3,20 @@ package com.cameleer3.server.core.diagram;
|
||||
import com.cameleer3.common.graph.RouteGraph;
|
||||
|
||||
/**
|
||||
* Renders route diagrams from {@link RouteGraph} definitions.
|
||||
* Renders a route graph as SVG or as a positioned JSON layout.
|
||||
* <p>
|
||||
* Implementations produce either SVG documents for direct display or
|
||||
* JSON-serializable layout data for client-side rendering.
|
||||
* Implementations handle layout computation and visual rendering.
|
||||
* Stub interface -- full implementation in a later plan.
|
||||
*/
|
||||
public interface DiagramRenderer {
|
||||
|
||||
/**
|
||||
* Render the route graph as an SVG XML document.
|
||||
*
|
||||
* @param graph the route graph definition
|
||||
* @return SVG XML string
|
||||
* Render the route graph as an SVG document string.
|
||||
*/
|
||||
String renderSvg(RouteGraph graph);
|
||||
|
||||
/**
|
||||
* Compute the diagram layout with positioned nodes and edges.
|
||||
*
|
||||
* @param graph the route graph definition
|
||||
* @return layout data suitable for JSON serialization
|
||||
* Compute a positioned JSON layout for the route graph.
|
||||
*/
|
||||
DiagramLayout layoutJson(RouteGraph graph);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user