feat(02-03): detail controller, tree reconstruction, processor snapshot endpoint

- Implement findRawById and findProcessorSnapshot in ClickHouseExecutionRepository
- DetailController with GET /executions/{id} returning nested processor tree
- GET /executions/{id}/processors/{index}/snapshot for per-processor exchange data
- 5 unit tests for tree reconstruction (linear, branching, multiple roots, empty)
- 6 integration tests for detail endpoint, snapshot, and 404 handling
- Added assertj and mockito test dependencies to core module

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-11 16:29:53 +01:00
parent 82a190c8e2
commit 0615a9851d
5 changed files with 626 additions and 1 deletions

View File

@@ -0,0 +1,54 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.storage.ClickHouseExecutionRepository;
import com.cameleer3.server.core.detail.DetailService;
import com.cameleer3.server.core.detail.ExecutionDetail;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/**
* Endpoints for retrieving execution details and processor snapshots.
* <p>
* The detail endpoint returns a nested processor tree reconstructed from
* flat parallel arrays stored in ClickHouse. The snapshot endpoint returns
* per-processor exchange data (bodies and headers).
*/
@RestController
@RequestMapping("/api/v1/executions")
@Tag(name = "Detail", description = "Execution detail and processor snapshot endpoints")
public class DetailController {
private final DetailService detailService;
private final ClickHouseExecutionRepository executionRepository;
public DetailController(DetailService detailService,
ClickHouseExecutionRepository executionRepository) {
this.detailService = detailService;
this.executionRepository = executionRepository;
}
@GetMapping("/{executionId}")
@Operation(summary = "Get execution detail with nested processor tree")
public ResponseEntity<ExecutionDetail> getDetail(@PathVariable String executionId) {
return detailService.getDetail(executionId)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
@GetMapping("/{executionId}/processors/{index}/snapshot")
@Operation(summary = "Get exchange snapshot for a specific processor")
public ResponseEntity<Map<String, String>> getProcessorSnapshot(
@PathVariable String executionId,
@PathVariable int index) {
return executionRepository.findProcessorSnapshot(executionId, index)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
}

View File

@@ -152,7 +152,201 @@ public class ClickHouseExecutionRepository implements ExecutionRepository {
@Override
public Optional<RawExecutionRow> findRawById(String executionId) {
throw new UnsupportedOperationException("Not yet implemented");
String sql = """
SELECT execution_id, route_id, agent_id, status, start_time, end_time,
duration_ms, correlation_id, exchange_id, error_message, error_stacktrace,
diagram_content_hash,
processor_ids, processor_types, processor_statuses,
processor_starts, processor_ends, processor_durations,
processor_diagram_node_ids,
processor_error_messages, processor_error_stacktraces,
processor_depths, processor_parent_indexes
FROM route_executions
WHERE execution_id = ?
LIMIT 1
""";
List<RawExecutionRow> results = jdbcTemplate.query(sql, (rs, rowNum) -> {
// Extract parallel arrays from ClickHouse
String[] processorIds = toStringArray(rs.getArray("processor_ids"));
String[] processorTypes = toStringArray(rs.getArray("processor_types"));
String[] processorStatuses = toStringArray(rs.getArray("processor_statuses"));
Instant[] processorStarts = toInstantArray(rs.getArray("processor_starts"));
Instant[] processorEnds = toInstantArray(rs.getArray("processor_ends"));
long[] processorDurations = toLongArray(rs.getArray("processor_durations"));
String[] processorDiagramNodeIds = toStringArray(rs.getArray("processor_diagram_node_ids"));
String[] processorErrorMessages = toStringArray(rs.getArray("processor_error_messages"));
String[] processorErrorStacktraces = toStringArray(rs.getArray("processor_error_stacktraces"));
int[] processorDepths = toIntArrayFromShort(rs.getArray("processor_depths"));
int[] processorParentIndexes = toIntArray(rs.getArray("processor_parent_indexes"));
Timestamp endTs = rs.getTimestamp("end_time");
return new RawExecutionRow(
rs.getString("execution_id"),
rs.getString("route_id"),
rs.getString("agent_id"),
rs.getString("status"),
rs.getTimestamp("start_time").toInstant(),
endTs != null ? endTs.toInstant() : null,
rs.getLong("duration_ms"),
rs.getString("correlation_id"),
rs.getString("exchange_id"),
rs.getString("error_message"),
rs.getString("error_stacktrace"),
rs.getString("diagram_content_hash"),
processorIds, processorTypes, processorStatuses,
processorStarts, processorEnds, processorDurations,
processorDiagramNodeIds,
processorErrorMessages, processorErrorStacktraces,
processorDepths, processorParentIndexes
);
}, executionId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
/**
* Find exchange snapshot data for a specific processor by index.
*
* @param executionId the execution ID
* @param processorIndex 0-based processor index
* @return map with inputBody, outputBody, inputHeaders, outputHeaders or empty if not found
*/
public Optional<java.util.Map<String, String>> findProcessorSnapshot(String executionId, int processorIndex) {
// ClickHouse arrays are 1-indexed in SQL
int chIndex = processorIndex + 1;
String sql = """
SELECT
processor_input_bodies[?] AS input_body,
processor_output_bodies[?] AS output_body,
processor_input_headers[?] AS input_headers,
processor_output_headers[?] AS output_headers,
length(processor_ids) AS proc_count
FROM route_executions
WHERE execution_id = ?
LIMIT 1
""";
List<java.util.Map<String, String>> results = jdbcTemplate.query(sql, (rs, rowNum) -> {
int procCount = rs.getInt("proc_count");
if (processorIndex < 0 || processorIndex >= procCount) {
return null;
}
var snapshot = new java.util.LinkedHashMap<String, String>();
snapshot.put("inputBody", rs.getString("input_body"));
snapshot.put("outputBody", rs.getString("output_body"));
snapshot.put("inputHeaders", rs.getString("input_headers"));
snapshot.put("outputHeaders", rs.getString("output_headers"));
return snapshot;
}, chIndex, chIndex, chIndex, chIndex, executionId);
if (results.isEmpty() || results.get(0) == null) {
return Optional.empty();
}
return Optional.of(results.get(0));
}
// --- Array extraction helpers ---
private static String[] toStringArray(java.sql.Array sqlArray) throws SQLException {
if (sqlArray == null) return new String[0];
Object arr = sqlArray.getArray();
if (arr instanceof String[] sa) return sa;
if (arr instanceof Object[] oa) {
String[] result = new String[oa.length];
for (int i = 0; i < oa.length; i++) {
result[i] = oa[i] != null ? oa[i].toString() : "";
}
return result;
}
return new String[0];
}
private static Instant[] toInstantArray(java.sql.Array sqlArray) throws SQLException {
if (sqlArray == null) return new Instant[0];
Object arr = sqlArray.getArray();
if (arr instanceof Timestamp[] ts) {
Instant[] result = new Instant[ts.length];
for (int i = 0; i < ts.length; i++) {
result[i] = ts[i] != null ? ts[i].toInstant() : Instant.EPOCH;
}
return result;
}
if (arr instanceof Object[] oa) {
Instant[] result = new Instant[oa.length];
for (int i = 0; i < oa.length; i++) {
if (oa[i] instanceof Timestamp ts) {
result[i] = ts.toInstant();
} else {
result[i] = Instant.EPOCH;
}
}
return result;
}
return new Instant[0];
}
private static long[] toLongArray(java.sql.Array sqlArray) throws SQLException {
if (sqlArray == null) return new long[0];
Object arr = sqlArray.getArray();
if (arr instanceof long[] la) return la;
if (arr instanceof Long[] la) {
long[] result = new long[la.length];
for (int i = 0; i < la.length; i++) {
result[i] = la[i] != null ? la[i] : 0;
}
return result;
}
if (arr instanceof Object[] oa) {
long[] result = new long[oa.length];
for (int i = 0; i < oa.length; i++) {
result[i] = oa[i] instanceof Number n ? n.longValue() : 0;
}
return result;
}
return new long[0];
}
private static int[] toIntArray(java.sql.Array sqlArray) throws SQLException {
if (sqlArray == null) return new int[0];
Object arr = sqlArray.getArray();
if (arr instanceof int[] ia) return ia;
if (arr instanceof Integer[] ia) {
int[] result = new int[ia.length];
for (int i = 0; i < ia.length; i++) {
result[i] = ia[i] != null ? ia[i] : 0;
}
return result;
}
if (arr instanceof Object[] oa) {
int[] result = new int[oa.length];
for (int i = 0; i < oa.length; i++) {
result[i] = oa[i] instanceof Number n ? n.intValue() : 0;
}
return result;
}
return new int[0];
}
private static int[] toIntArrayFromShort(java.sql.Array sqlArray) throws SQLException {
if (sqlArray == null) return new int[0];
Object arr = sqlArray.getArray();
if (arr instanceof short[] sa) {
int[] result = new int[sa.length];
for (int i = 0; i < sa.length; i++) {
result[i] = sa[i];
}
return result;
}
if (arr instanceof int[] ia) return ia;
if (arr instanceof Object[] oa) {
int[] result = new int[oa.length];
for (int i = 0; i < oa.length; i++) {
result[i] = oa[i] instanceof Number n ? n.intValue() : 0;
}
return result;
}
return new int[0];
}
/**

View File

@@ -0,0 +1,227 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.AbstractClickHouseIT;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
/**
* Integration tests for the detail and processor snapshot endpoints.
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class DetailControllerIT extends AbstractClickHouseIT {
@Autowired
private TestRestTemplate restTemplate;
private final ObjectMapper objectMapper = new ObjectMapper();
private String seededExecutionId;
/**
* Seed a route execution with a 3-level processor tree:
* root -> [child1, child2], child2 -> [grandchild]
*/
@BeforeAll
void seedTestData() {
String json = """
{
"routeId": "detail-test-route",
"exchangeId": "detail-ex-1",
"correlationId": "detail-corr-1",
"status": "COMPLETED",
"startTime": "2026-03-10T10:00:00Z",
"endTime": "2026-03-10T10:00:01Z",
"durationMs": 1000,
"errorMessage": "",
"errorStackTrace": "",
"processors": [
{
"processorId": "root-proc",
"processorType": "split",
"status": "COMPLETED",
"startTime": "2026-03-10T10:00:00Z",
"endTime": "2026-03-10T10:00:01Z",
"durationMs": 1000,
"diagramNodeId": "node-root",
"inputBody": "root-input-body",
"outputBody": "root-output-body",
"inputHeaders": {"Content-Type": "application/json"},
"outputHeaders": {"X-Result": "ok"},
"children": [
{
"processorId": "child1-proc",
"processorType": "log",
"status": "COMPLETED",
"startTime": "2026-03-10T10:00:00.100Z",
"endTime": "2026-03-10T10:00:00.200Z",
"durationMs": 100,
"diagramNodeId": "node-child1",
"inputBody": "child1-input",
"outputBody": "child1-output",
"inputHeaders": {},
"outputHeaders": {}
},
{
"processorId": "child2-proc",
"processorType": "bean",
"status": "COMPLETED",
"startTime": "2026-03-10T10:00:00.200Z",
"endTime": "2026-03-10T10:00:00.800Z",
"durationMs": 600,
"diagramNodeId": "node-child2",
"inputBody": "child2-input",
"outputBody": "child2-output",
"inputHeaders": {},
"outputHeaders": {},
"children": [
{
"processorId": "grandchild-proc",
"processorType": "to",
"status": "COMPLETED",
"startTime": "2026-03-10T10:00:00.300Z",
"endTime": "2026-03-10T10:00:00.700Z",
"durationMs": 400,
"diagramNodeId": "node-gc",
"inputBody": "gc-input",
"outputBody": "gc-output",
"inputHeaders": {"X-GC": "true"},
"outputHeaders": {}
}
]
}
]
}
]
}
""";
ingest(json);
// Wait for flush and get the execution_id
await().atMost(10, SECONDS).untilAsserted(() -> {
Integer count = jdbcTemplate.queryForObject(
"SELECT count() FROM route_executions WHERE route_id = 'detail-test-route'",
Integer.class);
assertThat(count).isGreaterThanOrEqualTo(1);
});
seededExecutionId = jdbcTemplate.queryForObject(
"SELECT execution_id FROM route_executions WHERE route_id = 'detail-test-route' LIMIT 1",
String.class);
}
@Test
void getDetail_returnsNestedProcessorTree() throws Exception {
ResponseEntity<String> response = detailGet("/" + seededExecutionId);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("executionId").asText()).isEqualTo(seededExecutionId);
assertThat(body.get("routeId").asText()).isEqualTo("detail-test-route");
assertThat(body.get("status").asText()).isEqualTo("COMPLETED");
assertThat(body.get("durationMs").asLong()).isEqualTo(1000);
// Check nested tree: 1 root
JsonNode processors = body.get("processors");
assertThat(processors).hasSize(1);
// Root has 2 children
JsonNode root = processors.get(0);
assertThat(root.get("processorId").asText()).isEqualTo("root-proc");
assertThat(root.get("processorType").asText()).isEqualTo("split");
assertThat(root.get("children")).hasSize(2);
// Child1 has no children
JsonNode child1 = root.get("children").get(0);
assertThat(child1.get("processorId").asText()).isEqualTo("child1-proc");
assertThat(child1.get("children")).isEmpty();
// Child2 has 1 grandchild
JsonNode child2 = root.get("children").get(1);
assertThat(child2.get("processorId").asText()).isEqualTo("child2-proc");
assertThat(child2.get("children")).hasSize(1);
JsonNode grandchild = child2.get("children").get(0);
assertThat(grandchild.get("processorId").asText()).isEqualTo("grandchild-proc");
assertThat(grandchild.get("children")).isEmpty();
}
@Test
void getDetail_includesDiagramContentHash() throws Exception {
ResponseEntity<String> response = detailGet("/" + seededExecutionId);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
// diagramContentHash should be present (may be empty string)
assertThat(body.has("diagramContentHash")).isTrue();
}
@Test
void getDetail_nonexistentId_returns404() {
ResponseEntity<String> response = detailGet("/nonexistent-execution-id");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
@Test
void getProcessorSnapshot_returnsExchangeData() throws Exception {
// Processor index 0 is root-proc
ResponseEntity<String> response = detailGet(
"/" + seededExecutionId + "/processors/0/snapshot");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("inputBody").asText()).isEqualTo("root-input-body");
assertThat(body.get("outputBody").asText()).isEqualTo("root-output-body");
assertThat(body.get("inputHeaders").asText()).contains("Content-Type");
assertThat(body.get("outputHeaders").asText()).contains("X-Result");
}
@Test
void getProcessorSnapshot_outOfBoundsIndex_returns404() {
ResponseEntity<String> response = detailGet(
"/" + seededExecutionId + "/processors/999/snapshot");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
@Test
void getProcessorSnapshot_nonexistentExecution_returns404() {
ResponseEntity<String> response = detailGet(
"/nonexistent-id/processors/0/snapshot");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
// --- Helper methods ---
private void ingest(String json) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("X-Cameleer-Protocol-Version", "1");
restTemplate.postForEntity("/api/v1/data/executions",
new HttpEntity<>(json, headers), String.class);
}
private ResponseEntity<String> detailGet(String path) {
HttpHeaders headers = new HttpHeaders();
headers.set("X-Cameleer-Protocol-Version", "1");
return restTemplate.exchange(
"/api/v1/executions" + path,
HttpMethod.GET,
new HttpEntity<>(headers),
String.class);
}
}

View File

@@ -32,5 +32,15 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,140 @@
package com.cameleer3.server.core.detail;
import com.cameleer3.server.core.storage.ExecutionRepository;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
/**
* Unit tests for {@link DetailService#reconstructTree} logic.
* <p>
* Verifies correct parent-child wiring from flat parallel arrays.
*/
class TreeReconstructionTest {
private final DetailService detailService = new DetailService(mock(ExecutionRepository.class));
private static final Instant NOW = Instant.parse("2026-03-10T10:00:00Z");
@Test
void linearChain_rootChildGrandchild() {
// [root, child, grandchild], depths=[0,1,2], parents=[-1,0,1]
List<ProcessorNode> roots = detailService.reconstructTree(
new String[]{"root", "child", "grandchild"},
new String[]{"log", "bean", "to"},
new String[]{"COMPLETED", "COMPLETED", "COMPLETED"},
new Instant[]{NOW, NOW, NOW},
new Instant[]{NOW, NOW, NOW},
new long[]{10, 20, 30},
new String[]{"n1", "n2", "n3"},
new String[]{"", "", ""},
new String[]{"", "", ""},
new int[]{0, 1, 2},
new int[]{-1, 0, 1}
);
assertThat(roots).hasSize(1);
ProcessorNode root = roots.get(0);
assertThat(root.getProcessorId()).isEqualTo("root");
assertThat(root.getChildren()).hasSize(1);
ProcessorNode child = root.getChildren().get(0);
assertThat(child.getProcessorId()).isEqualTo("child");
assertThat(child.getChildren()).hasSize(1);
ProcessorNode grandchild = child.getChildren().get(0);
assertThat(grandchild.getProcessorId()).isEqualTo("grandchild");
assertThat(grandchild.getChildren()).isEmpty();
}
@Test
void multipleRoots_noNesting() {
// [A, B, C], depths=[0,0,0], parents=[-1,-1,-1]
List<ProcessorNode> roots = detailService.reconstructTree(
new String[]{"A", "B", "C"},
new String[]{"log", "log", "log"},
new String[]{"COMPLETED", "COMPLETED", "COMPLETED"},
new Instant[]{NOW, NOW, NOW},
new Instant[]{NOW, NOW, NOW},
new long[]{10, 20, 30},
new String[]{"n1", "n2", "n3"},
new String[]{"", "", ""},
new String[]{"", "", ""},
new int[]{0, 0, 0},
new int[]{-1, -1, -1}
);
assertThat(roots).hasSize(3);
assertThat(roots.get(0).getProcessorId()).isEqualTo("A");
assertThat(roots.get(1).getProcessorId()).isEqualTo("B");
assertThat(roots.get(2).getProcessorId()).isEqualTo("C");
roots.forEach(r -> assertThat(r.getChildren()).isEmpty());
}
@Test
void branchingTree_parentWithTwoChildren_secondChildHasGrandchild() {
// [parent, child1, child2, grandchild], depths=[0,1,1,2], parents=[-1,0,0,2]
List<ProcessorNode> roots = detailService.reconstructTree(
new String[]{"parent", "child1", "child2", "grandchild"},
new String[]{"split", "log", "bean", "to"},
new String[]{"COMPLETED", "COMPLETED", "COMPLETED", "COMPLETED"},
new Instant[]{NOW, NOW, NOW, NOW},
new Instant[]{NOW, NOW, NOW, NOW},
new long[]{100, 20, 30, 5},
new String[]{"n1", "n2", "n3", "n4"},
new String[]{"", "", "", ""},
new String[]{"", "", "", ""},
new int[]{0, 1, 1, 2},
new int[]{-1, 0, 0, 2}
);
assertThat(roots).hasSize(1);
ProcessorNode parent = roots.get(0);
assertThat(parent.getProcessorId()).isEqualTo("parent");
assertThat(parent.getChildren()).hasSize(2);
ProcessorNode child1 = parent.getChildren().get(0);
assertThat(child1.getProcessorId()).isEqualTo("child1");
assertThat(child1.getChildren()).isEmpty();
ProcessorNode child2 = parent.getChildren().get(1);
assertThat(child2.getProcessorId()).isEqualTo("child2");
assertThat(child2.getChildren()).hasSize(1);
ProcessorNode grandchild = child2.getChildren().get(0);
assertThat(grandchild.getProcessorId()).isEqualTo("grandchild");
assertThat(grandchild.getChildren()).isEmpty();
}
@Test
void emptyArrays_producesEmptyList() {
List<ProcessorNode> roots = detailService.reconstructTree(
new String[]{},
new String[]{},
new String[]{},
new Instant[]{},
new Instant[]{},
new long[]{},
new String[]{},
new String[]{},
new String[]{},
new int[]{},
new int[]{}
);
assertThat(roots).isEmpty();
}
@Test
void nullArrays_producesEmptyList() {
List<ProcessorNode> roots = detailService.reconstructTree(
null, null, null, null, null, null, null, null, null, null, null
);
assertThat(roots).isEmpty();
}
}