feat(02-01): extend ingestion to populate Phase 2 columns with integration tests
- Refactored flattenProcessors to flattenWithMetadata (FlatProcessor record with depth/parentIndex) - INSERT now populates 12 new columns: exchange_bodies, exchange_headers, processor_depths, processor_parent_indexes, processor_error_messages, processor_error_stacktraces, processor_input_bodies, processor_output_bodies, processor_input_headers, processor_output_headers, processor_diagram_node_ids, diagram_content_hash - Exchange bodies/headers concatenated from all processor snapshots + route-level snapshots - Null ExchangeSnapshot handled gracefully (empty string defaults) - Headers serialized to JSON via Jackson ObjectMapper - IngestionSchemaIT verifies 3-level tree metadata, body concatenation, null snapshot handling - DiagramRenderer/DiagramLayout stubs created to fix pre-existing compilation error (Rule 3) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,11 @@
|
||||
# Deferred Items - Phase 02
|
||||
|
||||
## Pre-existing Issues
|
||||
|
||||
### ElkDiagramRendererTest breaks Spring context when run in full suite
|
||||
- **Found during:** 02-01, Task 2
|
||||
- **Issue:** When `ElkDiagramRendererTest` runs before Spring Boot integration tests in the same Surefire JVM, ELK's static initialization fails with `NoClassDefFoundError: org/eclipse/xtext/xbase/lib/CollectionLiterals`, which then prevents the Spring context from creating the `diagramRenderer` bean for all subsequent integration tests.
|
||||
- **Impact:** Full `mvn test` for the app module shows 30 errors (all integration tests fail after ElkDiagramRendererTest runs)
|
||||
- **Tests pass individually and in any grouping that excludes ElkDiagramRendererTest**
|
||||
- **Root cause:** ELK 0.11.0's service loader initializes `LayeredMetaDataProvider` which uses xtext's `CollectionLiterals` in a static initializer. The class is present on the classpath (dependency resolves correctly) but fails in the Surefire fork's classloading order when combined with Spring Boot's fat classloader.
|
||||
- **Suggested fix:** Configure Surefire to fork per test class, or defer ElkDiagramRendererTest to its own module/phase.
|
||||
@@ -1,21 +1,25 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.common.model.ExchangeSnapshot;
|
||||
import com.cameleer3.common.model.ProcessorExecution;
|
||||
import com.cameleer3.common.model.RouteExecution;
|
||||
import com.cameleer3.server.core.detail.RawExecutionRow;
|
||||
import com.cameleer3.server.core.storage.ExecutionRepository;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import com.cameleer3.server.core.detail.RawExecutionRow;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
@@ -23,20 +27,29 @@ import java.util.UUID;
|
||||
* ClickHouse implementation of {@link ExecutionRepository}.
|
||||
* <p>
|
||||
* Performs batch inserts into the {@code route_executions} table.
|
||||
* Processor executions are flattened into parallel arrays.
|
||||
* Processor executions are flattened into parallel arrays with tree metadata
|
||||
* (depth, parent index) for reconstruction.
|
||||
*/
|
||||
@Repository
|
||||
public class ClickHouseExecutionRepository implements ExecutionRepository {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ClickHouseExecutionRepository.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static final String INSERT_SQL = """
|
||||
INSERT INTO route_executions (
|
||||
execution_id, route_id, agent_id, status, start_time, end_time,
|
||||
duration_ms, correlation_id, exchange_id, error_message, error_stacktrace,
|
||||
processor_ids, processor_types, processor_starts, processor_ends,
|
||||
processor_durations, processor_statuses
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
processor_durations, processor_statuses,
|
||||
exchange_bodies, exchange_headers,
|
||||
processor_depths, processor_parent_indexes,
|
||||
processor_error_messages, processor_error_stacktraces,
|
||||
processor_input_bodies, processor_output_bodies,
|
||||
processor_input_headers, processor_output_headers,
|
||||
processor_diagram_node_ids, diagram_content_hash
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""";
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
@@ -55,27 +68,77 @@ public class ClickHouseExecutionRepository implements ExecutionRepository {
|
||||
@Override
|
||||
public void setValues(PreparedStatement ps, int i) throws SQLException {
|
||||
RouteExecution exec = executions.get(i);
|
||||
List<ProcessorExecution> processors = flattenProcessors(exec.getProcessors());
|
||||
List<FlatProcessor> flatProcessors = flattenWithMetadata(exec.getProcessors());
|
||||
|
||||
ps.setString(1, UUID.randomUUID().toString());
|
||||
ps.setString(2, nullSafe(exec.getRouteId()));
|
||||
ps.setString(3, ""); // agent_id set by controller header or empty
|
||||
ps.setString(4, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING");
|
||||
ps.setObject(5, toTimestamp(exec.getStartTime()));
|
||||
ps.setObject(6, toTimestamp(exec.getEndTime()));
|
||||
ps.setLong(7, exec.getDurationMs());
|
||||
ps.setString(8, nullSafe(exec.getCorrelationId()));
|
||||
ps.setString(9, nullSafe(exec.getExchangeId()));
|
||||
ps.setString(10, nullSafe(exec.getErrorMessage()));
|
||||
ps.setString(11, nullSafe(exec.getErrorStackTrace()));
|
||||
int col = 1;
|
||||
ps.setString(col++, UUID.randomUUID().toString());
|
||||
ps.setString(col++, nullSafe(exec.getRouteId()));
|
||||
ps.setString(col++, ""); // agent_id set by controller header or empty
|
||||
ps.setString(col++, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING");
|
||||
ps.setObject(col++, toTimestamp(exec.getStartTime()));
|
||||
ps.setObject(col++, toTimestamp(exec.getEndTime()));
|
||||
ps.setLong(col++, exec.getDurationMs());
|
||||
ps.setString(col++, nullSafe(exec.getCorrelationId()));
|
||||
ps.setString(col++, nullSafe(exec.getExchangeId()));
|
||||
ps.setString(col++, nullSafe(exec.getErrorMessage()));
|
||||
ps.setString(col++, nullSafe(exec.getErrorStackTrace()));
|
||||
|
||||
// Parallel arrays for processor executions
|
||||
ps.setObject(12, processors.stream().map(p -> nullSafe(p.getProcessorId())).toArray(String[]::new));
|
||||
ps.setObject(13, processors.stream().map(p -> nullSafe(p.getProcessorType())).toArray(String[]::new));
|
||||
ps.setObject(14, processors.stream().map(p -> toTimestamp(p.getStartTime())).toArray(Timestamp[]::new));
|
||||
ps.setObject(15, processors.stream().map(p -> toTimestamp(p.getEndTime())).toArray(Timestamp[]::new));
|
||||
ps.setObject(16, processors.stream().mapToLong(ProcessorExecution::getDurationMs).boxed().toArray(Long[]::new));
|
||||
ps.setObject(17, processors.stream().map(p -> p.getStatus() != null ? p.getStatus().name() : "RUNNING").toArray(String[]::new));
|
||||
// Original parallel arrays
|
||||
ps.setObject(col++, flatProcessors.stream().map(fp -> nullSafe(fp.proc.getProcessorId())).toArray(String[]::new));
|
||||
ps.setObject(col++, flatProcessors.stream().map(fp -> nullSafe(fp.proc.getProcessorType())).toArray(String[]::new));
|
||||
ps.setObject(col++, flatProcessors.stream().map(fp -> toTimestamp(fp.proc.getStartTime())).toArray(Timestamp[]::new));
|
||||
ps.setObject(col++, flatProcessors.stream().map(fp -> toTimestamp(fp.proc.getEndTime())).toArray(Timestamp[]::new));
|
||||
ps.setObject(col++, flatProcessors.stream().mapToLong(fp -> fp.proc.getDurationMs()).boxed().toArray(Long[]::new));
|
||||
ps.setObject(col++, flatProcessors.stream().map(fp -> fp.proc.getStatus() != null ? fp.proc.getStatus().name() : "RUNNING").toArray(String[]::new));
|
||||
|
||||
// Phase 2: exchange bodies and headers (concatenated for search)
|
||||
StringBuilder allBodies = new StringBuilder();
|
||||
StringBuilder allHeaders = new StringBuilder();
|
||||
|
||||
String[] inputBodies = new String[flatProcessors.size()];
|
||||
String[] outputBodies = new String[flatProcessors.size()];
|
||||
String[] inputHeaders = new String[flatProcessors.size()];
|
||||
String[] outputHeaders = new String[flatProcessors.size()];
|
||||
String[] errorMessages = new String[flatProcessors.size()];
|
||||
String[] errorStacktraces = new String[flatProcessors.size()];
|
||||
String[] diagramNodeIds = new String[flatProcessors.size()];
|
||||
Short[] depths = new Short[flatProcessors.size()];
|
||||
Integer[] parentIndexes = new Integer[flatProcessors.size()];
|
||||
|
||||
for (int j = 0; j < flatProcessors.size(); j++) {
|
||||
FlatProcessor fp = flatProcessors.get(j);
|
||||
ProcessorExecution p = fp.proc;
|
||||
|
||||
inputBodies[j] = nullSafe(p.getInputBody());
|
||||
outputBodies[j] = nullSafe(p.getOutputBody());
|
||||
inputHeaders[j] = mapToJson(p.getInputHeaders());
|
||||
outputHeaders[j] = mapToJson(p.getOutputHeaders());
|
||||
errorMessages[j] = nullSafe(p.getErrorMessage());
|
||||
errorStacktraces[j] = nullSafe(p.getErrorStackTrace());
|
||||
diagramNodeIds[j] = nullSafe(p.getDiagramNodeId());
|
||||
depths[j] = (short) fp.depth;
|
||||
parentIndexes[j] = fp.parentIndex;
|
||||
|
||||
allBodies.append(inputBodies[j]).append(' ').append(outputBodies[j]).append(' ');
|
||||
allHeaders.append(inputHeaders[j]).append(' ').append(outputHeaders[j]).append(' ');
|
||||
}
|
||||
|
||||
// Include route-level input/output snapshot in searchable text
|
||||
appendSnapshotText(exec.getInputSnapshot(), allBodies, allHeaders);
|
||||
appendSnapshotText(exec.getOutputSnapshot(), allBodies, allHeaders);
|
||||
|
||||
ps.setString(col++, allBodies.toString().trim()); // exchange_bodies
|
||||
ps.setString(col++, allHeaders.toString().trim()); // exchange_headers
|
||||
ps.setObject(col++, depths); // processor_depths
|
||||
ps.setObject(col++, parentIndexes); // processor_parent_indexes
|
||||
ps.setObject(col++, errorMessages); // processor_error_messages
|
||||
ps.setObject(col++, errorStacktraces); // processor_error_stacktraces
|
||||
ps.setObject(col++, inputBodies); // processor_input_bodies
|
||||
ps.setObject(col++, outputBodies); // processor_output_bodies
|
||||
ps.setObject(col++, inputHeaders); // processor_input_headers
|
||||
ps.setObject(col++, outputHeaders); // processor_output_headers
|
||||
ps.setObject(col++, diagramNodeIds); // processor_diagram_node_ids
|
||||
ps.setString(col++, ""); // diagram_content_hash (wired later)
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -87,32 +150,59 @@ public class ClickHouseExecutionRepository implements ExecutionRepository {
|
||||
log.debug("Inserted batch of {} route executions into ClickHouse", executions.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<RawExecutionRow> findRawById(String executionId) {
|
||||
throw new UnsupportedOperationException("Not yet implemented");
|
||||
}
|
||||
|
||||
/**
|
||||
* Flatten the processor tree into a flat list (depth-first).
|
||||
* Internal record for a flattened processor with tree metadata.
|
||||
*/
|
||||
private List<ProcessorExecution> flattenProcessors(List<ProcessorExecution> processors) {
|
||||
private record FlatProcessor(ProcessorExecution proc, int depth, int parentIndex) {}
|
||||
|
||||
/**
|
||||
* Flatten the processor tree with depth and parent index metadata (DFS order).
|
||||
*/
|
||||
private List<FlatProcessor> flattenWithMetadata(List<ProcessorExecution> processors) {
|
||||
if (processors == null || processors.isEmpty()) {
|
||||
return List.of();
|
||||
}
|
||||
var result = new java.util.ArrayList<ProcessorExecution>();
|
||||
var result = new ArrayList<FlatProcessor>();
|
||||
for (ProcessorExecution p : processors) {
|
||||
flatten(p, result);
|
||||
flattenRecursive(p, 0, -1, result);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void flatten(ProcessorExecution processor, List<ProcessorExecution> result) {
|
||||
result.add(processor);
|
||||
private void flattenRecursive(ProcessorExecution processor, int depth, int parentIdx,
|
||||
List<FlatProcessor> result) {
|
||||
int myIndex = result.size();
|
||||
result.add(new FlatProcessor(processor, depth, parentIdx));
|
||||
if (processor.getChildren() != null) {
|
||||
for (ProcessorExecution child : processor.getChildren()) {
|
||||
flatten(child, result);
|
||||
flattenRecursive(child, depth + 1, myIndex, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<RawExecutionRow> findRawById(String executionId) {
|
||||
throw new UnsupportedOperationException("Not yet implemented");
|
||||
private void appendSnapshotText(ExchangeSnapshot snapshot,
|
||||
StringBuilder allBodies, StringBuilder allHeaders) {
|
||||
if (snapshot != null) {
|
||||
allBodies.append(nullSafe(snapshot.getBody())).append(' ');
|
||||
allHeaders.append(mapToJson(snapshot.getHeaders())).append(' ');
|
||||
}
|
||||
}
|
||||
|
||||
private static String mapToJson(Map<String, String> map) {
|
||||
if (map == null || map.isEmpty()) {
|
||||
return "{}";
|
||||
}
|
||||
try {
|
||||
return OBJECT_MAPPER.writeValueAsString(map);
|
||||
} catch (JsonProcessingException e) {
|
||||
log.warn("Failed to serialize headers map to JSON", e);
|
||||
return "{}";
|
||||
}
|
||||
}
|
||||
|
||||
private static String nullSafe(String value) {
|
||||
|
||||
@@ -1,9 +1,5 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.common.model.ExchangeSnapshot;
|
||||
import com.cameleer3.common.model.ExecutionStatus;
|
||||
import com.cameleer3.common.model.ProcessorExecution;
|
||||
import com.cameleer3.common.model.RouteExecution;
|
||||
import com.cameleer3.server.app.AbstractClickHouseIT;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -14,8 +10,7 @@ import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
|
||||
import java.sql.Array;
|
||||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@@ -90,49 +85,40 @@ class IngestionSchemaIT extends AbstractClickHouseIT {
|
||||
postExecution(json);
|
||||
|
||||
await().atMost(10, SECONDS).untilAsserted(() -> {
|
||||
var rows = jdbcTemplate.queryForList(
|
||||
"SELECT processor_depths, processor_parent_indexes, processor_diagram_node_ids, " +
|
||||
"exchange_bodies, processor_input_bodies, processor_output_bodies, " +
|
||||
"processor_input_headers, processor_output_headers " +
|
||||
"FROM route_executions WHERE route_id = 'schema-test-tree'");
|
||||
// Use individual typed queries to avoid ClickHouse Array cast issues
|
||||
var depths = queryArray(
|
||||
"SELECT processor_depths FROM route_executions WHERE route_id = 'schema-test-tree'");
|
||||
assertThat(depths).containsExactly("0", "1", "2");
|
||||
|
||||
assertThat(rows).hasSize(1);
|
||||
var row = rows.get(0);
|
||||
var parentIndexes = queryArray(
|
||||
"SELECT processor_parent_indexes FROM route_executions WHERE route_id = 'schema-test-tree'");
|
||||
assertThat(parentIndexes).containsExactly("-1", "0", "1");
|
||||
|
||||
// Verify depths: root=0, child=1, grandchild=2
|
||||
@SuppressWarnings("unchecked")
|
||||
var depths = (List<Number>) row.get("processor_depths");
|
||||
assertThat(depths).containsExactly((short) 0, (short) 1, (short) 2);
|
||||
|
||||
// Verify parent indexes: root=-1, child=0 (parent is root at idx 0), grandchild=1 (parent is child at idx 1)
|
||||
@SuppressWarnings("unchecked")
|
||||
var parentIndexes = (List<Number>) row.get("processor_parent_indexes");
|
||||
assertThat(parentIndexes).containsExactly(-1, 0, 1);
|
||||
|
||||
// Verify diagram node IDs
|
||||
@SuppressWarnings("unchecked")
|
||||
var diagramNodeIds = (List<String>) row.get("processor_diagram_node_ids");
|
||||
var diagramNodeIds = queryArray(
|
||||
"SELECT processor_diagram_node_ids FROM route_executions WHERE route_id = 'schema-test-tree'");
|
||||
assertThat(diagramNodeIds).containsExactly("node-root", "node-child", "node-grandchild");
|
||||
|
||||
// Verify exchange_bodies contains concatenated text
|
||||
String bodies = (String) row.get("exchange_bodies");
|
||||
String bodies = jdbcTemplate.queryForObject(
|
||||
"SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-tree'",
|
||||
String.class);
|
||||
assertThat(bodies).contains("root-input");
|
||||
assertThat(bodies).contains("root-output");
|
||||
assertThat(bodies).contains("child-input");
|
||||
assertThat(bodies).contains("child-output");
|
||||
|
||||
// Verify per-processor input/output bodies
|
||||
@SuppressWarnings("unchecked")
|
||||
var inputBodies = (List<String>) row.get("processor_input_bodies");
|
||||
var inputBodies = queryArray(
|
||||
"SELECT processor_input_bodies FROM route_executions WHERE route_id = 'schema-test-tree'");
|
||||
assertThat(inputBodies).containsExactly("root-input", "child-input", "");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
var outputBodies = (List<String>) row.get("processor_output_bodies");
|
||||
var outputBodies = queryArray(
|
||||
"SELECT processor_output_bodies FROM route_executions WHERE route_id = 'schema-test-tree'");
|
||||
assertThat(outputBodies).containsExactly("root-output", "child-output", "");
|
||||
|
||||
// Verify per-processor headers stored as JSON strings
|
||||
@SuppressWarnings("unchecked")
|
||||
var inputHeaders = (List<String>) row.get("processor_input_headers");
|
||||
// Verify per-processor input headers stored as JSON strings
|
||||
var inputHeaders = queryArray(
|
||||
"SELECT processor_input_headers FROM route_executions WHERE route_id = 'schema-test-tree'");
|
||||
assertThat(inputHeaders.get(0)).contains("Content-Type");
|
||||
assertThat(inputHeaders.get(0)).contains("application/json");
|
||||
});
|
||||
@@ -175,22 +161,19 @@ class IngestionSchemaIT extends AbstractClickHouseIT {
|
||||
postExecution(json);
|
||||
|
||||
await().atMost(10, SECONDS).untilAsserted(() -> {
|
||||
var rows = jdbcTemplate.queryForList(
|
||||
"SELECT exchange_bodies, exchange_headers " +
|
||||
"FROM route_executions WHERE route_id = 'schema-test-bodies'");
|
||||
|
||||
assertThat(rows).hasSize(1);
|
||||
var row = rows.get(0);
|
||||
|
||||
// Bodies should contain all sources
|
||||
String bodies = (String) row.get("exchange_bodies");
|
||||
String bodies = jdbcTemplate.queryForObject(
|
||||
"SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-bodies'",
|
||||
String.class);
|
||||
assertThat(bodies).contains("processor-body-text");
|
||||
assertThat(bodies).contains("processor-output-text");
|
||||
assertThat(bodies).contains("route-level-input-body");
|
||||
assertThat(bodies).contains("route-level-output-body");
|
||||
|
||||
// Headers should contain route-level header
|
||||
String headers = (String) row.get("exchange_headers");
|
||||
String headers = jdbcTemplate.queryForObject(
|
||||
"SELECT exchange_headers FROM route_executions WHERE route_id = 'schema-test-bodies'",
|
||||
String.class);
|
||||
assertThat(headers).contains("X-Route");
|
||||
assertThat(headers).contains("header-value");
|
||||
});
|
||||
@@ -224,26 +207,20 @@ class IngestionSchemaIT extends AbstractClickHouseIT {
|
||||
postExecution(json);
|
||||
|
||||
await().atMost(10, SECONDS).untilAsserted(() -> {
|
||||
var rows = jdbcTemplate.queryForList(
|
||||
"SELECT exchange_bodies, exchange_headers, processor_input_bodies, " +
|
||||
"processor_output_bodies, processor_depths, processor_parent_indexes " +
|
||||
"FROM route_executions WHERE route_id = 'schema-test-null-snap'");
|
||||
|
||||
assertThat(rows).hasSize(1);
|
||||
var row = rows.get(0);
|
||||
|
||||
// Empty but not null
|
||||
String bodies = (String) row.get("exchange_bodies");
|
||||
String bodies = jdbcTemplate.queryForObject(
|
||||
"SELECT exchange_bodies FROM route_executions WHERE route_id = 'schema-test-null-snap'",
|
||||
String.class);
|
||||
assertThat(bodies).isNotNull();
|
||||
|
||||
// Depths and parent indexes still populated for tree metadata
|
||||
@SuppressWarnings("unchecked")
|
||||
var depths = (List<Number>) row.get("processor_depths");
|
||||
assertThat(depths).containsExactly((short) 0);
|
||||
var depths = queryArray(
|
||||
"SELECT processor_depths FROM route_executions WHERE route_id = 'schema-test-null-snap'");
|
||||
assertThat(depths).containsExactly("0");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
var parentIndexes = (List<Number>) row.get("processor_parent_indexes");
|
||||
assertThat(parentIndexes).containsExactly(-1);
|
||||
var parentIndexes = queryArray(
|
||||
"SELECT processor_parent_indexes FROM route_executions WHERE route_id = 'schema-test-null-snap'");
|
||||
assertThat(parentIndexes).containsExactly("-1");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -259,4 +236,26 @@ class IngestionSchemaIT extends AbstractClickHouseIT {
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
|
||||
}
|
||||
|
||||
/**
|
||||
* Query an array column from ClickHouse and return it as a List of strings.
|
||||
* Handles the ClickHouse JDBC Array type by converting via toString on elements.
|
||||
*/
|
||||
private List<String> queryArray(String sql) {
|
||||
return jdbcTemplate.query(sql, (rs, rowNum) -> {
|
||||
Object arr = rs.getArray(1).getArray();
|
||||
if (arr instanceof Object[] objects) {
|
||||
return Arrays.stream(objects).map(Object::toString).toList();
|
||||
} else if (arr instanceof short[] shorts) {
|
||||
var result = new java.util.ArrayList<String>();
|
||||
for (short s : shorts) result.add(String.valueOf(s));
|
||||
return result;
|
||||
} else if (arr instanceof int[] ints) {
|
||||
var result = new java.util.ArrayList<String>();
|
||||
for (int v : ints) result.add(String.valueOf(v));
|
||||
return result;
|
||||
}
|
||||
return List.<String>of();
|
||||
}).get(0);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user