diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java new file mode 100644 index 00000000..27dc39a8 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java @@ -0,0 +1,104 @@ +package com.cameleer3.server.core.detail; + +import com.cameleer3.server.core.storage.ExecutionRepository; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Provides execution detail with reconstructed processor tree. + *

+ * This is a plain class (no Spring annotations) -- it lives in the core module + * and is wired as a bean by the app module configuration. + */ +public class DetailService { + + private final ExecutionRepository repository; + + public DetailService(ExecutionRepository repository) { + this.repository = repository; + } + + /** + * Get the full detail of a route execution, including the nested processor tree. + * + * @param executionId the execution ID to look up + * @return the execution detail, or empty if not found + */ + public Optional getDetail(String executionId) { + return repository.findRawById(executionId) + .map(this::toDetail); + } + + private ExecutionDetail toDetail(RawExecutionRow row) { + List roots = reconstructTree( + row.processorIds(), + row.processorTypes(), + row.processorStatuses(), + row.processorStarts(), + row.processorEnds(), + row.processorDurations(), + row.processorDiagramNodeIds(), + row.processorErrorMessages(), + row.processorErrorStacktraces(), + row.processorDepths(), + row.processorParentIndexes() + ); + + return new ExecutionDetail( + row.executionId(), + row.routeId(), + row.agentId(), + row.status(), + row.startTime(), + row.endTime(), + row.durationMs(), + row.correlationId(), + row.exchangeId(), + row.errorMessage(), + row.errorStackTrace(), + row.diagramContentHash(), + roots + ); + } + + /** + * Reconstruct the nested processor tree from flat parallel arrays. + *

+ * Uses parentIndexes to wire children: parentIndex == -1 means the node is a root. + * Otherwise, parentIndex is the array index of the parent node. + */ + List reconstructTree( + String[] ids, String[] types, String[] statuses, + java.time.Instant[] starts, java.time.Instant[] ends, long[] durations, + String[] diagramNodeIds, String[] errorMessages, String[] errorStacktraces, + int[] depths, int[] parentIndexes) { + + if (ids == null || ids.length == 0) { + return List.of(); + } + + int len = ids.length; + ProcessorNode[] nodes = new ProcessorNode[len]; + + for (int i = 0; i < len; i++) { + nodes[i] = new ProcessorNode( + ids[i], types[i], statuses[i], + starts[i], ends[i], durations[i], + diagramNodeIds[i], errorMessages[i], errorStacktraces[i] + ); + } + + List roots = new ArrayList<>(); + for (int i = 0; i < len; i++) { + if (parentIndexes[i] == -1) { + roots.add(nodes[i]); + } else { + nodes[parentIndexes[i]].addChild(nodes[i]); + } + } + + return roots; + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java new file mode 100644 index 00000000..e739dd81 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java @@ -0,0 +1,41 @@ +package com.cameleer3.server.core.detail; + +import java.time.Instant; +import java.util.List; + +/** + * Full detail of a route execution, including the nested processor tree. + *

+ * This is the rich detail model returned by the detail endpoint. The processor + * tree is reconstructed from flat parallel arrays stored in ClickHouse. + * + * @param executionId unique execution identifier + * @param routeId Camel route ID + * @param agentId agent instance that reported the execution + * @param status execution status (COMPLETED, FAILED, RUNNING) + * @param startTime execution start time + * @param endTime execution end time (may be null for RUNNING) + * @param durationMs execution duration in milliseconds + * @param correlationId correlation ID for cross-instance tracing + * @param exchangeId Camel exchange ID + * @param errorMessage error message (empty string if no error) + * @param errorStackTrace error stack trace (empty string if no error) + * @param diagramContentHash content hash linking to the active route diagram version + * @param processors nested processor execution tree (root nodes) + */ +public record ExecutionDetail( + String executionId, + String routeId, + String agentId, + String status, + Instant startTime, + Instant endTime, + long durationMs, + String correlationId, + String exchangeId, + String errorMessage, + String errorStackTrace, + String diagramContentHash, + List processors +) { +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java new file mode 100644 index 00000000..10d1e88e --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java @@ -0,0 +1,55 @@ +package com.cameleer3.server.core.detail; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +/** + * Nested tree node representing a single processor execution within a route. + *

+ * The tree structure is reconstructed from flat parallel arrays stored in ClickHouse. + * Each node may have children (e.g., processors inside a split or try-catch block). + */ +public final class ProcessorNode { + + private final String processorId; + private final String processorType; + private final String status; + private final Instant startTime; + private final Instant endTime; + private final long durationMs; + private final String diagramNodeId; + private final String errorMessage; + private final String errorStackTrace; + private final List children; + + public ProcessorNode(String processorId, String processorType, String status, + Instant startTime, Instant endTime, long durationMs, + String diagramNodeId, String errorMessage, String errorStackTrace) { + this.processorId = processorId; + this.processorType = processorType; + this.status = status; + this.startTime = startTime; + this.endTime = endTime; + this.durationMs = durationMs; + this.diagramNodeId = diagramNodeId; + this.errorMessage = errorMessage; + this.errorStackTrace = errorStackTrace; + this.children = new ArrayList<>(); + } + + public void addChild(ProcessorNode child) { + children.add(child); + } + + public String getProcessorId() { return processorId; } + public String getProcessorType() { return processorType; } + public String getStatus() { return status; } + public Instant getStartTime() { return startTime; } + public Instant getEndTime() { return endTime; } + public long getDurationMs() { return durationMs; } + public String getDiagramNodeId() { return diagramNodeId; } + public String getErrorMessage() { return errorMessage; } + public String getErrorStackTrace() { return errorStackTrace; } + public List getChildren() { return List.copyOf(children); } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java new file mode 100644 index 00000000..2297e4b6 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java @@ -0,0 +1,59 @@ +package com.cameleer3.server.core.detail; + +import java.time.Instant; + +/** + * Raw execution data from ClickHouse, including all parallel arrays needed + * for tree reconstruction. This is the intermediate representation between + * the database and the {@link ExecutionDetail} domain object. + * + * @param executionId unique execution identifier + * @param routeId Camel route ID + * @param agentId agent instance + * @param status execution status + * @param startTime execution start time + * @param endTime execution end time + * @param durationMs execution duration in milliseconds + * @param correlationId correlation ID + * @param exchangeId Camel exchange ID + * @param errorMessage execution-level error message + * @param errorStackTrace execution-level error stack trace + * @param diagramContentHash content hash for diagram linking + * @param processorIds processor IDs (parallel array) + * @param processorTypes processor types (parallel array) + * @param processorStatuses processor statuses (parallel array) + * @param processorStarts processor start times (parallel array) + * @param processorEnds processor end times (parallel array) + * @param processorDurations processor durations in ms (parallel array) + * @param processorDiagramNodeIds processor diagram node IDs (parallel array) + * @param processorErrorMessages processor error messages (parallel array) + * @param processorErrorStacktraces processor error stack traces (parallel array) + * @param processorDepths processor tree depths (parallel array) + * @param processorParentIndexes processor parent indexes, -1 for roots (parallel array) + */ +public record RawExecutionRow( + String executionId, + String routeId, + String agentId, + String status, + Instant startTime, + Instant endTime, + long durationMs, + String correlationId, + String exchangeId, + String errorMessage, + String errorStackTrace, + String diagramContentHash, + String[] processorIds, + String[] processorTypes, + String[] processorStatuses, + Instant[] processorStarts, + Instant[] processorEnds, + long[] processorDurations, + String[] processorDiagramNodeIds, + String[] processorErrorMessages, + String[] processorErrorStacktraces, + int[] processorDepths, + int[] processorParentIndexes +) { +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/ExecutionSummary.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/ExecutionSummary.java new file mode 100644 index 00000000..885e9764 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/ExecutionSummary.java @@ -0,0 +1,34 @@ +package com.cameleer3.server.core.search; + +import java.time.Instant; + +/** + * Lightweight summary of a route execution for search result listings. + *

+ * Contains only the fields needed for the list view -- not the full processor + * arrays or exchange snapshot data. + * + * @param executionId unique execution identifier + * @param routeId Camel route ID + * @param agentId agent instance that reported the execution + * @param status execution status (COMPLETED, FAILED, RUNNING) + * @param startTime execution start time + * @param endTime execution end time (may be null for RUNNING) + * @param durationMs execution duration in milliseconds + * @param correlationId correlation ID for cross-instance tracing + * @param errorMessage error message (empty string if no error) + * @param diagramContentHash content hash linking to the active route diagram version + */ +public record ExecutionSummary( + String executionId, + String routeId, + String agentId, + String status, + Instant startTime, + Instant endTime, + long durationMs, + String correlationId, + String errorMessage, + String diagramContentHash +) { +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java new file mode 100644 index 00000000..a5d1dd72 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java @@ -0,0 +1,27 @@ +package com.cameleer3.server.core.search; + +/** + * Swappable search backend abstraction. + *

+ * The current implementation uses ClickHouse for search. This interface allows + * replacing the search backend (e.g., with OpenSearch) without changing the + * service layer or controllers. + */ +public interface SearchEngine { + + /** + * Search for route executions matching the given criteria. + * + * @param request search filters and pagination + * @return paginated search results with total count + */ + SearchResult search(SearchRequest request); + + /** + * Count route executions matching the given criteria (without fetching data). + * + * @param request search filters + * @return total number of matching executions + */ + long count(SearchRequest request); +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java new file mode 100644 index 00000000..7aa217c6 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java @@ -0,0 +1,47 @@ +package com.cameleer3.server.core.search; + +import java.time.Instant; + +/** + * Immutable search criteria for querying route executions. + *

+ * All filter fields are nullable/optional. When null, the filter is not applied. + * The compact constructor validates and normalizes pagination parameters. + * + * @param status execution status filter (COMPLETED, FAILED, RUNNING) + * @param timeFrom inclusive start of time range + * @param timeTo exclusive end of time range + * @param durationMin minimum duration in milliseconds (inclusive) + * @param durationMax maximum duration in milliseconds (inclusive) + * @param correlationId exact correlation ID match + * @param text global full-text search across all text fields + * @param textInBody full-text search scoped to exchange bodies + * @param textInHeaders full-text search scoped to exchange headers + * @param textInErrors full-text search scoped to error messages and stack traces + * @param offset pagination offset (0-based) + * @param limit page size (default 50, max 500) + */ +public record SearchRequest( + String status, + Instant timeFrom, + Instant timeTo, + Long durationMin, + Long durationMax, + String correlationId, + String text, + String textInBody, + String textInHeaders, + String textInErrors, + int offset, + int limit +) { + + private static final int DEFAULT_LIMIT = 50; + private static final int MAX_LIMIT = 500; + + public SearchRequest { + if (limit <= 0) limit = DEFAULT_LIMIT; + if (limit > MAX_LIMIT) limit = MAX_LIMIT; + if (offset < 0) offset = 0; + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchResult.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchResult.java new file mode 100644 index 00000000..dcd56461 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchResult.java @@ -0,0 +1,27 @@ +package com.cameleer3.server.core.search; + +import java.util.List; + +/** + * Paginated result envelope for search queries. + * + * @param data the result items for the current page + * @param total total number of matching items across all pages + * @param offset the offset used for this page + * @param limit the limit used for this page + * @param the type of result items + */ +public record SearchResult( + List data, + long total, + int offset, + int limit +) { + + /** + * Create an empty result with the given pagination parameters. + */ + public static SearchResult empty(int offset, int limit) { + return new SearchResult<>(List.of(), 0, offset, limit); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchService.java new file mode 100644 index 00000000..a0cfc5e1 --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchService.java @@ -0,0 +1,31 @@ +package com.cameleer3.server.core.search; + +/** + * Orchestrates search operations, delegating to a {@link SearchEngine} backend. + *

+ * This is a plain class (no Spring annotations) -- it lives in the core module + * and is wired as a bean by the app module configuration. The thin orchestration + * layer allows adding cross-cutting concerns (logging, caching, metrics) later. + */ +public class SearchService { + + private final SearchEngine engine; + + public SearchService(SearchEngine engine) { + this.engine = engine; + } + + /** + * Search for route executions matching the given criteria. + */ + public SearchResult search(SearchRequest request) { + return engine.search(request); + } + + /** + * Count route executions matching the given criteria. + */ + public long count(SearchRequest request) { + return engine.count(request); + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java index 249bef3a..286ab076 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java @@ -1,11 +1,13 @@ package com.cameleer3.server.core.storage; import com.cameleer3.common.model.RouteExecution; +import com.cameleer3.server.core.detail.RawExecutionRow; import java.util.List; +import java.util.Optional; /** - * Repository for route execution batch inserts into ClickHouse. + * Repository for route execution storage and retrieval. */ public interface ExecutionRepository { @@ -14,4 +16,13 @@ public interface ExecutionRepository { * Implementations must perform a single batch insert for efficiency. */ void insertBatch(List executions); + + /** + * Find a raw execution row by execution ID, including all parallel arrays + * needed for processor tree reconstruction. + * + * @param executionId the execution ID to look up + * @return the raw execution row, or empty if not found + */ + Optional findRawById(String executionId); } diff --git a/clickhouse/init/02-search-columns.sql b/clickhouse/init/02-search-columns.sql new file mode 100644 index 00000000..2b11b435 --- /dev/null +++ b/clickhouse/init/02-search-columns.sql @@ -0,0 +1,25 @@ +-- Phase 2: Schema extension for search, detail, and diagram linking columns. +-- Adds exchange snapshot data, processor tree metadata, and diagram content hash. + +ALTER TABLE route_executions + ADD COLUMN IF NOT EXISTS exchange_bodies String DEFAULT '', + ADD COLUMN IF NOT EXISTS exchange_headers String DEFAULT '', + ADD COLUMN IF NOT EXISTS processor_depths Array(UInt16) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_parent_indexes Array(Int32) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_error_messages Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_error_stacktraces Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_input_bodies Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_output_bodies Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_input_headers Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_output_headers Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS processor_diagram_node_ids Array(String) DEFAULT [], + ADD COLUMN IF NOT EXISTS diagram_content_hash String DEFAULT ''; + +-- Skip indexes for full-text search on new text columns +ALTER TABLE route_executions + ADD INDEX IF NOT EXISTS idx_exchange_bodies exchange_bodies TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4, + ADD INDEX IF NOT EXISTS idx_exchange_headers exchange_headers TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4; + +-- Skip index on error_stacktrace (not indexed in 01-schema.sql, needed for SRCH-05) +ALTER TABLE route_executions + ADD INDEX IF NOT EXISTS idx_error_stacktrace error_stacktrace TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;