fix: scope correlation-chain query to the exchange's own env

Correlated exchanges always share the env of the one being viewed —
using the globally-selected env from the picker was wrong if the user
switched envs after opening a detail view (or arrived via permalink).

Thread `environment` through:
- `ExecutionStore.ExecutionRecord` gains `environment` field; the
  ClickHouse `executions` table already stores this, just not read back.
- `ClickHouseExecutionStore.findById` SELECT adds the column; mapper
  populates it.
- `ExecutionDetail` gains `environment`; `DetailService` passes through.
- `IngestionService.toExecutionRecord` passes null — this legacy PG
  ingestion path isn't active when ClickHouse is enabled, and the
  read-side is what drives the correlation UI.
- UI `ExchangeHeader` reads `detail.environment ?? storeEnv` and
  extends the TS type locally (schema.d.ts catches up on next regen).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-17 10:19:42 +02:00
parent f04e77788e
commit d02fa73080
7 changed files with 16 additions and 4 deletions

View File

@@ -204,7 +204,7 @@ public class ClickHouseExecutionStore implements ExecutionStore {
@Override @Override
public Optional<ExecutionRecord> findById(String executionId) { public Optional<ExecutionRecord> findById(String executionId) {
List<ExecutionRecord> results = jdbc.query(""" List<ExecutionRecord> results = jdbc.query("""
SELECT execution_id, route_id, instance_id, application_id, status, SELECT execution_id, route_id, instance_id, application_id, environment, status,
correlation_id, exchange_id, start_time, end_time, duration_ms, correlation_id, exchange_id, start_time, end_time, duration_ms,
error_message, error_stacktrace, diagram_content_hash, engine_level, error_message, error_stacktrace, diagram_content_hash, engine_level,
input_body, output_body, input_headers, output_headers, input_body, output_body, input_headers, output_headers,
@@ -304,6 +304,7 @@ public class ClickHouseExecutionStore implements ExecutionStore {
emptyToNull(rs.getString("route_id")), emptyToNull(rs.getString("route_id")),
emptyToNull(rs.getString("instance_id")), emptyToNull(rs.getString("instance_id")),
emptyToNull(rs.getString("application_id")), emptyToNull(rs.getString("application_id")),
emptyToNull(rs.getString("environment")),
emptyToNull(rs.getString("status")), emptyToNull(rs.getString("status")),
emptyToNull(rs.getString("correlation_id")), emptyToNull(rs.getString("correlation_id")),
emptyToNull(rs.getString("exchange_id")), emptyToNull(rs.getString("exchange_id")),

View File

@@ -36,7 +36,7 @@ public class DetailService {
} }
return new ExecutionDetail( return new ExecutionDetail(
exec.executionId(), exec.routeId(), exec.instanceId(), exec.executionId(), exec.routeId(), exec.instanceId(),
exec.applicationId(), exec.applicationId(), exec.environment(),
exec.status(), exec.startTime(), exec.endTime(), exec.status(), exec.startTime(), exec.endTime(),
exec.durationMs() != null ? exec.durationMs() : 0L, exec.durationMs() != null ? exec.durationMs() : 0L,
exec.correlationId(), exec.exchangeId(), exec.correlationId(), exec.exchangeId(),

View File

@@ -13,6 +13,7 @@ import java.util.Map;
* @param executionId unique execution identifier * @param executionId unique execution identifier
* @param routeId Camel route ID * @param routeId Camel route ID
* @param instanceId agent instance that reported the execution * @param instanceId agent instance that reported the execution
* @param environment environment slug this exchange was recorded in
* @param status execution status (COMPLETED, FAILED, RUNNING) * @param status execution status (COMPLETED, FAILED, RUNNING)
* @param startTime execution start time * @param startTime execution start time
* @param endTime execution end time (may be null for RUNNING) * @param endTime execution end time (may be null for RUNNING)
@@ -33,6 +34,7 @@ public record ExecutionDetail(
String routeId, String routeId,
String instanceId, String instanceId,
String applicationId, String applicationId,
String environment,
String status, String status,
Instant startTime, Instant startTime,
Instant endTime, Instant endTime,

View File

@@ -114,6 +114,7 @@ public class IngestionService {
return new ExecutionRecord( return new ExecutionRecord(
exec.getExchangeId(), exec.getRouteId(), instanceId, applicationId, exec.getExchangeId(), exec.getRouteId(), instanceId, applicationId,
null, // environment: legacy PG path; ClickHouse path uses MergedExecution with env resolved from registry
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING", exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
exec.getCorrelationId(), exec.getExchangeId(), exec.getCorrelationId(), exec.getExchangeId(),
exec.getStartTime(), exec.getEndTime(), exec.getStartTime(), exec.getEndTime(),

View File

@@ -20,6 +20,7 @@ public interface ExecutionStore {
record ExecutionRecord( record ExecutionRecord(
String executionId, String routeId, String instanceId, String applicationId, String executionId, String routeId, String instanceId, String applicationId,
String environment,
String status, String correlationId, String exchangeId, String status, String correlationId, String exchangeId,
Instant startTime, Instant endTime, Long durationMs, Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace, String diagramContentHash, String errorMessage, String errorStacktrace, String diagramContentHash,

View File

@@ -1,6 +1,9 @@
import type { components } from '../../api/schema'; import type { components } from '../../api/schema';
export type ExecutionDetail = components['schemas']['ExecutionDetail']; export type ExecutionDetail = components['schemas']['ExecutionDetail'] & {
/** Environment slug this exchange was recorded in. Added to the backend record; schema.d.ts will pick it up on next openapi regen. */
environment?: string;
};
export type ProcessorNode = components['schemas']['ProcessorNode']; export type ProcessorNode = components['schemas']['ProcessorNode'];
export interface NodeExecutionState { export interface NodeExecutionState {

View File

@@ -33,7 +33,11 @@ function statusVariant(s: string): StatusVariant {
export function ExchangeHeader({ detail, onCorrelatedSelect, onClearSelection }: ExchangeHeaderProps) { export function ExchangeHeader({ detail, onCorrelatedSelect, onClearSelection }: ExchangeHeaderProps) {
const navigate = useNavigate(); const navigate = useNavigate();
const { timeRange } = useGlobalFilters(); const { timeRange } = useGlobalFilters();
const environment = useEnvironmentStore((s) => s.environment); const storeEnv = useEnvironmentStore((s) => s.environment);
// Prefer the exchange's own env over the selected env — correlated exchanges
// always live in the same env as the one being viewed, and the user may have
// switched env-picker after opening this detail.
const environment = detail.environment ?? storeEnv;
const { data: chainResult } = useCorrelationChain(detail.correlationId ?? null, environment); const { data: chainResult } = useCorrelationChain(detail.correlationId ?? null, environment);
const chain = chainResult?.data; const chain = chainResult?.data;
const showChain = chain && chain.length > 1; const showChain = chain && chain.length > 1;