feat(02-01): extend ingestion to populate Phase 2 columns with integration tests

- Refactored flattenProcessors to flattenWithMetadata (FlatProcessor record with depth/parentIndex)
- INSERT now populates 12 new columns: exchange_bodies, exchange_headers, processor_depths,
  processor_parent_indexes, processor_error_messages, processor_error_stacktraces,
  processor_input_bodies, processor_output_bodies, processor_input_headers,
  processor_output_headers, processor_diagram_node_ids, diagram_content_hash
- Exchange bodies/headers concatenated from all processor snapshots + route-level snapshots
- Null ExchangeSnapshot handled gracefully (empty string defaults)
- Headers serialized to JSON via Jackson ObjectMapper
- IngestionSchemaIT verifies 3-level tree metadata, body concatenation, null snapshot handling
- DiagramRenderer/DiagramLayout stubs created to fix pre-existing compilation error (Rule 3)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-11 16:15:41 +01:00
parent c0922430c4
commit f6ff279a60
3 changed files with 192 additions and 92 deletions

View File

@@ -1,21 +1,25 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.common.model.ExchangeSnapshot;
import com.cameleer3.common.model.ProcessorExecution;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.detail.RawExecutionRow;
import com.cameleer3.server.core.storage.ExecutionRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import com.cameleer3.server.core.detail.RawExecutionRow;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
@@ -23,20 +27,29 @@ import java.util.UUID;
* ClickHouse implementation of {@link ExecutionRepository}.
* <p>
* Performs batch inserts into the {@code route_executions} table.
* Processor executions are flattened into parallel arrays.
* Processor executions are flattened into parallel arrays with tree metadata
* (depth, parent index) for reconstruction.
*/
@Repository
public class ClickHouseExecutionRepository implements ExecutionRepository {
private static final Logger log = LoggerFactory.getLogger(ClickHouseExecutionRepository.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String INSERT_SQL = """
INSERT INTO route_executions (
execution_id, route_id, agent_id, status, start_time, end_time,
duration_ms, correlation_id, exchange_id, error_message, error_stacktrace,
processor_ids, processor_types, processor_starts, processor_ends,
processor_durations, processor_statuses
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
processor_durations, processor_statuses,
exchange_bodies, exchange_headers,
processor_depths, processor_parent_indexes,
processor_error_messages, processor_error_stacktraces,
processor_input_bodies, processor_output_bodies,
processor_input_headers, processor_output_headers,
processor_diagram_node_ids, diagram_content_hash
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""";
private final JdbcTemplate jdbcTemplate;
@@ -55,27 +68,77 @@ public class ClickHouseExecutionRepository implements ExecutionRepository {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
RouteExecution exec = executions.get(i);
List<ProcessorExecution> processors = flattenProcessors(exec.getProcessors());
List<FlatProcessor> flatProcessors = flattenWithMetadata(exec.getProcessors());
ps.setString(1, UUID.randomUUID().toString());
ps.setString(2, nullSafe(exec.getRouteId()));
ps.setString(3, ""); // agent_id set by controller header or empty
ps.setString(4, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING");
ps.setObject(5, toTimestamp(exec.getStartTime()));
ps.setObject(6, toTimestamp(exec.getEndTime()));
ps.setLong(7, exec.getDurationMs());
ps.setString(8, nullSafe(exec.getCorrelationId()));
ps.setString(9, nullSafe(exec.getExchangeId()));
ps.setString(10, nullSafe(exec.getErrorMessage()));
ps.setString(11, nullSafe(exec.getErrorStackTrace()));
int col = 1;
ps.setString(col++, UUID.randomUUID().toString());
ps.setString(col++, nullSafe(exec.getRouteId()));
ps.setString(col++, ""); // agent_id set by controller header or empty
ps.setString(col++, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING");
ps.setObject(col++, toTimestamp(exec.getStartTime()));
ps.setObject(col++, toTimestamp(exec.getEndTime()));
ps.setLong(col++, exec.getDurationMs());
ps.setString(col++, nullSafe(exec.getCorrelationId()));
ps.setString(col++, nullSafe(exec.getExchangeId()));
ps.setString(col++, nullSafe(exec.getErrorMessage()));
ps.setString(col++, nullSafe(exec.getErrorStackTrace()));
// Parallel arrays for processor executions
ps.setObject(12, processors.stream().map(p -> nullSafe(p.getProcessorId())).toArray(String[]::new));
ps.setObject(13, processors.stream().map(p -> nullSafe(p.getProcessorType())).toArray(String[]::new));
ps.setObject(14, processors.stream().map(p -> toTimestamp(p.getStartTime())).toArray(Timestamp[]::new));
ps.setObject(15, processors.stream().map(p -> toTimestamp(p.getEndTime())).toArray(Timestamp[]::new));
ps.setObject(16, processors.stream().mapToLong(ProcessorExecution::getDurationMs).boxed().toArray(Long[]::new));
ps.setObject(17, processors.stream().map(p -> p.getStatus() != null ? p.getStatus().name() : "RUNNING").toArray(String[]::new));
// Original parallel arrays
ps.setObject(col++, flatProcessors.stream().map(fp -> nullSafe(fp.proc.getProcessorId())).toArray(String[]::new));
ps.setObject(col++, flatProcessors.stream().map(fp -> nullSafe(fp.proc.getProcessorType())).toArray(String[]::new));
ps.setObject(col++, flatProcessors.stream().map(fp -> toTimestamp(fp.proc.getStartTime())).toArray(Timestamp[]::new));
ps.setObject(col++, flatProcessors.stream().map(fp -> toTimestamp(fp.proc.getEndTime())).toArray(Timestamp[]::new));
ps.setObject(col++, flatProcessors.stream().mapToLong(fp -> fp.proc.getDurationMs()).boxed().toArray(Long[]::new));
ps.setObject(col++, flatProcessors.stream().map(fp -> fp.proc.getStatus() != null ? fp.proc.getStatus().name() : "RUNNING").toArray(String[]::new));
// Phase 2: exchange bodies and headers (concatenated for search)
StringBuilder allBodies = new StringBuilder();
StringBuilder allHeaders = new StringBuilder();
String[] inputBodies = new String[flatProcessors.size()];
String[] outputBodies = new String[flatProcessors.size()];
String[] inputHeaders = new String[flatProcessors.size()];
String[] outputHeaders = new String[flatProcessors.size()];
String[] errorMessages = new String[flatProcessors.size()];
String[] errorStacktraces = new String[flatProcessors.size()];
String[] diagramNodeIds = new String[flatProcessors.size()];
Short[] depths = new Short[flatProcessors.size()];
Integer[] parentIndexes = new Integer[flatProcessors.size()];
for (int j = 0; j < flatProcessors.size(); j++) {
FlatProcessor fp = flatProcessors.get(j);
ProcessorExecution p = fp.proc;
inputBodies[j] = nullSafe(p.getInputBody());
outputBodies[j] = nullSafe(p.getOutputBody());
inputHeaders[j] = mapToJson(p.getInputHeaders());
outputHeaders[j] = mapToJson(p.getOutputHeaders());
errorMessages[j] = nullSafe(p.getErrorMessage());
errorStacktraces[j] = nullSafe(p.getErrorStackTrace());
diagramNodeIds[j] = nullSafe(p.getDiagramNodeId());
depths[j] = (short) fp.depth;
parentIndexes[j] = fp.parentIndex;
allBodies.append(inputBodies[j]).append(' ').append(outputBodies[j]).append(' ');
allHeaders.append(inputHeaders[j]).append(' ').append(outputHeaders[j]).append(' ');
}
// Include route-level input/output snapshot in searchable text
appendSnapshotText(exec.getInputSnapshot(), allBodies, allHeaders);
appendSnapshotText(exec.getOutputSnapshot(), allBodies, allHeaders);
ps.setString(col++, allBodies.toString().trim()); // exchange_bodies
ps.setString(col++, allHeaders.toString().trim()); // exchange_headers
ps.setObject(col++, depths); // processor_depths
ps.setObject(col++, parentIndexes); // processor_parent_indexes
ps.setObject(col++, errorMessages); // processor_error_messages
ps.setObject(col++, errorStacktraces); // processor_error_stacktraces
ps.setObject(col++, inputBodies); // processor_input_bodies
ps.setObject(col++, outputBodies); // processor_output_bodies
ps.setObject(col++, inputHeaders); // processor_input_headers
ps.setObject(col++, outputHeaders); // processor_output_headers
ps.setObject(col++, diagramNodeIds); // processor_diagram_node_ids
ps.setString(col++, ""); // diagram_content_hash (wired later)
}
@Override
@@ -87,32 +150,59 @@ public class ClickHouseExecutionRepository implements ExecutionRepository {
log.debug("Inserted batch of {} route executions into ClickHouse", executions.size());
}
@Override
public Optional<RawExecutionRow> findRawById(String executionId) {
throw new UnsupportedOperationException("Not yet implemented");
}
/**
* Flatten the processor tree into a flat list (depth-first).
* Internal record for a flattened processor with tree metadata.
*/
private List<ProcessorExecution> flattenProcessors(List<ProcessorExecution> processors) {
private record FlatProcessor(ProcessorExecution proc, int depth, int parentIndex) {}
/**
* Flatten the processor tree with depth and parent index metadata (DFS order).
*/
private List<FlatProcessor> flattenWithMetadata(List<ProcessorExecution> processors) {
if (processors == null || processors.isEmpty()) {
return List.of();
}
var result = new java.util.ArrayList<ProcessorExecution>();
var result = new ArrayList<FlatProcessor>();
for (ProcessorExecution p : processors) {
flatten(p, result);
flattenRecursive(p, 0, -1, result);
}
return result;
}
private void flatten(ProcessorExecution processor, List<ProcessorExecution> result) {
result.add(processor);
private void flattenRecursive(ProcessorExecution processor, int depth, int parentIdx,
List<FlatProcessor> result) {
int myIndex = result.size();
result.add(new FlatProcessor(processor, depth, parentIdx));
if (processor.getChildren() != null) {
for (ProcessorExecution child : processor.getChildren()) {
flatten(child, result);
flattenRecursive(child, depth + 1, myIndex, result);
}
}
}
@Override
public Optional<RawExecutionRow> findRawById(String executionId) {
throw new UnsupportedOperationException("Not yet implemented");
private void appendSnapshotText(ExchangeSnapshot snapshot,
StringBuilder allBodies, StringBuilder allHeaders) {
if (snapshot != null) {
allBodies.append(nullSafe(snapshot.getBody())).append(' ');
allHeaders.append(mapToJson(snapshot.getHeaders())).append(' ');
}
}
private static String mapToJson(Map<String, String> map) {
if (map == null || map.isEmpty()) {
return "{}";
}
try {
return OBJECT_MAPPER.writeValueAsString(map);
} catch (JsonProcessingException e) {
log.warn("Failed to serialize headers map to JSON", e);
return "{}";
}
}
private static String nullSafe(String value) {