5 Commits

Author SHA1 Message Date
hsiegeln
ebe768711b fix: Cmd-K exchange selection reads exchangeId from URL params
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m1s
CI / docker (push) Successful in 57s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 36s
ExchangesPage ignored the exchangeId URL parameter, so selecting an
exchange from the command palette navigated to the right URL but never
displayed the execution overlay. Now derives selection from URL params
as fallback, and LayoutShell passes selectedExchange in state for
exchange/attribute results.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:26:36 +02:00
hsiegeln
af45f93854 fix: add missing isReplay parameter to test constructors
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m1s
CI / docker (push) Successful in 57s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 41s
The ExecutionDocument and ExecutionRecord records gained an isReplay
field but the integration tests were not updated, breaking CI.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:08:12 +02:00
hsiegeln
da1d74309e fix: detect replay via replayExchangeId field, not just header
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 1m4s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
The X-Cameleer-Replay header is only available when inputSnapshot is
captured (DETAILED/DEEP engine level). The agent always sets
replayExchangeId on RouteExecution, so check that first.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 14:57:59 +02:00
hsiegeln
7a4d7b6915 fix: resolve 8 SonarQube reliability bugs
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 1m2s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
- ElkDiagramRenderer: guard against null containingNode before getElkRoot()
- OpenSearchAdminController: return 503/502 instead of 200 on errors
- DatabaseAdminController: return 503 instead of 200 on connection failure
- SpaForwardController: replace unbound {path} variables with /** wildcards
- WriteBuffer: check offer() return value and log on unexpected rejection
- ApiExceptionHandler: extract getReason() to local var for null safety
- Admin UI pages: handle isError state for disconnected service display

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 14:39:54 +02:00
hsiegeln
ab7031e6ed feat: add is_replay flag to execution pipeline and UI
Detect replayed exchanges via X-Cameleer-Replay header during ingestion,
persist the flag through PostgreSQL and OpenSearch, and surface it in
the dashboard (amber replay icon) and exchange detail chain view.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 14:39:40 +02:00
24 changed files with 118 additions and 42 deletions

View File

@@ -14,7 +14,8 @@ public class ApiExceptionHandler {
@ExceptionHandler(ResponseStatusException.class)
public ResponseEntity<ErrorResponse> handleResponseStatus(ResponseStatusException ex) {
String reason = ex.getReason();
return ResponseEntity.status(ex.getStatusCode())
.body(new ErrorResponse(ex.getReason() != null ? ex.getReason() : "Unknown error"));
.body(new ErrorResponse(reason != null ? reason : "Unknown error"));
}
}

View File

@@ -59,7 +59,8 @@ public class DatabaseAdminController {
String host = extractHost(dataSource);
return ResponseEntity.ok(new DatabaseStatusResponse(true, version, host, schema, timescaleDb));
} catch (Exception e) {
return ResponseEntity.ok(new DatabaseStatusResponse(false, null, null, null, false));
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(new DatabaseStatusResponse(false, null, null, null, false));
}
}

View File

@@ -80,7 +80,8 @@ public class OpenSearchAdminController {
health.numberOfNodes(),
opensearchUrl));
} catch (Exception e) {
return ResponseEntity.ok(new OpenSearchStatusResponse(
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(new OpenSearchStatusResponse(
false, "UNREACHABLE", null, 0, opensearchUrl));
}
}
@@ -149,7 +150,8 @@ public class OpenSearchAdminController {
pageItems, totalIndices, totalDocs,
humanSize(totalBytes), page, size, totalPages));
} catch (Exception e) {
return ResponseEntity.ok(new IndicesPageResponse(
return ResponseEntity.status(HttpStatus.BAD_GATEWAY)
.body(new IndicesPageResponse(
List.of(), 0, 0, "0 B", page, size, 0));
}
}
@@ -234,7 +236,8 @@ public class OpenSearchAdminController {
searchLatency, indexingLatency,
heapUsed, heapMax));
} catch (Exception e) {
return ResponseEntity.ok(new PerformanceResponse(0, 0, 0, 0, 0, 0));
return ResponseEntity.status(HttpStatus.BAD_GATEWAY)
.body(new PerformanceResponse(0, 0, 0, 0, 0, 0));
}
}

View File

@@ -415,12 +415,13 @@ public class ElkDiagramRenderer implements DiagramRenderer {
for (ElkEdge elkEdge : allEdges) {
String sourceId = elkEdge.getSources().isEmpty() ? "" : elkEdge.getSources().get(0).getIdentifier();
String targetId = elkEdge.getTargets().isEmpty() ? "" : elkEdge.getTargets().get(0).getIdentifier();
ElkNode edgeRoot = getElkRoot(elkEdge.getContainingNode());
ElkNode containingNode = elkEdge.getContainingNode();
ElkNode edgeRoot = containingNode != null ? getElkRoot(containingNode) : null;
List<double[]> points = new ArrayList<>();
for (ElkEdgeSection section : elkEdge.getSections()) {
double cx = getAbsoluteX(elkEdge.getContainingNode(), edgeRoot);
double cy = getAbsoluteY(elkEdge.getContainingNode(), edgeRoot);
double cx = containingNode != null ? getAbsoluteX(containingNode, edgeRoot) : 0;
double cy = containingNode != null ? getAbsoluteY(containingNode, edgeRoot) : 0;
points.add(new double[]{section.getStartX() + cx, section.getStartY() + cy});
for (ElkBendPoint bp : section.getBendPoints()) {
points.add(new double[]{bp.getX() + cx, bp.getY() + cy});

View File

@@ -362,6 +362,7 @@ public class OpenSearchIndex implements SearchIndex {
}).toList());
}
map.put("has_trace_data", doc.hasTraceData());
map.put("is_replay", doc.isReplay());
return map;
}
@@ -399,7 +400,8 @@ public class OpenSearchIndex implements SearchIndex {
null, // diagramContentHash not stored in index
extractHighlight(hit),
attributes,
Boolean.TRUE.equals(src.get("has_trace_data"))
Boolean.TRUE.equals(src.get("has_trace_data")),
Boolean.TRUE.equals(src.get("is_replay"))
);
}

View File

@@ -31,10 +31,10 @@ public class PostgresExecutionStore implements ExecutionStore {
attributes,
error_type, error_category, root_cause_type, root_cause_message,
trace_id, span_id,
processors_json, has_trace_data,
processors_json, has_trace_data, is_replay,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb,
?, ?, ?, ?, ?, ?, ?::jsonb, ?, now(), now())
?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?, now(), now())
ON CONFLICT (execution_id, start_time) DO UPDATE SET
status = CASE
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
@@ -62,6 +62,7 @@ public class PostgresExecutionStore implements ExecutionStore {
span_id = COALESCE(EXCLUDED.span_id, executions.span_id),
processors_json = COALESCE(EXCLUDED.processors_json, executions.processors_json),
has_trace_data = EXCLUDED.has_trace_data OR executions.has_trace_data,
is_replay = EXCLUDED.is_replay OR executions.is_replay,
updated_at = now()
""",
execution.executionId(), execution.routeId(), execution.agentId(),
@@ -78,7 +79,7 @@ public class PostgresExecutionStore implements ExecutionStore {
execution.errorType(), execution.errorCategory(),
execution.rootCauseType(), execution.rootCauseMessage(),
execution.traceId(), execution.spanId(),
execution.processorsJson(), execution.hasTraceData());
execution.processorsJson(), execution.hasTraceData(), execution.isReplay());
}
@Override
@@ -180,7 +181,8 @@ public class PostgresExecutionStore implements ExecutionStore {
rs.getString("root_cause_type"), rs.getString("root_cause_message"),
rs.getString("trace_id"), rs.getString("span_id"),
rs.getString("processors_json"),
rs.getBoolean("has_trace_data"));
rs.getBoolean("has_trace_data"),
rs.getBoolean("is_replay"));
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
new ProcessorRecord(

View File

@@ -16,9 +16,9 @@ public class SpaForwardController {
@GetMapping(value = {
"/login",
"/executions",
"/executions/{path:[^\\.]*}",
"/executions/**",
"/oidc/callback",
"/admin/{path:[^\\.]*}"
"/admin/**"
})
public String forward() {
return "forward:/index.html";

View File

@@ -0,0 +1,7 @@
-- Flag indicating whether this execution is a replayed exchange
ALTER TABLE executions ADD COLUMN IF NOT EXISTS is_replay BOOLEAN NOT NULL DEFAULT FALSE;
-- Backfill: check inputHeaders JSON for X-Cameleer-Replay header
UPDATE executions SET is_replay = TRUE
WHERE input_headers IS NOT NULL
AND input_headers::jsonb ? 'X-Cameleer-Replay';

View File

@@ -36,7 +36,7 @@ class OpenSearchIndexIT extends AbstractPostgresIT {
"OrderNotFoundException: order-12345 not found", null,
List.of(new ProcessorDoc("proc-1", "log", "COMPLETED",
null, null, "request body with customer-99", null, null, null, null)),
null, false);
null, false, false);
searchIndex.index(doc);
refreshOpenSearchIndices();
@@ -62,7 +62,7 @@ class OpenSearchIndexIT extends AbstractPostgresIT {
now, now.plusMillis(50), 50L, null, null,
List.of(new ProcessorDoc("proc-1", "bean", "COMPLETED",
null, null, "UniquePayloadIdentifier12345", null, null, null, null)),
null, false);
null, false, false);
searchIndex.index(doc);
refreshOpenSearchIndices();

View File

@@ -27,7 +27,7 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
now, now.plusMillis(100), 100L,
null, null, null,
"REGULAR", null, null, null, null, null,
null, null, null, null, null, null, null, false);
null, null, null, null, null, null, null, false, false);
executionStore.upsert(record);
Optional<ExecutionRecord> found = executionStore.findById("exec-1");
@@ -45,12 +45,12 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
"exec-dup", "route-a", "agent-1", "app-1",
"RUNNING", null, null, now, null, null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null, null, null, false);
null, null, null, null, null, null, null, false, false);
ExecutionRecord second = new ExecutionRecord(
"exec-dup", "route-a", "agent-1", "app-1",
"COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null,
"COMPLETE", null, null, null, null, null,
null, null, null, null, null, null, null, false);
null, null, null, null, null, null, null, false, false);
executionStore.upsert(first);
executionStore.upsert(second);
@@ -68,7 +68,7 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
"exec-proc", "route-a", "agent-1", "app-1",
"COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null,
"COMPLETE", null, null, null, null, null,
null, null, null, null, null, null, null, false);
null, null, null, null, null, null, null, false, false);
executionStore.upsert(exec);
List<ProcessorRecord> processors = List.of(

View File

@@ -61,6 +61,6 @@ class PostgresStatsStoreIT extends AbstractPostgresIT {
startTime, startTime.plusMillis(durationMs), durationMs,
status.equals("FAILED") ? "error" : null, null, null,
null, null, null, null, null, null,
null, null, null, null, null, null, null, false));
null, null, null, null, null, null, null, false, false));
}
}

View File

@@ -79,7 +79,7 @@ public class SearchIndexer implements SearchIndexerStats {
exec.status(), exec.correlationId(), exec.exchangeId(),
exec.startTime(), exec.endTime(), exec.durationMs(),
exec.errorMessage(), exec.errorStacktrace(), processorDocs,
exec.attributes(), exec.hasTraceData()));
exec.attributes(), exec.hasTraceData(), exec.isReplay()));
indexedCount.incrementAndGet();
lastIndexedAt = Instant.now();

View File

@@ -102,6 +102,12 @@ public class IngestionService {
boolean hasTraceData = hasAnyTraceData(exec.getProcessors());
boolean isReplay = exec.getReplayExchangeId() != null;
if (!isReplay && inputSnapshot != null && inputSnapshot.getHeaders() != null) {
isReplay = "true".equalsIgnoreCase(
String.valueOf(inputSnapshot.getHeaders().get("X-Cameleer-Replay")));
}
return new ExecutionRecord(
exec.getExchangeId(), exec.getRouteId(), agentId, applicationName,
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
@@ -117,7 +123,8 @@ public class IngestionService {
exec.getRootCauseType(), exec.getRootCauseMessage(),
exec.getTraceId(), exec.getSpanId(),
toJsonObject(exec.getProcessors()),
hasTraceData
hasTraceData,
isReplay
);
}

View File

@@ -1,5 +1,8 @@
package com.cameleer3.server.core.ingestion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
@@ -16,6 +19,8 @@ import java.util.concurrent.BlockingQueue;
*/
public class WriteBuffer<T> {
private static final Logger log = LoggerFactory.getLogger(WriteBuffer.class);
private final BlockingQueue<T> queue;
private final int capacity;
@@ -45,7 +50,10 @@ public class WriteBuffer<T> {
return false;
}
for (T item : items) {
queue.offer(item);
if (!queue.offer(item)) {
log.warn("WriteBuffer offer rejected despite capacity check — possible concurrent modification");
return false;
}
}
return true;
}

View File

@@ -34,6 +34,7 @@ public record ExecutionSummary(
String diagramContentHash,
String highlight,
Map<String, String> attributes,
boolean hasTraceData
boolean hasTraceData,
boolean isReplay
) {
}

View File

@@ -30,7 +30,8 @@ public interface ExecutionStore {
String rootCauseType, String rootCauseMessage,
String traceId, String spanId,
String processorsJson,
boolean hasTraceData
boolean hasTraceData,
boolean isReplay
) {}
record ProcessorRecord(

View File

@@ -10,7 +10,8 @@ public record ExecutionDocument(
String errorMessage, String errorStacktrace,
List<ProcessorDoc> processors,
String attributes,
boolean hasTraceData
boolean hasTraceData,
boolean isReplay
) {
public record ProcessorDoc(
String processorId, String processorType, String status,

View File

@@ -1504,6 +1504,7 @@ export interface components {
[key: string]: string;
};
hasTraceData: boolean;
isReplay: boolean;
};
SearchResultExecutionSummary: {
data: components["schemas"]["ExecutionSummary"][];

View File

@@ -210,7 +210,21 @@ function LayoutContent() {
const handlePaletteSelect = useCallback((result: any) => {
if (result.path) {
navigate(result.path, { state: result.path ? { sidebarReveal: result.path } : undefined });
const state: Record<string, unknown> = { sidebarReveal: result.path };
// For exchange/attribute results, pass selectedExchange in state
if (result.category === 'exchange' || result.category === 'attribute') {
const parts = result.path.split('/').filter(Boolean);
if (parts.length === 4 && parts[0] === 'exchanges') {
state.selectedExchange = {
executionId: parts[3],
applicationName: parts[1],
routeId: parts[2],
};
}
}
navigate(result.path, { state });
}
setPaletteOpen(false);
}, [navigate, setPaletteOpen]);

View File

@@ -3,7 +3,8 @@ import type { Column } from '@cameleer/design-system';
import { useDatabaseStatus, useConnectionPool, useDatabaseTables, useActiveQueries, useKillQuery } from '../../api/queries/admin/database';
export default function DatabaseAdminPage() {
const { data: status } = useDatabaseStatus();
const { data: status, isError: statusError } = useDatabaseStatus();
const unreachable = statusError || (status && !status.connected);
const { data: pool } = useConnectionPool();
const { data: tables } = useDatabaseTables();
const { data: queries } = useActiveQueries();
@@ -34,7 +35,7 @@ export default function DatabaseAdminPage() {
<h2 style={{ marginBottom: '1rem' }}>Database Administration</h2>
<div style={{ display: 'flex', gap: '1rem', marginBottom: '1.5rem', flexWrap: 'wrap' }}>
<StatCard label="Status" value={status?.connected ? 'Connected' : 'Disconnected'} accent={status?.connected ? 'success' : 'error'} />
<StatCard label="Status" value={unreachable ? 'Disconnected' : status ? 'Connected' : '\u2014'} accent={unreachable ? 'error' : status ? 'success' : undefined} />
<StatCard label="Version" value={status?.version ?? '—'} />
<StatCard label="TimescaleDB" value={status?.timescaleDb ? 'Enabled' : 'Disabled'} />
</div>

View File

@@ -4,11 +4,12 @@ import { useOpenSearchStatus, usePipelineStats, useOpenSearchIndices, useOpenSea
import styles from './OpenSearchAdminPage.module.css';
export default function OpenSearchAdminPage() {
const { data: status } = useOpenSearchStatus();
const { data: status, isError: statusError } = useOpenSearchStatus();
const { data: pipeline } = usePipelineStats();
const { data: perf } = useOpenSearchPerformance();
const { data: execIndices } = useOpenSearchIndices(0, 50, '', 'executions');
const { data: logIndices } = useOpenSearchIndices(0, 50, '', 'logs');
const unreachable = statusError || (status && !status.reachable);
const deleteIndex = useDeleteIndex();
const indexColumns: Column<any>[] = [
@@ -22,7 +23,7 @@ export default function OpenSearchAdminPage() {
return (
<div>
<div className={styles.statStrip}>
<StatCard label="Status" value={status?.reachable ? 'Connected' : 'Disconnected'} accent={status?.reachable ? 'success' : 'error'} />
<StatCard label="Status" value={unreachable ? 'Disconnected' : status ? 'Connected' : '\u2014'} accent={unreachable ? 'error' : status ? 'success' : undefined} />
<StatCard label="Health" value={status?.clusterHealth ?? '\u2014'} accent={status?.clusterHealth === 'green' ? 'success' : 'warning'} />
<StatCard label="Version" value={status?.version ?? '\u2014'} />
<StatCard label="Nodes" value={status?.nodeCount ?? 0} />

View File

@@ -1,6 +1,6 @@
import { useState, useMemo, useCallback } from 'react'
import { useParams, useNavigate, useSearchParams } from 'react-router'
import { AlertTriangle, X, Search, Footprints } from 'lucide-react'
import { AlertTriangle, X, Search, Footprints, RotateCcw } from 'lucide-react'
import {
DataTable,
StatusDot,
@@ -79,6 +79,7 @@ function buildBaseColumns(): Column<Row>[] {
<StatusDot variant={statusToVariant(row.status)} />
<MonoText size="xs">{statusLabel(row.status)}</MonoText>
{row.hasTraceData && <Footprints size={11} color="#3D7C47" style={{ marginLeft: 2, flexShrink: 0 }} />}
{row.isReplay && <RotateCcw size={11} color="var(--amber)" style={{ marginLeft: 2, flexShrink: 0 }} />}
</span>
),
},

View File

@@ -122,7 +122,7 @@ export function ExchangeHeader({ detail, onCorrelatedSelect, onClearSelection }:
{showChain ? chain.map((ce: any, i: number) => {
const isCurrent = ce.executionId === detail.executionId;
const variant = statusVariant(ce.status);
const isReplay = ce.attributes?._replay != null;
const isReplay = !!ce.isReplay;
const statusCls =
variant === 'success' ? styles.chainNodeSuccess
: variant === 'error' ? styles.chainNodeError

View File

@@ -20,17 +20,35 @@ import type { SelectedExchange } from '../Dashboard/Dashboard';
export default function ExchangesPage() {
const navigate = useNavigate();
const location = useLocation();
const { appId: scopedAppId, routeId: scopedRouteId } = useParams<{ appId?: string; routeId?: string }>();
const { appId: scopedAppId, routeId: scopedRouteId, exchangeId: scopedExchangeId } =
useParams<{ appId?: string; routeId?: string; exchangeId?: string }>();
// Restore selection from browser history state (enables Back/Forward)
const stateSelected = (location.state as any)?.selectedExchange as SelectedExchange | undefined;
const [selected, setSelectedInternal] = useState<SelectedExchange | null>(stateSelected ?? null);
// Sync from history state when the user navigates Back/Forward
// Derive selection from URL params when no state-based selection exists (Cmd-K, bookmarks)
const urlDerivedExchange: SelectedExchange | null =
(scopedExchangeId && scopedAppId && scopedRouteId)
? { executionId: scopedExchangeId, applicationName: scopedAppId, routeId: scopedRouteId }
: null;
const [selected, setSelectedInternal] = useState<SelectedExchange | null>(stateSelected ?? urlDerivedExchange);
// Sync selection from history state or URL params on navigation changes
useEffect(() => {
const restored = (location.state as any)?.selectedExchange as SelectedExchange | undefined;
setSelectedInternal(restored ?? null);
}, [location.state]);
if (restored) {
setSelectedInternal(restored);
} else if (scopedExchangeId && scopedAppId && scopedRouteId) {
setSelectedInternal({
executionId: scopedExchangeId,
applicationName: scopedAppId,
routeId: scopedRouteId,
});
} else {
setSelectedInternal(null);
}
}, [location.state, scopedExchangeId, scopedAppId, scopedRouteId]);
const [splitPercent, setSplitPercent] = useState(50);
const containerRef = useRef<HTMLDivElement>(null);
@@ -52,10 +70,15 @@ export default function ExchangesPage() {
});
}, [navigate, location.pathname, location.search, location.state]);
// Clear selection: push a history entry without selection (so Back returns to selected state)
// Clear selection: navigate up to route level when URL has exchangeId
const handleClearSelection = useCallback(() => {
setSelectedInternal(null);
}, []);
if (scopedExchangeId && scopedAppId && scopedRouteId) {
navigate(`/exchanges/${scopedAppId}/${scopedRouteId}`, {
state: { ...location.state, selectedExchange: undefined },
});
}
}, [scopedExchangeId, scopedAppId, scopedRouteId, navigate, location.state]);
const handleSplitterDown = useCallback((e: React.PointerEvent) => {
e.currentTarget.setPointerCapture(e.pointerId);