12 Commits

Author SHA1 Message Date
hsiegeln
ebe768711b fix: Cmd-K exchange selection reads exchangeId from URL params
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m1s
CI / docker (push) Successful in 57s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 36s
ExchangesPage ignored the exchangeId URL parameter, so selecting an
exchange from the command palette navigated to the right URL but never
displayed the execution overlay. Now derives selection from URL params
as fallback, and LayoutShell passes selectedExchange in state for
exchange/attribute results.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:26:36 +02:00
hsiegeln
af45f93854 fix: add missing isReplay parameter to test constructors
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m1s
CI / docker (push) Successful in 57s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 41s
The ExecutionDocument and ExecutionRecord records gained an isReplay
field but the integration tests were not updated, breaking CI.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:08:12 +02:00
hsiegeln
da1d74309e fix: detect replay via replayExchangeId field, not just header
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 1m4s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
The X-Cameleer-Replay header is only available when inputSnapshot is
captured (DETAILED/DEEP engine level). The agent always sets
replayExchangeId on RouteExecution, so check that first.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 14:57:59 +02:00
hsiegeln
7a4d7b6915 fix: resolve 8 SonarQube reliability bugs
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 1m2s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
- ElkDiagramRenderer: guard against null containingNode before getElkRoot()
- OpenSearchAdminController: return 503/502 instead of 200 on errors
- DatabaseAdminController: return 503 instead of 200 on connection failure
- SpaForwardController: replace unbound {path} variables with /** wildcards
- WriteBuffer: check offer() return value and log on unexpected rejection
- ApiExceptionHandler: extract getReason() to local var for null safety
- Admin UI pages: handle isError state for disconnected service display

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 14:39:54 +02:00
hsiegeln
ab7031e6ed feat: add is_replay flag to execution pipeline and UI
Detect replayed exchanges via X-Cameleer-Replay header during ingestion,
persist the flag through PostgreSQL and OpenSearch, and surface it in
the dashboard (amber replay icon) and exchange detail chain view.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 14:39:40 +02:00
hsiegeln
cf3cec0164 feat: show replay marker on correlated chain entries
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m46s
CI / docker (push) Successful in 1m52s
CI / deploy (push) Successful in 51s
CI / deploy-feature (push) Has been skipped
SonarQube / sonarqube (push) Failing after 1m16s
Exchanges with a _replay attribute now display a small amber
RotateCcw icon between the status dot and route name in the
correlation chain. Tooltip also indicates (replay).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 23:26:55 +02:00
hsiegeln
79762c3f0d fix: audit replay with actual outcome, not premature SUCCESS
All checks were successful
CI / build (push) Successful in 2m8s
CI / cleanup-branch (push) Has been skipped
CI / docker (push) Successful in 1m7s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 42s
Replay audit log now records the agent's reply status (SUCCESS/FAILURE),
message, and error details. Timeout and internal errors are also logged
as FAILURE with the cause.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 23:14:36 +02:00
hsiegeln
715cbc1894 feat: synchronous replay endpoint with agent response status
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m8s
CI / docker (push) Successful in 56s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 38s
Add dedicated POST /agents/{id}/replay endpoint that uses
addCommandWithReply to wait for the agent ACK (30s timeout).
Returns the actual replay result (status, message, data) instead
of just a delivery confirmation.

Frontend toast now reflects the agent's response: "Replay completed"
on success, agent error message on failure, timeout message if the
agent doesn't respond.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 22:48:02 +02:00
hsiegeln
dd398178f0 docs: add route-control command to HOWTO and CLAUDE.md
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m35s
CI / docker (push) Successful in 13s
CI / deploy (push) Successful in 49s
CI / deploy-feature (push) Has been skipped
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 21:44:12 +02:00
hsiegeln
8b0d473fcd feat: add route control bar and fix replay protocol compliance
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m4s
CI / docker (push) Successful in 1m0s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Has been cancelled
Add ROUTE_CONTROL command type and route-control mapping in
AgentCommandController. New RouteControlBar component in the exchange
header shows Start/Stop/Suspend/Resume actions (grouped pill bar) and
a Replay button, gated by agent capabilities and OPERATOR/ADMIN role.

Fix useReplayExchange hook to match protocol section 16: payload now
uses { routeId, exchange: { body, headers }, originalExchangeId, nonce }
instead of the flat { headers, body } format.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 21:42:06 +02:00
hsiegeln
30e9b55379 fix: detail panel respects iteration filtering
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m35s
CI / docker (push) Successful in 1m12s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 48s
- findProcessorInTree now skips non-selected iteration wrappers so
  the returned ProcessorNode has data from the correct iteration
- Gate selectedProcessor on overlay presence so processors not
  executed in the current iteration don't show in the detail panel
- Header shows "Exchange Details" or "Processor Details" contextually

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 20:25:28 +02:00
hsiegeln
3091754b0f fix: dim compound containers when no descendants executed in overlay
CompoundNode (circuit breaker, choice, etc.) now renders at 0.35
opacity when the overlay is active but neither the compound itself
nor any of its diagram descendants appear in the execution overlay.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 20:13:40 +02:00
38 changed files with 572 additions and 65 deletions

View File

@@ -36,7 +36,7 @@ java -jar cameleer3-server-app/target/cameleer3-server-app-1.0-SNAPSHOT.jar
- Spring Boot 3.4.3 parent POM
- Depends on `com.cameleer3:cameleer3-common` from Gitea Maven registry
- Jackson `JavaTimeModule` for `Instant` deserialization
- Communication: receives HTTP POST data from agents (executions, diagrams, metrics, logs), serves SSE event streams for config push/commands
- Communication: receives HTTP POST data from agents (executions, diagrams, metrics, logs), serves SSE event streams for config push/commands (config-update, deep-trace, replay, route-control)
- Maintains agent instance registry with states: LIVE → STALE → DEAD
- Storage: PostgreSQL (TimescaleDB) for structured data, OpenSearch for full-text search and application log storage
- Security: JWT auth with RBAC (AGENT/VIEWER/OPERATOR/ADMIN roles), Ed25519 config signing, bootstrap token for registration

View File

@@ -325,6 +325,12 @@ curl -s -X POST http://localhost:8081/api/v1/agents/groups/order-service-prod/co
-H "Authorization: Bearer $TOKEN" \
-d '{"type":"deep-trace","payload":{"routeId":"route-1","durationSeconds":60}}'
# Send route control command to agent group (start/stop/suspend/resume)
curl -s -X POST http://localhost:8081/api/v1/agents/groups/order-service-prod/commands \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN" \
-d '{"type":"route-control","payload":{"routeId":"route-1","action":"stop","nonce":"unique-uuid"}}'
# Broadcast command to all live agents
curl -s -X POST http://localhost:8081/api/v1/agents/commands \
-H "Content-Type: application/json" \
@@ -338,7 +344,7 @@ curl -s -X POST http://localhost:8081/api/v1/agents/agent-1/commands/{commandId}
**Agent lifecycle:** LIVE (heartbeat within 90s) → STALE (missed 3 heartbeats) → DEAD (5min after STALE). DEAD agents kept indefinitely.
**SSE events:** `config-update`, `deep-trace`, `replay` commands pushed in real time. Server sends ping keepalive every 15s.
**SSE events:** `config-update`, `deep-trace`, `replay`, `route-control` commands pushed in real time. Server sends ping keepalive every 15s.
**Command expiry:** Unacknowledged commands expire after 60 seconds.

View File

@@ -5,6 +5,8 @@ import com.cameleer3.server.app.dto.CommandAckRequest;
import com.cameleer3.server.app.dto.CommandBroadcastResponse;
import com.cameleer3.server.app.dto.CommandRequest;
import com.cameleer3.server.app.dto.CommandSingleResponse;
import com.cameleer3.server.app.dto.ReplayRequest;
import com.cameleer3.server.app.dto.ReplayResponse;
import com.cameleer3.server.core.admin.AuditCategory;
import com.cameleer3.server.core.admin.AuditResult;
import com.cameleer3.server.core.admin.AuditService;
@@ -13,6 +15,7 @@ import com.cameleer3.server.core.agent.AgentEventService;
import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.agent.AgentState;
import com.cameleer3.server.core.agent.CommandReply;
import com.cameleer3.server.core.agent.CommandType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,7 +35,14 @@ import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Command push endpoints for sending commands to agents via SSE.
@@ -184,6 +194,75 @@ public class AgentCommandController {
return ResponseEntity.ok().build();
}
@PostMapping("/{id}/replay")
@Operation(summary = "Replay an exchange on a specific agent (synchronous)",
description = "Sends a replay command and waits for the agent to complete the replay. "
+ "Returns the replay result including status, replayExchangeId, and duration.")
@ApiResponse(responseCode = "200", description = "Replay completed (check status for success/failure)")
@ApiResponse(responseCode = "404", description = "Agent not found or not connected")
@ApiResponse(responseCode = "504", description = "Agent did not respond in time")
public ResponseEntity<ReplayResponse> replayExchange(@PathVariable String id,
@RequestBody ReplayRequest request,
HttpServletRequest httpRequest) {
AgentInfo agent = registryService.findById(id);
if (agent == null) {
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not found: " + id);
}
// Build protocol-compliant replay payload
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("routeId", request.routeId());
Map<String, Object> exchange = new LinkedHashMap<>();
exchange.put("body", request.body() != null ? request.body() : "");
exchange.put("headers", request.headers() != null ? request.headers() : Map.of());
payload.put("exchange", exchange);
if (request.originalExchangeId() != null) {
payload.put("originalExchangeId", request.originalExchangeId());
}
payload.put("nonce", UUID.randomUUID().toString());
String payloadJson;
try {
payloadJson = objectMapper.writeValueAsString(payload);
} catch (JsonProcessingException e) {
log.error("Failed to serialize replay payload", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new ReplayResponse("FAILURE", "Failed to serialize request", null));
}
CompletableFuture<CommandReply> future = registryService.addCommandWithReply(
id, CommandType.REPLAY, payloadJson);
Map<String, Object> auditDetails = new LinkedHashMap<>();
auditDetails.put("routeId", request.routeId());
if (request.originalExchangeId() != null) {
auditDetails.put("originalExchangeId", request.originalExchangeId());
}
try {
CommandReply reply = future.orTimeout(30, TimeUnit.SECONDS).join();
auditDetails.put("replyStatus", reply.status());
auditDetails.put("replyMessage", reply.message() != null ? reply.message() : "");
auditService.log("replay_exchange", AuditCategory.AGENT, id, auditDetails,
"SUCCESS".equals(reply.status()) ? AuditResult.SUCCESS : AuditResult.FAILURE, httpRequest);
return ResponseEntity.ok(new ReplayResponse(reply.status(), reply.message(), reply.data()));
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
auditDetails.put("error", "timeout");
auditService.log("replay_exchange", AuditCategory.AGENT, id, auditDetails,
AuditResult.FAILURE, httpRequest);
return ResponseEntity.status(HttpStatus.GATEWAY_TIMEOUT)
.body(new ReplayResponse("FAILURE", "Agent did not respond within 30 seconds", null));
}
auditDetails.put("error", e.getCause().getMessage());
auditService.log("replay_exchange", AuditCategory.AGENT, id, auditDetails,
AuditResult.FAILURE, httpRequest);
log.error("Error awaiting replay reply from agent {}", id, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new ReplayResponse("FAILURE", "Internal error: " + e.getCause().getMessage(), null));
}
}
private CommandType mapCommandType(String typeStr) {
return switch (typeStr) {
case "config-update" -> CommandType.CONFIG_UPDATE;
@@ -191,8 +270,9 @@ public class AgentCommandController {
case "replay" -> CommandType.REPLAY;
case "set-traced-processors" -> CommandType.SET_TRACED_PROCESSORS;
case "test-expression" -> CommandType.TEST_EXPRESSION;
case "route-control" -> CommandType.ROUTE_CONTROL;
default -> throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
"Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay, set-traced-processors, test-expression");
"Invalid command type: " + typeStr + ". Valid: config-update, deep-trace, replay, set-traced-processors, test-expression, route-control");
};
}
}

View File

@@ -14,7 +14,8 @@ public class ApiExceptionHandler {
@ExceptionHandler(ResponseStatusException.class)
public ResponseEntity<ErrorResponse> handleResponseStatus(ResponseStatusException ex) {
String reason = ex.getReason();
return ResponseEntity.status(ex.getStatusCode())
.body(new ErrorResponse(ex.getReason() != null ? ex.getReason() : "Unknown error"));
.body(new ErrorResponse(reason != null ? reason : "Unknown error"));
}
}

View File

@@ -59,7 +59,8 @@ public class DatabaseAdminController {
String host = extractHost(dataSource);
return ResponseEntity.ok(new DatabaseStatusResponse(true, version, host, schema, timescaleDb));
} catch (Exception e) {
return ResponseEntity.ok(new DatabaseStatusResponse(false, null, null, null, false));
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(new DatabaseStatusResponse(false, null, null, null, false));
}
}

View File

@@ -80,7 +80,8 @@ public class OpenSearchAdminController {
health.numberOfNodes(),
opensearchUrl));
} catch (Exception e) {
return ResponseEntity.ok(new OpenSearchStatusResponse(
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(new OpenSearchStatusResponse(
false, "UNREACHABLE", null, 0, opensearchUrl));
}
}
@@ -149,7 +150,8 @@ public class OpenSearchAdminController {
pageItems, totalIndices, totalDocs,
humanSize(totalBytes), page, size, totalPages));
} catch (Exception e) {
return ResponseEntity.ok(new IndicesPageResponse(
return ResponseEntity.status(HttpStatus.BAD_GATEWAY)
.body(new IndicesPageResponse(
List.of(), 0, 0, "0 B", page, size, 0));
}
}
@@ -234,7 +236,8 @@ public class OpenSearchAdminController {
searchLatency, indexingLatency,
heapUsed, heapMax));
} catch (Exception e) {
return ResponseEntity.ok(new PerformanceResponse(0, 0, 0, 0, 0, 0));
return ResponseEntity.status(HttpStatus.BAD_GATEWAY)
.body(new PerformanceResponse(0, 0, 0, 0, 0, 0));
}
}

View File

@@ -415,12 +415,13 @@ public class ElkDiagramRenderer implements DiagramRenderer {
for (ElkEdge elkEdge : allEdges) {
String sourceId = elkEdge.getSources().isEmpty() ? "" : elkEdge.getSources().get(0).getIdentifier();
String targetId = elkEdge.getTargets().isEmpty() ? "" : elkEdge.getTargets().get(0).getIdentifier();
ElkNode edgeRoot = getElkRoot(elkEdge.getContainingNode());
ElkNode containingNode = elkEdge.getContainingNode();
ElkNode edgeRoot = containingNode != null ? getElkRoot(containingNode) : null;
List<double[]> points = new ArrayList<>();
for (ElkEdgeSection section : elkEdge.getSections()) {
double cx = getAbsoluteX(elkEdge.getContainingNode(), edgeRoot);
double cy = getAbsoluteY(elkEdge.getContainingNode(), edgeRoot);
double cx = containingNode != null ? getAbsoluteX(containingNode, edgeRoot) : 0;
double cy = containingNode != null ? getAbsoluteY(containingNode, edgeRoot) : 0;
points.add(new double[]{section.getStartX() + cx, section.getStartY() + cy});
for (ElkBendPoint bp : section.getBendPoints()) {
points.add(new double[]{bp.getX() + cx, bp.getY() + cy});

View File

@@ -0,0 +1,18 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import java.util.Map;
@Schema(description = "Request to replay an exchange on an agent")
public record ReplayRequest(
@NotNull @Schema(description = "Camel route ID to replay on")
String routeId,
@Schema(description = "Message body for the replayed exchange")
String body,
@Schema(description = "Message headers for the replayed exchange")
Map<String, String> headers,
@Schema(description = "Exchange ID of the original execution being replayed (for audit trail)")
String originalExchangeId
) {}

View File

@@ -0,0 +1,13 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(description = "Result of a replay command")
public record ReplayResponse(
@Schema(description = "Replay outcome: SUCCESS or FAILURE")
String status,
@Schema(description = "Human-readable result message")
String message,
@Schema(description = "Structured result data from the agent (JSON)")
String data
) {}

View File

@@ -362,6 +362,7 @@ public class OpenSearchIndex implements SearchIndex {
}).toList());
}
map.put("has_trace_data", doc.hasTraceData());
map.put("is_replay", doc.isReplay());
return map;
}
@@ -399,7 +400,8 @@ public class OpenSearchIndex implements SearchIndex {
null, // diagramContentHash not stored in index
extractHighlight(hit),
attributes,
Boolean.TRUE.equals(src.get("has_trace_data"))
Boolean.TRUE.equals(src.get("has_trace_data")),
Boolean.TRUE.equals(src.get("is_replay"))
);
}

View File

@@ -72,6 +72,7 @@ public class SecurityConfig {
.requestMatchers(HttpMethod.POST, "/api/v1/agents/*/commands").hasAnyRole("OPERATOR", "ADMIN")
.requestMatchers(HttpMethod.POST, "/api/v1/agents/groups/*/commands").hasAnyRole("OPERATOR", "ADMIN")
.requestMatchers(HttpMethod.POST, "/api/v1/agents/commands").hasAnyRole("OPERATOR", "ADMIN")
.requestMatchers(HttpMethod.POST, "/api/v1/agents/*/replay").hasAnyRole("OPERATOR", "ADMIN")
// Search endpoints
.requestMatchers(HttpMethod.GET, "/api/v1/search/**").hasAnyRole("VIEWER", "OPERATOR", "ADMIN", "AGENT")

View File

@@ -31,10 +31,10 @@ public class PostgresExecutionStore implements ExecutionStore {
attributes,
error_type, error_category, root_cause_type, root_cause_message,
trace_id, span_id,
processors_json, has_trace_data,
processors_json, has_trace_data, is_replay,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb,
?, ?, ?, ?, ?, ?, ?::jsonb, ?, now(), now())
?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?, now(), now())
ON CONFLICT (execution_id, start_time) DO UPDATE SET
status = CASE
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
@@ -62,6 +62,7 @@ public class PostgresExecutionStore implements ExecutionStore {
span_id = COALESCE(EXCLUDED.span_id, executions.span_id),
processors_json = COALESCE(EXCLUDED.processors_json, executions.processors_json),
has_trace_data = EXCLUDED.has_trace_data OR executions.has_trace_data,
is_replay = EXCLUDED.is_replay OR executions.is_replay,
updated_at = now()
""",
execution.executionId(), execution.routeId(), execution.agentId(),
@@ -78,7 +79,7 @@ public class PostgresExecutionStore implements ExecutionStore {
execution.errorType(), execution.errorCategory(),
execution.rootCauseType(), execution.rootCauseMessage(),
execution.traceId(), execution.spanId(),
execution.processorsJson(), execution.hasTraceData());
execution.processorsJson(), execution.hasTraceData(), execution.isReplay());
}
@Override
@@ -180,7 +181,8 @@ public class PostgresExecutionStore implements ExecutionStore {
rs.getString("root_cause_type"), rs.getString("root_cause_message"),
rs.getString("trace_id"), rs.getString("span_id"),
rs.getString("processors_json"),
rs.getBoolean("has_trace_data"));
rs.getBoolean("has_trace_data"),
rs.getBoolean("is_replay"));
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
new ProcessorRecord(

View File

@@ -16,9 +16,9 @@ public class SpaForwardController {
@GetMapping(value = {
"/login",
"/executions",
"/executions/{path:[^\\.]*}",
"/executions/**",
"/oidc/callback",
"/admin/{path:[^\\.]*}"
"/admin/**"
})
public String forward() {
return "forward:/index.html";

View File

@@ -0,0 +1,7 @@
-- Flag indicating whether this execution is a replayed exchange
ALTER TABLE executions ADD COLUMN IF NOT EXISTS is_replay BOOLEAN NOT NULL DEFAULT FALSE;
-- Backfill: check inputHeaders JSON for X-Cameleer-Replay header
UPDATE executions SET is_replay = TRUE
WHERE input_headers IS NOT NULL
AND input_headers::jsonb ? 'X-Cameleer-Replay';

View File

@@ -36,7 +36,7 @@ class OpenSearchIndexIT extends AbstractPostgresIT {
"OrderNotFoundException: order-12345 not found", null,
List.of(new ProcessorDoc("proc-1", "log", "COMPLETED",
null, null, "request body with customer-99", null, null, null, null)),
null, false);
null, false, false);
searchIndex.index(doc);
refreshOpenSearchIndices();
@@ -62,7 +62,7 @@ class OpenSearchIndexIT extends AbstractPostgresIT {
now, now.plusMillis(50), 50L, null, null,
List.of(new ProcessorDoc("proc-1", "bean", "COMPLETED",
null, null, "UniquePayloadIdentifier12345", null, null, null, null)),
null, false);
null, false, false);
searchIndex.index(doc);
refreshOpenSearchIndices();

View File

@@ -27,7 +27,7 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
now, now.plusMillis(100), 100L,
null, null, null,
"REGULAR", null, null, null, null, null,
null, null, null, null, null, null, null, false);
null, null, null, null, null, null, null, false, false);
executionStore.upsert(record);
Optional<ExecutionRecord> found = executionStore.findById("exec-1");
@@ -45,12 +45,12 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
"exec-dup", "route-a", "agent-1", "app-1",
"RUNNING", null, null, now, null, null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null, null, null, false);
null, null, null, null, null, null, null, false, false);
ExecutionRecord second = new ExecutionRecord(
"exec-dup", "route-a", "agent-1", "app-1",
"COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null,
"COMPLETE", null, null, null, null, null,
null, null, null, null, null, null, null, false);
null, null, null, null, null, null, null, false, false);
executionStore.upsert(first);
executionStore.upsert(second);
@@ -68,7 +68,7 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
"exec-proc", "route-a", "agent-1", "app-1",
"COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null,
"COMPLETE", null, null, null, null, null,
null, null, null, null, null, null, null, false);
null, null, null, null, null, null, null, false, false);
executionStore.upsert(exec);
List<ProcessorRecord> processors = List.of(

View File

@@ -61,6 +61,6 @@ class PostgresStatsStoreIT extends AbstractPostgresIT {
startTime, startTime.plusMillis(durationMs), durationMs,
status.equals("FAILED") ? "error" : null, null, null,
null, null, null, null, null, null,
null, null, null, null, null, null, null, false));
null, null, null, null, null, null, null, false, false));
}
}

View File

@@ -8,5 +8,6 @@ public enum CommandType {
DEEP_TRACE,
REPLAY,
SET_TRACED_PROCESSORS,
TEST_EXPRESSION
TEST_EXPRESSION,
ROUTE_CONTROL
}

View File

@@ -79,7 +79,7 @@ public class SearchIndexer implements SearchIndexerStats {
exec.status(), exec.correlationId(), exec.exchangeId(),
exec.startTime(), exec.endTime(), exec.durationMs(),
exec.errorMessage(), exec.errorStacktrace(), processorDocs,
exec.attributes(), exec.hasTraceData()));
exec.attributes(), exec.hasTraceData(), exec.isReplay()));
indexedCount.incrementAndGet();
lastIndexedAt = Instant.now();

View File

@@ -102,6 +102,12 @@ public class IngestionService {
boolean hasTraceData = hasAnyTraceData(exec.getProcessors());
boolean isReplay = exec.getReplayExchangeId() != null;
if (!isReplay && inputSnapshot != null && inputSnapshot.getHeaders() != null) {
isReplay = "true".equalsIgnoreCase(
String.valueOf(inputSnapshot.getHeaders().get("X-Cameleer-Replay")));
}
return new ExecutionRecord(
exec.getExchangeId(), exec.getRouteId(), agentId, applicationName,
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
@@ -117,7 +123,8 @@ public class IngestionService {
exec.getRootCauseType(), exec.getRootCauseMessage(),
exec.getTraceId(), exec.getSpanId(),
toJsonObject(exec.getProcessors()),
hasTraceData
hasTraceData,
isReplay
);
}

View File

@@ -1,5 +1,8 @@
package com.cameleer3.server.core.ingestion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
@@ -16,6 +19,8 @@ import java.util.concurrent.BlockingQueue;
*/
public class WriteBuffer<T> {
private static final Logger log = LoggerFactory.getLogger(WriteBuffer.class);
private final BlockingQueue<T> queue;
private final int capacity;
@@ -45,7 +50,10 @@ public class WriteBuffer<T> {
return false;
}
for (T item : items) {
queue.offer(item);
if (!queue.offer(item)) {
log.warn("WriteBuffer offer rejected despite capacity check — possible concurrent modification");
return false;
}
}
return true;
}

View File

@@ -34,6 +34,7 @@ public record ExecutionSummary(
String diagramContentHash,
String highlight,
Map<String, String> attributes,
boolean hasTraceData
boolean hasTraceData,
boolean isReplay
) {
}

View File

@@ -30,7 +30,8 @@ public interface ExecutionStore {
String rootCauseType, String rootCauseMessage,
String traceId, String spanId,
String processorsJson,
boolean hasTraceData
boolean hasTraceData,
boolean isReplay
) {}
record ProcessorRecord(

View File

@@ -10,7 +10,8 @@ public record ExecutionDocument(
String errorMessage, String errorStacktrace,
List<ProcessorDoc> processors,
String attributes,
boolean hasTraceData
boolean hasTraceData,
boolean isReplay
) {
public record ProcessorDoc(
String processorId, String processorType, String status,

View File

@@ -154,25 +154,59 @@ export function useTestExpression() {
})
}
// ── Route Control ────────────────────────────────────────────────────────
export function useSendRouteCommand() {
return useMutation({
mutationFn: async ({ application, action, routeId }: {
application: string
action: 'start' | 'stop' | 'suspend' | 'resume'
routeId: string
}) => {
const { data, error } = await api.POST('/agents/groups/{group}/commands', {
params: { path: { group: application } },
body: { type: 'route-control', payload: { routeId, action, nonce: crypto.randomUUID() } } as any,
})
if (error) throw new Error('Failed to send route command')
return data!
},
})
}
// ── Replay Exchange ───────────────────────────────────────────────────────
export interface ReplayResult {
status: string
message: string
data?: string
}
export function useReplayExchange() {
return useMutation({
mutationFn: async ({
agentId,
routeId,
headers,
body,
originalExchangeId,
}: {
agentId: string
headers: Record<string, string>
routeId: string
headers?: Record<string, string>
body: string
}) => {
const { data, error } = await api.POST('/agents/{id}/commands', {
params: { path: { id: agentId } },
body: { type: 'replay', payload: { headers, body } } as any,
originalExchangeId?: string
}): Promise<ReplayResult> => {
const res = await authFetch(`/api/v1/agents/${encodeURIComponent(agentId)}/replay`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ routeId, body, headers: headers ?? {}, originalExchangeId }),
})
if (error) throw new Error('Failed to send replay command')
return data!
if (!res.ok) {
if (res.status === 404) throw new Error('Agent not found')
if (res.status === 504) throw new Error('Replay timed out — agent did not respond')
throw new Error('Failed to send replay command')
}
return res.json() as Promise<ReplayResult>
},
})
}

View File

@@ -1504,6 +1504,7 @@ export interface components {
[key: string]: string;
};
hasTraceData: boolean;
isReplay: boolean;
};
SearchResultExecutionSummary: {
data: components["schemas"]["ExecutionSummary"][];

View File

@@ -90,7 +90,7 @@ export function DetailPanel({
<div className={styles.detailPanel}>
{/* Header bar */}
<div className={styles.processorHeader}>
<span className={styles.processorName}>Details</span>
<span className={styles.processorName}>{selectedProcessor ? 'Processor Details' : 'Exchange Details'}</span>
</div>
{/* Tab bar */}

View File

@@ -20,15 +20,46 @@ interface ExecutionDiagramProps {
className?: string;
}
const ITERATION_WRAPPER_TYPES = new Set([
'loopIteration', 'splitIteration', 'multicastBranch',
]);
function wrapperIndex(proc: ProcessorNode): number | undefined {
return proc.loopIndex ?? proc.splitIndex ?? proc.multicastIndex ?? undefined;
}
/**
* Find a processor in the tree, respecting iteration filtering.
* Only recurses into the selected iteration wrapper so the returned
* ProcessorNode has data from the correct iteration.
*/
function findProcessorInTree(
nodes: ProcessorNode[] | undefined,
processorId: string | null,
iterationState?: Map<string, import('./types').IterationInfo>,
parentId?: string,
): ProcessorNode | null {
if (!nodes || !processorId) return null;
for (const n of nodes) {
if (!n.processorId) continue;
// Iteration wrapper: only recurse into the selected iteration
if (ITERATION_WRAPPER_TYPES.has(n.processorType)) {
if (parentId && iterationState?.has(parentId)) {
const info = iterationState.get(parentId)!;
const idx = wrapperIndex(n);
if (idx != null && idx !== info.current) continue;
}
if (n.children) {
const found = findProcessorInTree(n.children, processorId, iterationState, n.processorId);
if (found) return found;
}
continue;
}
if (n.processorId === processorId) return n;
if (n.children) {
const found = findProcessorInTree(n.children, processorId);
const found = findProcessorInTree(n.children, processorId, iterationState, n.processorId);
if (found) return found;
}
}
@@ -204,7 +235,11 @@ export function ExecutionDiagram({
{/* Detail panel */}
<div className={styles.detailArea} style={{ height: `${100 - splitPercent}%` }}>
<DetailPanel
selectedProcessor={findProcessorInTree(detail.processors, selectedProcessorId || null)}
selectedProcessor={
selectedProcessorId && overlay.has(selectedProcessorId)
? findProcessorInTree(detail.processors, selectedProcessorId, iterationState)
: null
}
executionDetail={detail}
executionId={executionId}
onSelectProcessor={setSelectedProcessorId}

View File

@@ -210,7 +210,21 @@ function LayoutContent() {
const handlePaletteSelect = useCallback((result: any) => {
if (result.path) {
navigate(result.path, { state: result.path ? { sidebarReveal: result.path } : undefined });
const state: Record<string, unknown> = { sidebarReveal: result.path };
// For exchange/attribute results, pass selectedExchange in state
if (result.category === 'exchange' || result.category === 'attribute') {
const parts = result.path.split('/').filter(Boolean);
if (parts.length === 4 && parts[0] === 'exchanges') {
state.selectedExchange = {
executionId: parts[3],
applicationName: parts[1],
routeId: parts[2],
};
}
}
navigate(result.path, { state });
}
setPaletteOpen(false);
}, [navigate, setPaletteOpen]);

View File

@@ -71,6 +71,10 @@ export function CompoundNode({
const isGated = ownState?.filterMatched === false || ownState?.duplicateMessage === true;
const effectiveColor = isGated ? 'var(--amber)' : color;
// Dim compound when overlay is active but neither the compound nor any
// descendant was executed in the current iteration.
const isSkipped = overlayActive && !ownState && !hasExecutedDescendant(node, executionOverlay);
// _TRY_BODY / _CB_MAIN: transparent wrapper — no header, no border, just layout
if (node.type === '_TRY_BODY' || node.type === '_CB_MAIN') {
return (
@@ -85,7 +89,7 @@ export function CompoundNode({
if (node.type === '_CB_FALLBACK') {
const fallbackColor = '#7C3AED'; // EIP purple
return (
<g data-node-id={node.id} transform={`translate(${x}, ${y})`}>
<g data-node-id={node.id} transform={`translate(${x}, ${y})`} opacity={isSkipped ? 0.35 : undefined}>
<rect x={0} y={0} width={w} height={h} rx={CORNER_RADIUS}
fill={fallbackColor} fillOpacity={0.06} />
<rect x={0} y={0} width={w} height={h} rx={CORNER_RADIUS}
@@ -106,7 +110,7 @@ export function CompoundNode({
: (node.label ? `finally: ${node.label}` : 'finally');
return (
<g data-node-id={node.id} transform={`translate(${x}, ${y})`}>
<g data-node-id={node.id} transform={`translate(${x}, ${y})`} opacity={isSkipped ? 0.35 : undefined}>
{/* Tinted background */}
<rect x={0} y={0} width={w} height={h} rx={CORNER_RADIUS}
fill={color} fillOpacity={0.06} />
@@ -126,7 +130,7 @@ export function CompoundNode({
// Default compound rendering (DO_TRY, EIP_CHOICE, EIP_FILTER, EIP_IDEMPOTENT_CONSUMER, etc.)
const containerFill = isGated ? 'var(--amber-bg)' : 'white';
return (
<g data-node-id={node.id} transform={`translate(${x}, ${y})`}>
<g data-node-id={node.id} transform={`translate(${x}, ${y})`} opacity={isSkipped ? 0.35 : undefined}>
{/* Container body */}
<rect
x={0}
@@ -268,3 +272,15 @@ function collectIds(nodes: DiagramNodeType[], set: Set<string>) {
if (n.children) collectIds(n.children, set);
}
}
function hasExecutedDescendant(
node: DiagramNodeType,
overlay?: Map<string, NodeExecutionState>,
): boolean {
if (!overlay || !node.children) return false;
for (const child of node.children) {
if (child.id && overlay.has(child.id)) return true;
if (child.children && hasExecutedDescendant(child, overlay)) return true;
}
return false;
}

View File

@@ -3,7 +3,8 @@ import type { Column } from '@cameleer/design-system';
import { useDatabaseStatus, useConnectionPool, useDatabaseTables, useActiveQueries, useKillQuery } from '../../api/queries/admin/database';
export default function DatabaseAdminPage() {
const { data: status } = useDatabaseStatus();
const { data: status, isError: statusError } = useDatabaseStatus();
const unreachable = statusError || (status && !status.connected);
const { data: pool } = useConnectionPool();
const { data: tables } = useDatabaseTables();
const { data: queries } = useActiveQueries();
@@ -34,7 +35,7 @@ export default function DatabaseAdminPage() {
<h2 style={{ marginBottom: '1rem' }}>Database Administration</h2>
<div style={{ display: 'flex', gap: '1rem', marginBottom: '1.5rem', flexWrap: 'wrap' }}>
<StatCard label="Status" value={status?.connected ? 'Connected' : 'Disconnected'} accent={status?.connected ? 'success' : 'error'} />
<StatCard label="Status" value={unreachable ? 'Disconnected' : status ? 'Connected' : '\u2014'} accent={unreachable ? 'error' : status ? 'success' : undefined} />
<StatCard label="Version" value={status?.version ?? '—'} />
<StatCard label="TimescaleDB" value={status?.timescaleDb ? 'Enabled' : 'Disabled'} />
</div>

View File

@@ -4,11 +4,12 @@ import { useOpenSearchStatus, usePipelineStats, useOpenSearchIndices, useOpenSea
import styles from './OpenSearchAdminPage.module.css';
export default function OpenSearchAdminPage() {
const { data: status } = useOpenSearchStatus();
const { data: status, isError: statusError } = useOpenSearchStatus();
const { data: pipeline } = usePipelineStats();
const { data: perf } = useOpenSearchPerformance();
const { data: execIndices } = useOpenSearchIndices(0, 50, '', 'executions');
const { data: logIndices } = useOpenSearchIndices(0, 50, '', 'logs');
const unreachable = statusError || (status && !status.reachable);
const deleteIndex = useDeleteIndex();
const indexColumns: Column<any>[] = [
@@ -22,7 +23,7 @@ export default function OpenSearchAdminPage() {
return (
<div>
<div className={styles.statStrip}>
<StatCard label="Status" value={status?.reachable ? 'Connected' : 'Disconnected'} accent={status?.reachable ? 'success' : 'error'} />
<StatCard label="Status" value={unreachable ? 'Disconnected' : status ? 'Connected' : '\u2014'} accent={unreachable ? 'error' : status ? 'success' : undefined} />
<StatCard label="Health" value={status?.clusterHealth ?? '\u2014'} accent={status?.clusterHealth === 'green' ? 'success' : 'warning'} />
<StatCard label="Version" value={status?.version ?? '\u2014'} />
<StatCard label="Nodes" value={status?.nodeCount ?? 0} />

View File

@@ -1,6 +1,6 @@
import { useState, useMemo, useCallback } from 'react'
import { useParams, useNavigate, useSearchParams } from 'react-router'
import { AlertTriangle, X, Search, Footprints } from 'lucide-react'
import { AlertTriangle, X, Search, Footprints, RotateCcw } from 'lucide-react'
import {
DataTable,
StatusDot,
@@ -79,6 +79,7 @@ function buildBaseColumns(): Column<Row>[] {
<StatusDot variant={statusToVariant(row.status)} />
<MonoText size="xs">{statusLabel(row.status)}</MonoText>
{row.hasTraceData && <Footprints size={11} color="#3D7C47" style={{ marginLeft: 2, flexShrink: 0 }} />}
{row.isReplay && <RotateCcw size={11} color="var(--amber)" style={{ marginLeft: 2, flexShrink: 0 }} />}
</span>
),
},

View File

@@ -185,6 +185,11 @@
font-weight: 500;
}
.replayIcon {
color: var(--amber);
flex-shrink: 0;
}
.chainDuration {
color: var(--text-muted);
font-size: 9px;

View File

@@ -1,11 +1,13 @@
import { useMemo } from 'react';
import { useNavigate } from 'react-router';
import { GitBranch, Server } from 'lucide-react';
import { GitBranch, Server, RotateCcw } from 'lucide-react';
import { StatusDot, MonoText, Badge } from '@cameleer/design-system';
import { useCorrelationChain } from '../../api/queries/correlation';
import { useAgents } from '../../api/queries/agents';
import { useAuthStore } from '../../auth/auth-store';
import type { ExecutionDetail } from '../../components/ExecutionDiagram/types';
import { attributeBadgeColor } from '../../utils/attribute-color';
import { RouteControlBar } from './RouteControlBar';
import styles from './ExchangeHeader.module.css';
interface ExchangeHeaderProps {
@@ -47,14 +49,22 @@ export function ExchangeHeader({ detail, onCorrelatedSelect, onClearSelection }:
const showChain = chain && chain.length > 1;
const attrs = Object.entries(detail.attributes ?? {});
// Look up agent state for icon coloring
// Look up agent state for icon coloring + route control capability
const { data: agents } = useAgents(undefined, detail.applicationName);
const agentState = useMemo(() => {
if (!agents || !detail.agentId) return undefined;
const agent = (agents as any[]).find((a: any) => a.id === detail.agentId);
return agent?.state?.toLowerCase() as 'live' | 'stale' | 'dead' | undefined;
const { agentState, hasRouteControl, hasReplay } = useMemo(() => {
if (!agents) return { agentState: undefined, hasRouteControl: false, hasReplay: false };
const agentList = agents as any[];
const agent = detail.agentId ? agentList.find((a: any) => a.id === detail.agentId) : undefined;
return {
agentState: agent?.state?.toLowerCase() as 'live' | 'stale' | 'dead' | undefined,
hasRouteControl: agentList.some((a: any) => a.capabilities?.routeControl === true),
hasReplay: agentList.some((a: any) => a.capabilities?.replay === true),
};
}, [agents, detail.agentId]);
const roles = useAuthStore((s) => s.roles);
const canControl = roles.some(r => r === 'OPERATOR' || r === 'ADMIN');
return (
<div className={styles.header}>
{/* Exchange info — always shown */}
@@ -92,12 +102,27 @@ export function ExchangeHeader({ detail, onCorrelatedSelect, onClearSelection }:
<span className={styles.duration}>{formatDuration(detail.durationMs)}</span>
</div>
{/* Route control / replay — only if agent supports it AND user has operator+ role */}
{canControl && (hasRouteControl || hasReplay) && (
<RouteControlBar
application={detail.applicationName}
routeId={detail.routeId}
hasRouteControl={hasRouteControl}
hasReplay={hasReplay}
agentId={detail.agentId}
exchangeId={detail.exchangeId}
inputHeaders={detail.inputHeaders}
inputBody={detail.inputBody}
/>
)}
{/* Correlation chain */}
<div className={styles.chain}>
<span className={styles.chainLabel}>Correlated</span>
{showChain ? chain.map((ce: any, i: number) => {
const isCurrent = ce.executionId === detail.executionId;
const variant = statusVariant(ce.status);
const isReplay = !!ce.isReplay;
const statusCls =
variant === 'success' ? styles.chainNodeSuccess
: variant === 'error' ? styles.chainNodeError
@@ -113,9 +138,10 @@ export function ExchangeHeader({ detail, onCorrelatedSelect, onClearSelection }:
onCorrelatedSelect(ce.executionId, ce.applicationName ?? detail.applicationName, ce.routeId);
}
}}
title={`${ce.executionId}\n${ce.routeId} \u2014 ${formatDuration(ce.durationMs)}`}
title={`${ce.executionId}\n${ce.routeId} \u2014 ${formatDuration(ce.durationMs)}${isReplay ? '\n(replay)' : ''}`}
>
<StatusDot variant={variant} />
{isReplay && <RotateCcw size={9} className={styles.replayIcon} />}
<span className={styles.chainRoute}>{ce.routeId}</span>
<span className={styles.chainDuration}>{formatDuration(ce.durationMs)}</span>
</button>

View File

@@ -20,17 +20,35 @@ import type { SelectedExchange } from '../Dashboard/Dashboard';
export default function ExchangesPage() {
const navigate = useNavigate();
const location = useLocation();
const { appId: scopedAppId, routeId: scopedRouteId } = useParams<{ appId?: string; routeId?: string }>();
const { appId: scopedAppId, routeId: scopedRouteId, exchangeId: scopedExchangeId } =
useParams<{ appId?: string; routeId?: string; exchangeId?: string }>();
// Restore selection from browser history state (enables Back/Forward)
const stateSelected = (location.state as any)?.selectedExchange as SelectedExchange | undefined;
const [selected, setSelectedInternal] = useState<SelectedExchange | null>(stateSelected ?? null);
// Sync from history state when the user navigates Back/Forward
// Derive selection from URL params when no state-based selection exists (Cmd-K, bookmarks)
const urlDerivedExchange: SelectedExchange | null =
(scopedExchangeId && scopedAppId && scopedRouteId)
? { executionId: scopedExchangeId, applicationName: scopedAppId, routeId: scopedRouteId }
: null;
const [selected, setSelectedInternal] = useState<SelectedExchange | null>(stateSelected ?? urlDerivedExchange);
// Sync selection from history state or URL params on navigation changes
useEffect(() => {
const restored = (location.state as any)?.selectedExchange as SelectedExchange | undefined;
setSelectedInternal(restored ?? null);
}, [location.state]);
if (restored) {
setSelectedInternal(restored);
} else if (scopedExchangeId && scopedAppId && scopedRouteId) {
setSelectedInternal({
executionId: scopedExchangeId,
applicationName: scopedAppId,
routeId: scopedRouteId,
});
} else {
setSelectedInternal(null);
}
}, [location.state, scopedExchangeId, scopedAppId, scopedRouteId]);
const [splitPercent, setSplitPercent] = useState(50);
const containerRef = useRef<HTMLDivElement>(null);
@@ -52,10 +70,15 @@ export default function ExchangesPage() {
});
}, [navigate, location.pathname, location.search, location.state]);
// Clear selection: push a history entry without selection (so Back returns to selected state)
// Clear selection: navigate up to route level when URL has exchangeId
const handleClearSelection = useCallback(() => {
setSelectedInternal(null);
}, []);
if (scopedExchangeId && scopedAppId && scopedRouteId) {
navigate(`/exchanges/${scopedAppId}/${scopedRouteId}`, {
state: { ...location.state, selectedExchange: undefined },
});
}
}, [scopedExchangeId, scopedAppId, scopedRouteId, navigate, location.state]);
const handleSplitterDown = useCallback((e: React.PointerEvent) => {
e.currentTarget.setPointerCapture(e.pointerId);

View File

@@ -0,0 +1,81 @@
.bar {
display: flex;
align-items: center;
gap: 0.5rem;
padding: 0.375rem 0.75rem;
border-bottom: 1px solid var(--border-subtle);
}
.label {
font-size: 9px;
font-weight: 700;
text-transform: uppercase;
letter-spacing: 0.8px;
color: var(--text-muted);
margin-right: 0.25rem;
flex-shrink: 0;
}
.group {
display: inline-flex;
border: 1px solid var(--border-subtle);
border-radius: var(--radius-sm);
background: var(--bg-surface);
overflow: hidden;
}
.group.sending {
opacity: 0.5;
pointer-events: none;
}
.segment {
display: inline-flex;
align-items: center;
gap: 4px;
padding: 3px 10px;
border: none;
background: none;
font: inherit;
font-size: 11px;
font-weight: 500;
color: var(--text-secondary);
cursor: pointer;
white-space: nowrap;
transition: background 0.12s, color 0.12s;
}
.segment:hover:not(:disabled) {
background: var(--bg-hover);
color: var(--text-primary);
}
.segment:disabled {
cursor: not-allowed;
}
.divider {
width: 1px;
height: 14px;
background: var(--border);
flex-shrink: 0;
}
/* Icon semantic colors */
.success svg { color: var(--success); }
.danger svg { color: var(--error); }
.warning svg { color: var(--amber); }
/* Preserve icon color on hover */
.success:hover:not(:disabled) svg { color: var(--success); }
.danger:hover:not(:disabled) svg { color: var(--error); }
.warning:hover:not(:disabled) svg { color: var(--amber); }
@keyframes spin {
to { transform: rotate(360deg); }
}
.spinner {
animation: spin 0.8s linear infinite;
color: var(--text-muted);
}

View File

@@ -0,0 +1,115 @@
import { useState } from 'react';
import { Play, Square, Pause, PlayCircle, RotateCcw, Loader2 } from 'lucide-react';
import { useToast } from '@cameleer/design-system';
import { useSendRouteCommand, useReplayExchange } from '../../api/queries/commands';
import styles from './RouteControlBar.module.css';
interface RouteControlBarProps {
application: string;
routeId: string;
hasRouteControl: boolean;
hasReplay: boolean;
agentId?: string;
exchangeId?: string;
inputHeaders?: string;
inputBody?: string;
}
type RouteAction = 'start' | 'stop' | 'suspend' | 'resume';
const ROUTE_ACTIONS: { action: RouteAction; label: string; icon: typeof Play; colorClass: string }[] = [
{ action: 'start', label: 'Start', icon: Play, colorClass: styles.success },
{ action: 'stop', label: 'Stop', icon: Square, colorClass: styles.danger },
{ action: 'suspend', label: 'Suspend', icon: Pause, colorClass: styles.warning },
{ action: 'resume', label: 'Resume', icon: PlayCircle, colorClass: styles.success },
];
export function RouteControlBar({ application, routeId, hasRouteControl, hasReplay, agentId, exchangeId, inputHeaders, inputBody }: RouteControlBarProps) {
const { toast } = useToast();
const sendRouteCommand = useSendRouteCommand();
const replayExchange = useReplayExchange();
const [sendingAction, setSendingAction] = useState<string | null>(null);
const busy = sendingAction !== null;
function handleRouteAction(action: RouteAction) {
setSendingAction(action);
sendRouteCommand.mutate(
{ application, action, routeId },
{
onSuccess: () => {
toast({ title: `Route ${action} sent`, description: `${routeId} on ${application}`, variant: 'success' });
setSendingAction(null);
},
onError: (err) => {
toast({ title: `Route ${action} failed`, description: err.message, variant: 'error' });
setSendingAction(null);
},
},
);
}
function handleReplay() {
if (!agentId) return;
let headers: Record<string, string> = {};
try { headers = inputHeaders ? JSON.parse(inputHeaders) : {}; } catch { /* empty */ }
setSendingAction('replay');
replayExchange.mutate(
{ agentId, routeId, headers, body: inputBody ?? '', originalExchangeId: exchangeId },
{
onSuccess: (result) => {
if (result.status === 'SUCCESS') {
toast({ title: 'Replay completed', description: result.message ?? `${routeId} on ${agentId}`, variant: 'success' });
} else {
toast({ title: 'Replay failed', description: result.message ?? 'Agent reported failure', variant: 'error' });
}
setSendingAction(null);
},
onError: (err) => {
toast({ title: 'Replay failed', description: err.message, variant: 'error' });
setSendingAction(null);
},
},
);
}
return (
<div className={styles.bar}>
<span className={styles.label}>Route</span>
{hasRouteControl && (
<div className={`${styles.group} ${busy ? styles.sending : ''}`}>
{ROUTE_ACTIONS.map(({ action, label, icon: Icon, colorClass }) => (
<button
key={action}
className={`${styles.segment} ${colorClass}`}
disabled={busy}
onClick={() => handleRouteAction(action)}
title={`${label} route ${routeId}`}
>
{sendingAction === action
? <Loader2 size={12} className={styles.spinner} />
: <Icon size={12} />}
{label}
</button>
))}
</div>
)}
{hasRouteControl && hasReplay && <span className={styles.divider} />}
{hasReplay && (
<div className={`${styles.group} ${busy ? styles.sending : ''}`}>
<button
className={`${styles.segment} ${styles.success}`}
disabled={busy || !agentId}
onClick={handleReplay}
title={`Replay exchange on ${agentId ?? 'agent'}`}
>
{sendingAction === 'replay'
? <Loader2 size={12} className={styles.spinner} />
: <RotateCcw size={12} />}
Replay
</button>
</div>
)}
</div>
);
}