Compare commits

2 Commits

Author SHA1 Message Date
hsiegeln
7a4d7b6915 fix: resolve 8 SonarQube reliability bugs
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 1m2s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
- ElkDiagramRenderer: guard against null containingNode before getElkRoot()
- OpenSearchAdminController: return 503/502 instead of 200 on errors
- DatabaseAdminController: return 503 instead of 200 on connection failure
- SpaForwardController: replace unbound {path} variables with /** wildcards
- WriteBuffer: check offer() return value and log on unexpected rejection
- ApiExceptionHandler: extract getReason() to local var for null safety
- Admin UI pages: handle isError state for disconnected service display

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 14:39:54 +02:00
hsiegeln
ab7031e6ed feat: add is_replay flag to execution pipeline and UI
Detect replayed exchanges via X-Cameleer-Replay header during ingestion,
persist the flag through PostgreSQL and OpenSearch, and surface it in
the dashboard (amber replay icon) and exchange detail chain view.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 14:39:40 +02:00
19 changed files with 66 additions and 27 deletions

View File

@@ -14,7 +14,8 @@ public class ApiExceptionHandler {
@ExceptionHandler(ResponseStatusException.class) @ExceptionHandler(ResponseStatusException.class)
public ResponseEntity<ErrorResponse> handleResponseStatus(ResponseStatusException ex) { public ResponseEntity<ErrorResponse> handleResponseStatus(ResponseStatusException ex) {
String reason = ex.getReason();
return ResponseEntity.status(ex.getStatusCode()) return ResponseEntity.status(ex.getStatusCode())
.body(new ErrorResponse(ex.getReason() != null ? ex.getReason() : "Unknown error")); .body(new ErrorResponse(reason != null ? reason : "Unknown error"));
} }
} }

View File

@@ -59,7 +59,8 @@ public class DatabaseAdminController {
String host = extractHost(dataSource); String host = extractHost(dataSource);
return ResponseEntity.ok(new DatabaseStatusResponse(true, version, host, schema, timescaleDb)); return ResponseEntity.ok(new DatabaseStatusResponse(true, version, host, schema, timescaleDb));
} catch (Exception e) { } catch (Exception e) {
return ResponseEntity.ok(new DatabaseStatusResponse(false, null, null, null, false)); return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(new DatabaseStatusResponse(false, null, null, null, false));
} }
} }

View File

@@ -80,7 +80,8 @@ public class OpenSearchAdminController {
health.numberOfNodes(), health.numberOfNodes(),
opensearchUrl)); opensearchUrl));
} catch (Exception e) { } catch (Exception e) {
return ResponseEntity.ok(new OpenSearchStatusResponse( return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(new OpenSearchStatusResponse(
false, "UNREACHABLE", null, 0, opensearchUrl)); false, "UNREACHABLE", null, 0, opensearchUrl));
} }
} }
@@ -149,7 +150,8 @@ public class OpenSearchAdminController {
pageItems, totalIndices, totalDocs, pageItems, totalIndices, totalDocs,
humanSize(totalBytes), page, size, totalPages)); humanSize(totalBytes), page, size, totalPages));
} catch (Exception e) { } catch (Exception e) {
return ResponseEntity.ok(new IndicesPageResponse( return ResponseEntity.status(HttpStatus.BAD_GATEWAY)
.body(new IndicesPageResponse(
List.of(), 0, 0, "0 B", page, size, 0)); List.of(), 0, 0, "0 B", page, size, 0));
} }
} }
@@ -234,7 +236,8 @@ public class OpenSearchAdminController {
searchLatency, indexingLatency, searchLatency, indexingLatency,
heapUsed, heapMax)); heapUsed, heapMax));
} catch (Exception e) { } catch (Exception e) {
return ResponseEntity.ok(new PerformanceResponse(0, 0, 0, 0, 0, 0)); return ResponseEntity.status(HttpStatus.BAD_GATEWAY)
.body(new PerformanceResponse(0, 0, 0, 0, 0, 0));
} }
} }

View File

@@ -415,12 +415,13 @@ public class ElkDiagramRenderer implements DiagramRenderer {
for (ElkEdge elkEdge : allEdges) { for (ElkEdge elkEdge : allEdges) {
String sourceId = elkEdge.getSources().isEmpty() ? "" : elkEdge.getSources().get(0).getIdentifier(); String sourceId = elkEdge.getSources().isEmpty() ? "" : elkEdge.getSources().get(0).getIdentifier();
String targetId = elkEdge.getTargets().isEmpty() ? "" : elkEdge.getTargets().get(0).getIdentifier(); String targetId = elkEdge.getTargets().isEmpty() ? "" : elkEdge.getTargets().get(0).getIdentifier();
ElkNode edgeRoot = getElkRoot(elkEdge.getContainingNode()); ElkNode containingNode = elkEdge.getContainingNode();
ElkNode edgeRoot = containingNode != null ? getElkRoot(containingNode) : null;
List<double[]> points = new ArrayList<>(); List<double[]> points = new ArrayList<>();
for (ElkEdgeSection section : elkEdge.getSections()) { for (ElkEdgeSection section : elkEdge.getSections()) {
double cx = getAbsoluteX(elkEdge.getContainingNode(), edgeRoot); double cx = containingNode != null ? getAbsoluteX(containingNode, edgeRoot) : 0;
double cy = getAbsoluteY(elkEdge.getContainingNode(), edgeRoot); double cy = containingNode != null ? getAbsoluteY(containingNode, edgeRoot) : 0;
points.add(new double[]{section.getStartX() + cx, section.getStartY() + cy}); points.add(new double[]{section.getStartX() + cx, section.getStartY() + cy});
for (ElkBendPoint bp : section.getBendPoints()) { for (ElkBendPoint bp : section.getBendPoints()) {
points.add(new double[]{bp.getX() + cx, bp.getY() + cy}); points.add(new double[]{bp.getX() + cx, bp.getY() + cy});

View File

@@ -362,6 +362,7 @@ public class OpenSearchIndex implements SearchIndex {
}).toList()); }).toList());
} }
map.put("has_trace_data", doc.hasTraceData()); map.put("has_trace_data", doc.hasTraceData());
map.put("is_replay", doc.isReplay());
return map; return map;
} }
@@ -399,7 +400,8 @@ public class OpenSearchIndex implements SearchIndex {
null, // diagramContentHash not stored in index null, // diagramContentHash not stored in index
extractHighlight(hit), extractHighlight(hit),
attributes, attributes,
Boolean.TRUE.equals(src.get("has_trace_data")) Boolean.TRUE.equals(src.get("has_trace_data")),
Boolean.TRUE.equals(src.get("is_replay"))
); );
} }

View File

@@ -31,10 +31,10 @@ public class PostgresExecutionStore implements ExecutionStore {
attributes, attributes,
error_type, error_category, root_cause_type, root_cause_message, error_type, error_category, root_cause_type, root_cause_message,
trace_id, span_id, trace_id, span_id,
processors_json, has_trace_data, processors_json, has_trace_data, is_replay,
created_at, updated_at) created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb,
?, ?, ?, ?, ?, ?, ?::jsonb, ?, now(), now()) ?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?, now(), now())
ON CONFLICT (execution_id, start_time) DO UPDATE SET ON CONFLICT (execution_id, start_time) DO UPDATE SET
status = CASE status = CASE
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED') WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
@@ -62,6 +62,7 @@ public class PostgresExecutionStore implements ExecutionStore {
span_id = COALESCE(EXCLUDED.span_id, executions.span_id), span_id = COALESCE(EXCLUDED.span_id, executions.span_id),
processors_json = COALESCE(EXCLUDED.processors_json, executions.processors_json), processors_json = COALESCE(EXCLUDED.processors_json, executions.processors_json),
has_trace_data = EXCLUDED.has_trace_data OR executions.has_trace_data, has_trace_data = EXCLUDED.has_trace_data OR executions.has_trace_data,
is_replay = EXCLUDED.is_replay OR executions.is_replay,
updated_at = now() updated_at = now()
""", """,
execution.executionId(), execution.routeId(), execution.agentId(), execution.executionId(), execution.routeId(), execution.agentId(),
@@ -78,7 +79,7 @@ public class PostgresExecutionStore implements ExecutionStore {
execution.errorType(), execution.errorCategory(), execution.errorType(), execution.errorCategory(),
execution.rootCauseType(), execution.rootCauseMessage(), execution.rootCauseType(), execution.rootCauseMessage(),
execution.traceId(), execution.spanId(), execution.traceId(), execution.spanId(),
execution.processorsJson(), execution.hasTraceData()); execution.processorsJson(), execution.hasTraceData(), execution.isReplay());
} }
@Override @Override
@@ -180,7 +181,8 @@ public class PostgresExecutionStore implements ExecutionStore {
rs.getString("root_cause_type"), rs.getString("root_cause_message"), rs.getString("root_cause_type"), rs.getString("root_cause_message"),
rs.getString("trace_id"), rs.getString("span_id"), rs.getString("trace_id"), rs.getString("span_id"),
rs.getString("processors_json"), rs.getString("processors_json"),
rs.getBoolean("has_trace_data")); rs.getBoolean("has_trace_data"),
rs.getBoolean("is_replay"));
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) -> private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
new ProcessorRecord( new ProcessorRecord(

View File

@@ -16,9 +16,9 @@ public class SpaForwardController {
@GetMapping(value = { @GetMapping(value = {
"/login", "/login",
"/executions", "/executions",
"/executions/{path:[^\\.]*}", "/executions/**",
"/oidc/callback", "/oidc/callback",
"/admin/{path:[^\\.]*}" "/admin/**"
}) })
public String forward() { public String forward() {
return "forward:/index.html"; return "forward:/index.html";

View File

@@ -0,0 +1,7 @@
-- Flag indicating whether this execution is a replayed exchange
ALTER TABLE executions ADD COLUMN IF NOT EXISTS is_replay BOOLEAN NOT NULL DEFAULT FALSE;
-- Backfill: check inputHeaders JSON for X-Cameleer-Replay header
UPDATE executions SET is_replay = TRUE
WHERE input_headers IS NOT NULL
AND input_headers::jsonb ? 'X-Cameleer-Replay';

View File

@@ -79,7 +79,7 @@ public class SearchIndexer implements SearchIndexerStats {
exec.status(), exec.correlationId(), exec.exchangeId(), exec.status(), exec.correlationId(), exec.exchangeId(),
exec.startTime(), exec.endTime(), exec.durationMs(), exec.startTime(), exec.endTime(), exec.durationMs(),
exec.errorMessage(), exec.errorStacktrace(), processorDocs, exec.errorMessage(), exec.errorStacktrace(), processorDocs,
exec.attributes(), exec.hasTraceData())); exec.attributes(), exec.hasTraceData(), exec.isReplay()));
indexedCount.incrementAndGet(); indexedCount.incrementAndGet();
lastIndexedAt = Instant.now(); lastIndexedAt = Instant.now();

View File

@@ -102,6 +102,12 @@ public class IngestionService {
boolean hasTraceData = hasAnyTraceData(exec.getProcessors()); boolean hasTraceData = hasAnyTraceData(exec.getProcessors());
boolean isReplay = false;
if (inputSnapshot != null && inputSnapshot.getHeaders() != null) {
isReplay = "true".equalsIgnoreCase(
String.valueOf(inputSnapshot.getHeaders().get("X-Cameleer-Replay")));
}
return new ExecutionRecord( return new ExecutionRecord(
exec.getExchangeId(), exec.getRouteId(), agentId, applicationName, exec.getExchangeId(), exec.getRouteId(), agentId, applicationName,
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING", exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
@@ -117,7 +123,8 @@ public class IngestionService {
exec.getRootCauseType(), exec.getRootCauseMessage(), exec.getRootCauseType(), exec.getRootCauseMessage(),
exec.getTraceId(), exec.getSpanId(), exec.getTraceId(), exec.getSpanId(),
toJsonObject(exec.getProcessors()), toJsonObject(exec.getProcessors()),
hasTraceData hasTraceData,
isReplay
); );
} }

View File

@@ -1,5 +1,8 @@
package com.cameleer3.server.core.ingestion; package com.cameleer3.server.core.ingestion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
@@ -16,6 +19,8 @@ import java.util.concurrent.BlockingQueue;
*/ */
public class WriteBuffer<T> { public class WriteBuffer<T> {
private static final Logger log = LoggerFactory.getLogger(WriteBuffer.class);
private final BlockingQueue<T> queue; private final BlockingQueue<T> queue;
private final int capacity; private final int capacity;
@@ -45,7 +50,10 @@ public class WriteBuffer<T> {
return false; return false;
} }
for (T item : items) { for (T item : items) {
queue.offer(item); if (!queue.offer(item)) {
log.warn("WriteBuffer offer rejected despite capacity check — possible concurrent modification");
return false;
}
} }
return true; return true;
} }

View File

@@ -34,6 +34,7 @@ public record ExecutionSummary(
String diagramContentHash, String diagramContentHash,
String highlight, String highlight,
Map<String, String> attributes, Map<String, String> attributes,
boolean hasTraceData boolean hasTraceData,
boolean isReplay
) { ) {
} }

View File

@@ -30,7 +30,8 @@ public interface ExecutionStore {
String rootCauseType, String rootCauseMessage, String rootCauseType, String rootCauseMessage,
String traceId, String spanId, String traceId, String spanId,
String processorsJson, String processorsJson,
boolean hasTraceData boolean hasTraceData,
boolean isReplay
) {} ) {}
record ProcessorRecord( record ProcessorRecord(

View File

@@ -10,7 +10,8 @@ public record ExecutionDocument(
String errorMessage, String errorStacktrace, String errorMessage, String errorStacktrace,
List<ProcessorDoc> processors, List<ProcessorDoc> processors,
String attributes, String attributes,
boolean hasTraceData boolean hasTraceData,
boolean isReplay
) { ) {
public record ProcessorDoc( public record ProcessorDoc(
String processorId, String processorType, String status, String processorId, String processorType, String status,

View File

@@ -1504,6 +1504,7 @@ export interface components {
[key: string]: string; [key: string]: string;
}; };
hasTraceData: boolean; hasTraceData: boolean;
isReplay: boolean;
}; };
SearchResultExecutionSummary: { SearchResultExecutionSummary: {
data: components["schemas"]["ExecutionSummary"][]; data: components["schemas"]["ExecutionSummary"][];

View File

@@ -3,7 +3,8 @@ import type { Column } from '@cameleer/design-system';
import { useDatabaseStatus, useConnectionPool, useDatabaseTables, useActiveQueries, useKillQuery } from '../../api/queries/admin/database'; import { useDatabaseStatus, useConnectionPool, useDatabaseTables, useActiveQueries, useKillQuery } from '../../api/queries/admin/database';
export default function DatabaseAdminPage() { export default function DatabaseAdminPage() {
const { data: status } = useDatabaseStatus(); const { data: status, isError: statusError } = useDatabaseStatus();
const unreachable = statusError || (status && !status.connected);
const { data: pool } = useConnectionPool(); const { data: pool } = useConnectionPool();
const { data: tables } = useDatabaseTables(); const { data: tables } = useDatabaseTables();
const { data: queries } = useActiveQueries(); const { data: queries } = useActiveQueries();
@@ -34,7 +35,7 @@ export default function DatabaseAdminPage() {
<h2 style={{ marginBottom: '1rem' }}>Database Administration</h2> <h2 style={{ marginBottom: '1rem' }}>Database Administration</h2>
<div style={{ display: 'flex', gap: '1rem', marginBottom: '1.5rem', flexWrap: 'wrap' }}> <div style={{ display: 'flex', gap: '1rem', marginBottom: '1.5rem', flexWrap: 'wrap' }}>
<StatCard label="Status" value={status?.connected ? 'Connected' : 'Disconnected'} accent={status?.connected ? 'success' : 'error'} /> <StatCard label="Status" value={unreachable ? 'Disconnected' : status ? 'Connected' : '\u2014'} accent={unreachable ? 'error' : status ? 'success' : undefined} />
<StatCard label="Version" value={status?.version ?? '—'} /> <StatCard label="Version" value={status?.version ?? '—'} />
<StatCard label="TimescaleDB" value={status?.timescaleDb ? 'Enabled' : 'Disabled'} /> <StatCard label="TimescaleDB" value={status?.timescaleDb ? 'Enabled' : 'Disabled'} />
</div> </div>

View File

@@ -4,11 +4,12 @@ import { useOpenSearchStatus, usePipelineStats, useOpenSearchIndices, useOpenSea
import styles from './OpenSearchAdminPage.module.css'; import styles from './OpenSearchAdminPage.module.css';
export default function OpenSearchAdminPage() { export default function OpenSearchAdminPage() {
const { data: status } = useOpenSearchStatus(); const { data: status, isError: statusError } = useOpenSearchStatus();
const { data: pipeline } = usePipelineStats(); const { data: pipeline } = usePipelineStats();
const { data: perf } = useOpenSearchPerformance(); const { data: perf } = useOpenSearchPerformance();
const { data: execIndices } = useOpenSearchIndices(0, 50, '', 'executions'); const { data: execIndices } = useOpenSearchIndices(0, 50, '', 'executions');
const { data: logIndices } = useOpenSearchIndices(0, 50, '', 'logs'); const { data: logIndices } = useOpenSearchIndices(0, 50, '', 'logs');
const unreachable = statusError || (status && !status.reachable);
const deleteIndex = useDeleteIndex(); const deleteIndex = useDeleteIndex();
const indexColumns: Column<any>[] = [ const indexColumns: Column<any>[] = [
@@ -22,7 +23,7 @@ export default function OpenSearchAdminPage() {
return ( return (
<div> <div>
<div className={styles.statStrip}> <div className={styles.statStrip}>
<StatCard label="Status" value={status?.reachable ? 'Connected' : 'Disconnected'} accent={status?.reachable ? 'success' : 'error'} /> <StatCard label="Status" value={unreachable ? 'Disconnected' : status ? 'Connected' : '\u2014'} accent={unreachable ? 'error' : status ? 'success' : undefined} />
<StatCard label="Health" value={status?.clusterHealth ?? '\u2014'} accent={status?.clusterHealth === 'green' ? 'success' : 'warning'} /> <StatCard label="Health" value={status?.clusterHealth ?? '\u2014'} accent={status?.clusterHealth === 'green' ? 'success' : 'warning'} />
<StatCard label="Version" value={status?.version ?? '\u2014'} /> <StatCard label="Version" value={status?.version ?? '\u2014'} />
<StatCard label="Nodes" value={status?.nodeCount ?? 0} /> <StatCard label="Nodes" value={status?.nodeCount ?? 0} />

View File

@@ -1,6 +1,6 @@
import { useState, useMemo, useCallback } from 'react' import { useState, useMemo, useCallback } from 'react'
import { useParams, useNavigate, useSearchParams } from 'react-router' import { useParams, useNavigate, useSearchParams } from 'react-router'
import { AlertTriangle, X, Search, Footprints } from 'lucide-react' import { AlertTriangle, X, Search, Footprints, RotateCcw } from 'lucide-react'
import { import {
DataTable, DataTable,
StatusDot, StatusDot,
@@ -79,6 +79,7 @@ function buildBaseColumns(): Column<Row>[] {
<StatusDot variant={statusToVariant(row.status)} /> <StatusDot variant={statusToVariant(row.status)} />
<MonoText size="xs">{statusLabel(row.status)}</MonoText> <MonoText size="xs">{statusLabel(row.status)}</MonoText>
{row.hasTraceData && <Footprints size={11} color="#3D7C47" style={{ marginLeft: 2, flexShrink: 0 }} />} {row.hasTraceData && <Footprints size={11} color="#3D7C47" style={{ marginLeft: 2, flexShrink: 0 }} />}
{row.isReplay && <RotateCcw size={11} color="var(--amber)" style={{ marginLeft: 2, flexShrink: 0 }} />}
</span> </span>
), ),
}, },

View File

@@ -122,7 +122,7 @@ export function ExchangeHeader({ detail, onCorrelatedSelect, onClearSelection }:
{showChain ? chain.map((ce: any, i: number) => { {showChain ? chain.map((ce: any, i: number) => {
const isCurrent = ce.executionId === detail.executionId; const isCurrent = ce.executionId === detail.executionId;
const variant = statusVariant(ce.status); const variant = statusVariant(ce.status);
const isReplay = ce.attributes?._replay != null; const isReplay = !!ce.isReplay;
const statusCls = const statusCls =
variant === 'success' ? styles.chainNodeSuccess variant === 'success' ? styles.chainNodeSuccess
: variant === 'error' ? styles.chainNodeError : variant === 'error' ? styles.chainNodeError