feat(02-01): add schema extension and core search/detail domain types

- ClickHouse schema migration SQL with 12 new columns and skip indexes
- SearchRequest, SearchResult, ExecutionSummary records in core search package
- SearchEngine interface for swappable search backend (ClickHouse/OpenSearch)
- SearchService orchestration layer
- ProcessorNode, ExecutionDetail, RawExecutionRow, DetailService in core detail package
- DetailService reconstructs nested processor tree from flat parallel arrays
- ExecutionRepository extended with findRawById query method

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-11 16:05:14 +01:00
parent 6df74505be
commit 044259535a
11 changed files with 462 additions and 1 deletions

View File

@@ -0,0 +1,104 @@
package com.cameleer3.server.core.detail;
import com.cameleer3.server.core.storage.ExecutionRepository;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
/**
* Provides execution detail with reconstructed processor tree.
* <p>
* This is a plain class (no Spring annotations) -- it lives in the core module
* and is wired as a bean by the app module configuration.
*/
public class DetailService {
private final ExecutionRepository repository;
public DetailService(ExecutionRepository repository) {
this.repository = repository;
}
/**
* Get the full detail of a route execution, including the nested processor tree.
*
* @param executionId the execution ID to look up
* @return the execution detail, or empty if not found
*/
public Optional<ExecutionDetail> getDetail(String executionId) {
return repository.findRawById(executionId)
.map(this::toDetail);
}
private ExecutionDetail toDetail(RawExecutionRow row) {
List<ProcessorNode> roots = reconstructTree(
row.processorIds(),
row.processorTypes(),
row.processorStatuses(),
row.processorStarts(),
row.processorEnds(),
row.processorDurations(),
row.processorDiagramNodeIds(),
row.processorErrorMessages(),
row.processorErrorStacktraces(),
row.processorDepths(),
row.processorParentIndexes()
);
return new ExecutionDetail(
row.executionId(),
row.routeId(),
row.agentId(),
row.status(),
row.startTime(),
row.endTime(),
row.durationMs(),
row.correlationId(),
row.exchangeId(),
row.errorMessage(),
row.errorStackTrace(),
row.diagramContentHash(),
roots
);
}
/**
* Reconstruct the nested processor tree from flat parallel arrays.
* <p>
* Uses parentIndexes to wire children: parentIndex == -1 means the node is a root.
* Otherwise, parentIndex is the array index of the parent node.
*/
List<ProcessorNode> reconstructTree(
String[] ids, String[] types, String[] statuses,
java.time.Instant[] starts, java.time.Instant[] ends, long[] durations,
String[] diagramNodeIds, String[] errorMessages, String[] errorStacktraces,
int[] depths, int[] parentIndexes) {
if (ids == null || ids.length == 0) {
return List.of();
}
int len = ids.length;
ProcessorNode[] nodes = new ProcessorNode[len];
for (int i = 0; i < len; i++) {
nodes[i] = new ProcessorNode(
ids[i], types[i], statuses[i],
starts[i], ends[i], durations[i],
diagramNodeIds[i], errorMessages[i], errorStacktraces[i]
);
}
List<ProcessorNode> roots = new ArrayList<>();
for (int i = 0; i < len; i++) {
if (parentIndexes[i] == -1) {
roots.add(nodes[i]);
} else {
nodes[parentIndexes[i]].addChild(nodes[i]);
}
}
return roots;
}
}

View File

@@ -0,0 +1,41 @@
package com.cameleer3.server.core.detail;
import java.time.Instant;
import java.util.List;
/**
* Full detail of a route execution, including the nested processor tree.
* <p>
* This is the rich detail model returned by the detail endpoint. The processor
* tree is reconstructed from flat parallel arrays stored in ClickHouse.
*
* @param executionId unique execution identifier
* @param routeId Camel route ID
* @param agentId agent instance that reported the execution
* @param status execution status (COMPLETED, FAILED, RUNNING)
* @param startTime execution start time
* @param endTime execution end time (may be null for RUNNING)
* @param durationMs execution duration in milliseconds
* @param correlationId correlation ID for cross-instance tracing
* @param exchangeId Camel exchange ID
* @param errorMessage error message (empty string if no error)
* @param errorStackTrace error stack trace (empty string if no error)
* @param diagramContentHash content hash linking to the active route diagram version
* @param processors nested processor execution tree (root nodes)
*/
public record ExecutionDetail(
String executionId,
String routeId,
String agentId,
String status,
Instant startTime,
Instant endTime,
long durationMs,
String correlationId,
String exchangeId,
String errorMessage,
String errorStackTrace,
String diagramContentHash,
List<ProcessorNode> processors
) {
}

View File

@@ -0,0 +1,55 @@
package com.cameleer3.server.core.detail;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
/**
* Nested tree node representing a single processor execution within a route.
* <p>
* The tree structure is reconstructed from flat parallel arrays stored in ClickHouse.
* Each node may have children (e.g., processors inside a split or try-catch block).
*/
public final class ProcessorNode {
private final String processorId;
private final String processorType;
private final String status;
private final Instant startTime;
private final Instant endTime;
private final long durationMs;
private final String diagramNodeId;
private final String errorMessage;
private final String errorStackTrace;
private final List<ProcessorNode> children;
public ProcessorNode(String processorId, String processorType, String status,
Instant startTime, Instant endTime, long durationMs,
String diagramNodeId, String errorMessage, String errorStackTrace) {
this.processorId = processorId;
this.processorType = processorType;
this.status = status;
this.startTime = startTime;
this.endTime = endTime;
this.durationMs = durationMs;
this.diagramNodeId = diagramNodeId;
this.errorMessage = errorMessage;
this.errorStackTrace = errorStackTrace;
this.children = new ArrayList<>();
}
public void addChild(ProcessorNode child) {
children.add(child);
}
public String getProcessorId() { return processorId; }
public String getProcessorType() { return processorType; }
public String getStatus() { return status; }
public Instant getStartTime() { return startTime; }
public Instant getEndTime() { return endTime; }
public long getDurationMs() { return durationMs; }
public String getDiagramNodeId() { return diagramNodeId; }
public String getErrorMessage() { return errorMessage; }
public String getErrorStackTrace() { return errorStackTrace; }
public List<ProcessorNode> getChildren() { return List.copyOf(children); }
}

View File

@@ -0,0 +1,59 @@
package com.cameleer3.server.core.detail;
import java.time.Instant;
/**
* Raw execution data from ClickHouse, including all parallel arrays needed
* for tree reconstruction. This is the intermediate representation between
* the database and the {@link ExecutionDetail} domain object.
*
* @param executionId unique execution identifier
* @param routeId Camel route ID
* @param agentId agent instance
* @param status execution status
* @param startTime execution start time
* @param endTime execution end time
* @param durationMs execution duration in milliseconds
* @param correlationId correlation ID
* @param exchangeId Camel exchange ID
* @param errorMessage execution-level error message
* @param errorStackTrace execution-level error stack trace
* @param diagramContentHash content hash for diagram linking
* @param processorIds processor IDs (parallel array)
* @param processorTypes processor types (parallel array)
* @param processorStatuses processor statuses (parallel array)
* @param processorStarts processor start times (parallel array)
* @param processorEnds processor end times (parallel array)
* @param processorDurations processor durations in ms (parallel array)
* @param processorDiagramNodeIds processor diagram node IDs (parallel array)
* @param processorErrorMessages processor error messages (parallel array)
* @param processorErrorStacktraces processor error stack traces (parallel array)
* @param processorDepths processor tree depths (parallel array)
* @param processorParentIndexes processor parent indexes, -1 for roots (parallel array)
*/
public record RawExecutionRow(
String executionId,
String routeId,
String agentId,
String status,
Instant startTime,
Instant endTime,
long durationMs,
String correlationId,
String exchangeId,
String errorMessage,
String errorStackTrace,
String diagramContentHash,
String[] processorIds,
String[] processorTypes,
String[] processorStatuses,
Instant[] processorStarts,
Instant[] processorEnds,
long[] processorDurations,
String[] processorDiagramNodeIds,
String[] processorErrorMessages,
String[] processorErrorStacktraces,
int[] processorDepths,
int[] processorParentIndexes
) {
}

View File

@@ -0,0 +1,34 @@
package com.cameleer3.server.core.search;
import java.time.Instant;
/**
* Lightweight summary of a route execution for search result listings.
* <p>
* Contains only the fields needed for the list view -- not the full processor
* arrays or exchange snapshot data.
*
* @param executionId unique execution identifier
* @param routeId Camel route ID
* @param agentId agent instance that reported the execution
* @param status execution status (COMPLETED, FAILED, RUNNING)
* @param startTime execution start time
* @param endTime execution end time (may be null for RUNNING)
* @param durationMs execution duration in milliseconds
* @param correlationId correlation ID for cross-instance tracing
* @param errorMessage error message (empty string if no error)
* @param diagramContentHash content hash linking to the active route diagram version
*/
public record ExecutionSummary(
String executionId,
String routeId,
String agentId,
String status,
Instant startTime,
Instant endTime,
long durationMs,
String correlationId,
String errorMessage,
String diagramContentHash
) {
}

View File

@@ -0,0 +1,27 @@
package com.cameleer3.server.core.search;
/**
* Swappable search backend abstraction.
* <p>
* The current implementation uses ClickHouse for search. This interface allows
* replacing the search backend (e.g., with OpenSearch) without changing the
* service layer or controllers.
*/
public interface SearchEngine {
/**
* Search for route executions matching the given criteria.
*
* @param request search filters and pagination
* @return paginated search results with total count
*/
SearchResult<ExecutionSummary> search(SearchRequest request);
/**
* Count route executions matching the given criteria (without fetching data).
*
* @param request search filters
* @return total number of matching executions
*/
long count(SearchRequest request);
}

View File

@@ -0,0 +1,47 @@
package com.cameleer3.server.core.search;
import java.time.Instant;
/**
* Immutable search criteria for querying route executions.
* <p>
* All filter fields are nullable/optional. When null, the filter is not applied.
* The compact constructor validates and normalizes pagination parameters.
*
* @param status execution status filter (COMPLETED, FAILED, RUNNING)
* @param timeFrom inclusive start of time range
* @param timeTo exclusive end of time range
* @param durationMin minimum duration in milliseconds (inclusive)
* @param durationMax maximum duration in milliseconds (inclusive)
* @param correlationId exact correlation ID match
* @param text global full-text search across all text fields
* @param textInBody full-text search scoped to exchange bodies
* @param textInHeaders full-text search scoped to exchange headers
* @param textInErrors full-text search scoped to error messages and stack traces
* @param offset pagination offset (0-based)
* @param limit page size (default 50, max 500)
*/
public record SearchRequest(
String status,
Instant timeFrom,
Instant timeTo,
Long durationMin,
Long durationMax,
String correlationId,
String text,
String textInBody,
String textInHeaders,
String textInErrors,
int offset,
int limit
) {
private static final int DEFAULT_LIMIT = 50;
private static final int MAX_LIMIT = 500;
public SearchRequest {
if (limit <= 0) limit = DEFAULT_LIMIT;
if (limit > MAX_LIMIT) limit = MAX_LIMIT;
if (offset < 0) offset = 0;
}
}

View File

@@ -0,0 +1,27 @@
package com.cameleer3.server.core.search;
import java.util.List;
/**
* Paginated result envelope for search queries.
*
* @param data the result items for the current page
* @param total total number of matching items across all pages
* @param offset the offset used for this page
* @param limit the limit used for this page
* @param <T> the type of result items
*/
public record SearchResult<T>(
List<T> data,
long total,
int offset,
int limit
) {
/**
* Create an empty result with the given pagination parameters.
*/
public static <T> SearchResult<T> empty(int offset, int limit) {
return new SearchResult<>(List.of(), 0, offset, limit);
}
}

View File

@@ -0,0 +1,31 @@
package com.cameleer3.server.core.search;
/**
* Orchestrates search operations, delegating to a {@link SearchEngine} backend.
* <p>
* This is a plain class (no Spring annotations) -- it lives in the core module
* and is wired as a bean by the app module configuration. The thin orchestration
* layer allows adding cross-cutting concerns (logging, caching, metrics) later.
*/
public class SearchService {
private final SearchEngine engine;
public SearchService(SearchEngine engine) {
this.engine = engine;
}
/**
* Search for route executions matching the given criteria.
*/
public SearchResult<ExecutionSummary> search(SearchRequest request) {
return engine.search(request);
}
/**
* Count route executions matching the given criteria.
*/
public long count(SearchRequest request) {
return engine.count(request);
}
}

View File

@@ -1,11 +1,13 @@
package com.cameleer3.server.core.storage; package com.cameleer3.server.core.storage;
import com.cameleer3.common.model.RouteExecution; import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.detail.RawExecutionRow;
import java.util.List; import java.util.List;
import java.util.Optional;
/** /**
* Repository for route execution batch inserts into ClickHouse. * Repository for route execution storage and retrieval.
*/ */
public interface ExecutionRepository { public interface ExecutionRepository {
@@ -14,4 +16,13 @@ public interface ExecutionRepository {
* Implementations must perform a single batch insert for efficiency. * Implementations must perform a single batch insert for efficiency.
*/ */
void insertBatch(List<RouteExecution> executions); void insertBatch(List<RouteExecution> executions);
/**
* Find a raw execution row by execution ID, including all parallel arrays
* needed for processor tree reconstruction.
*
* @param executionId the execution ID to look up
* @return the raw execution row, or empty if not found
*/
Optional<RawExecutionRow> findRawById(String executionId);
} }

View File

@@ -0,0 +1,25 @@
-- Phase 2: Schema extension for search, detail, and diagram linking columns.
-- Adds exchange snapshot data, processor tree metadata, and diagram content hash.
ALTER TABLE route_executions
ADD COLUMN IF NOT EXISTS exchange_bodies String DEFAULT '',
ADD COLUMN IF NOT EXISTS exchange_headers String DEFAULT '',
ADD COLUMN IF NOT EXISTS processor_depths Array(UInt16) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_parent_indexes Array(Int32) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_error_messages Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_error_stacktraces Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_input_bodies Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_output_bodies Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_input_headers Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_output_headers Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS processor_diagram_node_ids Array(String) DEFAULT [],
ADD COLUMN IF NOT EXISTS diagram_content_hash String DEFAULT '';
-- Skip indexes for full-text search on new text columns
ALTER TABLE route_executions
ADD INDEX IF NOT EXISTS idx_exchange_bodies exchange_bodies TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4,
ADD INDEX IF NOT EXISTS idx_exchange_headers exchange_headers TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;
-- Skip index on error_stacktrace (not indexed in 01-schema.sql, needed for SRCH-05)
ALTER TABLE route_executions
ADD INDEX IF NOT EXISTS idx_error_stacktrace error_stacktrace TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;