From e2d9428dff7cb43681ea1026fd4ed5bc707118a4 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Thu, 16 Apr 2026 19:49:55 +0200 Subject: [PATCH] fix: drop stale instance_id filter from search and scope route stats by app The exchange search silently filtered by the in-memory agent registry's current instance IDs on top of application_id. Historical exchanges written by previous agent instances (or any instance not currently registered, e.g. after a server restart before agents heartbeat back) were hidden from results even though they matched the application filter. Fix: drop the applicationId -> instanceIds resolution in SearchController. Rely on application_id = ? in ClickHouseSearchIndex; keep explicit instanceIds filtering only when a client passes them. Related cleanup: the agentIds parameter on StatsStore.statsForRoute / timeseriesForRoute was silently discarded inside ClickHouseStatsStore, so per-route stats aggregated across any apps sharing a routeId. Replace with String applicationId and add application_id to the stats_1m_route filters so per-route stats are correctly scoped. Co-Authored-By: Claude Opus 4.7 (1M context) --- .claude/rules/core-classes.md | 2 +- .../app/controller/SearchController.java | 40 +++---------------- .../app/storage/ClickHouseStatsStore.java | 20 +++++++--- .../app/storage/ClickHouseStatsStoreIT.java | 2 +- .../server/core/search/SearchRequest.java | 4 +- .../server/core/search/SearchService.java | 20 +++++----- .../server/core/storage/StatsStore.java | 8 ++-- 7 files changed, 37 insertions(+), 59 deletions(-) diff --git a/.claude/rules/core-classes.md b/.claude/rules/core-classes.md index 73272582..03e101d2 100644 --- a/.claude/rules/core-classes.md +++ b/.claude/rules/core-classes.md @@ -43,7 +43,7 @@ paths: ## search/ — Execution search and stats -- `SearchService` — search, count, stats, statsForApp, timeseries, timeseriesForApp, timeseriesForRoute, timeseriesGroupedByApp, timeseriesGroupedByRoute, slaCompliance, slaCountsByApp, slaCountsByRoute, topErrors, activeErrorTypes, punchcard, distinctAttributeKeys +- `SearchService` — search, count, stats, statsForApp, statsForRoute, timeseries, timeseriesForApp, timeseriesForRoute, timeseriesGroupedByApp, timeseriesGroupedByRoute, slaCompliance, slaCountsByApp, slaCountsByRoute, topErrors, activeErrorTypes, punchcard, distinctAttributeKeys. `statsForRoute`/`timeseriesForRoute` take `(routeId, applicationId)` — app filter is applied to `stats_1m_route`. - `SearchRequest` / `SearchResult` — search DTOs - `ExecutionStats`, `ExecutionSummary` — stats aggregation records - `StatsTimeseries`, `TopError` — timeseries and error DTOs diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/controller/SearchController.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/controller/SearchController.java index 3f235eaa..ef63fd0c 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/controller/SearchController.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/controller/SearchController.java @@ -2,8 +2,6 @@ package com.cameleer.server.app.controller; import com.cameleer.server.core.admin.AppSettings; import com.cameleer.server.core.admin.AppSettingsRepository; -import com.cameleer.server.core.agent.AgentInfo; -import com.cameleer.server.core.agent.AgentRegistryService; import com.cameleer.server.core.search.ExecutionStats; import com.cameleer.server.core.search.ExecutionSummary; import com.cameleer.server.core.search.SearchRequest; @@ -38,13 +36,11 @@ import java.util.Map; public class SearchController { private final SearchService searchService; - private final AgentRegistryService registryService; private final AppSettingsRepository appSettingsRepository; - public SearchController(SearchService searchService, AgentRegistryService registryService, + public SearchController(SearchService searchService, AppSettingsRepository appSettingsRepository) { this.searchService = searchService; - this.registryService = registryService; this.appSettingsRepository = appSettingsRepository; } @@ -66,15 +62,13 @@ public class SearchController { @RequestParam(required = false) String sortField, @RequestParam(required = false) String sortDir) { - List agentIds = resolveApplicationToAgentIds(application); - SearchRequest request = new SearchRequest( status, timeFrom, timeTo, null, null, correlationId, text, null, null, null, routeId, instanceId, processorType, - application, agentIds, + application, null, offset, limit, sortField, sortDir, environment @@ -87,13 +81,7 @@ public class SearchController { @Operation(summary = "Advanced search with all filters") public ResponseEntity> searchPost( @RequestBody SearchRequest request) { - // Resolve application to agentIds if application is specified but agentIds is not - SearchRequest resolved = request; - if (request.applicationId() != null && !request.applicationId().isBlank() - && (request.instanceIds() == null || request.instanceIds().isEmpty())) { - resolved = request.withInstanceIds(resolveApplicationToAgentIds(request.applicationId())); - } - return ResponseEntity.ok(searchService.search(resolved)); + return ResponseEntity.ok(searchService.search(request)); } @GetMapping("/stats") @@ -111,8 +99,7 @@ public class SearchController { } else if (routeId == null) { stats = searchService.statsForApp(from, end, application, environment); } else { - List agentIds = resolveApplicationToAgentIds(application); - stats = searchService.stats(from, end, routeId, agentIds, environment); + stats = searchService.statsForRoute(from, end, routeId, application, environment); } // Enrich with SLA compliance @@ -139,11 +126,7 @@ public class SearchController { if (routeId == null) { return ResponseEntity.ok(searchService.timeseriesForApp(from, end, buckets, application, environment)); } - List agentIds = resolveApplicationToAgentIds(application); - if (routeId == null && agentIds.isEmpty()) { - return ResponseEntity.ok(searchService.timeseries(from, end, buckets, environment)); - } - return ResponseEntity.ok(searchService.timeseries(from, end, buckets, routeId, agentIds, environment)); + return ResponseEntity.ok(searchService.timeseriesForRoute(from, end, buckets, routeId, application, environment)); } @GetMapping("/stats/timeseries/by-app") @@ -197,17 +180,4 @@ public class SearchController { Instant end = to != null ? to : Instant.now(); return ResponseEntity.ok(searchService.topErrors(from, end, application, routeId, limit, environment)); } - - /** - * Resolve an application name to agent IDs. - * Returns empty list if application is null/blank (no filtering). - */ - private List resolveApplicationToAgentIds(String application) { - if (application == null || application.isBlank()) { - return List.of(); - } - return registryService.findByApplication(application).stream() - .map(AgentInfo::instanceId) - .toList(); - } } diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseStatsStore.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseStatsStore.java index 9a5d27de..2ad8b994 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseStatsStore.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseStatsStore.java @@ -53,9 +53,13 @@ public class ClickHouseStatsStore implements StatsStore { } @Override - public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds, String environment) { - return queryStats("stats_1m_route", from, to, List.of( - new Filter("route_id", routeId)), true, environment); + public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, String applicationId, String environment) { + List filters = new ArrayList<>(); + filters.add(new Filter("route_id", routeId)); + if (applicationId != null && !applicationId.isBlank()) { + filters.add(new Filter("application_id", applicationId)); + } + return queryStats("stats_1m_route", from, to, filters, true, environment); } @Override @@ -78,9 +82,13 @@ public class ClickHouseStatsStore implements StatsStore { @Override public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount, - String routeId, List agentIds, String environment) { - return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of( - new Filter("route_id", routeId)), true, environment); + String routeId, String applicationId, String environment) { + List filters = new ArrayList<>(); + filters.add(new Filter("route_id", routeId)); + if (applicationId != null && !applicationId.isBlank()) { + filters.add(new Filter("application_id", applicationId)); + } + return queryTimeseries("stats_1m_route", from, to, bucketCount, filters, true, environment); } @Override diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseStatsStoreIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseStatsStoreIT.java index a817ed50..1804c949 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseStatsStoreIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseStatsStoreIT.java @@ -182,7 +182,7 @@ class ClickHouseStatsStoreIT { Instant from = BASE.minusSeconds(60); Instant to = BASE.plusSeconds(300); - ExecutionStats routeA = store.statsForRoute(from, to, "route-a", List.of(), null); + ExecutionStats routeA = store.statsForRoute(from, to, "route-a", null, null); assertThat(routeA.totalCount()).isEqualTo(6); } diff --git a/cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchRequest.java b/cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchRequest.java index 3f1b3c47..52044916 100644 --- a/cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchRequest.java +++ b/cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchRequest.java @@ -22,8 +22,8 @@ import java.util.List; * @param routeId exact match on route_id * @param instanceId exact match on instance_id * @param processorType matches processor_types array via has() - * @param applicationId application ID filter (resolved to instanceIds server-side) - * @param instanceIds list of instance IDs (resolved from application, used for IN clause) + * @param applicationId exact match on application_id + * @param instanceIds list of instance IDs for an IN clause (only set when drilling down to specific agents) * @param offset pagination offset (0-based) * @param limit page size (default 50, max 500) * @param sortField column to sort by (default: startTime) diff --git a/cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchService.java b/cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchService.java index e22e685e..4f6109f7 100644 --- a/cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchService.java +++ b/cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchService.java @@ -45,12 +45,12 @@ public class SearchService { return statsStore.statsForApp(from, to, applicationId, environment); } - public ExecutionStats stats(Instant from, Instant to, String routeId, List agentIds) { - return statsStore.statsForRoute(from, to, routeId, agentIds, null); + public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, String applicationId) { + return statsStore.statsForRoute(from, to, routeId, applicationId, null); } - public ExecutionStats stats(Instant from, Instant to, String routeId, List agentIds, String environment) { - return statsStore.statsForRoute(from, to, routeId, agentIds, environment); + public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, String applicationId, String environment) { + return statsStore.statsForRoute(from, to, routeId, applicationId, environment); } public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) { @@ -69,14 +69,14 @@ public class SearchService { return statsStore.timeseriesForApp(from, to, bucketCount, applicationId, environment); } - public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount, - String routeId, List agentIds) { - return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, agentIds, null); + public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount, + String routeId, String applicationId) { + return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, applicationId, null); } - public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount, - String routeId, List agentIds, String environment) { - return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, agentIds, environment); + public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount, + String routeId, String applicationId, String environment) { + return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, applicationId, environment); } // ── Dashboard-specific queries ──────────────────────────────────────── diff --git a/cameleer-server-core/src/main/java/com/cameleer/server/core/storage/StatsStore.java b/cameleer-server-core/src/main/java/com/cameleer/server/core/storage/StatsStore.java index 742ab35a..4651c3ae 100644 --- a/cameleer-server-core/src/main/java/com/cameleer/server/core/storage/StatsStore.java +++ b/cameleer-server-core/src/main/java/com/cameleer/server/core/storage/StatsStore.java @@ -16,8 +16,8 @@ public interface StatsStore { // Per-app stats (stats_1m_app) ExecutionStats statsForApp(Instant from, Instant to, String applicationId, String environment); - // Per-route stats (stats_1m_route), optionally scoped to specific agents - ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds, String environment); + // Per-route stats (stats_1m_route), optionally scoped to an application + ExecutionStats statsForRoute(Instant from, Instant to, String routeId, String applicationId, String environment); // Per-processor stats (stats_1m_processor) ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType); @@ -28,9 +28,9 @@ public interface StatsStore { // Per-app timeseries StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationId, String environment); - // Per-route timeseries, optionally scoped to specific agents + // Per-route timeseries, optionally scoped to an application StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount, - String routeId, List agentIds, String environment); + String routeId, String applicationId, String environment); // Per-processor timeseries StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,