diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java index 0049a0f8..dc465d26 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java @@ -9,11 +9,14 @@ import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; +import com.cameleer3.server.core.detail.RawExecutionRow; + import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; import java.time.Instant; import java.util.List; +import java.util.Optional; import java.util.UUID; /** @@ -107,6 +110,11 @@ public class ClickHouseExecutionRepository implements ExecutionRepository { } } + @Override + public Optional findRawById(String executionId) { + throw new UnsupportedOperationException("Not yet implemented"); + } + private static String nullSafe(String value) { return value != null ? value : ""; } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java index b2f6499f..a4a27597 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java @@ -50,22 +50,31 @@ public abstract class AbstractClickHouseIT { @BeforeAll static void initSchema() throws Exception { // Surefire runs from the module directory; schema is in the project root - Path schemaPath = Path.of("clickhouse/init/01-schema.sql"); - if (!Files.exists(schemaPath)) { - schemaPath = Path.of("../clickhouse/init/01-schema.sql"); + Path baseDir = Path.of("clickhouse/init"); + if (!Files.exists(baseDir)) { + baseDir = Path.of("../clickhouse/init"); } - String sql = Files.readString(schemaPath, StandardCharsets.UTF_8); + + // Load all schema files in order + String[] schemaFiles = {"01-schema.sql", "02-search-columns.sql"}; try (Connection conn = DriverManager.getConnection( CLICKHOUSE.getJdbcUrl(), CLICKHOUSE.getUsername(), CLICKHOUSE.getPassword()); Statement stmt = conn.createStatement()) { - // Execute each statement separately (separated by semicolons) - for (String statement : sql.split(";")) { - String trimmed = statement.trim(); - if (!trimmed.isEmpty()) { - stmt.execute(trimmed); + + for (String schemaFile : schemaFiles) { + Path schemaPath = baseDir.resolve(schemaFile); + if (Files.exists(schemaPath)) { + String sql = Files.readString(schemaPath, StandardCharsets.UTF_8); + // Execute each statement separately (separated by semicolons) + for (String statement : sql.split(";")) { + String trimmed = statement.trim(); + if (!trimmed.isEmpty()) { + stmt.execute(trimmed); + } + } } } } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java new file mode 100644 index 00000000..0cb1dcde --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java @@ -0,0 +1,262 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.common.model.ExchangeSnapshot; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.ProcessorExecution; +import com.cameleer3.common.model.RouteExecution; +import com.cameleer3.server.app.AbstractClickHouseIT; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; + +import java.sql.Array; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * Integration test verifying that Phase 2 schema columns are correctly populated + * during ingestion of route executions with nested processors and exchange data. + */ +class IngestionSchemaIT extends AbstractClickHouseIT { + + @Autowired + private TestRestTemplate restTemplate; + + @Test + void processorTreeMetadata_depthsAndParentIndexesCorrect() { + // Build a 3-level processor tree: root -> child -> grandchild + String json = """ + { + "routeId": "schema-test-tree", + "exchangeId": "ex-tree-1", + "correlationId": "corr-tree-1", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "endTime": "2026-03-11T10:00:01Z", + "durationMs": 1000, + "processors": [ + { + "processorId": "root-proc", + "processorType": "bean", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "endTime": "2026-03-11T10:00:00.500Z", + "durationMs": 500, + "diagramNodeId": "node-root", + "inputBody": "root-input", + "outputBody": "root-output", + "inputHeaders": {"Content-Type": "application/json"}, + "outputHeaders": {"X-Result": "ok"}, + "children": [ + { + "processorId": "child-proc", + "processorType": "log", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00.100Z", + "endTime": "2026-03-11T10:00:00.400Z", + "durationMs": 300, + "diagramNodeId": "node-child", + "inputBody": "child-input", + "outputBody": "child-output", + "children": [ + { + "processorId": "grandchild-proc", + "processorType": "setHeader", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00.200Z", + "endTime": "2026-03-11T10:00:00.300Z", + "durationMs": 100, + "diagramNodeId": "node-grandchild", + "children": [] + } + ] + } + ] + } + ] + } + """; + + postExecution(json); + + await().atMost(10, SECONDS).untilAsserted(() -> { + var rows = jdbcTemplate.queryForList( + "SELECT processor_depths, processor_parent_indexes, processor_diagram_node_ids, " + + "exchange_bodies, processor_input_bodies, processor_output_bodies, " + + "processor_input_headers, processor_output_headers " + + "FROM route_executions WHERE route_id = 'schema-test-tree'"); + + assertThat(rows).hasSize(1); + var row = rows.get(0); + + // Verify depths: root=0, child=1, grandchild=2 + @SuppressWarnings("unchecked") + var depths = (List) row.get("processor_depths"); + assertThat(depths).containsExactly((short) 0, (short) 1, (short) 2); + + // Verify parent indexes: root=-1, child=0 (parent is root at idx 0), grandchild=1 (parent is child at idx 1) + @SuppressWarnings("unchecked") + var parentIndexes = (List) row.get("processor_parent_indexes"); + assertThat(parentIndexes).containsExactly(-1, 0, 1); + + // Verify diagram node IDs + @SuppressWarnings("unchecked") + var diagramNodeIds = (List) row.get("processor_diagram_node_ids"); + assertThat(diagramNodeIds).containsExactly("node-root", "node-child", "node-grandchild"); + + // Verify exchange_bodies contains concatenated text + String bodies = (String) row.get("exchange_bodies"); + assertThat(bodies).contains("root-input"); + assertThat(bodies).contains("root-output"); + assertThat(bodies).contains("child-input"); + assertThat(bodies).contains("child-output"); + + // Verify per-processor input/output bodies + @SuppressWarnings("unchecked") + var inputBodies = (List) row.get("processor_input_bodies"); + assertThat(inputBodies).containsExactly("root-input", "child-input", ""); + + @SuppressWarnings("unchecked") + var outputBodies = (List) row.get("processor_output_bodies"); + assertThat(outputBodies).containsExactly("root-output", "child-output", ""); + + // Verify per-processor headers stored as JSON strings + @SuppressWarnings("unchecked") + var inputHeaders = (List) row.get("processor_input_headers"); + assertThat(inputHeaders.get(0)).contains("Content-Type"); + assertThat(inputHeaders.get(0)).contains("application/json"); + }); + } + + @Test + void exchangeBodiesContainsConcatenatedText() { + String json = """ + { + "routeId": "schema-test-bodies", + "exchangeId": "ex-bodies-1", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "endTime": "2026-03-11T10:00:01Z", + "durationMs": 1000, + "inputSnapshot": { + "body": "route-level-input-body", + "headers": {"X-Route": "header-value"} + }, + "outputSnapshot": { + "body": "route-level-output-body", + "headers": {} + }, + "processors": [ + { + "processorId": "proc-1", + "processorType": "bean", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "endTime": "2026-03-11T10:00:00.500Z", + "durationMs": 500, + "inputBody": "processor-body-text", + "outputBody": "processor-output-text", + "children": [] + } + ] + } + """; + + postExecution(json); + + await().atMost(10, SECONDS).untilAsserted(() -> { + var rows = jdbcTemplate.queryForList( + "SELECT exchange_bodies, exchange_headers " + + "FROM route_executions WHERE route_id = 'schema-test-bodies'"); + + assertThat(rows).hasSize(1); + var row = rows.get(0); + + // Bodies should contain all sources + String bodies = (String) row.get("exchange_bodies"); + assertThat(bodies).contains("processor-body-text"); + assertThat(bodies).contains("processor-output-text"); + assertThat(bodies).contains("route-level-input-body"); + assertThat(bodies).contains("route-level-output-body"); + + // Headers should contain route-level header + String headers = (String) row.get("exchange_headers"); + assertThat(headers).contains("X-Route"); + assertThat(headers).contains("header-value"); + }); + } + + @Test + void nullSnapshots_insertSucceedsWithEmptyDefaults() { + // Execution with no exchange snapshots and no processor snapshot data + String json = """ + { + "routeId": "schema-test-null-snap", + "exchangeId": "ex-null-1", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "endTime": "2026-03-11T10:00:01Z", + "durationMs": 1000, + "processors": [ + { + "processorId": "proc-null", + "processorType": "log", + "status": "COMPLETED", + "startTime": "2026-03-11T10:00:00Z", + "endTime": "2026-03-11T10:00:00.500Z", + "durationMs": 500, + "children": [] + } + ] + } + """; + + postExecution(json); + + await().atMost(10, SECONDS).untilAsserted(() -> { + var rows = jdbcTemplate.queryForList( + "SELECT exchange_bodies, exchange_headers, processor_input_bodies, " + + "processor_output_bodies, processor_depths, processor_parent_indexes " + + "FROM route_executions WHERE route_id = 'schema-test-null-snap'"); + + assertThat(rows).hasSize(1); + var row = rows.get(0); + + // Empty but not null + String bodies = (String) row.get("exchange_bodies"); + assertThat(bodies).isNotNull(); + + // Depths and parent indexes still populated for tree metadata + @SuppressWarnings("unchecked") + var depths = (List) row.get("processor_depths"); + assertThat(depths).containsExactly((short) 0); + + @SuppressWarnings("unchecked") + var parentIndexes = (List) row.get("processor_parent_indexes"); + assertThat(parentIndexes).containsExactly(-1); + }); + } + + private void postExecution(String json) { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("X-Cameleer-Protocol-Version", "1"); + + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/data/executions", + new HttpEntity<>(json, headers), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/diagram/DiagramRenderer.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/diagram/DiagramRenderer.java index dd46ec91..f8027464 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/diagram/DiagramRenderer.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/diagram/DiagramRenderer.java @@ -3,26 +3,20 @@ package com.cameleer3.server.core.diagram; import com.cameleer3.common.graph.RouteGraph; /** - * Renders route diagrams from {@link RouteGraph} definitions. + * Renders a route graph as SVG or as a positioned JSON layout. *

- * Implementations produce either SVG documents for direct display or - * JSON-serializable layout data for client-side rendering. + * Implementations handle layout computation and visual rendering. + * Stub interface -- full implementation in a later plan. */ public interface DiagramRenderer { /** - * Render the route graph as an SVG XML document. - * - * @param graph the route graph definition - * @return SVG XML string + * Render the route graph as an SVG document string. */ String renderSvg(RouteGraph graph); /** - * Compute the diagram layout with positioned nodes and edges. - * - * @param graph the route graph definition - * @return layout data suitable for JSON serialization + * Compute a positioned JSON layout for the route graph. */ DiagramLayout layoutJson(RouteGraph graph); }