diff --git a/.planning/phases/02-transaction-search-diagrams/deferred-items.md b/.planning/phases/02-transaction-search-diagrams/deferred-items.md new file mode 100644 index 00000000..adc9b747 --- /dev/null +++ b/.planning/phases/02-transaction-search-diagrams/deferred-items.md @@ -0,0 +1,11 @@ +# Deferred Items - Phase 02 + +## Pre-existing Issues + +### ElkDiagramRendererTest breaks Spring context when run in full suite +- **Found during:** 02-01, Task 2 +- **Issue:** When `ElkDiagramRendererTest` runs before Spring Boot integration tests in the same Surefire JVM, ELK's static initialization fails with `NoClassDefFoundError: org/eclipse/xtext/xbase/lib/CollectionLiterals`, which then prevents the Spring context from creating the `diagramRenderer` bean for all subsequent integration tests. +- **Impact:** Full `mvn test` for the app module shows 30 errors (all integration tests fail after ElkDiagramRendererTest runs) +- **Tests pass individually and in any grouping that excludes ElkDiagramRendererTest** +- **Root cause:** ELK 0.11.0's service loader initializes `LayeredMetaDataProvider` which uses xtext's `CollectionLiterals` in a static initializer. The class is present on the classpath (dependency resolves correctly) but fails in the Surefire fork's classloading order when combined with Spring Boot's fat classloader. +- **Suggested fix:** Configure Surefire to fork per test class, or defer ElkDiagramRendererTest to its own module/phase. diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java index dc465d26..dc113a3b 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java @@ -1,21 +1,25 @@ package com.cameleer3.server.app.storage; +import com.cameleer3.common.model.ExchangeSnapshot; import com.cameleer3.common.model.ProcessorExecution; import com.cameleer3.common.model.RouteExecution; +import com.cameleer3.server.core.detail.RawExecutionRow; import com.cameleer3.server.core.storage.ExecutionRepository; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; -import com.cameleer3.server.core.detail.RawExecutionRow; - import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; import java.time.Instant; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -23,20 +27,29 @@ import java.util.UUID; * ClickHouse implementation of {@link ExecutionRepository}. *

* Performs batch inserts into the {@code route_executions} table. - * Processor executions are flattened into parallel arrays. + * Processor executions are flattened into parallel arrays with tree metadata + * (depth, parent index) for reconstruction. */ @Repository public class ClickHouseExecutionRepository implements ExecutionRepository { private static final Logger log = LoggerFactory.getLogger(ClickHouseExecutionRepository.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String INSERT_SQL = """ INSERT INTO route_executions ( execution_id, route_id, agent_id, status, start_time, end_time, duration_ms, correlation_id, exchange_id, error_message, error_stacktrace, processor_ids, processor_types, processor_starts, processor_ends, - processor_durations, processor_statuses - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + processor_durations, processor_statuses, + exchange_bodies, exchange_headers, + processor_depths, processor_parent_indexes, + processor_error_messages, processor_error_stacktraces, + processor_input_bodies, processor_output_bodies, + processor_input_headers, processor_output_headers, + processor_diagram_node_ids, diagram_content_hash + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """; private final JdbcTemplate jdbcTemplate; @@ -55,27 +68,77 @@ public class ClickHouseExecutionRepository implements ExecutionRepository { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { RouteExecution exec = executions.get(i); - List processors = flattenProcessors(exec.getProcessors()); + List flatProcessors = flattenWithMetadata(exec.getProcessors()); - ps.setString(1, UUID.randomUUID().toString()); - ps.setString(2, nullSafe(exec.getRouteId())); - ps.setString(3, ""); // agent_id set by controller header or empty - ps.setString(4, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING"); - ps.setObject(5, toTimestamp(exec.getStartTime())); - ps.setObject(6, toTimestamp(exec.getEndTime())); - ps.setLong(7, exec.getDurationMs()); - ps.setString(8, nullSafe(exec.getCorrelationId())); - ps.setString(9, nullSafe(exec.getExchangeId())); - ps.setString(10, nullSafe(exec.getErrorMessage())); - ps.setString(11, nullSafe(exec.getErrorStackTrace())); + int col = 1; + ps.setString(col++, UUID.randomUUID().toString()); + ps.setString(col++, nullSafe(exec.getRouteId())); + ps.setString(col++, ""); // agent_id set by controller header or empty + ps.setString(col++, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING"); + ps.setObject(col++, toTimestamp(exec.getStartTime())); + ps.setObject(col++, toTimestamp(exec.getEndTime())); + ps.setLong(col++, exec.getDurationMs()); + ps.setString(col++, nullSafe(exec.getCorrelationId())); + ps.setString(col++, nullSafe(exec.getExchangeId())); + ps.setString(col++, nullSafe(exec.getErrorMessage())); + ps.setString(col++, nullSafe(exec.getErrorStackTrace())); - // Parallel arrays for processor executions - ps.setObject(12, processors.stream().map(p -> nullSafe(p.getProcessorId())).toArray(String[]::new)); - ps.setObject(13, processors.stream().map(p -> nullSafe(p.getProcessorType())).toArray(String[]::new)); - ps.setObject(14, processors.stream().map(p -> toTimestamp(p.getStartTime())).toArray(Timestamp[]::new)); - ps.setObject(15, processors.stream().map(p -> toTimestamp(p.getEndTime())).toArray(Timestamp[]::new)); - ps.setObject(16, processors.stream().mapToLong(ProcessorExecution::getDurationMs).boxed().toArray(Long[]::new)); - ps.setObject(17, processors.stream().map(p -> p.getStatus() != null ? p.getStatus().name() : "RUNNING").toArray(String[]::new)); + // Original parallel arrays + ps.setObject(col++, flatProcessors.stream().map(fp -> nullSafe(fp.proc.getProcessorId())).toArray(String[]::new)); + ps.setObject(col++, flatProcessors.stream().map(fp -> nullSafe(fp.proc.getProcessorType())).toArray(String[]::new)); + ps.setObject(col++, flatProcessors.stream().map(fp -> toTimestamp(fp.proc.getStartTime())).toArray(Timestamp[]::new)); + ps.setObject(col++, flatProcessors.stream().map(fp -> toTimestamp(fp.proc.getEndTime())).toArray(Timestamp[]::new)); + ps.setObject(col++, flatProcessors.stream().mapToLong(fp -> fp.proc.getDurationMs()).boxed().toArray(Long[]::new)); + ps.setObject(col++, flatProcessors.stream().map(fp -> fp.proc.getStatus() != null ? fp.proc.getStatus().name() : "RUNNING").toArray(String[]::new)); + + // Phase 2: exchange bodies and headers (concatenated for search) + StringBuilder allBodies = new StringBuilder(); + StringBuilder allHeaders = new StringBuilder(); + + String[] inputBodies = new String[flatProcessors.size()]; + String[] outputBodies = new String[flatProcessors.size()]; + String[] inputHeaders = new String[flatProcessors.size()]; + String[] outputHeaders = new String[flatProcessors.size()]; + String[] errorMessages = new String[flatProcessors.size()]; + String[] errorStacktraces = new String[flatProcessors.size()]; + String[] diagramNodeIds = new String[flatProcessors.size()]; + Short[] depths = new Short[flatProcessors.size()]; + Integer[] parentIndexes = new Integer[flatProcessors.size()]; + + for (int j = 0; j < flatProcessors.size(); j++) { + FlatProcessor fp = flatProcessors.get(j); + ProcessorExecution p = fp.proc; + + inputBodies[j] = nullSafe(p.getInputBody()); + outputBodies[j] = nullSafe(p.getOutputBody()); + inputHeaders[j] = mapToJson(p.getInputHeaders()); + outputHeaders[j] = mapToJson(p.getOutputHeaders()); + errorMessages[j] = nullSafe(p.getErrorMessage()); + errorStacktraces[j] = nullSafe(p.getErrorStackTrace()); + diagramNodeIds[j] = nullSafe(p.getDiagramNodeId()); + depths[j] = (short) fp.depth; + parentIndexes[j] = fp.parentIndex; + + allBodies.append(inputBodies[j]).append(' ').append(outputBodies[j]).append(' '); + allHeaders.append(inputHeaders[j]).append(' ').append(outputHeaders[j]).append(' '); + } + + // Include route-level input/output snapshot in searchable text + appendSnapshotText(exec.getInputSnapshot(), allBodies, allHeaders); + appendSnapshotText(exec.getOutputSnapshot(), allBodies, allHeaders); + + ps.setString(col++, allBodies.toString().trim()); // exchange_bodies + ps.setString(col++, allHeaders.toString().trim()); // exchange_headers + ps.setObject(col++, depths); // processor_depths + ps.setObject(col++, parentIndexes); // processor_parent_indexes + ps.setObject(col++, errorMessages); // processor_error_messages + ps.setObject(col++, errorStacktraces); // processor_error_stacktraces + ps.setObject(col++, inputBodies); // processor_input_bodies + ps.setObject(col++, outputBodies); // processor_output_bodies + ps.setObject(col++, inputHeaders); // processor_input_headers + ps.setObject(col++, outputHeaders); // processor_output_headers + ps.setObject(col++, diagramNodeIds); // processor_diagram_node_ids + ps.setString(col++, ""); // diagram_content_hash (wired later) } @Override @@ -87,32 +150,59 @@ public class ClickHouseExecutionRepository implements ExecutionRepository { log.debug("Inserted batch of {} route executions into ClickHouse", executions.size()); } + @Override + public Optional findRawById(String executionId) { + throw new UnsupportedOperationException("Not yet implemented"); + } + /** - * Flatten the processor tree into a flat list (depth-first). + * Internal record for a flattened processor with tree metadata. */ - private List flattenProcessors(List processors) { + private record FlatProcessor(ProcessorExecution proc, int depth, int parentIndex) {} + + /** + * Flatten the processor tree with depth and parent index metadata (DFS order). + */ + private List flattenWithMetadata(List processors) { if (processors == null || processors.isEmpty()) { return List.of(); } - var result = new java.util.ArrayList(); + var result = new ArrayList(); for (ProcessorExecution p : processors) { - flatten(p, result); + flattenRecursive(p, 0, -1, result); } return result; } - private void flatten(ProcessorExecution processor, List result) { - result.add(processor); + private void flattenRecursive(ProcessorExecution processor, int depth, int parentIdx, + List result) { + int myIndex = result.size(); + result.add(new FlatProcessor(processor, depth, parentIdx)); if (processor.getChildren() != null) { for (ProcessorExecution child : processor.getChildren()) { - flatten(child, result); + flattenRecursive(child, depth + 1, myIndex, result); } } } - @Override - public Optional findRawById(String executionId) { - throw new UnsupportedOperationException("Not yet implemented"); + private void appendSnapshotText(ExchangeSnapshot snapshot, + StringBuilder allBodies, StringBuilder allHeaders) { + if (snapshot != null) { + allBodies.append(nullSafe(snapshot.getBody())).append(' '); + allHeaders.append(mapToJson(snapshot.getHeaders())).append(' '); + } + } + + private static String mapToJson(Map map) { + if (map == null || map.isEmpty()) { + return "{}"; + } + try { + return OBJECT_MAPPER.writeValueAsString(map); + } catch (JsonProcessingException e) { + log.warn("Failed to serialize headers map to JSON", e); + return "{}"; + } } private static String nullSafe(String value) { diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java index 0cb1dcde..e8e756ad 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java @@ -1,9 +1,5 @@ package com.cameleer3.server.app.storage; -import com.cameleer3.common.model.ExchangeSnapshot; -import com.cameleer3.common.model.ExecutionStatus; -import com.cameleer3.common.model.ProcessorExecution; -import com.cameleer3.common.model.RouteExecution; import com.cameleer3.server.app.AbstractClickHouseIT; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -14,8 +10,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; -import java.sql.Array; -import java.time.Instant; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -90,49 +85,40 @@ class IngestionSchemaIT extends AbstractClickHouseIT { postExecution(json); await().atMost(10, SECONDS).untilAsserted(() -> { - var rows = jdbcTemplate.queryForList( - "SELECT processor_depths, processor_parent_indexes, processor_diagram_node_ids, " + - "exchange_bodies, processor_input_bodies, processor_output_bodies, " + - "processor_input_headers, processor_output_headers " + - "FROM route_executions WHERE route_id = 'schema-test-tree'"); + // Use individual typed queries to avoid ClickHouse Array cast issues + var depths = queryArray( + "SELECT processor_depths FROM route_executions WHERE route_id = 'schema-test-tree'"); + assertThat(depths).containsExactly("0", "1", "2"); - assertThat(rows).hasSize(1); - var row = rows.get(0); + var parentIndexes = queryArray( + "SELECT processor_parent_indexes FROM route_executions WHERE route_id = 'schema-test-tree'"); + assertThat(parentIndexes).containsExactly("-1", "0", "1"); - // Verify depths: root=0, child=1, grandchild=2 - @SuppressWarnings("unchecked") - var depths = (List) row.get("processor_depths"); - assertThat(depths).containsExactly((short) 0, (short) 1, (short) 2); - - // Verify parent indexes: root=-1, child=0 (parent is root at idx 0), grandchild=1 (parent is child at idx 1) - @SuppressWarnings("unchecked") - var parentIndexes = (List) row.get("processor_parent_indexes"); - assertThat(parentIndexes).containsExactly(-1, 0, 1); - - // Verify diagram node IDs - @SuppressWarnings("unchecked") - var diagramNodeIds = (List) row.get("processor_diagram_node_ids"); + var diagramNodeIds = queryArray( + "SELECT processor_diagram_node_ids FROM route_executions WHERE route_id = 'schema-test-tree'"); assertThat(diagramNodeIds).containsExactly("node-root", "node-child", "node-grandchild"); // Verify exchange_bodies contains concatenated text - String bodies = (String) row.get("exchange_bodies"); + String bodies = jdbcTemplate.queryForObject( + "SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-tree'", + String.class); assertThat(bodies).contains("root-input"); assertThat(bodies).contains("root-output"); assertThat(bodies).contains("child-input"); assertThat(bodies).contains("child-output"); // Verify per-processor input/output bodies - @SuppressWarnings("unchecked") - var inputBodies = (List) row.get("processor_input_bodies"); + var inputBodies = queryArray( + "SELECT processor_input_bodies FROM route_executions WHERE route_id = 'schema-test-tree'"); assertThat(inputBodies).containsExactly("root-input", "child-input", ""); - @SuppressWarnings("unchecked") - var outputBodies = (List) row.get("processor_output_bodies"); + var outputBodies = queryArray( + "SELECT processor_output_bodies FROM route_executions WHERE route_id = 'schema-test-tree'"); assertThat(outputBodies).containsExactly("root-output", "child-output", ""); - // Verify per-processor headers stored as JSON strings - @SuppressWarnings("unchecked") - var inputHeaders = (List) row.get("processor_input_headers"); + // Verify per-processor input headers stored as JSON strings + var inputHeaders = queryArray( + "SELECT processor_input_headers FROM route_executions WHERE route_id = 'schema-test-tree'"); assertThat(inputHeaders.get(0)).contains("Content-Type"); assertThat(inputHeaders.get(0)).contains("application/json"); }); @@ -175,22 +161,19 @@ class IngestionSchemaIT extends AbstractClickHouseIT { postExecution(json); await().atMost(10, SECONDS).untilAsserted(() -> { - var rows = jdbcTemplate.queryForList( - "SELECT exchange_bodies, exchange_headers " + - "FROM route_executions WHERE route_id = 'schema-test-bodies'"); - - assertThat(rows).hasSize(1); - var row = rows.get(0); - // Bodies should contain all sources - String bodies = (String) row.get("exchange_bodies"); + String bodies = jdbcTemplate.queryForObject( + "SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-bodies'", + String.class); assertThat(bodies).contains("processor-body-text"); assertThat(bodies).contains("processor-output-text"); assertThat(bodies).contains("route-level-input-body"); assertThat(bodies).contains("route-level-output-body"); // Headers should contain route-level header - String headers = (String) row.get("exchange_headers"); + String headers = jdbcTemplate.queryForObject( + "SELECT exchange_headers FROM route_executions WHERE route_id = 'schema-test-bodies'", + String.class); assertThat(headers).contains("X-Route"); assertThat(headers).contains("header-value"); }); @@ -224,26 +207,20 @@ class IngestionSchemaIT extends AbstractClickHouseIT { postExecution(json); await().atMost(10, SECONDS).untilAsserted(() -> { - var rows = jdbcTemplate.queryForList( - "SELECT exchange_bodies, exchange_headers, processor_input_bodies, " + - "processor_output_bodies, processor_depths, processor_parent_indexes " + - "FROM route_executions WHERE route_id = 'schema-test-null-snap'"); - - assertThat(rows).hasSize(1); - var row = rows.get(0); - // Empty but not null - String bodies = (String) row.get("exchange_bodies"); + String bodies = jdbcTemplate.queryForObject( + "SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-null-snap'", + String.class); assertThat(bodies).isNotNull(); // Depths and parent indexes still populated for tree metadata - @SuppressWarnings("unchecked") - var depths = (List) row.get("processor_depths"); - assertThat(depths).containsExactly((short) 0); + var depths = queryArray( + "SELECT processor_depths FROM route_executions WHERE route_id = 'schema-test-null-snap'"); + assertThat(depths).containsExactly("0"); - @SuppressWarnings("unchecked") - var parentIndexes = (List) row.get("processor_parent_indexes"); - assertThat(parentIndexes).containsExactly(-1); + var parentIndexes = queryArray( + "SELECT processor_parent_indexes FROM route_executions WHERE route_id = 'schema-test-null-snap'"); + assertThat(parentIndexes).containsExactly("-1"); }); } @@ -259,4 +236,26 @@ class IngestionSchemaIT extends AbstractClickHouseIT { assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); } + + /** + * Query an array column from ClickHouse and return it as a List of strings. + * Handles the ClickHouse JDBC Array type by converting via toString on elements. + */ + private List queryArray(String sql) { + return jdbcTemplate.query(sql, (rs, rowNum) -> { + Object arr = rs.getArray(1).getArray(); + if (arr instanceof Object[] objects) { + return Arrays.stream(objects).map(Object::toString).toList(); + } else if (arr instanceof short[] shorts) { + var result = new java.util.ArrayList(); + for (short s : shorts) result.add(String.valueOf(s)); + return result; + } else if (arr instanceof int[] ints) { + var result = new java.util.ArrayList(); + for (int v : ints) result.add(String.valueOf(v)); + return result; + } + return List.of(); + }).get(0); + } }