fix: drop stale instance_id filter from search and scope route stats by app
The exchange search silently filtered by the in-memory agent registry's current instance IDs on top of application_id. Historical exchanges written by previous agent instances (or any instance not currently registered, e.g. after a server restart before agents heartbeat back) were hidden from results even though they matched the application filter. Fix: drop the applicationId -> instanceIds resolution in SearchController. Rely on application_id = ? in ClickHouseSearchIndex; keep explicit instanceIds filtering only when a client passes them. Related cleanup: the agentIds parameter on StatsStore.statsForRoute / timeseriesForRoute was silently discarded inside ClickHouseStatsStore, so per-route stats aggregated across any apps sharing a routeId. Replace with String applicationId and add application_id to the stats_1m_route filters so per-route stats are correctly scoped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -43,7 +43,7 @@ paths:
|
||||
|
||||
## search/ — Execution search and stats
|
||||
|
||||
- `SearchService` — search, count, stats, statsForApp, timeseries, timeseriesForApp, timeseriesForRoute, timeseriesGroupedByApp, timeseriesGroupedByRoute, slaCompliance, slaCountsByApp, slaCountsByRoute, topErrors, activeErrorTypes, punchcard, distinctAttributeKeys
|
||||
- `SearchService` — search, count, stats, statsForApp, statsForRoute, timeseries, timeseriesForApp, timeseriesForRoute, timeseriesGroupedByApp, timeseriesGroupedByRoute, slaCompliance, slaCountsByApp, slaCountsByRoute, topErrors, activeErrorTypes, punchcard, distinctAttributeKeys. `statsForRoute`/`timeseriesForRoute` take `(routeId, applicationId)` — app filter is applied to `stats_1m_route`.
|
||||
- `SearchRequest` / `SearchResult` — search DTOs
|
||||
- `ExecutionStats`, `ExecutionSummary` — stats aggregation records
|
||||
- `StatsTimeseries`, `TopError` — timeseries and error DTOs
|
||||
|
||||
@@ -2,8 +2,6 @@ package com.cameleer.server.app.controller;
|
||||
|
||||
import com.cameleer.server.core.admin.AppSettings;
|
||||
import com.cameleer.server.core.admin.AppSettingsRepository;
|
||||
import com.cameleer.server.core.agent.AgentInfo;
|
||||
import com.cameleer.server.core.agent.AgentRegistryService;
|
||||
import com.cameleer.server.core.search.ExecutionStats;
|
||||
import com.cameleer.server.core.search.ExecutionSummary;
|
||||
import com.cameleer.server.core.search.SearchRequest;
|
||||
@@ -38,13 +36,11 @@ import java.util.Map;
|
||||
public class SearchController {
|
||||
|
||||
private final SearchService searchService;
|
||||
private final AgentRegistryService registryService;
|
||||
private final AppSettingsRepository appSettingsRepository;
|
||||
|
||||
public SearchController(SearchService searchService, AgentRegistryService registryService,
|
||||
public SearchController(SearchService searchService,
|
||||
AppSettingsRepository appSettingsRepository) {
|
||||
this.searchService = searchService;
|
||||
this.registryService = registryService;
|
||||
this.appSettingsRepository = appSettingsRepository;
|
||||
}
|
||||
|
||||
@@ -66,15 +62,13 @@ public class SearchController {
|
||||
@RequestParam(required = false) String sortField,
|
||||
@RequestParam(required = false) String sortDir) {
|
||||
|
||||
List<String> agentIds = resolveApplicationToAgentIds(application);
|
||||
|
||||
SearchRequest request = new SearchRequest(
|
||||
status, timeFrom, timeTo,
|
||||
null, null,
|
||||
correlationId,
|
||||
text, null, null, null,
|
||||
routeId, instanceId, processorType,
|
||||
application, agentIds,
|
||||
application, null,
|
||||
offset, limit,
|
||||
sortField, sortDir,
|
||||
environment
|
||||
@@ -87,13 +81,7 @@ public class SearchController {
|
||||
@Operation(summary = "Advanced search with all filters")
|
||||
public ResponseEntity<SearchResult<ExecutionSummary>> searchPost(
|
||||
@RequestBody SearchRequest request) {
|
||||
// Resolve application to agentIds if application is specified but agentIds is not
|
||||
SearchRequest resolved = request;
|
||||
if (request.applicationId() != null && !request.applicationId().isBlank()
|
||||
&& (request.instanceIds() == null || request.instanceIds().isEmpty())) {
|
||||
resolved = request.withInstanceIds(resolveApplicationToAgentIds(request.applicationId()));
|
||||
}
|
||||
return ResponseEntity.ok(searchService.search(resolved));
|
||||
return ResponseEntity.ok(searchService.search(request));
|
||||
}
|
||||
|
||||
@GetMapping("/stats")
|
||||
@@ -111,8 +99,7 @@ public class SearchController {
|
||||
} else if (routeId == null) {
|
||||
stats = searchService.statsForApp(from, end, application, environment);
|
||||
} else {
|
||||
List<String> agentIds = resolveApplicationToAgentIds(application);
|
||||
stats = searchService.stats(from, end, routeId, agentIds, environment);
|
||||
stats = searchService.statsForRoute(from, end, routeId, application, environment);
|
||||
}
|
||||
|
||||
// Enrich with SLA compliance
|
||||
@@ -139,11 +126,7 @@ public class SearchController {
|
||||
if (routeId == null) {
|
||||
return ResponseEntity.ok(searchService.timeseriesForApp(from, end, buckets, application, environment));
|
||||
}
|
||||
List<String> agentIds = resolveApplicationToAgentIds(application);
|
||||
if (routeId == null && agentIds.isEmpty()) {
|
||||
return ResponseEntity.ok(searchService.timeseries(from, end, buckets, environment));
|
||||
}
|
||||
return ResponseEntity.ok(searchService.timeseries(from, end, buckets, routeId, agentIds, environment));
|
||||
return ResponseEntity.ok(searchService.timeseriesForRoute(from, end, buckets, routeId, application, environment));
|
||||
}
|
||||
|
||||
@GetMapping("/stats/timeseries/by-app")
|
||||
@@ -197,17 +180,4 @@ public class SearchController {
|
||||
Instant end = to != null ? to : Instant.now();
|
||||
return ResponseEntity.ok(searchService.topErrors(from, end, application, routeId, limit, environment));
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve an application name to agent IDs.
|
||||
* Returns empty list if application is null/blank (no filtering).
|
||||
*/
|
||||
private List<String> resolveApplicationToAgentIds(String application) {
|
||||
if (application == null || application.isBlank()) {
|
||||
return List.of();
|
||||
}
|
||||
return registryService.findByApplication(application).stream()
|
||||
.map(AgentInfo::instanceId)
|
||||
.toList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,9 +53,13 @@ public class ClickHouseStatsStore implements StatsStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds, String environment) {
|
||||
return queryStats("stats_1m_route", from, to, List.of(
|
||||
new Filter("route_id", routeId)), true, environment);
|
||||
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, String applicationId, String environment) {
|
||||
List<Filter> filters = new ArrayList<>();
|
||||
filters.add(new Filter("route_id", routeId));
|
||||
if (applicationId != null && !applicationId.isBlank()) {
|
||||
filters.add(new Filter("application_id", applicationId));
|
||||
}
|
||||
return queryStats("stats_1m_route", from, to, filters, true, environment);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -78,9 +82,13 @@ public class ClickHouseStatsStore implements StatsStore {
|
||||
|
||||
@Override
|
||||
public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
|
||||
String routeId, List<String> agentIds, String environment) {
|
||||
return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of(
|
||||
new Filter("route_id", routeId)), true, environment);
|
||||
String routeId, String applicationId, String environment) {
|
||||
List<Filter> filters = new ArrayList<>();
|
||||
filters.add(new Filter("route_id", routeId));
|
||||
if (applicationId != null && !applicationId.isBlank()) {
|
||||
filters.add(new Filter("application_id", applicationId));
|
||||
}
|
||||
return queryTimeseries("stats_1m_route", from, to, bucketCount, filters, true, environment);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -182,7 +182,7 @@ class ClickHouseStatsStoreIT {
|
||||
Instant from = BASE.minusSeconds(60);
|
||||
Instant to = BASE.plusSeconds(300);
|
||||
|
||||
ExecutionStats routeA = store.statsForRoute(from, to, "route-a", List.of(), null);
|
||||
ExecutionStats routeA = store.statsForRoute(from, to, "route-a", null, null);
|
||||
assertThat(routeA.totalCount()).isEqualTo(6);
|
||||
}
|
||||
|
||||
|
||||
@@ -22,8 +22,8 @@ import java.util.List;
|
||||
* @param routeId exact match on route_id
|
||||
* @param instanceId exact match on instance_id
|
||||
* @param processorType matches processor_types array via has()
|
||||
* @param applicationId application ID filter (resolved to instanceIds server-side)
|
||||
* @param instanceIds list of instance IDs (resolved from application, used for IN clause)
|
||||
* @param applicationId exact match on application_id
|
||||
* @param instanceIds list of instance IDs for an IN clause (only set when drilling down to specific agents)
|
||||
* @param offset pagination offset (0-based)
|
||||
* @param limit page size (default 50, max 500)
|
||||
* @param sortField column to sort by (default: startTime)
|
||||
|
||||
@@ -45,12 +45,12 @@ public class SearchService {
|
||||
return statsStore.statsForApp(from, to, applicationId, environment);
|
||||
}
|
||||
|
||||
public ExecutionStats stats(Instant from, Instant to, String routeId, List<String> agentIds) {
|
||||
return statsStore.statsForRoute(from, to, routeId, agentIds, null);
|
||||
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, String applicationId) {
|
||||
return statsStore.statsForRoute(from, to, routeId, applicationId, null);
|
||||
}
|
||||
|
||||
public ExecutionStats stats(Instant from, Instant to, String routeId, List<String> agentIds, String environment) {
|
||||
return statsStore.statsForRoute(from, to, routeId, agentIds, environment);
|
||||
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, String applicationId, String environment) {
|
||||
return statsStore.statsForRoute(from, to, routeId, applicationId, environment);
|
||||
}
|
||||
|
||||
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
|
||||
@@ -69,14 +69,14 @@ public class SearchService {
|
||||
return statsStore.timeseriesForApp(from, to, bucketCount, applicationId, environment);
|
||||
}
|
||||
|
||||
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount,
|
||||
String routeId, List<String> agentIds) {
|
||||
return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, agentIds, null);
|
||||
public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
|
||||
String routeId, String applicationId) {
|
||||
return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, applicationId, null);
|
||||
}
|
||||
|
||||
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount,
|
||||
String routeId, List<String> agentIds, String environment) {
|
||||
return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, agentIds, environment);
|
||||
public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
|
||||
String routeId, String applicationId, String environment) {
|
||||
return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, applicationId, environment);
|
||||
}
|
||||
|
||||
// ── Dashboard-specific queries ────────────────────────────────────────
|
||||
|
||||
@@ -16,8 +16,8 @@ public interface StatsStore {
|
||||
// Per-app stats (stats_1m_app)
|
||||
ExecutionStats statsForApp(Instant from, Instant to, String applicationId, String environment);
|
||||
|
||||
// Per-route stats (stats_1m_route), optionally scoped to specific agents
|
||||
ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds, String environment);
|
||||
// Per-route stats (stats_1m_route), optionally scoped to an application
|
||||
ExecutionStats statsForRoute(Instant from, Instant to, String routeId, String applicationId, String environment);
|
||||
|
||||
// Per-processor stats (stats_1m_processor)
|
||||
ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType);
|
||||
@@ -28,9 +28,9 @@ public interface StatsStore {
|
||||
// Per-app timeseries
|
||||
StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationId, String environment);
|
||||
|
||||
// Per-route timeseries, optionally scoped to specific agents
|
||||
// Per-route timeseries, optionally scoped to an application
|
||||
StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
|
||||
String routeId, List<String> agentIds, String environment);
|
||||
String routeId, String applicationId, String environment);
|
||||
|
||||
// Per-processor timeseries
|
||||
StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
|
||||
|
||||
Reference in New Issue
Block a user