feat(02-03): detail controller, tree reconstruction, processor snapshot endpoint
- Implement findRawById and findProcessorSnapshot in ClickHouseExecutionRepository
- DetailController with GET /executions/{id} returning nested processor tree
- GET /executions/{id}/processors/{index}/snapshot for per-processor exchange data
- 5 unit tests for tree reconstruction (linear, branching, multiple roots, empty)
- 6 integration tests for detail endpoint, snapshot, and 404 handling
- Added assertj and mockito test dependencies to core module
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,54 @@
|
||||
package com.cameleer3.server.app.controller;
|
||||
|
||||
import com.cameleer3.server.app.storage.ClickHouseExecutionRepository;
|
||||
import com.cameleer3.server.core.detail.DetailService;
|
||||
import com.cameleer3.server.core.detail.ExecutionDetail;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Endpoints for retrieving execution details and processor snapshots.
|
||||
* <p>
|
||||
* The detail endpoint returns a nested processor tree reconstructed from
|
||||
* flat parallel arrays stored in ClickHouse. The snapshot endpoint returns
|
||||
* per-processor exchange data (bodies and headers).
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/api/v1/executions")
|
||||
@Tag(name = "Detail", description = "Execution detail and processor snapshot endpoints")
|
||||
public class DetailController {
|
||||
|
||||
private final DetailService detailService;
|
||||
private final ClickHouseExecutionRepository executionRepository;
|
||||
|
||||
public DetailController(DetailService detailService,
|
||||
ClickHouseExecutionRepository executionRepository) {
|
||||
this.detailService = detailService;
|
||||
this.executionRepository = executionRepository;
|
||||
}
|
||||
|
||||
@GetMapping("/{executionId}")
|
||||
@Operation(summary = "Get execution detail with nested processor tree")
|
||||
public ResponseEntity<ExecutionDetail> getDetail(@PathVariable String executionId) {
|
||||
return detailService.getDetail(executionId)
|
||||
.map(ResponseEntity::ok)
|
||||
.orElse(ResponseEntity.notFound().build());
|
||||
}
|
||||
|
||||
@GetMapping("/{executionId}/processors/{index}/snapshot")
|
||||
@Operation(summary = "Get exchange snapshot for a specific processor")
|
||||
public ResponseEntity<Map<String, String>> getProcessorSnapshot(
|
||||
@PathVariable String executionId,
|
||||
@PathVariable int index) {
|
||||
return executionRepository.findProcessorSnapshot(executionId, index)
|
||||
.map(ResponseEntity::ok)
|
||||
.orElse(ResponseEntity.notFound().build());
|
||||
}
|
||||
}
|
||||
@@ -152,7 +152,201 @@ public class ClickHouseExecutionRepository implements ExecutionRepository {
|
||||
|
||||
@Override
|
||||
public Optional<RawExecutionRow> findRawById(String executionId) {
|
||||
throw new UnsupportedOperationException("Not yet implemented");
|
||||
String sql = """
|
||||
SELECT execution_id, route_id, agent_id, status, start_time, end_time,
|
||||
duration_ms, correlation_id, exchange_id, error_message, error_stacktrace,
|
||||
diagram_content_hash,
|
||||
processor_ids, processor_types, processor_statuses,
|
||||
processor_starts, processor_ends, processor_durations,
|
||||
processor_diagram_node_ids,
|
||||
processor_error_messages, processor_error_stacktraces,
|
||||
processor_depths, processor_parent_indexes
|
||||
FROM route_executions
|
||||
WHERE execution_id = ?
|
||||
LIMIT 1
|
||||
""";
|
||||
|
||||
List<RawExecutionRow> results = jdbcTemplate.query(sql, (rs, rowNum) -> {
|
||||
// Extract parallel arrays from ClickHouse
|
||||
String[] processorIds = toStringArray(rs.getArray("processor_ids"));
|
||||
String[] processorTypes = toStringArray(rs.getArray("processor_types"));
|
||||
String[] processorStatuses = toStringArray(rs.getArray("processor_statuses"));
|
||||
Instant[] processorStarts = toInstantArray(rs.getArray("processor_starts"));
|
||||
Instant[] processorEnds = toInstantArray(rs.getArray("processor_ends"));
|
||||
long[] processorDurations = toLongArray(rs.getArray("processor_durations"));
|
||||
String[] processorDiagramNodeIds = toStringArray(rs.getArray("processor_diagram_node_ids"));
|
||||
String[] processorErrorMessages = toStringArray(rs.getArray("processor_error_messages"));
|
||||
String[] processorErrorStacktraces = toStringArray(rs.getArray("processor_error_stacktraces"));
|
||||
int[] processorDepths = toIntArrayFromShort(rs.getArray("processor_depths"));
|
||||
int[] processorParentIndexes = toIntArray(rs.getArray("processor_parent_indexes"));
|
||||
|
||||
Timestamp endTs = rs.getTimestamp("end_time");
|
||||
return new RawExecutionRow(
|
||||
rs.getString("execution_id"),
|
||||
rs.getString("route_id"),
|
||||
rs.getString("agent_id"),
|
||||
rs.getString("status"),
|
||||
rs.getTimestamp("start_time").toInstant(),
|
||||
endTs != null ? endTs.toInstant() : null,
|
||||
rs.getLong("duration_ms"),
|
||||
rs.getString("correlation_id"),
|
||||
rs.getString("exchange_id"),
|
||||
rs.getString("error_message"),
|
||||
rs.getString("error_stacktrace"),
|
||||
rs.getString("diagram_content_hash"),
|
||||
processorIds, processorTypes, processorStatuses,
|
||||
processorStarts, processorEnds, processorDurations,
|
||||
processorDiagramNodeIds,
|
||||
processorErrorMessages, processorErrorStacktraces,
|
||||
processorDepths, processorParentIndexes
|
||||
);
|
||||
}, executionId);
|
||||
|
||||
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
||||
}
|
||||
|
||||
/**
|
||||
* Find exchange snapshot data for a specific processor by index.
|
||||
*
|
||||
* @param executionId the execution ID
|
||||
* @param processorIndex 0-based processor index
|
||||
* @return map with inputBody, outputBody, inputHeaders, outputHeaders or empty if not found
|
||||
*/
|
||||
public Optional<java.util.Map<String, String>> findProcessorSnapshot(String executionId, int processorIndex) {
|
||||
// ClickHouse arrays are 1-indexed in SQL
|
||||
int chIndex = processorIndex + 1;
|
||||
String sql = """
|
||||
SELECT
|
||||
processor_input_bodies[?] AS input_body,
|
||||
processor_output_bodies[?] AS output_body,
|
||||
processor_input_headers[?] AS input_headers,
|
||||
processor_output_headers[?] AS output_headers,
|
||||
length(processor_ids) AS proc_count
|
||||
FROM route_executions
|
||||
WHERE execution_id = ?
|
||||
LIMIT 1
|
||||
""";
|
||||
|
||||
List<java.util.Map<String, String>> results = jdbcTemplate.query(sql, (rs, rowNum) -> {
|
||||
int procCount = rs.getInt("proc_count");
|
||||
if (processorIndex < 0 || processorIndex >= procCount) {
|
||||
return null;
|
||||
}
|
||||
var snapshot = new java.util.LinkedHashMap<String, String>();
|
||||
snapshot.put("inputBody", rs.getString("input_body"));
|
||||
snapshot.put("outputBody", rs.getString("output_body"));
|
||||
snapshot.put("inputHeaders", rs.getString("input_headers"));
|
||||
snapshot.put("outputHeaders", rs.getString("output_headers"));
|
||||
return snapshot;
|
||||
}, chIndex, chIndex, chIndex, chIndex, executionId);
|
||||
|
||||
if (results.isEmpty() || results.get(0) == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(results.get(0));
|
||||
}
|
||||
|
||||
// --- Array extraction helpers ---
|
||||
|
||||
private static String[] toStringArray(java.sql.Array sqlArray) throws SQLException {
|
||||
if (sqlArray == null) return new String[0];
|
||||
Object arr = sqlArray.getArray();
|
||||
if (arr instanceof String[] sa) return sa;
|
||||
if (arr instanceof Object[] oa) {
|
||||
String[] result = new String[oa.length];
|
||||
for (int i = 0; i < oa.length; i++) {
|
||||
result[i] = oa[i] != null ? oa[i].toString() : "";
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return new String[0];
|
||||
}
|
||||
|
||||
private static Instant[] toInstantArray(java.sql.Array sqlArray) throws SQLException {
|
||||
if (sqlArray == null) return new Instant[0];
|
||||
Object arr = sqlArray.getArray();
|
||||
if (arr instanceof Timestamp[] ts) {
|
||||
Instant[] result = new Instant[ts.length];
|
||||
for (int i = 0; i < ts.length; i++) {
|
||||
result[i] = ts[i] != null ? ts[i].toInstant() : Instant.EPOCH;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
if (arr instanceof Object[] oa) {
|
||||
Instant[] result = new Instant[oa.length];
|
||||
for (int i = 0; i < oa.length; i++) {
|
||||
if (oa[i] instanceof Timestamp ts) {
|
||||
result[i] = ts.toInstant();
|
||||
} else {
|
||||
result[i] = Instant.EPOCH;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return new Instant[0];
|
||||
}
|
||||
|
||||
private static long[] toLongArray(java.sql.Array sqlArray) throws SQLException {
|
||||
if (sqlArray == null) return new long[0];
|
||||
Object arr = sqlArray.getArray();
|
||||
if (arr instanceof long[] la) return la;
|
||||
if (arr instanceof Long[] la) {
|
||||
long[] result = new long[la.length];
|
||||
for (int i = 0; i < la.length; i++) {
|
||||
result[i] = la[i] != null ? la[i] : 0;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
if (arr instanceof Object[] oa) {
|
||||
long[] result = new long[oa.length];
|
||||
for (int i = 0; i < oa.length; i++) {
|
||||
result[i] = oa[i] instanceof Number n ? n.longValue() : 0;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return new long[0];
|
||||
}
|
||||
|
||||
private static int[] toIntArray(java.sql.Array sqlArray) throws SQLException {
|
||||
if (sqlArray == null) return new int[0];
|
||||
Object arr = sqlArray.getArray();
|
||||
if (arr instanceof int[] ia) return ia;
|
||||
if (arr instanceof Integer[] ia) {
|
||||
int[] result = new int[ia.length];
|
||||
for (int i = 0; i < ia.length; i++) {
|
||||
result[i] = ia[i] != null ? ia[i] : 0;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
if (arr instanceof Object[] oa) {
|
||||
int[] result = new int[oa.length];
|
||||
for (int i = 0; i < oa.length; i++) {
|
||||
result[i] = oa[i] instanceof Number n ? n.intValue() : 0;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return new int[0];
|
||||
}
|
||||
|
||||
private static int[] toIntArrayFromShort(java.sql.Array sqlArray) throws SQLException {
|
||||
if (sqlArray == null) return new int[0];
|
||||
Object arr = sqlArray.getArray();
|
||||
if (arr instanceof short[] sa) {
|
||||
int[] result = new int[sa.length];
|
||||
for (int i = 0; i < sa.length; i++) {
|
||||
result[i] = sa[i];
|
||||
}
|
||||
return result;
|
||||
}
|
||||
if (arr instanceof int[] ia) return ia;
|
||||
if (arr instanceof Object[] oa) {
|
||||
int[] result = new int[oa.length];
|
||||
for (int i = 0; i < oa.length; i++) {
|
||||
result[i] = oa[i] instanceof Number n ? n.intValue() : 0;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return new int[0];
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user