feat(02-01): extend ingestion to populate Phase 2 columns with integration tests

- Refactored flattenProcessors to flattenWithMetadata (FlatProcessor record with depth/parentIndex)
- INSERT now populates 12 new columns: exchange_bodies, exchange_headers, processor_depths,
  processor_parent_indexes, processor_error_messages, processor_error_stacktraces,
  processor_input_bodies, processor_output_bodies, processor_input_headers,
  processor_output_headers, processor_diagram_node_ids, diagram_content_hash
- Exchange bodies/headers concatenated from all processor snapshots + route-level snapshots
- Null ExchangeSnapshot handled gracefully (empty string defaults)
- Headers serialized to JSON via Jackson ObjectMapper
- IngestionSchemaIT verifies 3-level tree metadata, body concatenation, null snapshot handling
- DiagramRenderer/DiagramLayout stubs created to fix pre-existing compilation error (Rule 3)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-11 16:15:41 +01:00
parent c0922430c4
commit f6ff279a60
3 changed files with 192 additions and 92 deletions

View File

@@ -0,0 +1,11 @@
# Deferred Items - Phase 02
## Pre-existing Issues
### ElkDiagramRendererTest breaks Spring context when run in full suite
- **Found during:** 02-01, Task 2
- **Issue:** When `ElkDiagramRendererTest` runs before Spring Boot integration tests in the same Surefire JVM, ELK's static initialization fails with `NoClassDefFoundError: org/eclipse/xtext/xbase/lib/CollectionLiterals`, which then prevents the Spring context from creating the `diagramRenderer` bean for all subsequent integration tests.
- **Impact:** Full `mvn test` for the app module shows 30 errors (all integration tests fail after ElkDiagramRendererTest runs)
- **Tests pass individually and in any grouping that excludes ElkDiagramRendererTest**
- **Root cause:** ELK 0.11.0's service loader initializes `LayeredMetaDataProvider` which uses xtext's `CollectionLiterals` in a static initializer. The class is present on the classpath (dependency resolves correctly) but fails in the Surefire fork's classloading order when combined with Spring Boot's fat classloader.
- **Suggested fix:** Configure Surefire to fork per test class, or defer ElkDiagramRendererTest to its own module/phase.

View File

@@ -1,21 +1,25 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.common.model.ExchangeSnapshot;
import com.cameleer3.common.model.ProcessorExecution;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.detail.RawExecutionRow;
import com.cameleer3.server.core.storage.ExecutionRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import com.cameleer3.server.core.detail.RawExecutionRow;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
@@ -23,20 +27,29 @@ import java.util.UUID;
* ClickHouse implementation of {@link ExecutionRepository}.
* <p>
* Performs batch inserts into the {@code route_executions} table.
* Processor executions are flattened into parallel arrays.
* Processor executions are flattened into parallel arrays with tree metadata
* (depth, parent index) for reconstruction.
*/
@Repository
public class ClickHouseExecutionRepository implements ExecutionRepository {
private static final Logger log = LoggerFactory.getLogger(ClickHouseExecutionRepository.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String INSERT_SQL = """
INSERT INTO route_executions (
execution_id, route_id, agent_id, status, start_time, end_time,
duration_ms, correlation_id, exchange_id, error_message, error_stacktrace,
processor_ids, processor_types, processor_starts, processor_ends,
processor_durations, processor_statuses
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
processor_durations, processor_statuses,
exchange_bodies, exchange_headers,
processor_depths, processor_parent_indexes,
processor_error_messages, processor_error_stacktraces,
processor_input_bodies, processor_output_bodies,
processor_input_headers, processor_output_headers,
processor_diagram_node_ids, diagram_content_hash
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""";
private final JdbcTemplate jdbcTemplate;
@@ -55,27 +68,77 @@ public class ClickHouseExecutionRepository implements ExecutionRepository {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
RouteExecution exec = executions.get(i);
List<ProcessorExecution> processors = flattenProcessors(exec.getProcessors());
List<FlatProcessor> flatProcessors = flattenWithMetadata(exec.getProcessors());
ps.setString(1, UUID.randomUUID().toString());
ps.setString(2, nullSafe(exec.getRouteId()));
ps.setString(3, ""); // agent_id set by controller header or empty
ps.setString(4, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING");
ps.setObject(5, toTimestamp(exec.getStartTime()));
ps.setObject(6, toTimestamp(exec.getEndTime()));
ps.setLong(7, exec.getDurationMs());
ps.setString(8, nullSafe(exec.getCorrelationId()));
ps.setString(9, nullSafe(exec.getExchangeId()));
ps.setString(10, nullSafe(exec.getErrorMessage()));
ps.setString(11, nullSafe(exec.getErrorStackTrace()));
int col = 1;
ps.setString(col++, UUID.randomUUID().toString());
ps.setString(col++, nullSafe(exec.getRouteId()));
ps.setString(col++, ""); // agent_id set by controller header or empty
ps.setString(col++, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING");
ps.setObject(col++, toTimestamp(exec.getStartTime()));
ps.setObject(col++, toTimestamp(exec.getEndTime()));
ps.setLong(col++, exec.getDurationMs());
ps.setString(col++, nullSafe(exec.getCorrelationId()));
ps.setString(col++, nullSafe(exec.getExchangeId()));
ps.setString(col++, nullSafe(exec.getErrorMessage()));
ps.setString(col++, nullSafe(exec.getErrorStackTrace()));
// Parallel arrays for processor executions
ps.setObject(12, processors.stream().map(p -> nullSafe(p.getProcessorId())).toArray(String[]::new));
ps.setObject(13, processors.stream().map(p -> nullSafe(p.getProcessorType())).toArray(String[]::new));
ps.setObject(14, processors.stream().map(p -> toTimestamp(p.getStartTime())).toArray(Timestamp[]::new));
ps.setObject(15, processors.stream().map(p -> toTimestamp(p.getEndTime())).toArray(Timestamp[]::new));
ps.setObject(16, processors.stream().mapToLong(ProcessorExecution::getDurationMs).boxed().toArray(Long[]::new));
ps.setObject(17, processors.stream().map(p -> p.getStatus() != null ? p.getStatus().name() : "RUNNING").toArray(String[]::new));
// Original parallel arrays
ps.setObject(col++, flatProcessors.stream().map(fp -> nullSafe(fp.proc.getProcessorId())).toArray(String[]::new));
ps.setObject(col++, flatProcessors.stream().map(fp -> nullSafe(fp.proc.getProcessorType())).toArray(String[]::new));
ps.setObject(col++, flatProcessors.stream().map(fp -> toTimestamp(fp.proc.getStartTime())).toArray(Timestamp[]::new));
ps.setObject(col++, flatProcessors.stream().map(fp -> toTimestamp(fp.proc.getEndTime())).toArray(Timestamp[]::new));
ps.setObject(col++, flatProcessors.stream().mapToLong(fp -> fp.proc.getDurationMs()).boxed().toArray(Long[]::new));
ps.setObject(col++, flatProcessors.stream().map(fp -> fp.proc.getStatus() != null ? fp.proc.getStatus().name() : "RUNNING").toArray(String[]::new));
// Phase 2: exchange bodies and headers (concatenated for search)
StringBuilder allBodies = new StringBuilder();
StringBuilder allHeaders = new StringBuilder();
String[] inputBodies = new String[flatProcessors.size()];
String[] outputBodies = new String[flatProcessors.size()];
String[] inputHeaders = new String[flatProcessors.size()];
String[] outputHeaders = new String[flatProcessors.size()];
String[] errorMessages = new String[flatProcessors.size()];
String[] errorStacktraces = new String[flatProcessors.size()];
String[] diagramNodeIds = new String[flatProcessors.size()];
Short[] depths = new Short[flatProcessors.size()];
Integer[] parentIndexes = new Integer[flatProcessors.size()];
for (int j = 0; j < flatProcessors.size(); j++) {
FlatProcessor fp = flatProcessors.get(j);
ProcessorExecution p = fp.proc;
inputBodies[j] = nullSafe(p.getInputBody());
outputBodies[j] = nullSafe(p.getOutputBody());
inputHeaders[j] = mapToJson(p.getInputHeaders());
outputHeaders[j] = mapToJson(p.getOutputHeaders());
errorMessages[j] = nullSafe(p.getErrorMessage());
errorStacktraces[j] = nullSafe(p.getErrorStackTrace());
diagramNodeIds[j] = nullSafe(p.getDiagramNodeId());
depths[j] = (short) fp.depth;
parentIndexes[j] = fp.parentIndex;
allBodies.append(inputBodies[j]).append(' ').append(outputBodies[j]).append(' ');
allHeaders.append(inputHeaders[j]).append(' ').append(outputHeaders[j]).append(' ');
}
// Include route-level input/output snapshot in searchable text
appendSnapshotText(exec.getInputSnapshot(), allBodies, allHeaders);
appendSnapshotText(exec.getOutputSnapshot(), allBodies, allHeaders);
ps.setString(col++, allBodies.toString().trim()); // exchange_bodies
ps.setString(col++, allHeaders.toString().trim()); // exchange_headers
ps.setObject(col++, depths); // processor_depths
ps.setObject(col++, parentIndexes); // processor_parent_indexes
ps.setObject(col++, errorMessages); // processor_error_messages
ps.setObject(col++, errorStacktraces); // processor_error_stacktraces
ps.setObject(col++, inputBodies); // processor_input_bodies
ps.setObject(col++, outputBodies); // processor_output_bodies
ps.setObject(col++, inputHeaders); // processor_input_headers
ps.setObject(col++, outputHeaders); // processor_output_headers
ps.setObject(col++, diagramNodeIds); // processor_diagram_node_ids
ps.setString(col++, ""); // diagram_content_hash (wired later)
}
@Override
@@ -87,32 +150,59 @@ public class ClickHouseExecutionRepository implements ExecutionRepository {
log.debug("Inserted batch of {} route executions into ClickHouse", executions.size());
}
@Override
public Optional<RawExecutionRow> findRawById(String executionId) {
throw new UnsupportedOperationException("Not yet implemented");
}
/**
* Flatten the processor tree into a flat list (depth-first).
* Internal record for a flattened processor with tree metadata.
*/
private List<ProcessorExecution> flattenProcessors(List<ProcessorExecution> processors) {
private record FlatProcessor(ProcessorExecution proc, int depth, int parentIndex) {}
/**
* Flatten the processor tree with depth and parent index metadata (DFS order).
*/
private List<FlatProcessor> flattenWithMetadata(List<ProcessorExecution> processors) {
if (processors == null || processors.isEmpty()) {
return List.of();
}
var result = new java.util.ArrayList<ProcessorExecution>();
var result = new ArrayList<FlatProcessor>();
for (ProcessorExecution p : processors) {
flatten(p, result);
flattenRecursive(p, 0, -1, result);
}
return result;
}
private void flatten(ProcessorExecution processor, List<ProcessorExecution> result) {
result.add(processor);
private void flattenRecursive(ProcessorExecution processor, int depth, int parentIdx,
List<FlatProcessor> result) {
int myIndex = result.size();
result.add(new FlatProcessor(processor, depth, parentIdx));
if (processor.getChildren() != null) {
for (ProcessorExecution child : processor.getChildren()) {
flatten(child, result);
flattenRecursive(child, depth + 1, myIndex, result);
}
}
}
@Override
public Optional<RawExecutionRow> findRawById(String executionId) {
throw new UnsupportedOperationException("Not yet implemented");
private void appendSnapshotText(ExchangeSnapshot snapshot,
StringBuilder allBodies, StringBuilder allHeaders) {
if (snapshot != null) {
allBodies.append(nullSafe(snapshot.getBody())).append(' ');
allHeaders.append(mapToJson(snapshot.getHeaders())).append(' ');
}
}
private static String mapToJson(Map<String, String> map) {
if (map == null || map.isEmpty()) {
return "{}";
}
try {
return OBJECT_MAPPER.writeValueAsString(map);
} catch (JsonProcessingException e) {
log.warn("Failed to serialize headers map to JSON", e);
return "{}";
}
}
private static String nullSafe(String value) {

View File

@@ -1,9 +1,5 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.common.model.ExchangeSnapshot;
import com.cameleer3.common.model.ExecutionStatus;
import com.cameleer3.common.model.ProcessorExecution;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.app.AbstractClickHouseIT;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -14,8 +10,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import java.sql.Array;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -90,49 +85,40 @@ class IngestionSchemaIT extends AbstractClickHouseIT {
postExecution(json);
await().atMost(10, SECONDS).untilAsserted(() -> {
var rows = jdbcTemplate.queryForList(
"SELECT processor_depths, processor_parent_indexes, processor_diagram_node_ids, " +
"exchange_bodies, processor_input_bodies, processor_output_bodies, " +
"processor_input_headers, processor_output_headers " +
"FROM route_executions WHERE route_id = 'schema-test-tree'");
// Use individual typed queries to avoid ClickHouse Array cast issues
var depths = queryArray(
"SELECT processor_depths FROM route_executions WHERE route_id = 'schema-test-tree'");
assertThat(depths).containsExactly("0", "1", "2");
assertThat(rows).hasSize(1);
var row = rows.get(0);
var parentIndexes = queryArray(
"SELECT processor_parent_indexes FROM route_executions WHERE route_id = 'schema-test-tree'");
assertThat(parentIndexes).containsExactly("-1", "0", "1");
// Verify depths: root=0, child=1, grandchild=2
@SuppressWarnings("unchecked")
var depths = (List<Number>) row.get("processor_depths");
assertThat(depths).containsExactly((short) 0, (short) 1, (short) 2);
// Verify parent indexes: root=-1, child=0 (parent is root at idx 0), grandchild=1 (parent is child at idx 1)
@SuppressWarnings("unchecked")
var parentIndexes = (List<Number>) row.get("processor_parent_indexes");
assertThat(parentIndexes).containsExactly(-1, 0, 1);
// Verify diagram node IDs
@SuppressWarnings("unchecked")
var diagramNodeIds = (List<String>) row.get("processor_diagram_node_ids");
var diagramNodeIds = queryArray(
"SELECT processor_diagram_node_ids FROM route_executions WHERE route_id = 'schema-test-tree'");
assertThat(diagramNodeIds).containsExactly("node-root", "node-child", "node-grandchild");
// Verify exchange_bodies contains concatenated text
String bodies = (String) row.get("exchange_bodies");
String bodies = jdbcTemplate.queryForObject(
"SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-tree'",
String.class);
assertThat(bodies).contains("root-input");
assertThat(bodies).contains("root-output");
assertThat(bodies).contains("child-input");
assertThat(bodies).contains("child-output");
// Verify per-processor input/output bodies
@SuppressWarnings("unchecked")
var inputBodies = (List<String>) row.get("processor_input_bodies");
var inputBodies = queryArray(
"SELECT processor_input_bodies FROM route_executions WHERE route_id = 'schema-test-tree'");
assertThat(inputBodies).containsExactly("root-input", "child-input", "");
@SuppressWarnings("unchecked")
var outputBodies = (List<String>) row.get("processor_output_bodies");
var outputBodies = queryArray(
"SELECT processor_output_bodies FROM route_executions WHERE route_id = 'schema-test-tree'");
assertThat(outputBodies).containsExactly("root-output", "child-output", "");
// Verify per-processor headers stored as JSON strings
@SuppressWarnings("unchecked")
var inputHeaders = (List<String>) row.get("processor_input_headers");
// Verify per-processor input headers stored as JSON strings
var inputHeaders = queryArray(
"SELECT processor_input_headers FROM route_executions WHERE route_id = 'schema-test-tree'");
assertThat(inputHeaders.get(0)).contains("Content-Type");
assertThat(inputHeaders.get(0)).contains("application/json");
});
@@ -175,22 +161,19 @@ class IngestionSchemaIT extends AbstractClickHouseIT {
postExecution(json);
await().atMost(10, SECONDS).untilAsserted(() -> {
var rows = jdbcTemplate.queryForList(
"SELECT exchange_bodies, exchange_headers " +
"FROM route_executions WHERE route_id = 'schema-test-bodies'");
assertThat(rows).hasSize(1);
var row = rows.get(0);
// Bodies should contain all sources
String bodies = (String) row.get("exchange_bodies");
String bodies = jdbcTemplate.queryForObject(
"SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-bodies'",
String.class);
assertThat(bodies).contains("processor-body-text");
assertThat(bodies).contains("processor-output-text");
assertThat(bodies).contains("route-level-input-body");
assertThat(bodies).contains("route-level-output-body");
// Headers should contain route-level header
String headers = (String) row.get("exchange_headers");
String headers = jdbcTemplate.queryForObject(
"SELECT exchange_headers FROM route_executions WHERE route_id = 'schema-test-bodies'",
String.class);
assertThat(headers).contains("X-Route");
assertThat(headers).contains("header-value");
});
@@ -224,26 +207,20 @@ class IngestionSchemaIT extends AbstractClickHouseIT {
postExecution(json);
await().atMost(10, SECONDS).untilAsserted(() -> {
var rows = jdbcTemplate.queryForList(
"SELECT exchange_bodies, exchange_headers, processor_input_bodies, " +
"processor_output_bodies, processor_depths, processor_parent_indexes " +
"FROM route_executions WHERE route_id = 'schema-test-null-snap'");
assertThat(rows).hasSize(1);
var row = rows.get(0);
// Empty but not null
String bodies = (String) row.get("exchange_bodies");
String bodies = jdbcTemplate.queryForObject(
"SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-null-snap'",
String.class);
assertThat(bodies).isNotNull();
// Depths and parent indexes still populated for tree metadata
@SuppressWarnings("unchecked")
var depths = (List<Number>) row.get("processor_depths");
assertThat(depths).containsExactly((short) 0);
var depths = queryArray(
"SELECT processor_depths FROM route_executions WHERE route_id = 'schema-test-null-snap'");
assertThat(depths).containsExactly("0");
@SuppressWarnings("unchecked")
var parentIndexes = (List<Number>) row.get("processor_parent_indexes");
assertThat(parentIndexes).containsExactly(-1);
var parentIndexes = queryArray(
"SELECT processor_parent_indexes FROM route_executions WHERE route_id = 'schema-test-null-snap'");
assertThat(parentIndexes).containsExactly("-1");
});
}
@@ -259,4 +236,26 @@ class IngestionSchemaIT extends AbstractClickHouseIT {
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
}
/**
* Query an array column from ClickHouse and return it as a List of strings.
* Handles the ClickHouse JDBC Array type by converting via toString on elements.
*/
private List<String> queryArray(String sql) {
return jdbcTemplate.query(sql, (rs, rowNum) -> {
Object arr = rs.getArray(1).getArray();
if (arr instanceof Object[] objects) {
return Arrays.stream(objects).map(Object::toString).toList();
} else if (arr instanceof short[] shorts) {
var result = new java.util.ArrayList<String>();
for (short s : shorts) result.add(String.valueOf(s));
return result;
} else if (arr instanceof int[] ints) {
var result = new java.util.ArrayList<String>();
for (int v : ints) result.add(String.valueOf(v));
return result;
}
return List.<String>of();
}).get(0);
}
}