diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java index 6a1394a3..2421d7da 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java @@ -362,6 +362,7 @@ public class OpenSearchIndex implements SearchIndex { }).toList()); } map.put("has_trace_data", doc.hasTraceData()); + map.put("is_replay", doc.isReplay()); return map; } @@ -399,7 +400,8 @@ public class OpenSearchIndex implements SearchIndex { null, // diagramContentHash not stored in index extractHighlight(hit), attributes, - Boolean.TRUE.equals(src.get("has_trace_data")) + Boolean.TRUE.equals(src.get("has_trace_data")), + Boolean.TRUE.equals(src.get("is_replay")) ); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java index 7758c21d..c1e1c8a0 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java @@ -31,10 +31,10 @@ public class PostgresExecutionStore implements ExecutionStore { attributes, error_type, error_category, root_cause_type, root_cause_message, trace_id, span_id, - processors_json, has_trace_data, + processors_json, has_trace_data, is_replay, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, - ?, ?, ?, ?, ?, ?, ?::jsonb, ?, now(), now()) + ?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?, now(), now()) ON CONFLICT (execution_id, start_time) DO UPDATE SET status = CASE WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED') @@ -62,6 +62,7 @@ public class PostgresExecutionStore implements ExecutionStore { span_id = COALESCE(EXCLUDED.span_id, executions.span_id), processors_json = COALESCE(EXCLUDED.processors_json, executions.processors_json), has_trace_data = EXCLUDED.has_trace_data OR executions.has_trace_data, + is_replay = EXCLUDED.is_replay OR executions.is_replay, updated_at = now() """, execution.executionId(), execution.routeId(), execution.agentId(), @@ -78,7 +79,7 @@ public class PostgresExecutionStore implements ExecutionStore { execution.errorType(), execution.errorCategory(), execution.rootCauseType(), execution.rootCauseMessage(), execution.traceId(), execution.spanId(), - execution.processorsJson(), execution.hasTraceData()); + execution.processorsJson(), execution.hasTraceData(), execution.isReplay()); } @Override @@ -180,7 +181,8 @@ public class PostgresExecutionStore implements ExecutionStore { rs.getString("root_cause_type"), rs.getString("root_cause_message"), rs.getString("trace_id"), rs.getString("span_id"), rs.getString("processors_json"), - rs.getBoolean("has_trace_data")); + rs.getBoolean("has_trace_data"), + rs.getBoolean("is_replay")); private static final RowMapper PROCESSOR_MAPPER = (rs, rowNum) -> new ProcessorRecord( diff --git a/cameleer3-server-app/src/main/resources/db/migration/V13__is_replay.sql b/cameleer3-server-app/src/main/resources/db/migration/V13__is_replay.sql new file mode 100644 index 00000000..cd6f5be3 --- /dev/null +++ b/cameleer3-server-app/src/main/resources/db/migration/V13__is_replay.sql @@ -0,0 +1,7 @@ +-- Flag indicating whether this execution is a replayed exchange +ALTER TABLE executions ADD COLUMN IF NOT EXISTS is_replay BOOLEAN NOT NULL DEFAULT FALSE; + +-- Backfill: check inputHeaders JSON for X-Cameleer-Replay header +UPDATE executions SET is_replay = TRUE +WHERE input_headers IS NOT NULL + AND input_headers::jsonb ? 'X-Cameleer-Replay'; diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java index ad6a606f..e9a73982 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java @@ -79,7 +79,7 @@ public class SearchIndexer implements SearchIndexerStats { exec.status(), exec.correlationId(), exec.exchangeId(), exec.startTime(), exec.endTime(), exec.durationMs(), exec.errorMessage(), exec.errorStacktrace(), processorDocs, - exec.attributes(), exec.hasTraceData())); + exec.attributes(), exec.hasTraceData(), exec.isReplay())); indexedCount.incrementAndGet(); lastIndexedAt = Instant.now(); diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java index 92766a0e..8809a8c3 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java @@ -102,6 +102,12 @@ public class IngestionService { boolean hasTraceData = hasAnyTraceData(exec.getProcessors()); + boolean isReplay = false; + if (inputSnapshot != null && inputSnapshot.getHeaders() != null) { + isReplay = "true".equalsIgnoreCase( + String.valueOf(inputSnapshot.getHeaders().get("X-Cameleer-Replay"))); + } + return new ExecutionRecord( exec.getExchangeId(), exec.getRouteId(), agentId, applicationName, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING", @@ -117,7 +123,8 @@ public class IngestionService { exec.getRootCauseType(), exec.getRootCauseMessage(), exec.getTraceId(), exec.getSpanId(), toJsonObject(exec.getProcessors()), - hasTraceData + hasTraceData, + isReplay ); } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/ExecutionSummary.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/ExecutionSummary.java index a9704ee9..f0f95666 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/ExecutionSummary.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/ExecutionSummary.java @@ -34,6 +34,7 @@ public record ExecutionSummary( String diagramContentHash, String highlight, Map attributes, - boolean hasTraceData + boolean hasTraceData, + boolean isReplay ) { } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java index cd38d376..dcaf4858 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java @@ -30,7 +30,8 @@ public interface ExecutionStore { String rootCauseType, String rootCauseMessage, String traceId, String spanId, String processorsJson, - boolean hasTraceData + boolean hasTraceData, + boolean isReplay ) {} record ProcessorRecord( diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java index a2ce52c7..f937c157 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java @@ -10,7 +10,8 @@ public record ExecutionDocument( String errorMessage, String errorStacktrace, List processors, String attributes, - boolean hasTraceData + boolean hasTraceData, + boolean isReplay ) { public record ProcessorDoc( String processorId, String processorType, String status, diff --git a/ui/src/api/schema.d.ts b/ui/src/api/schema.d.ts index b9a3ac93..1c7876b7 100644 --- a/ui/src/api/schema.d.ts +++ b/ui/src/api/schema.d.ts @@ -1504,6 +1504,7 @@ export interface components { [key: string]: string; }; hasTraceData: boolean; + isReplay: boolean; }; SearchResultExecutionSummary: { data: components["schemas"]["ExecutionSummary"][]; diff --git a/ui/src/pages/Dashboard/Dashboard.tsx b/ui/src/pages/Dashboard/Dashboard.tsx index b76d7fb4..540f0058 100644 --- a/ui/src/pages/Dashboard/Dashboard.tsx +++ b/ui/src/pages/Dashboard/Dashboard.tsx @@ -1,6 +1,6 @@ import { useState, useMemo, useCallback } from 'react' import { useParams, useNavigate, useSearchParams } from 'react-router' -import { AlertTriangle, X, Search, Footprints } from 'lucide-react' +import { AlertTriangle, X, Search, Footprints, RotateCcw } from 'lucide-react' import { DataTable, StatusDot, @@ -79,6 +79,7 @@ function buildBaseColumns(): Column[] { {statusLabel(row.status)} {row.hasTraceData && } + {row.isReplay && } ), }, diff --git a/ui/src/pages/Exchanges/ExchangeHeader.tsx b/ui/src/pages/Exchanges/ExchangeHeader.tsx index 2ef8d2d2..bf409507 100644 --- a/ui/src/pages/Exchanges/ExchangeHeader.tsx +++ b/ui/src/pages/Exchanges/ExchangeHeader.tsx @@ -122,7 +122,7 @@ export function ExchangeHeader({ detail, onCorrelatedSelect, onClearSelection }: {showChain ? chain.map((ce: any, i: number) => { const isCurrent = ce.executionId === detail.executionId; const variant = statusVariant(ce.status); - const isReplay = ce.attributes?._replay != null; + const isReplay = !!ce.isReplay; const statusCls = variant === 'success' ? styles.chainNodeSuccess : variant === 'error' ? styles.chainNodeError