feat: add is_replay flag to execution pipeline and UI
Detect replayed exchanges via X-Cameleer-Replay header during ingestion, persist the flag through PostgreSQL and OpenSearch, and surface it in the dashboard (amber replay icon) and exchange detail chain view. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -362,6 +362,7 @@ public class OpenSearchIndex implements SearchIndex {
|
|||||||
}).toList());
|
}).toList());
|
||||||
}
|
}
|
||||||
map.put("has_trace_data", doc.hasTraceData());
|
map.put("has_trace_data", doc.hasTraceData());
|
||||||
|
map.put("is_replay", doc.isReplay());
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -399,7 +400,8 @@ public class OpenSearchIndex implements SearchIndex {
|
|||||||
null, // diagramContentHash not stored in index
|
null, // diagramContentHash not stored in index
|
||||||
extractHighlight(hit),
|
extractHighlight(hit),
|
||||||
attributes,
|
attributes,
|
||||||
Boolean.TRUE.equals(src.get("has_trace_data"))
|
Boolean.TRUE.equals(src.get("has_trace_data")),
|
||||||
|
Boolean.TRUE.equals(src.get("is_replay"))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -31,10 +31,10 @@ public class PostgresExecutionStore implements ExecutionStore {
|
|||||||
attributes,
|
attributes,
|
||||||
error_type, error_category, root_cause_type, root_cause_message,
|
error_type, error_category, root_cause_type, root_cause_message,
|
||||||
trace_id, span_id,
|
trace_id, span_id,
|
||||||
processors_json, has_trace_data,
|
processors_json, has_trace_data, is_replay,
|
||||||
created_at, updated_at)
|
created_at, updated_at)
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb,
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb,
|
||||||
?, ?, ?, ?, ?, ?, ?::jsonb, ?, now(), now())
|
?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?, now(), now())
|
||||||
ON CONFLICT (execution_id, start_time) DO UPDATE SET
|
ON CONFLICT (execution_id, start_time) DO UPDATE SET
|
||||||
status = CASE
|
status = CASE
|
||||||
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
|
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
|
||||||
@@ -62,6 +62,7 @@ public class PostgresExecutionStore implements ExecutionStore {
|
|||||||
span_id = COALESCE(EXCLUDED.span_id, executions.span_id),
|
span_id = COALESCE(EXCLUDED.span_id, executions.span_id),
|
||||||
processors_json = COALESCE(EXCLUDED.processors_json, executions.processors_json),
|
processors_json = COALESCE(EXCLUDED.processors_json, executions.processors_json),
|
||||||
has_trace_data = EXCLUDED.has_trace_data OR executions.has_trace_data,
|
has_trace_data = EXCLUDED.has_trace_data OR executions.has_trace_data,
|
||||||
|
is_replay = EXCLUDED.is_replay OR executions.is_replay,
|
||||||
updated_at = now()
|
updated_at = now()
|
||||||
""",
|
""",
|
||||||
execution.executionId(), execution.routeId(), execution.agentId(),
|
execution.executionId(), execution.routeId(), execution.agentId(),
|
||||||
@@ -78,7 +79,7 @@ public class PostgresExecutionStore implements ExecutionStore {
|
|||||||
execution.errorType(), execution.errorCategory(),
|
execution.errorType(), execution.errorCategory(),
|
||||||
execution.rootCauseType(), execution.rootCauseMessage(),
|
execution.rootCauseType(), execution.rootCauseMessage(),
|
||||||
execution.traceId(), execution.spanId(),
|
execution.traceId(), execution.spanId(),
|
||||||
execution.processorsJson(), execution.hasTraceData());
|
execution.processorsJson(), execution.hasTraceData(), execution.isReplay());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -180,7 +181,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
|||||||
rs.getString("root_cause_type"), rs.getString("root_cause_message"),
|
rs.getString("root_cause_type"), rs.getString("root_cause_message"),
|
||||||
rs.getString("trace_id"), rs.getString("span_id"),
|
rs.getString("trace_id"), rs.getString("span_id"),
|
||||||
rs.getString("processors_json"),
|
rs.getString("processors_json"),
|
||||||
rs.getBoolean("has_trace_data"));
|
rs.getBoolean("has_trace_data"),
|
||||||
|
rs.getBoolean("is_replay"));
|
||||||
|
|
||||||
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
|
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
|
||||||
new ProcessorRecord(
|
new ProcessorRecord(
|
||||||
|
|||||||
@@ -0,0 +1,7 @@
|
|||||||
|
-- Flag indicating whether this execution is a replayed exchange
|
||||||
|
ALTER TABLE executions ADD COLUMN IF NOT EXISTS is_replay BOOLEAN NOT NULL DEFAULT FALSE;
|
||||||
|
|
||||||
|
-- Backfill: check inputHeaders JSON for X-Cameleer-Replay header
|
||||||
|
UPDATE executions SET is_replay = TRUE
|
||||||
|
WHERE input_headers IS NOT NULL
|
||||||
|
AND input_headers::jsonb ? 'X-Cameleer-Replay';
|
||||||
@@ -79,7 +79,7 @@ public class SearchIndexer implements SearchIndexerStats {
|
|||||||
exec.status(), exec.correlationId(), exec.exchangeId(),
|
exec.status(), exec.correlationId(), exec.exchangeId(),
|
||||||
exec.startTime(), exec.endTime(), exec.durationMs(),
|
exec.startTime(), exec.endTime(), exec.durationMs(),
|
||||||
exec.errorMessage(), exec.errorStacktrace(), processorDocs,
|
exec.errorMessage(), exec.errorStacktrace(), processorDocs,
|
||||||
exec.attributes(), exec.hasTraceData()));
|
exec.attributes(), exec.hasTraceData(), exec.isReplay()));
|
||||||
|
|
||||||
indexedCount.incrementAndGet();
|
indexedCount.incrementAndGet();
|
||||||
lastIndexedAt = Instant.now();
|
lastIndexedAt = Instant.now();
|
||||||
|
|||||||
@@ -102,6 +102,12 @@ public class IngestionService {
|
|||||||
|
|
||||||
boolean hasTraceData = hasAnyTraceData(exec.getProcessors());
|
boolean hasTraceData = hasAnyTraceData(exec.getProcessors());
|
||||||
|
|
||||||
|
boolean isReplay = false;
|
||||||
|
if (inputSnapshot != null && inputSnapshot.getHeaders() != null) {
|
||||||
|
isReplay = "true".equalsIgnoreCase(
|
||||||
|
String.valueOf(inputSnapshot.getHeaders().get("X-Cameleer-Replay")));
|
||||||
|
}
|
||||||
|
|
||||||
return new ExecutionRecord(
|
return new ExecutionRecord(
|
||||||
exec.getExchangeId(), exec.getRouteId(), agentId, applicationName,
|
exec.getExchangeId(), exec.getRouteId(), agentId, applicationName,
|
||||||
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
|
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
|
||||||
@@ -117,7 +123,8 @@ public class IngestionService {
|
|||||||
exec.getRootCauseType(), exec.getRootCauseMessage(),
|
exec.getRootCauseType(), exec.getRootCauseMessage(),
|
||||||
exec.getTraceId(), exec.getSpanId(),
|
exec.getTraceId(), exec.getSpanId(),
|
||||||
toJsonObject(exec.getProcessors()),
|
toJsonObject(exec.getProcessors()),
|
||||||
hasTraceData
|
hasTraceData,
|
||||||
|
isReplay
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ public record ExecutionSummary(
|
|||||||
String diagramContentHash,
|
String diagramContentHash,
|
||||||
String highlight,
|
String highlight,
|
||||||
Map<String, String> attributes,
|
Map<String, String> attributes,
|
||||||
boolean hasTraceData
|
boolean hasTraceData,
|
||||||
|
boolean isReplay
|
||||||
) {
|
) {
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,7 +30,8 @@ public interface ExecutionStore {
|
|||||||
String rootCauseType, String rootCauseMessage,
|
String rootCauseType, String rootCauseMessage,
|
||||||
String traceId, String spanId,
|
String traceId, String spanId,
|
||||||
String processorsJson,
|
String processorsJson,
|
||||||
boolean hasTraceData
|
boolean hasTraceData,
|
||||||
|
boolean isReplay
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
record ProcessorRecord(
|
record ProcessorRecord(
|
||||||
|
|||||||
@@ -10,7 +10,8 @@ public record ExecutionDocument(
|
|||||||
String errorMessage, String errorStacktrace,
|
String errorMessage, String errorStacktrace,
|
||||||
List<ProcessorDoc> processors,
|
List<ProcessorDoc> processors,
|
||||||
String attributes,
|
String attributes,
|
||||||
boolean hasTraceData
|
boolean hasTraceData,
|
||||||
|
boolean isReplay
|
||||||
) {
|
) {
|
||||||
public record ProcessorDoc(
|
public record ProcessorDoc(
|
||||||
String processorId, String processorType, String status,
|
String processorId, String processorType, String status,
|
||||||
|
|||||||
1
ui/src/api/schema.d.ts
vendored
1
ui/src/api/schema.d.ts
vendored
@@ -1504,6 +1504,7 @@ export interface components {
|
|||||||
[key: string]: string;
|
[key: string]: string;
|
||||||
};
|
};
|
||||||
hasTraceData: boolean;
|
hasTraceData: boolean;
|
||||||
|
isReplay: boolean;
|
||||||
};
|
};
|
||||||
SearchResultExecutionSummary: {
|
SearchResultExecutionSummary: {
|
||||||
data: components["schemas"]["ExecutionSummary"][];
|
data: components["schemas"]["ExecutionSummary"][];
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import { useState, useMemo, useCallback } from 'react'
|
import { useState, useMemo, useCallback } from 'react'
|
||||||
import { useParams, useNavigate, useSearchParams } from 'react-router'
|
import { useParams, useNavigate, useSearchParams } from 'react-router'
|
||||||
import { AlertTriangle, X, Search, Footprints } from 'lucide-react'
|
import { AlertTriangle, X, Search, Footprints, RotateCcw } from 'lucide-react'
|
||||||
import {
|
import {
|
||||||
DataTable,
|
DataTable,
|
||||||
StatusDot,
|
StatusDot,
|
||||||
@@ -79,6 +79,7 @@ function buildBaseColumns(): Column<Row>[] {
|
|||||||
<StatusDot variant={statusToVariant(row.status)} />
|
<StatusDot variant={statusToVariant(row.status)} />
|
||||||
<MonoText size="xs">{statusLabel(row.status)}</MonoText>
|
<MonoText size="xs">{statusLabel(row.status)}</MonoText>
|
||||||
{row.hasTraceData && <Footprints size={11} color="#3D7C47" style={{ marginLeft: 2, flexShrink: 0 }} />}
|
{row.hasTraceData && <Footprints size={11} color="#3D7C47" style={{ marginLeft: 2, flexShrink: 0 }} />}
|
||||||
|
{row.isReplay && <RotateCcw size={11} color="var(--amber)" style={{ marginLeft: 2, flexShrink: 0 }} />}
|
||||||
</span>
|
</span>
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -122,7 +122,7 @@ export function ExchangeHeader({ detail, onCorrelatedSelect, onClearSelection }:
|
|||||||
{showChain ? chain.map((ce: any, i: number) => {
|
{showChain ? chain.map((ce: any, i: number) => {
|
||||||
const isCurrent = ce.executionId === detail.executionId;
|
const isCurrent = ce.executionId === detail.executionId;
|
||||||
const variant = statusVariant(ce.status);
|
const variant = statusVariant(ce.status);
|
||||||
const isReplay = ce.attributes?._replay != null;
|
const isReplay = !!ce.isReplay;
|
||||||
const statusCls =
|
const statusCls =
|
||||||
variant === 'success' ? styles.chainNodeSuccess
|
variant === 'success' ? styles.chainNodeSuccess
|
||||||
: variant === 'error' ? styles.chainNodeError
|
: variant === 'error' ? styles.chainNodeError
|
||||||
|
|||||||
Reference in New Issue
Block a user