fix: commands respect selected environment
Backend: AgentRegistryService gains findByApplicationAndEnvironment() and environment-aware addGroupCommandWithReplies() overload. AgentCommandController and ApplicationConfigController accept optional environment query parameter. When set, commands only target agents in that environment. Backward compatible — null means all environments. Frontend: All command mutations (config update, route control, traced processors, tap config, route recording) now pass selectedEnv to the backend via query parameter. Prevents cross-environment command leakage — e.g., updating config for prod no longer pushes to dev agents. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -32,6 +32,7 @@ import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.server.ResponseStatusException;
|
||||
|
||||
@@ -114,13 +115,14 @@ public class AgentCommandController {
|
||||
@ApiResponse(responseCode = "200", description = "Commands dispatched and responses collected")
|
||||
@ApiResponse(responseCode = "400", description = "Invalid command payload")
|
||||
public ResponseEntity<CommandGroupResponse> sendGroupCommand(@PathVariable String group,
|
||||
@RequestParam(required = false) String environment,
|
||||
@RequestBody CommandRequest request,
|
||||
HttpServletRequest httpRequest) throws JsonProcessingException {
|
||||
CommandType type = mapCommandType(request.type());
|
||||
String payloadJson = request.payload() != null ? objectMapper.writeValueAsString(request.payload()) : "{}";
|
||||
|
||||
Map<String, CompletableFuture<CommandReply>> futures =
|
||||
registryService.addGroupCommandWithReplies(group, type, payloadJson);
|
||||
registryService.addGroupCommandWithReplies(group, environment, type, payloadJson);
|
||||
|
||||
if (futures.isEmpty()) {
|
||||
auditService.log("broadcast_group_command", AuditCategory.AGENT, group,
|
||||
@@ -171,12 +173,18 @@ public class AgentCommandController {
|
||||
description = "Sends a command to all agents currently in LIVE state")
|
||||
@ApiResponse(responseCode = "202", description = "Commands accepted")
|
||||
@ApiResponse(responseCode = "400", description = "Invalid command payload")
|
||||
public ResponseEntity<CommandBroadcastResponse> broadcastCommand(@RequestBody CommandRequest request,
|
||||
public ResponseEntity<CommandBroadcastResponse> broadcastCommand(@RequestParam(required = false) String environment,
|
||||
@RequestBody CommandRequest request,
|
||||
HttpServletRequest httpRequest) throws JsonProcessingException {
|
||||
CommandType type = mapCommandType(request.type());
|
||||
String payloadJson = request.payload() != null ? objectMapper.writeValueAsString(request.payload()) : "{}";
|
||||
|
||||
List<AgentInfo> liveAgents = registryService.findByState(AgentState.LIVE);
|
||||
if (environment != null) {
|
||||
liveAgents = liveAgents.stream()
|
||||
.filter(a -> environment.equals(a.environmentId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
List<String> commandIds = new ArrayList<>();
|
||||
for (AgentInfo agent : liveAgents) {
|
||||
|
||||
@@ -91,6 +91,7 @@ public class ApplicationConfigController {
|
||||
description = "Saves config and pushes CONFIG_UPDATE to all LIVE agents of this application")
|
||||
@ApiResponse(responseCode = "200", description = "Config saved and pushed")
|
||||
public ResponseEntity<ConfigUpdateResponse> updateConfig(@PathVariable String application,
|
||||
@RequestParam(required = false) String environment,
|
||||
@RequestBody ApplicationConfig config,
|
||||
Authentication auth,
|
||||
HttpServletRequest httpRequest) {
|
||||
@@ -99,7 +100,7 @@ public class ApplicationConfigController {
|
||||
config.setApplication(application);
|
||||
ApplicationConfig saved = configRepository.save(application, config, updatedBy);
|
||||
|
||||
CommandGroupResponse pushResult = pushConfigToAgents(application, saved);
|
||||
CommandGroupResponse pushResult = pushConfigToAgents(application, environment, saved);
|
||||
log.info("Config v{} saved for '{}', pushed to {} agent(s), {} responded",
|
||||
saved.getVersion(), application, pushResult.total(), pushResult.responded());
|
||||
|
||||
@@ -126,13 +127,16 @@ public class ApplicationConfigController {
|
||||
@ApiResponse(responseCode = "504", description = "Agent did not respond in time")
|
||||
public ResponseEntity<TestExpressionResponse> testExpression(
|
||||
@PathVariable String application,
|
||||
@RequestParam(required = false) String environment,
|
||||
@RequestBody TestExpressionRequest request) {
|
||||
// Find a LIVE agent for this application
|
||||
AgentInfo agent = registryService.findAll().stream()
|
||||
// Find a LIVE agent for this application, optionally filtered by environment
|
||||
var candidates = registryService.findAll().stream()
|
||||
.filter(a -> application.equals(a.applicationId()))
|
||||
.filter(a -> a.state() == AgentState.LIVE)
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
.filter(a -> a.state() == AgentState.LIVE);
|
||||
if (environment != null) {
|
||||
candidates = candidates.filter(a -> environment.equals(a.environmentId()));
|
||||
}
|
||||
AgentInfo agent = candidates.findFirst().orElse(null);
|
||||
|
||||
if (agent == null) {
|
||||
return ResponseEntity.status(HttpStatus.NOT_FOUND)
|
||||
@@ -176,7 +180,7 @@ public class ApplicationConfigController {
|
||||
}
|
||||
}
|
||||
|
||||
private CommandGroupResponse pushConfigToAgents(String application, ApplicationConfig config) {
|
||||
private CommandGroupResponse pushConfigToAgents(String application, String environment, ApplicationConfig config) {
|
||||
String payloadJson;
|
||||
try {
|
||||
payloadJson = objectMapper.writeValueAsString(config);
|
||||
@@ -186,7 +190,7 @@ public class ApplicationConfigController {
|
||||
}
|
||||
|
||||
Map<String, CompletableFuture<CommandReply>> futures =
|
||||
registryService.addGroupCommandWithReplies(application, CommandType.CONFIG_UPDATE, payloadJson);
|
||||
registryService.addGroupCommandWithReplies(application, environment, CommandType.CONFIG_UPDATE, payloadJson);
|
||||
|
||||
if (futures.isEmpty()) {
|
||||
return new CommandGroupResponse(true, 0, 0, List.of(), List.of());
|
||||
|
||||
@@ -214,6 +214,16 @@ public class AgentRegistryService {
|
||||
.toList();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return all agents belonging to the given application and environment.
|
||||
*/
|
||||
public List<AgentInfo> findByApplicationAndEnvironment(String application, String environment) {
|
||||
return agents.values().stream()
|
||||
.filter(a -> application.equals(a.applicationId()))
|
||||
.filter(a -> environment.equals(a.environmentId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a command to an agent's pending queue.
|
||||
* Notifies the event listener if one is set.
|
||||
@@ -336,8 +346,21 @@ public class AgentRegistryService {
|
||||
*/
|
||||
public Map<String, CompletableFuture<CommandReply>> addGroupCommandWithReplies(
|
||||
String group, CommandType type, String payload) {
|
||||
return addGroupCommandWithReplies(group, null, type, payload);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a command to all LIVE agents in a group, optionally filtered by environment.
|
||||
* When environment is null, targets all agents for the application.
|
||||
* Returns a map of agentId -> CompletableFuture<CommandReply>.
|
||||
*/
|
||||
public Map<String, CompletableFuture<CommandReply>> addGroupCommandWithReplies(
|
||||
String group, String environment, CommandType type, String payload) {
|
||||
Map<String, CompletableFuture<CommandReply>> results = new LinkedHashMap<>();
|
||||
List<AgentInfo> liveAgents = findByApplication(group).stream()
|
||||
List<AgentInfo> candidates = environment != null
|
||||
? findByApplicationAndEnvironment(group, environment)
|
||||
: findByApplication(group);
|
||||
List<AgentInfo> liveAgents = candidates.stream()
|
||||
.filter(a -> a.state() == AgentState.LIVE)
|
||||
.toList();
|
||||
|
||||
|
||||
@@ -73,8 +73,9 @@ export interface ConfigUpdateResponse {
|
||||
export function useUpdateApplicationConfig() {
|
||||
const queryClient = useQueryClient()
|
||||
return useMutation({
|
||||
mutationFn: async (config: ApplicationConfig) => {
|
||||
const res = await authFetch(`/config/${config.application}`, {
|
||||
mutationFn: async ({ config, environment }: { config: ApplicationConfig; environment?: string }) => {
|
||||
const envParam = environment ? `?environment=${encodeURIComponent(environment)}` : ''
|
||||
const res = await authFetch(`/config/${config.application}${envParam}`, {
|
||||
method: 'PUT',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify(config),
|
||||
@@ -119,12 +120,14 @@ interface SendGroupCommandParams {
|
||||
group: string
|
||||
type: string
|
||||
payload: Record<string, unknown>
|
||||
environment?: string
|
||||
}
|
||||
|
||||
export function useSendGroupCommand() {
|
||||
return useMutation({
|
||||
mutationFn: async ({ group, type, payload }: SendGroupCommandParams) => {
|
||||
const res = await authFetch(`/agents/groups/${encodeURIComponent(group)}/commands`, {
|
||||
mutationFn: async ({ group, type, payload, environment }: SendGroupCommandParams) => {
|
||||
const envParam = environment ? `?environment=${encodeURIComponent(environment)}` : ''
|
||||
const res = await authFetch(`/agents/groups/${encodeURIComponent(group)}/commands${envParam}`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ type, payload }),
|
||||
@@ -174,12 +177,14 @@ export function useTestExpression() {
|
||||
|
||||
export function useSendRouteCommand() {
|
||||
return useMutation({
|
||||
mutationFn: async ({ application, action, routeId }: {
|
||||
mutationFn: async ({ application, action, routeId, environment }: {
|
||||
application: string
|
||||
action: 'start' | 'stop' | 'suspend' | 'resume'
|
||||
routeId: string
|
||||
environment?: string
|
||||
}) => {
|
||||
const res = await authFetch(`/agents/groups/${encodeURIComponent(application)}/commands`, {
|
||||
const envParam = environment ? `?environment=${encodeURIComponent(environment)}` : ''
|
||||
const res = await authFetch(`/agents/groups/${encodeURIComponent(application)}/commands${envParam}`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ type: 'route-control', payload: { routeId, action, nonce: crypto.randomUUID() } }),
|
||||
|
||||
@@ -7,6 +7,7 @@ import {
|
||||
import type { Column } from '@cameleer/design-system';
|
||||
import { useApplicationConfig, useUpdateApplicationConfig } from '../../api/queries/commands';
|
||||
import type { ApplicationConfig, TapDefinition, ConfigUpdateResponse } from '../../api/queries/commands';
|
||||
import { useEnvironmentStore } from '../../api/environment-store';
|
||||
import { useCatalog } from '../../api/queries/catalog';
|
||||
import type { CatalogApp, CatalogRoute } from '../../api/queries/catalog';
|
||||
import { applyTracedProcessorUpdate, applyRouteRecordingUpdate } from '../../utils/config-draft-utils';
|
||||
@@ -75,6 +76,7 @@ export default function AppConfigDetailPage() {
|
||||
const { appId } = useParams<{ appId: string }>();
|
||||
const navigate = useNavigate();
|
||||
const { toast } = useToast();
|
||||
const selectedEnv = useEnvironmentStore((s) => s.environment);
|
||||
const { data: config, isLoading } = useApplicationConfig(appId);
|
||||
const updateConfig = useUpdateApplicationConfig();
|
||||
const { data: catalog } = useCatalog();
|
||||
@@ -147,7 +149,7 @@ export default function AppConfigDetailPage() {
|
||||
tracedProcessors: tracedDraft,
|
||||
routeRecording: routeRecordingDraft,
|
||||
} as ApplicationConfig;
|
||||
updateConfig.mutate(updated, {
|
||||
updateConfig.mutate({ config: updated, environment: selectedEnv }, {
|
||||
onSuccess: (saved: ConfigUpdateResponse) => {
|
||||
setEditing(false);
|
||||
if (saved.pushResult.success) {
|
||||
|
||||
@@ -115,7 +115,7 @@ export default function AgentHealth() {
|
||||
const saveConfigEdit = useCallback(() => {
|
||||
if (!appConfig) return;
|
||||
const updated = { ...appConfig, ...configDraft };
|
||||
updateConfig.mutate(updated, {
|
||||
updateConfig.mutate({ config: updated, environment: selectedEnv }, {
|
||||
onSuccess: (saved: ConfigUpdateResponse) => {
|
||||
setConfigEditing(false);
|
||||
setConfigDraft({});
|
||||
|
||||
@@ -238,19 +238,22 @@ function CreateAppView({ environments, selectedEnv }: { environments: Environmen
|
||||
// 4. Save agent config (will be pushed to agent on first connect)
|
||||
setStep('Saving monitoring config...');
|
||||
await updateAgentConfig.mutateAsync({
|
||||
application: slug.trim(),
|
||||
version: 0,
|
||||
engineLevel,
|
||||
payloadCaptureMode: payloadCapture,
|
||||
applicationLogLevel: appLogLevel,
|
||||
agentLogLevel,
|
||||
metricsEnabled,
|
||||
samplingRate: parseFloat(samplingRate) || 1.0,
|
||||
compressSuccess,
|
||||
tracedProcessors: {},
|
||||
taps: [],
|
||||
tapVersion: 0,
|
||||
routeRecording: {},
|
||||
config: {
|
||||
application: slug.trim(),
|
||||
version: 0,
|
||||
engineLevel,
|
||||
payloadCaptureMode: payloadCapture,
|
||||
applicationLogLevel: appLogLevel,
|
||||
agentLogLevel,
|
||||
metricsEnabled,
|
||||
samplingRate: parseFloat(samplingRate) || 1.0,
|
||||
compressSuccess,
|
||||
tracedProcessors: {},
|
||||
taps: [],
|
||||
tapVersion: 0,
|
||||
routeRecording: {},
|
||||
},
|
||||
environment: selectedEnv,
|
||||
});
|
||||
|
||||
// 5. Deploy (if requested)
|
||||
@@ -814,13 +817,16 @@ function ConfigSubTab({ app, environment }: { app: App; environment?: Environmen
|
||||
if (agentConfig) {
|
||||
try {
|
||||
await updateAgentConfig.mutateAsync({
|
||||
...agentConfig,
|
||||
engineLevel, payloadCaptureMode: payloadCapture,
|
||||
applicationLogLevel: appLogLevel, agentLogLevel,
|
||||
metricsEnabled, samplingRate: parseFloat(samplingRate) || 1.0,
|
||||
compressSuccess,
|
||||
tracedProcessors: tracedDraft,
|
||||
routeRecording: routeRecordingDraft,
|
||||
config: {
|
||||
...agentConfig,
|
||||
engineLevel, payloadCaptureMode: payloadCapture,
|
||||
applicationLogLevel: appLogLevel, agentLogLevel,
|
||||
metricsEnabled, samplingRate: parseFloat(samplingRate) || 1.0,
|
||||
compressSuccess,
|
||||
tracedProcessors: tracedDraft,
|
||||
routeRecording: routeRecordingDraft,
|
||||
},
|
||||
environment: environment?.slug,
|
||||
});
|
||||
} catch { toast({ title: 'Failed to save agent config', variant: 'error', duration: 86_400_000 }); return; }
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import { useCatalog } from '../../api/queries/catalog';
|
||||
import { useAgents } from '../../api/queries/agents';
|
||||
import { useApplicationConfig, useUpdateApplicationConfig } from '../../api/queries/commands';
|
||||
import type { TapDefinition, ConfigUpdateResponse } from '../../api/queries/commands';
|
||||
import { useEnvironmentStore } from '../../api/environment-store';
|
||||
import { useCanControl } from '../../auth/auth-store';
|
||||
import { useTracingStore } from '../../stores/tracing-store';
|
||||
import type { NodeAction, NodeConfig } from '../../components/ProcessDiagram/types';
|
||||
@@ -23,6 +24,7 @@ import type { SelectedExchange } from '../Dashboard/Dashboard';
|
||||
export default function ExchangesPage() {
|
||||
const navigate = useNavigate();
|
||||
const location = useLocation();
|
||||
const selectedEnv = useEnvironmentStore((s) => s.environment);
|
||||
const { appId: scopedAppId, routeId: scopedRouteId, exchangeId: scopedExchangeId } =
|
||||
useParams<{ appId?: string; routeId?: string; exchangeId?: string }>();
|
||||
|
||||
@@ -143,6 +145,7 @@ interface DiagramPanelProps {
|
||||
}
|
||||
|
||||
function DiagramPanel({ appId, routeId, exchangeId, onCorrelatedSelect, onClearSelection }: DiagramPanelProps) {
|
||||
const selectedEnv = useEnvironmentStore((s) => s.environment);
|
||||
const { timeRange } = useGlobalFilters();
|
||||
const timeFrom = timeRange.start.toISOString();
|
||||
const timeTo = timeRange.end.toISOString();
|
||||
@@ -240,7 +243,7 @@ function DiagramPanel({ appId, routeId, exchangeId, onCorrelatedSelect, onClearS
|
||||
|
||||
const handleTapSave = useCallback((updatedConfig: typeof appConfig) => {
|
||||
if (!updatedConfig) return;
|
||||
updateConfig.mutate(updatedConfig, {
|
||||
updateConfig.mutate({ config: updatedConfig, environment: selectedEnv }, {
|
||||
onSuccess: (saved: ConfigUpdateResponse) => {
|
||||
if (saved.pushResult.success) {
|
||||
toast({ title: 'Tap configuration saved', description: `Pushed to ${saved.pushResult.total}/${saved.pushResult.total} agents (v${saved.config.version})`, variant: 'success' });
|
||||
@@ -258,7 +261,7 @@ function DiagramPanel({ appId, routeId, exchangeId, onCorrelatedSelect, onClearS
|
||||
const handleTapDelete = useCallback((tap: TapDefinition) => {
|
||||
if (!appConfig) return;
|
||||
const taps = appConfig.taps.filter(t => t.tapId !== tap.tapId);
|
||||
updateConfig.mutate({ ...appConfig, taps }, {
|
||||
updateConfig.mutate({ config: { ...appConfig, taps }, environment: selectedEnv }, {
|
||||
onSuccess: (saved: ConfigUpdateResponse) => {
|
||||
if (saved.pushResult.success) {
|
||||
toast({ title: 'Tap deleted', description: `${tap.attributeName} removed — pushed to ${saved.pushResult.total}/${saved.pushResult.total} agents (v${saved.config.version})`, variant: 'success' });
|
||||
@@ -287,10 +290,10 @@ function DiagramPanel({ appId, routeId, exchangeId, onCorrelatedSelect, onClearS
|
||||
const enabled = nodeId in newMap;
|
||||
const tracedProcessors: Record<string, string> = {};
|
||||
for (const [k, v] of Object.entries(newMap)) tracedProcessors[k] = v;
|
||||
updateConfig.mutate({
|
||||
updateConfig.mutate({ config: {
|
||||
...appConfig,
|
||||
tracedProcessors,
|
||||
}, {
|
||||
}, environment: selectedEnv }, {
|
||||
onSuccess: (saved: ConfigUpdateResponse) => {
|
||||
if (saved.pushResult.success) {
|
||||
toast({ title: `Tracing ${enabled ? 'enabled' : 'disabled'}`, description: `${nodeId} — pushed to ${saved.pushResult.total}/${saved.pushResult.total} agents (v${saved.config.version})`, variant: 'success' });
|
||||
|
||||
@@ -3,6 +3,7 @@ import { Play, Square, Pause, PlayCircle, RotateCcw, Loader2 } from 'lucide-reac
|
||||
import { useToast, ConfirmDialog } from '@cameleer/design-system';
|
||||
import { useSendRouteCommand, useReplayExchange } from '../../api/queries/commands';
|
||||
import type { CommandGroupResponse } from '../../api/queries/commands';
|
||||
import { useEnvironmentStore } from '../../api/environment-store';
|
||||
import styles from './RouteControlBar.module.css';
|
||||
|
||||
interface RouteControlBarProps {
|
||||
@@ -34,6 +35,7 @@ const ACTION_DISABLED: Record<string, Set<RouteAction>> = {
|
||||
|
||||
export function RouteControlBar({ application, routeId, routeState, hasRouteControl, hasReplay, agentId, exchangeId, inputHeaders, inputBody }: RouteControlBarProps) {
|
||||
const { toast } = useToast();
|
||||
const selectedEnv = useEnvironmentStore((s) => s.environment);
|
||||
const sendRouteCommand = useSendRouteCommand();
|
||||
const replayExchange = useReplayExchange();
|
||||
const [sendingAction, setSendingAction] = useState<string | null>(null);
|
||||
@@ -54,7 +56,7 @@ export function RouteControlBar({ application, routeId, routeState, hasRouteCont
|
||||
function handleRouteAction(action: RouteAction) {
|
||||
setSendingAction(action);
|
||||
sendRouteCommand.mutate(
|
||||
{ application, action, routeId },
|
||||
{ application, action, routeId, environment: selectedEnv },
|
||||
{
|
||||
onSuccess: (result: CommandGroupResponse) => {
|
||||
if (result.success) {
|
||||
|
||||
@@ -344,7 +344,7 @@ export default function RouteDetail() {
|
||||
function toggleRecording() {
|
||||
if (!config.data) return;
|
||||
const routeRecording = { ...config.data.routeRecording, [routeId!]: !isRecording };
|
||||
updateConfig.mutate({ ...config.data, routeRecording });
|
||||
updateConfig.mutate({ config: { ...config.data, routeRecording }, environment: selectedEnv });
|
||||
}
|
||||
|
||||
// ── Derived data ───────────────────────────────────────────────────────────
|
||||
@@ -545,14 +545,14 @@ export default function RouteDetail() {
|
||||
const taps = editingTap
|
||||
? config.data.taps.map(t => t.tapId === editingTap.tapId ? tap : t)
|
||||
: [...(config.data.taps || []), tap];
|
||||
updateConfig.mutate({ ...config.data, taps });
|
||||
updateConfig.mutate({ config: { ...config.data, taps }, environment: selectedEnv });
|
||||
setTapModalOpen(false);
|
||||
}
|
||||
|
||||
function deleteTap(tap: TapDefinition) {
|
||||
if (!config.data) return;
|
||||
const taps = config.data.taps.filter(t => t.tapId !== tap.tapId);
|
||||
updateConfig.mutate({ ...config.data, taps });
|
||||
updateConfig.mutate({ config: { ...config.data, taps }, environment: selectedEnv });
|
||||
setDeletingTap(null);
|
||||
}
|
||||
|
||||
@@ -561,7 +561,7 @@ export default function RouteDetail() {
|
||||
const taps = config.data.taps.map(t =>
|
||||
t.tapId === tap.tapId ? { ...t, enabled: !t.enabled } : t,
|
||||
);
|
||||
updateConfig.mutate({ ...config.data, taps });
|
||||
updateConfig.mutate({ config: { ...config.data, taps }, environment: selectedEnv });
|
||||
}
|
||||
|
||||
function runTestExpression() {
|
||||
|
||||
Reference in New Issue
Block a user