feat: rename agent identity fields for protocol v2 + add SHUTDOWN lifecycle state
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m7s
CI / docker (push) Successful in 45s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 22s

Align all internal naming with the agent team's protocol v2 identity rename:
- agentId → instanceId (unique per-JVM identifier)
- applicationName → applicationId (shared app identifier)
- AgentInfo: id → instanceId, name → displayName, application → applicationId

Add SHUTDOWN lifecycle state for graceful agent shutdowns:
- New POST /data/events endpoint receives agent lifecycle events
- AGENT_STOPPED event transitions agent to SHUTDOWN (skips STALE/DEAD)
- New POST /{id}/deregister endpoint removes agent from registry
- Server now distinguishes graceful shutdown from crash (heartbeat timeout)

Includes ClickHouse V9 and PostgreSQL V14 migrations for column renames.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-01 12:22:42 +02:00
parent ad8dd73596
commit 909d713837
85 changed files with 645 additions and 494 deletions

View File

@@ -39,7 +39,7 @@ public class AgentLifecycleMonitor {
// Snapshot states before lifecycle check
Map<String, AgentState> statesBefore = new HashMap<>();
for (AgentInfo agent : registryService.findAll()) {
statesBefore.put(agent.id(), agent.state());
statesBefore.put(agent.instanceId(), agent.state());
}
registryService.checkLifecycle();
@@ -47,12 +47,12 @@ public class AgentLifecycleMonitor {
// Detect transitions and record events
for (AgentInfo agent : registryService.findAll()) {
AgentState before = statesBefore.get(agent.id());
AgentState before = statesBefore.get(agent.instanceId());
if (before != null && before != agent.state()) {
String eventType = mapTransitionEvent(before, agent.state());
if (eventType != null) {
agentEventService.recordEvent(agent.id(), agent.application(), eventType,
agent.name() + " " + before + " -> " + agent.state());
agentEventService.recordEvent(agent.instanceId(), agent.applicationId(), eventType,
agent.displayName() + " " + before + " -> " + agent.state());
}
}
}

View File

@@ -120,12 +120,12 @@ public class AgentCommandController {
List<AgentInfo> agents = registryService.findAll().stream()
.filter(a -> a.state() == AgentState.LIVE)
.filter(a -> group.equals(a.application()))
.filter(a -> group.equals(a.applicationId()))
.toList();
List<String> commandIds = new ArrayList<>();
for (AgentInfo agent : agents) {
AgentCommand command = registryService.addCommand(agent.id(), type, payloadJson);
AgentCommand command = registryService.addCommand(agent.instanceId(), type, payloadJson);
commandIds.add(command.id());
}
@@ -151,7 +151,7 @@ public class AgentCommandController {
List<String> commandIds = new ArrayList<>();
for (AgentInfo agent : liveAgents) {
AgentCommand command = registryService.addCommand(agent.id(), type, payloadJson);
AgentCommand command = registryService.addCommand(agent.instanceId(), type, payloadJson);
commandIds.add(command.id());
}
@@ -185,7 +185,7 @@ public class AgentCommandController {
// Record command result in agent event log
if (body != null && body.status() != null) {
AgentInfo agent = registryService.findById(id);
String application = agent != null ? agent.application() : "unknown";
String application = agent != null ? agent.applicationId() : "unknown";
agentEventService.recordEvent(id, application, "COMMAND_" + body.status(),
"Command " + commandId + ": " + body.message());
log.debug("Command {} ack from agent {}: {} - {}", commandId, id, body.status(), body.message());

View File

@@ -103,34 +103,34 @@ public class AgentRegistrationController {
return ResponseEntity.status(401).build();
}
if (request.agentId() == null || request.agentId().isBlank()
|| request.name() == null || request.name().isBlank()) {
if (request.instanceId() == null || request.instanceId().isBlank()
|| request.displayName() == null || request.displayName().isBlank()) {
return ResponseEntity.badRequest().build();
}
String application = request.application() != null ? request.application() : "default";
String application = request.applicationId() != null ? request.applicationId() : "default";
List<String> routeIds = request.routeIds() != null ? request.routeIds() : List.of();
var capabilities = request.capabilities() != null ? request.capabilities() : Collections.<String, Object>emptyMap();
AgentInfo agent = registryService.register(
request.agentId(), request.name(), application, request.version(), routeIds, capabilities);
log.info("Agent registered: {} (name={}, application={})", request.agentId(), request.name(), application);
request.instanceId(), request.displayName(), application, request.version(), routeIds, capabilities);
log.info("Agent registered: {} (name={}, application={})", request.instanceId(), request.displayName(), application);
agentEventService.recordEvent(request.agentId(), application, "REGISTERED",
"Agent registered: " + request.name());
agentEventService.recordEvent(request.instanceId(), application, "REGISTERED",
"Agent registered: " + request.displayName());
auditService.log(request.agentId(), "agent_register", AuditCategory.AGENT, request.agentId(),
Map.of("application", application, "name", request.name()),
auditService.log(request.instanceId(), "agent_register", AuditCategory.AGENT, request.instanceId(),
Map.of("application", application, "name", request.displayName()),
AuditResult.SUCCESS, httpRequest);
// Issue JWT tokens with AGENT role
List<String> roles = List.of("AGENT");
String accessToken = jwtService.createAccessToken(request.agentId(), application, roles);
String refreshToken = jwtService.createRefreshToken(request.agentId(), application, roles);
String accessToken = jwtService.createAccessToken(request.instanceId(), application, roles);
String refreshToken = jwtService.createRefreshToken(request.instanceId(), application, roles);
return ResponseEntity.ok(new AgentRegistrationResponse(
agent.id(),
"/api/v1/agents/" + agent.id() + "/events",
agent.instanceId(),
"/api/v1/agents/" + agent.instanceId() + "/events",
config.getHeartbeatIntervalMs(),
ed25519SigningService.getPublicKeyBase64(),
accessToken,
@@ -177,8 +177,8 @@ public class AgentRegistrationController {
// Preserve roles from refresh token
List<String> roles = result.roles().isEmpty()
? List.of("AGENT") : result.roles();
String newAccessToken = jwtService.createAccessToken(agentId, agent.application(), roles);
String newRefreshToken = jwtService.createRefreshToken(agentId, agent.application(), roles);
String newAccessToken = jwtService.createAccessToken(agentId, agent.applicationId(), roles);
String newRefreshToken = jwtService.createRefreshToken(agentId, agent.applicationId(), roles);
auditService.log(agentId, "agent_token_refresh", AuditCategory.AUTH, agentId,
null, AuditResult.SUCCESS, httpRequest);
@@ -199,6 +199,23 @@ public class AgentRegistrationController {
return ResponseEntity.ok().build();
}
@PostMapping("/{id}/deregister")
@Operation(summary = "Deregister agent",
description = "Removes the agent from the registry. Called by agents during graceful shutdown.")
@ApiResponse(responseCode = "200", description = "Agent deregistered")
@ApiResponse(responseCode = "404", description = "Agent not registered")
public ResponseEntity<Void> deregister(@PathVariable String id, HttpServletRequest httpRequest) {
AgentInfo agent = registryService.findById(id);
if (agent == null) {
return ResponseEntity.notFound().build();
}
String applicationId = agent.applicationId();
registryService.deregister(id);
agentEventService.recordEvent(id, applicationId, "DEREGISTERED", "Agent deregistered");
auditService.log(id, "agent_deregister", AuditCategory.AGENT, id, null, AuditResult.SUCCESS, httpRequest);
return ResponseEntity.ok().build();
}
@GetMapping
@Operation(summary = "List all agents",
description = "Returns all registered agents with runtime metrics, optionally filtered by status and/or application")
@@ -224,7 +241,7 @@ public class AgentRegistrationController {
// Apply application filter if specified
if (application != null && !application.isBlank()) {
agents = agents.stream()
.filter(a -> application.equals(a.application()))
.filter(a -> application.equals(a.applicationId()))
.toList();
}
@@ -235,10 +252,10 @@ public class AgentRegistrationController {
List<AgentInstanceResponse> response = finalAgents.stream()
.map(a -> {
AgentInstanceResponse dto = AgentInstanceResponse.from(a);
double[] m = agentMetrics.get(a.application());
double[] m = agentMetrics.get(a.applicationId());
if (m != null) {
long appAgentCount = finalAgents.stream()
.filter(ag -> ag.application().equals(a.application())).count();
.filter(ag -> ag.applicationId().equals(a.applicationId())).count();
double agentTps = appAgentCount > 0 ? m[0] / appAgentCount : 0;
double errorRate = m[1];
int activeRoutes = (int) m[2];
@@ -258,19 +275,19 @@ public class AgentRegistrationController {
// Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries
// that strip AggregateFunction column types, breaking -Merge combinators
jdbc.query(
"SELECT application_name, " +
"SELECT application_id, " +
"countMerge(total_count) AS total, " +
"countIfMerge(failed_count) AS failed, " +
"COUNT(DISTINCT route_id) AS active_routes " +
"FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) +
" GROUP BY application_name",
" GROUP BY application_id",
rs -> {
long total = rs.getLong("total");
long failed = rs.getLong("failed");
double tps = total / 60.0;
double errorRate = total > 0 ? (double) failed / total : 0.0;
int activeRoutes = rs.getInt("active_routes");
result.put(rs.getString("application_name"), new double[]{tps, errorRate, activeRoutes});
result.put(rs.getString("application_id"), new double[]{tps, errorRate, activeRoutes});
});
} catch (Exception e) {
log.debug("Could not query agent metrics: {}", e.getMessage());

View File

@@ -48,7 +48,7 @@ public class AppSettingsController {
@GetMapping("/{appId}")
@Operation(summary = "Get settings for a specific application (returns defaults if not configured)")
public ResponseEntity<AppSettings> getByAppId(@PathVariable String appId) {
AppSettings settings = repository.findByAppId(appId).orElse(AppSettings.defaults(appId));
AppSettings settings = repository.findByApplicationId(appId).orElse(AppSettings.defaults(appId));
return ResponseEntity.ok(settings);
}

View File

@@ -125,7 +125,7 @@ public class ApplicationConfigController {
@RequestBody TestExpressionRequest request) {
// Find a LIVE agent for this application
AgentInfo agent = registryService.findAll().stream()
.filter(a -> application.equals(a.application()))
.filter(a -> application.equals(a.applicationId()))
.filter(a -> a.state() == AgentState.LIVE)
.findFirst()
.orElse(null);
@@ -152,7 +152,7 @@ public class ApplicationConfigController {
// Send command and await reply
CompletableFuture<CommandReply> future = registryService.addCommandWithReply(
agent.id(), CommandType.TEST_EXPRESSION, payloadJson);
agent.instanceId(), CommandType.TEST_EXPRESSION, payloadJson);
try {
CommandReply reply = future.orTimeout(5, TimeUnit.SECONDS).join();
@@ -166,7 +166,7 @@ public class ApplicationConfigController {
return ResponseEntity.status(HttpStatus.GATEWAY_TIMEOUT)
.body(new TestExpressionResponse(null, "Agent did not respond within 5 seconds"));
}
log.error("Error awaiting test-expression reply from agent {}", agent.id(), e);
log.error("Error awaiting test-expression reply from agent {}", agent.instanceId(), e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new TestExpressionResponse(null, "Internal error: " + e.getCause().getMessage()));
}
@@ -183,11 +183,11 @@ public class ApplicationConfigController {
List<AgentInfo> agents = registryService.findAll().stream()
.filter(a -> a.state() == AgentState.LIVE)
.filter(a -> application.equals(a.application()))
.filter(a -> application.equals(a.applicationId()))
.toList();
for (AgentInfo agent : agents) {
registryService.addCommand(agent.id(), CommandType.CONFIG_UPDATE, payloadJson);
registryService.addCommand(agent.instanceId(), CommandType.CONFIG_UPDATE, payloadJson);
}
return agents.size();
}

View File

@@ -132,7 +132,7 @@ public class DatabaseAdminController {
Long totalRows = jdbc.queryForObject(
"SELECT count(*) FROM agent_metrics", Long.class);
List<String> agentIds = jdbc.queryForList(
"SELECT DISTINCT agent_id FROM agent_metrics ORDER BY agent_id", String.class);
"SELECT DISTINCT instance_id FROM agent_metrics ORDER BY instance_id", String.class);
Instant latestCollected = jdbc.queryForObject(
"SELECT max(collected_at) FROM agent_metrics", Instant.class);
List<String> metricNames = jdbc.queryForList(

View File

@@ -53,12 +53,12 @@ public class DiagramController {
description = "Accepts a single RouteGraph or an array of RouteGraphs")
@ApiResponse(responseCode = "202", description = "Data accepted for processing")
public ResponseEntity<Void> ingestDiagrams(@RequestBody String body) throws JsonProcessingException {
String agentId = extractAgentId();
String applicationName = resolveApplicationName(agentId);
String instanceId = extractAgentId();
String applicationId = resolveApplicationId(instanceId);
List<RouteGraph> graphs = parsePayload(body);
for (RouteGraph graph : graphs) {
ingestionService.ingestDiagram(new TaggedDiagram(agentId, applicationName, graph));
ingestionService.ingestDiagram(new TaggedDiagram(instanceId, applicationId, graph));
}
return ResponseEntity.accepted().build();
@@ -69,9 +69,9 @@ public class DiagramController {
return auth != null ? auth.getName() : "";
}
private String resolveApplicationName(String agentId) {
AgentInfo agent = registryService.findById(agentId);
return agent != null ? agent.application() : "";
private String resolveApplicationId(String instanceId) {
AgentInfo agent = registryService.findById(instanceId);
return agent != null ? agent.applicationId() : "";
}
private List<RouteGraph> parsePayload(String body) throws JsonProcessingException {

View File

@@ -100,7 +100,7 @@ public class DiagramRenderController {
@RequestParam String routeId,
@RequestParam(defaultValue = "LR") String direction) {
List<String> agentIds = registryService.findByApplication(application).stream()
.map(AgentInfo::id)
.map(AgentInfo::instanceId)
.toList();
if (agentIds.isEmpty()) {

View File

@@ -0,0 +1,88 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.common.model.AgentEvent;
import com.cameleer3.server.core.agent.AgentEventService;
import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* Ingestion endpoint for agent lifecycle events.
* <p>
* Agents emit events (AGENT_STARTED, AGENT_STOPPED, etc.) which are
* stored in the event log. AGENT_STOPPED triggers a graceful shutdown
* transition in the registry.
*/
@RestController
@RequestMapping("/api/v1/data")
@Tag(name = "Ingestion", description = "Data ingestion endpoints")
public class EventIngestionController {
private static final Logger log = LoggerFactory.getLogger(EventIngestionController.class);
private final AgentEventService agentEventService;
private final AgentRegistryService registryService;
private final ObjectMapper objectMapper;
public EventIngestionController(AgentEventService agentEventService,
AgentRegistryService registryService,
ObjectMapper objectMapper) {
this.agentEventService = agentEventService;
this.registryService = registryService;
this.objectMapper = objectMapper;
}
@PostMapping("/events")
@Operation(summary = "Ingest agent events")
public ResponseEntity<Void> ingestEvents(@RequestBody String body) {
String instanceId = extractInstanceId();
List<AgentEvent> events;
try {
String trimmed = body.strip();
if (trimmed.startsWith("[")) {
events = objectMapper.readValue(trimmed, new TypeReference<List<AgentEvent>>() {});
} else {
events = List.of(objectMapper.readValue(trimmed, AgentEvent.class));
}
} catch (Exception e) {
log.warn("Failed to parse event payload: {}", e.getMessage());
return ResponseEntity.badRequest().build();
}
AgentInfo agent = registryService.findById(instanceId);
String applicationId = agent != null ? agent.applicationId() : "";
for (AgentEvent event : events) {
agentEventService.recordEvent(instanceId, applicationId,
event.getEventType(),
event.getDetails() != null ? event.getDetails().toString() : null);
if ("AGENT_STOPPED".equals(event.getEventType())) {
log.info("Agent {} reported graceful shutdown", instanceId);
registryService.shutdown(instanceId);
}
}
return ResponseEntity.accepted().build();
}
private String extractInstanceId() {
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
return auth != null ? auth.getName() : "";
}
}

View File

@@ -58,12 +58,12 @@ public class ExecutionController {
description = "Accepts a single RouteExecution or an array of RouteExecutions")
@ApiResponse(responseCode = "202", description = "Data accepted for processing")
public ResponseEntity<Void> ingestExecutions(@RequestBody String body) throws JsonProcessingException {
String agentId = extractAgentId();
String applicationName = resolveApplicationName(agentId);
String instanceId = extractAgentId();
String applicationId = resolveApplicationId(instanceId);
List<RouteExecution> executions = parsePayload(body);
for (RouteExecution execution : executions) {
ingestionService.ingestExecution(agentId, applicationName, execution);
ingestionService.ingestExecution(instanceId, applicationId, execution);
}
return ResponseEntity.accepted().build();
@@ -74,9 +74,9 @@ public class ExecutionController {
return auth != null ? auth.getName() : "";
}
private String resolveApplicationName(String agentId) {
AgentInfo agent = registryService.findById(agentId);
return agent != null ? agent.application() : "";
private String resolveApplicationId(String instanceId) {
AgentInfo agent = registryService.findById(instanceId);
return agent != null ? agent.applicationId() : "";
}
private List<RouteExecution> parsePayload(String body) throws JsonProcessingException {

View File

@@ -38,12 +38,12 @@ public class LogIngestionController {
description = "Accepts a batch of log entries from an agent. Entries are indexed in OpenSearch.")
@ApiResponse(responseCode = "202", description = "Logs accepted for indexing")
public ResponseEntity<Void> ingestLogs(@RequestBody LogBatch batch) {
String agentId = extractAgentId();
String application = resolveApplicationName(agentId);
String instanceId = extractAgentId();
String applicationId = resolveApplicationId(instanceId);
if (batch.getEntries() != null && !batch.getEntries().isEmpty()) {
log.debug("Received {} log entries from agent={}, app={}", batch.getEntries().size(), agentId, application);
logIndex.indexBatch(agentId, application, batch.getEntries());
log.debug("Received {} log entries from instance={}, app={}", batch.getEntries().size(), instanceId, applicationId);
logIndex.indexBatch(instanceId, applicationId, batch.getEntries());
}
return ResponseEntity.accepted().build();
@@ -54,8 +54,8 @@ public class LogIngestionController {
return auth != null ? auth.getName() : "";
}
private String resolveApplicationName(String agentId) {
AgentInfo agent = registryService.findById(agentId);
return agent != null ? agent.application() : "";
private String resolveApplicationId(String instanceId) {
AgentInfo agent = registryService.findById(instanceId);
return agent != null ? agent.applicationId() : "";
}
}

View File

@@ -30,7 +30,7 @@ public class LogQueryController {
description = "Returns log entries for a given application, optionally filtered by agent, level, time range, and text query")
public ResponseEntity<List<LogEntryResponse>> searchLogs(
@RequestParam String application,
@RequestParam(required = false) String agentId,
@RequestParam(name = "agentId", required = false) String instanceId,
@RequestParam(required = false) String level,
@RequestParam(required = false) String query,
@RequestParam(required = false) String exchangeId,
@@ -44,7 +44,7 @@ public class LogQueryController {
Instant toInstant = to != null ? Instant.parse(to) : null;
List<LogEntryResult> results = logIndex.search(
application, agentId, level, query, exchangeId, fromInstant, toInstant, limit);
application, instanceId, level, query, exchangeId, fromInstant, toInstant, limit);
List<LogEntryResponse> entries = results.stream()
.map(r -> new LogEntryResponse(r.timestamp(), r.level(), r.loggerName(),

View File

@@ -58,7 +58,7 @@ public class RouteCatalogController {
// Group agents by application name
Map<String, List<AgentInfo>> agentsByApp = allAgents.stream()
.collect(Collectors.groupingBy(AgentInfo::application, LinkedHashMap::new, Collectors.toList()));
.collect(Collectors.groupingBy(AgentInfo::applicationId, LinkedHashMap::new, Collectors.toList()));
// Collect all distinct routes per app
Map<String, Set<String>> routesByApp = new LinkedHashMap<>();
@@ -84,11 +84,11 @@ public class RouteCatalogController {
Map<String, Instant> routeLastSeen = new LinkedHashMap<>();
try {
jdbc.query(
"SELECT application_name, route_id, countMerge(total_count) AS cnt, MAX(bucket) AS last_seen " +
"SELECT application_id, route_id, countMerge(total_count) AS cnt, MAX(bucket) AS last_seen " +
"FROM stats_1m_route WHERE bucket >= " + lit(rangeFrom) + " AND bucket < " + lit(rangeTo) +
" GROUP BY application_name, route_id",
" GROUP BY application_id, route_id",
rs -> {
String key = rs.getString("application_name") + "/" + rs.getString("route_id");
String key = rs.getString("application_id") + "/" + rs.getString("route_id");
routeExchangeCounts.put(key, rs.getLong("cnt"));
Timestamp ts = rs.getTimestamp("last_seen");
if (ts != null) routeLastSeen.put(key, ts.toInstant());
@@ -101,9 +101,9 @@ public class RouteCatalogController {
Map<String, Double> agentTps = new LinkedHashMap<>();
try {
jdbc.query(
"SELECT application_name, countMerge(total_count) AS cnt " +
"SELECT application_id, countMerge(total_count) AS cnt " +
"FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) +
" GROUP BY application_name",
" GROUP BY application_id",
rs -> {
// This gives per-app TPS; we'll distribute among agents below
});
@@ -119,7 +119,7 @@ public class RouteCatalogController {
// Routes
Set<String> routeIds = routesByApp.getOrDefault(appId, Set.of());
List<String> agentIds = agents.stream().map(AgentInfo::id).toList();
List<String> agentIds = agents.stream().map(AgentInfo::instanceId).toList();
List<RouteSummary> routeSummaries = routeIds.stream()
.map(routeId -> {
String key = appId + "/" + routeId;
@@ -132,7 +132,7 @@ public class RouteCatalogController {
// Agent summaries
List<AgentSummary> agentSummaries = agents.stream()
.map(a -> new AgentSummary(a.id(), a.name(), a.state().name().toLowerCase(), 0.0))
.map(a -> new AgentSummary(a.instanceId(), a.displayName(), a.state().name().toLowerCase(), 0.0))
.toList();
// Health = worst state among agents

View File

@@ -55,7 +55,7 @@ public class RouteMetricsController {
// Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries
// that strip AggregateFunction column types, breaking -Merge combinators
var sql = new StringBuilder(
"SELECT application_name, route_id, " +
"SELECT application_id, route_id, " +
"countMerge(total_count) AS total, " +
"countIfMerge(failed_count) AS failed, " +
"CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_dur, " +
@@ -63,16 +63,16 @@ public class RouteMetricsController {
"FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant));
if (appId != null) {
sql.append(" AND application_name = " + lit(appId));
sql.append(" AND application_id = " + lit(appId));
}
sql.append(" GROUP BY application_name, route_id ORDER BY application_name, route_id");
sql.append(" GROUP BY application_id, route_id ORDER BY application_id, route_id");
// Key struct for sparkline lookup
record RouteKey(String appId, String routeId) {}
List<RouteKey> routeKeys = new ArrayList<>();
List<RouteMetrics> metrics = jdbc.query(sql.toString(), (rs, rowNum) -> {
String applicationName = rs.getString("application_name");
String applicationId = rs.getString("application_id");
String routeId = rs.getString("route_id");
long total = rs.getLong("total");
long failed = rs.getLong("failed");
@@ -83,8 +83,8 @@ public class RouteMetricsController {
double errorRate = total > 0 ? (double) failed / total : 0.0;
double tps = windowSeconds > 0 ? (double) total / windowSeconds : 0.0;
routeKeys.add(new RouteKey(applicationName, routeId));
return new RouteMetrics(routeId, applicationName, total, successRate,
routeKeys.add(new RouteKey(applicationId, routeId));
return new RouteMetrics(routeId, applicationId, total, successRate,
avgDur, p99Dur, errorRate, tps, List.of(), -1.0);
});
@@ -99,7 +99,7 @@ public class RouteMetricsController {
String sparkSql = "SELECT toStartOfInterval(bucket, toIntervalSecond(" + bucketSeconds + ")) AS period, " +
"COALESCE(countMerge(total_count), 0) AS cnt " +
"FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) +
" AND application_name = " + lit(m.appId()) + " AND route_id = " + lit(m.routeId()) +
" AND application_id = " + lit(m.appId()) + " AND route_id = " + lit(m.routeId()) +
" GROUP BY period ORDER BY period";
List<Double> sparkline = jdbc.query(sparkSql,
(rs, rowNum) -> rs.getDouble("cnt"));
@@ -116,7 +116,7 @@ public class RouteMetricsController {
if (!metrics.isEmpty()) {
// Determine SLA threshold (per-app or default)
String effectiveAppId = appId != null ? appId : (metrics.isEmpty() ? null : metrics.get(0).appId());
int threshold = appSettingsRepository.findByAppId(effectiveAppId != null ? effectiveAppId : "")
int threshold = appSettingsRepository.findByApplicationId(effectiveAppId != null ? effectiveAppId : "")
.map(AppSettings::slaThresholdMs).orElse(300);
Map<String, long[]> slaCounts = statsStore.slaCountsByRoute(fromInstant, toInstant,
@@ -151,7 +151,7 @@ public class RouteMetricsController {
// Literal SQL for AggregatingMergeTree -Merge combinators
var sql = new StringBuilder(
"SELECT processor_id, processor_type, route_id, application_name, " +
"SELECT processor_id, processor_type, route_id, application_id, " +
"countMerge(total_count) AS total_count, " +
"countIfMerge(failed_count) AS failed_count, " +
"CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_duration_ms, " +
@@ -161,9 +161,9 @@ public class RouteMetricsController {
" AND route_id = " + lit(routeId));
if (appId != null) {
sql.append(" AND application_name = " + lit(appId));
sql.append(" AND application_id = " + lit(appId));
}
sql.append(" GROUP BY processor_id, processor_type, route_id, application_name");
sql.append(" GROUP BY processor_id, processor_type, route_id, application_id");
sql.append(" ORDER BY countMerge(total_count) DESC");
List<ProcessorMetrics> metrics = jdbc.query(sql.toString(), (rs, rowNum) -> {
@@ -174,7 +174,7 @@ public class RouteMetricsController {
rs.getString("processor_id"),
rs.getString("processor_type"),
rs.getString("route_id"),
rs.getString("application_name"),
rs.getString("application_id"),
totalCount,
failedCount,
rs.getDouble("avg_duration_ms"),

View File

@@ -57,7 +57,7 @@ public class SearchController {
@RequestParam(required = false) String correlationId,
@RequestParam(required = false) String text,
@RequestParam(required = false) String routeId,
@RequestParam(required = false) String agentId,
@RequestParam(name = "agentId", required = false) String instanceId,
@RequestParam(required = false) String processorType,
@RequestParam(required = false) String application,
@RequestParam(defaultValue = "0") int offset,
@@ -72,7 +72,7 @@ public class SearchController {
null, null,
correlationId,
text, null, null, null,
routeId, agentId, processorType,
routeId, instanceId, processorType,
application, agentIds,
offset, limit,
sortField, sortDir
@@ -87,9 +87,9 @@ public class SearchController {
@RequestBody SearchRequest request) {
// Resolve application to agentIds if application is specified but agentIds is not
SearchRequest resolved = request;
if (request.application() != null && !request.application().isBlank()
&& (request.agentIds() == null || request.agentIds().isEmpty())) {
resolved = request.withAgentIds(resolveApplicationToAgentIds(request.application()));
if (request.applicationId() != null && !request.applicationId().isBlank()
&& (request.instanceIds() == null || request.instanceIds().isEmpty())) {
resolved = request.withInstanceIds(resolveApplicationToAgentIds(request.applicationId()));
}
return ResponseEntity.ok(searchService.search(resolved));
}
@@ -114,7 +114,7 @@ public class SearchController {
// Enrich with SLA compliance
int threshold = appSettingsRepository
.findByAppId(application != null ? application : "")
.findByApplicationId(application != null ? application : "")
.map(AppSettings::slaThresholdMs).orElse(300);
double sla = searchService.slaCompliance(from, end, threshold, application, routeId);
return ResponseEntity.ok(stats.withSlaCompliance(sla));
@@ -193,7 +193,7 @@ public class SearchController {
return null;
}
return registryService.findByApplication(application).stream()
.map(AgentInfo::id)
.map(AgentInfo::instanceId)
.toList();
}
}

View File

@@ -9,15 +9,15 @@ import java.time.Instant;
@Schema(description = "Agent lifecycle event")
public record AgentEventResponse(
@NotNull long id,
@NotNull String agentId,
@NotNull String appId,
@NotNull String instanceId,
@NotNull String applicationId,
@NotNull String eventType,
String detail,
@NotNull Instant timestamp
) {
public static AgentEventResponse from(AgentEventRecord record) {
return new AgentEventResponse(
record.id(), record.agentId(), record.appId(),
record.id(), record.instanceId(), record.applicationId(),
record.eventType(), record.detail(), record.timestamp()
);
}

View File

@@ -11,9 +11,9 @@ import java.util.Map;
@Schema(description = "Agent instance summary with runtime metrics")
public record AgentInstanceResponse(
@NotNull String id,
@NotNull String name,
@NotNull String application,
@NotNull String instanceId,
@NotNull String displayName,
@NotNull String applicationId,
@NotNull String status,
@NotNull List<String> routeIds,
@NotNull Instant registeredAt,
@@ -29,7 +29,7 @@ public record AgentInstanceResponse(
public static AgentInstanceResponse from(AgentInfo info) {
long uptime = Duration.between(info.registeredAt(), Instant.now()).toSeconds();
return new AgentInstanceResponse(
info.id(), info.name(), info.application(),
info.instanceId(), info.displayName(), info.applicationId(),
info.state().name(), info.routeIds(),
info.registeredAt(), info.lastHeartbeat(),
info.version(), info.capabilities(),
@@ -41,7 +41,7 @@ public record AgentInstanceResponse(
public AgentInstanceResponse withMetrics(double tps, double errorRate, int activeRoutes) {
return new AgentInstanceResponse(
id, name, application, status, routeIds, registeredAt, lastHeartbeat,
instanceId, displayName, applicationId, status, routeIds, registeredAt, lastHeartbeat,
version, capabilities,
tps, errorRate, activeRoutes, totalRoutes, uptimeSeconds
);

View File

@@ -8,9 +8,9 @@ import java.util.Map;
@Schema(description = "Agent registration payload")
public record AgentRegistrationRequest(
@NotNull String agentId,
@NotNull String name,
@Schema(defaultValue = "default") String application,
@NotNull String instanceId,
@NotNull String displayName,
@Schema(defaultValue = "default") String applicationId,
String version,
List<String> routeIds,
Map<String, Object> capabilities

View File

@@ -5,7 +5,7 @@ import jakarta.validation.constraints.NotNull;
@Schema(description = "Agent registration result with JWT tokens and SSE endpoint")
public record AgentRegistrationResponse(
@NotNull String agentId,
@NotNull String instanceId,
@NotNull String sseEndpoint,
long heartbeatIntervalMs,
@NotNull String serverPublicKey,

View File

@@ -63,7 +63,7 @@ public class ExecutionFlushScheduler implements SmartLifecycle {
batch.tenantId(),
batch.executionId(),
batch.routeId(),
batch.applicationName(),
batch.applicationId(),
batch.execStartTime(),
batch.processors());
}
@@ -112,7 +112,7 @@ public class ExecutionFlushScheduler implements SmartLifecycle {
batch.tenantId(),
batch.executionId(),
batch.routeId(),
batch.applicationName(),
batch.applicationId(),
batch.execStartTime(),
batch.processors());
}

View File

@@ -35,20 +35,20 @@ public class ClickHouseLogStore implements LogIndex {
}
@Override
public void indexBatch(String agentId, String application, List<LogEntry> entries) {
public void indexBatch(String instanceId, String applicationId, List<LogEntry> entries) {
if (entries == null || entries.isEmpty()) {
return;
}
String sql = "INSERT INTO logs (tenant_id, timestamp, application, agent_id, level, " +
String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " +
"logger_name, message, thread_name, stack_trace, exchange_id, mdc) " +
"VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
jdbc.batchUpdate(sql, entries, entries.size(), (ps, entry) -> {
Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now();
ps.setTimestamp(1, Timestamp.from(ts));
ps.setString(2, application);
ps.setString(3, agentId);
ps.setString(2, applicationId);
ps.setString(3, instanceId);
ps.setString(4, entry.getLevel() != null ? entry.getLevel() : "");
ps.setString(5, entry.getLoggerName() != null ? entry.getLoggerName() : "");
ps.setString(6, entry.getMessage() != null ? entry.getMessage() : "");
@@ -64,22 +64,22 @@ public class ClickHouseLogStore implements LogIndex {
ps.setObject(10, mdc);
});
log.debug("Indexed {} log entries for agent={}, app={}", entries.size(), agentId, application);
log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId);
}
@Override
public List<LogEntryResult> search(String application, String agentId, String level,
public List<LogEntryResult> search(String applicationId, String instanceId, String level,
String query, String exchangeId,
Instant from, Instant to, int limit) {
StringBuilder sql = new StringBuilder(
"SELECT timestamp, level, logger_name, message, thread_name, stack_trace " +
"FROM logs WHERE tenant_id = 'default' AND application = ?");
List<Object> params = new ArrayList<>();
params.add(application);
params.add(applicationId);
if (agentId != null && !agentId.isEmpty()) {
sql.append(" AND agent_id = ?");
params.add(agentId);
if (instanceId != null && !instanceId.isEmpty()) {
sql.append(" AND instance_id = ?");
params.add(instanceId);
}
if (level != null && !level.isEmpty()) {

View File

@@ -37,11 +37,11 @@ public class ClickHouseSearchIndex implements SearchIndex {
"startTime", "start_time",
"durationMs", "duration_ms",
"status", "status",
"agentId", "agent_id",
"instanceId", "instance_id",
"routeId", "route_id",
"correlationId", "correlation_id",
"executionId", "execution_id",
"applicationName", "application_name"
"applicationId", "application_id"
);
private final JdbcTemplate jdbc;
@@ -78,7 +78,7 @@ public class ClickHouseSearchIndex implements SearchIndex {
String sortColumn = SORT_FIELD_MAP.getOrDefault(request.sortField(), "start_time");
String sortDir = "asc".equalsIgnoreCase(request.sortDir()) ? "ASC" : "DESC";
String dataSql = "SELECT execution_id, route_id, agent_id, application_name, "
String dataSql = "SELECT execution_id, route_id, instance_id, application_id, "
+ "status, start_time, end_time, duration_ms, correlation_id, "
+ "error_message, error_stacktrace, diagram_content_hash, attributes, "
+ "has_trace_data, is_replay, "
@@ -148,9 +148,9 @@ public class ClickHouseSearchIndex implements SearchIndex {
params.add(request.routeId());
}
if (request.agentId() != null) {
conditions.add("agent_id = ?");
params.add(request.agentId());
if (request.instanceId() != null) {
conditions.add("instance_id = ?");
params.add(request.instanceId());
}
if (request.correlationId() != null) {
@@ -158,15 +158,15 @@ public class ClickHouseSearchIndex implements SearchIndex {
params.add(request.correlationId());
}
if (request.application() != null && !request.application().isBlank()) {
conditions.add("application_name = ?");
params.add(request.application());
if (request.applicationId() != null && !request.applicationId().isBlank()) {
conditions.add("application_id = ?");
params.add(request.applicationId());
}
if (request.agentIds() != null && !request.agentIds().isEmpty()) {
String placeholders = String.join(", ", Collections.nCopies(request.agentIds().size(), "?"));
conditions.add("agent_id IN (" + placeholders + ")");
params.addAll(request.agentIds());
if (request.instanceIds() != null && !request.instanceIds().isEmpty()) {
String placeholders = String.join(", ", Collections.nCopies(request.instanceIds().size(), "?"));
conditions.add("instance_id IN (" + placeholders + ")");
params.addAll(request.instanceIds());
}
if (request.durationMin() != null) {
@@ -227,8 +227,8 @@ public class ClickHouseSearchIndex implements SearchIndex {
private ExecutionSummary mapRow(ResultSet rs, String searchTerm) throws SQLException {
String executionId = rs.getString("execution_id");
String routeId = rs.getString("route_id");
String agentId = rs.getString("agent_id");
String applicationName = rs.getString("application_name");
String instanceId = rs.getString("instance_id");
String applicationId = rs.getString("application_id");
String status = rs.getString("status");
Timestamp startTs = rs.getTimestamp("start_time");
@@ -261,7 +261,7 @@ public class ClickHouseSearchIndex implements SearchIndex {
}
return new ExecutionSummary(
executionId, routeId, agentId, applicationName, status,
executionId, routeId, instanceId, applicationId, status,
startTime, endTime, durationMs,
correlationId, errorMessage, diagramContentHash,
highlight, attributes, hasTraceData, isReplay

View File

@@ -197,12 +197,12 @@ public class OpenSearchIndex implements SearchIndex {
}
if (request.routeId() != null)
filter.add(termQuery("route_id.keyword", request.routeId()));
if (request.agentId() != null)
filter.add(termQuery("agent_id.keyword", request.agentId()));
if (request.instanceId() != null)
filter.add(termQuery("instance_id.keyword", request.instanceId()));
if (request.correlationId() != null)
filter.add(termQuery("correlation_id.keyword", request.correlationId()));
if (request.application() != null && !request.application().isBlank())
filter.add(termQuery("application_name.keyword", request.application()));
if (request.applicationId() != null && !request.applicationId().isBlank())
filter.add(termQuery("application_id.keyword", request.applicationId()));
// Full-text search across all fields + nested processor fields
if (request.text() != null && !request.text().isBlank()) {
@@ -243,7 +243,7 @@ public class OpenSearchIndex implements SearchIndex {
// Also try keyword fields for exact matches
textQueries.add(Query.of(q -> q.multiMatch(m -> m
.query(text)
.fields("execution_id", "route_id", "agent_id", "correlation_id", "exchange_id"))));
.fields("execution_id", "route_id", "instance_id", "correlation_id", "exchange_id"))));
must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1"))));
}
@@ -328,8 +328,8 @@ public class OpenSearchIndex implements SearchIndex {
Map<String, Object> map = new LinkedHashMap<>();
map.put("execution_id", doc.executionId());
map.put("route_id", doc.routeId());
map.put("agent_id", doc.agentId());
map.put("application_name", doc.applicationName());
map.put("instance_id", doc.instanceId());
map.put("application_id", doc.applicationId());
map.put("status", doc.status());
map.put("correlation_id", doc.correlationId());
map.put("exchange_id", doc.exchangeId());
@@ -391,8 +391,8 @@ public class OpenSearchIndex implements SearchIndex {
return new ExecutionSummary(
(String) src.get("execution_id"),
(String) src.get("route_id"),
(String) src.get("agent_id"),
(String) src.get("application_name"),
(String) src.get("instance_id"),
(String) src.get("application_id"),
(String) src.get("status"),
src.get("start_time") != null ? Instant.parse((String) src.get("start_time")) : null,
src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null,

View File

@@ -78,8 +78,8 @@ public class OpenSearchLogIndex implements LogIndex {
.properties("message", Property.of(p -> p.text(tx -> tx)))
.properties("threadName", Property.of(p -> p.keyword(k -> k)))
.properties("stackTrace", Property.of(p -> p.text(tx -> tx)))
.properties("agentId", Property.of(p -> p.keyword(k -> k)))
.properties("application", Property.of(p -> p.keyword(k -> k)))
.properties("instanceId", Property.of(p -> p.keyword(k -> k)))
.properties("applicationId", Property.of(p -> p.keyword(k -> k)))
.properties("exchangeId", Property.of(p -> p.keyword(k -> k)))))));
log.info("OpenSearch log index template '{}' created", templateName);
}
@@ -104,14 +104,14 @@ public class OpenSearchLogIndex implements LogIndex {
}
@Override
public List<LogEntryResult> search(String application, String agentId, String level,
public List<LogEntryResult> search(String applicationId, String instanceId, String level,
String query, String exchangeId,
Instant from, Instant to, int limit) {
try {
BoolQuery.Builder bool = new BoolQuery.Builder();
bool.must(Query.of(q -> q.term(t -> t.field("application").value(FieldValue.of(application)))));
if (agentId != null && !agentId.isEmpty()) {
bool.must(Query.of(q -> q.term(t -> t.field("agentId").value(FieldValue.of(agentId)))));
bool.must(Query.of(q -> q.term(t -> t.field("applicationId").value(FieldValue.of(applicationId)))));
if (instanceId != null && !instanceId.isEmpty()) {
bool.must(Query.of(q -> q.term(t -> t.field("instanceId").value(FieldValue.of(instanceId)))));
}
if (exchangeId != null && !exchangeId.isEmpty()) {
// Match on top-level field (new records) or MDC nested field (old records)
@@ -156,7 +156,7 @@ public class OpenSearchLogIndex implements LogIndex {
}
return results;
} catch (IOException e) {
log.error("Failed to search log entries for application={}", application, e);
log.error("Failed to search log entries for application={}", applicationId, e);
return List.of();
}
}
@@ -167,7 +167,7 @@ public class OpenSearchLogIndex implements LogIndex {
}
@Override
public void indexBatch(String agentId, String application, List<LogEntry> entries) {
public void indexBatch(String instanceId, String applicationId, List<LogEntry> entries) {
if (entries == null || entries.isEmpty()) {
return;
}
@@ -179,7 +179,7 @@ public class OpenSearchLogIndex implements LogIndex {
String indexName = indexPrefix + DAY_FMT.format(
entry.getTimestamp() != null ? entry.getTimestamp() : java.time.Instant.now());
Map<String, Object> doc = toMap(entry, agentId, application);
Map<String, Object> doc = toMap(entry, instanceId, applicationId);
bulkBuilder.operations(op -> op
.index(idx -> idx
@@ -201,14 +201,14 @@ public class OpenSearchLogIndex implements LogIndex {
}
log.error("Bulk log indexing had {} error(s) out of {} entries", errorCount, entries.size());
} else {
log.debug("Indexed {} log entries for agent={}, app={}", entries.size(), agentId, application);
log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId);
}
} catch (IOException e) {
log.error("Failed to bulk index {} log entries for agent={}", entries.size(), agentId, e);
log.error("Failed to bulk index {} log entries for instance={}", entries.size(), instanceId, e);
}
}
private Map<String, Object> toMap(LogEntry entry, String agentId, String application) {
private Map<String, Object> toMap(LogEntry entry, String instanceId, String applicationId) {
Map<String, Object> doc = new LinkedHashMap<>();
doc.put("@timestamp", entry.getTimestamp() != null ? entry.getTimestamp().toString() : null);
doc.put("level", entry.getLevel());
@@ -217,8 +217,8 @@ public class OpenSearchLogIndex implements LogIndex {
doc.put("threadName", entry.getThreadName());
doc.put("stackTrace", entry.getStackTrace());
doc.put("mdc", entry.getMdc());
doc.put("agentId", agentId);
doc.put("application", application);
doc.put("instanceId", instanceId);
doc.put("applicationId", applicationId);
if (entry.getMdc() != null) {
String exId = entry.getMdc().get("camel.exchangeId");
if (exId != null) doc.put("exchangeId", exId);

View File

@@ -20,10 +20,10 @@ public class ClickHouseAgentEventRepository implements AgentEventRepository {
private static final String TENANT = "default";
private static final String INSERT_SQL =
"INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail) VALUES (?, ?, ?, ?, ?)";
"INSERT INTO agent_events (tenant_id, instance_id, application_id, event_type, detail) VALUES (?, ?, ?, ?, ?)";
private static final String SELECT_BASE =
"SELECT 0 AS id, agent_id, app_id, event_type, detail, timestamp FROM agent_events WHERE tenant_id = ?";
"SELECT 0 AS id, instance_id, application_id, event_type, detail, timestamp FROM agent_events WHERE tenant_id = ?";
private final JdbcTemplate jdbc;
@@ -32,23 +32,23 @@ public class ClickHouseAgentEventRepository implements AgentEventRepository {
}
@Override
public void insert(String agentId, String appId, String eventType, String detail) {
jdbc.update(INSERT_SQL, TENANT, agentId, appId, eventType, detail);
public void insert(String instanceId, String applicationId, String eventType, String detail) {
jdbc.update(INSERT_SQL, TENANT, instanceId, applicationId, eventType, detail);
}
@Override
public List<AgentEventRecord> query(String appId, String agentId, Instant from, Instant to, int limit) {
public List<AgentEventRecord> query(String applicationId, String instanceId, Instant from, Instant to, int limit) {
var sql = new StringBuilder(SELECT_BASE);
var params = new ArrayList<Object>();
params.add(TENANT);
if (appId != null) {
sql.append(" AND app_id = ?");
params.add(appId);
if (applicationId != null) {
sql.append(" AND application_id = ?");
params.add(applicationId);
}
if (agentId != null) {
sql.append(" AND agent_id = ?");
params.add(agentId);
if (instanceId != null) {
sql.append(" AND instance_id = ?");
params.add(instanceId);
}
if (from != null) {
sql.append(" AND timestamp >= ?");
@@ -63,8 +63,8 @@ public class ClickHouseAgentEventRepository implements AgentEventRepository {
return jdbc.query(sql.toString(), (rs, rowNum) -> new AgentEventRecord(
rs.getLong("id"),
rs.getString("agent_id"),
rs.getString("app_id"),
rs.getString("instance_id"),
rs.getString("application_id"),
rs.getString("event_type"),
rs.getString("detail"),
rs.getTimestamp("timestamp").toInstant()

View File

@@ -42,7 +42,7 @@ public class ClickHouseDiagramStore implements DiagramStore {
private static final String INSERT_SQL = """
INSERT INTO route_diagrams
(tenant_id, content_hash, route_id, agent_id, application_name, definition, created_at)
(tenant_id, content_hash, route_id, instance_id, application_id, definition, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""";
@@ -54,13 +54,13 @@ public class ClickHouseDiagramStore implements DiagramStore {
private static final String SELECT_HASH_FOR_ROUTE = """
SELECT content_hash FROM route_diagrams
WHERE tenant_id = ? AND route_id = ? AND agent_id = ?
WHERE tenant_id = ? AND route_id = ? AND instance_id = ?
ORDER BY created_at DESC LIMIT 1
""";
private static final String SELECT_DEFINITIONS_FOR_APP = """
SELECT DISTINCT route_id, definition FROM route_diagrams
WHERE tenant_id = ? AND application_name = ?
WHERE tenant_id = ? AND application_id = ?
""";
private final JdbcTemplate jdbc;
@@ -76,8 +76,8 @@ public class ClickHouseDiagramStore implements DiagramStore {
public void store(TaggedDiagram diagram) {
try {
RouteGraph graph = diagram.graph();
String agentId = diagram.agentId() != null ? diagram.agentId() : "";
String applicationName = diagram.applicationName() != null ? diagram.applicationName() : "";
String agentId = diagram.instanceId() != null ? diagram.instanceId() : "";
String applicationId = diagram.applicationId() != null ? diagram.applicationId() : "";
String json = objectMapper.writeValueAsString(graph);
String contentHash = sha256Hex(json);
String routeId = graph.getRouteId() != null ? graph.getRouteId() : "";
@@ -87,7 +87,7 @@ public class ClickHouseDiagramStore implements DiagramStore {
contentHash,
routeId,
agentId,
applicationName,
applicationId,
json,
Timestamp.from(Instant.now()));
log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash);
@@ -128,7 +128,7 @@ public class ClickHouseDiagramStore implements DiagramStore {
}
String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?"));
String sql = "SELECT content_hash FROM route_diagrams " +
"WHERE tenant_id = ? AND route_id = ? AND agent_id IN (" + placeholders + ") " +
"WHERE tenant_id = ? AND route_id = ? AND instance_id IN (" + placeholders + ") " +
"ORDER BY created_at DESC LIMIT 1";
var params = new ArrayList<Object>();
params.add(TENANT);
@@ -142,10 +142,10 @@ public class ClickHouseDiagramStore implements DiagramStore {
}
@Override
public Map<String, String> findProcessorRouteMapping(String applicationName) {
public Map<String, String> findProcessorRouteMapping(String applicationId) {
Map<String, String> mapping = new HashMap<>();
List<Map<String, Object>> rows = jdbc.queryForList(
SELECT_DEFINITIONS_FOR_APP, TENANT, applicationName);
SELECT_DEFINITIONS_FOR_APP, TENANT, applicationId);
for (Map<String, Object> row : rows) {
String routeId = (String) row.get("route_id");
String json = (String) row.get("definition");
@@ -156,7 +156,7 @@ public class ClickHouseDiagramStore implements DiagramStore {
RouteGraph graph = objectMapper.readValue(json, RouteGraph.class);
collectNodeIds(graph.getRoot(), routeId, mapping);
} catch (JsonProcessingException e) {
log.warn("Failed to deserialize RouteGraph for route={} app={}", routeId, applicationName, e);
log.warn("Failed to deserialize RouteGraph for route={} app={}", routeId, applicationId, e);
}
}
return mapping;

View File

@@ -34,7 +34,7 @@ public class ClickHouseExecutionStore implements ExecutionStore {
jdbc.batchUpdate("""
INSERT INTO executions (
tenant_id, _version, execution_id, route_id, agent_id, application_name,
tenant_id, _version, execution_id, route_id, instance_id, application_id,
status, correlation_id, exchange_id, start_time, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message, diagram_content_hash, engine_level,
@@ -49,8 +49,8 @@ public class ClickHouseExecutionStore implements ExecutionStore {
e.version(),
nullToEmpty(e.executionId()),
nullToEmpty(e.routeId()),
nullToEmpty(e.agentId()),
nullToEmpty(e.applicationName()),
nullToEmpty(e.instanceId()),
nullToEmpty(e.applicationId()),
nullToEmpty(e.status()),
nullToEmpty(e.correlationId()),
nullToEmpty(e.exchangeId()),
@@ -80,14 +80,14 @@ public class ClickHouseExecutionStore implements ExecutionStore {
}
public void insertProcessorBatch(String tenantId, String executionId, String routeId,
String applicationName, Instant execStartTime,
String applicationId, Instant execStartTime,
List<FlatProcessorRecord> processors) {
if (processors.isEmpty()) return;
jdbc.batchUpdate("""
INSERT INTO processor_executions (
tenant_id, execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time, route_id, application_name,
processor_id, processor_type, start_time, route_id, application_id,
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
@@ -107,7 +107,7 @@ public class ClickHouseExecutionStore implements ExecutionStore {
nullToEmpty(p.getProcessorType()),
Timestamp.from(p.getStartTime() != null ? p.getStartTime() : execStartTime),
nullToEmpty(routeId),
nullToEmpty(applicationName),
nullToEmpty(applicationId),
p.getIteration(),
p.getIterationSize(),
p.getStatus() != null ? p.getStatus().name() : "",
@@ -137,7 +137,7 @@ public class ClickHouseExecutionStore implements ExecutionStore {
@Override
public Optional<ExecutionRecord> findById(String executionId) {
List<ExecutionRecord> results = jdbc.query("""
SELECT execution_id, route_id, agent_id, application_name, status,
SELECT execution_id, route_id, instance_id, application_id, status,
correlation_id, exchange_id, start_time, end_time, duration_ms,
error_message, error_stacktrace, diagram_content_hash, engine_level,
input_body, output_body, input_headers, output_headers, attributes,
@@ -156,7 +156,7 @@ public class ClickHouseExecutionStore implements ExecutionStore {
public List<ProcessorRecord> findProcessors(String executionId) {
return jdbc.query("""
SELECT execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time, route_id, application_name,
processor_id, processor_type, start_time, route_id, application_id,
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
@@ -175,7 +175,7 @@ public class ClickHouseExecutionStore implements ExecutionStore {
public Optional<ProcessorRecord> findProcessorById(String executionId, String processorId) {
List<ProcessorRecord> results = jdbc.query("""
SELECT execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time, route_id, application_name,
processor_id, processor_type, start_time, route_id, application_id,
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
@@ -195,7 +195,7 @@ public class ClickHouseExecutionStore implements ExecutionStore {
public Optional<ProcessorRecord> findProcessorBySeq(String executionId, int seq) {
List<ProcessorRecord> results = jdbc.query("""
SELECT execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time, route_id, application_name,
processor_id, processor_type, start_time, route_id, application_id,
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
@@ -220,7 +220,7 @@ public class ClickHouseExecutionStore implements ExecutionStore {
@Override
public void upsertProcessors(String executionId, Instant startTime,
String applicationName, String routeId,
String applicationId, String routeId,
List<ProcessorRecord> processors) {
throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline");
}
@@ -231,8 +231,8 @@ public class ClickHouseExecutionStore implements ExecutionStore {
return new ExecutionRecord(
emptyToNull(rs.getString("execution_id")),
emptyToNull(rs.getString("route_id")),
emptyToNull(rs.getString("agent_id")),
emptyToNull(rs.getString("application_name")),
emptyToNull(rs.getString("instance_id")),
emptyToNull(rs.getString("application_id")),
emptyToNull(rs.getString("status")),
emptyToNull(rs.getString("correlation_id")),
emptyToNull(rs.getString("exchange_id")),
@@ -265,7 +265,7 @@ public class ClickHouseExecutionStore implements ExecutionStore {
emptyToNull(rs.getString("execution_id")),
emptyToNull(rs.getString("processor_id")),
emptyToNull(rs.getString("processor_type")),
emptyToNull(rs.getString("application_name")),
emptyToNull(rs.getString("application_id")),
emptyToNull(rs.getString("route_id")),
0, // depth not stored in ClickHouse
emptyToNull(rs.getString("parent_processor_id")),

View File

@@ -17,7 +17,7 @@ public class ClickHouseMetricsQueryStore implements MetricsQueryStore {
@Override
public Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
String agentId, List<String> metricNames,
String instanceId, List<String> metricNames,
Instant from, Instant to, int buckets) {
long intervalSeconds = Math.max(60,
@@ -43,7 +43,7 @@ public class ClickHouseMetricsQueryStore implements MetricsQueryStore {
metric_name,
avg(metric_value) AS avg_value
FROM agent_metrics
WHERE agent_id = ?
WHERE instance_id = ?
AND collected_at >= ?
AND collected_at < ?
AND metric_name IN (%s)
@@ -57,7 +57,7 @@ public class ClickHouseMetricsQueryStore implements MetricsQueryStore {
double value = rs.getDouble("avg_value");
result.computeIfAbsent(metricName, k -> new ArrayList<>())
.add(new MetricTimeSeries.Bucket(bucket, value));
}, agentId,
}, instanceId,
java.sql.Timestamp.from(from),
java.sql.Timestamp.from(to));

View File

@@ -22,11 +22,11 @@ public class ClickHouseMetricsStore implements MetricsStore {
if (snapshots.isEmpty()) return;
jdbc.batchUpdate("""
INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags, collected_at)
INSERT INTO agent_metrics (instance_id, metric_name, metric_value, tags, collected_at)
VALUES (?, ?, ?, ?, ?)
""",
snapshots.stream().map(s -> new Object[]{
s.agentId(),
s.instanceId(),
s.metricName(),
s.metricValue(),
tagsToClickHouseMap(s.tags()),

View File

@@ -47,9 +47,9 @@ public class ClickHouseStatsStore implements StatsStore {
}
@Override
public ExecutionStats statsForApp(Instant from, Instant to, String applicationName) {
public ExecutionStats statsForApp(Instant from, Instant to, String applicationId) {
return queryStats("stats_1m_app", from, to, List.of(
new Filter("application_name", applicationName)), true);
new Filter("application_id", applicationId)), true);
}
@Override
@@ -71,9 +71,9 @@ public class ClickHouseStatsStore implements StatsStore {
}
@Override
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationName) {
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationId) {
return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
new Filter("application_name", applicationName)), true);
new Filter("application_id", applicationId)), true);
}
@Override
@@ -93,22 +93,22 @@ public class ClickHouseStatsStore implements StatsStore {
@Override
public Map<String, StatsTimeseries> timeseriesGroupedByApp(Instant from, Instant to, int bucketCount) {
return queryGroupedTimeseries("stats_1m_app", "application_name", from, to,
return queryGroupedTimeseries("stats_1m_app", "application_id", from, to,
bucketCount, List.of());
}
@Override
public Map<String, StatsTimeseries> timeseriesGroupedByRoute(Instant from, Instant to,
int bucketCount, String applicationName) {
int bucketCount, String applicationId) {
return queryGroupedTimeseries("stats_1m_route", "route_id", from, to,
bucketCount, List.of(new Filter("application_name", applicationName)));
bucketCount, List.of(new Filter("application_id", applicationId)));
}
// ── SLA compliance (raw table — prepared statements OK) ──────────────
@Override
public double slaCompliance(Instant from, Instant to, int thresholdMs,
String applicationName, String routeId) {
String applicationId, String routeId) {
String sql = "SELECT " +
"countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
"countIf(status != 'RUNNING') AS total " +
@@ -120,9 +120,9 @@ public class ClickHouseStatsStore implements StatsStore {
params.add(TENANT);
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
sql += " AND application_name = ?";
params.add(applicationName);
if (applicationId != null) {
sql += " AND application_id = ?";
params.add(applicationId);
}
if (routeId != null) {
sql += " AND route_id = ?";
@@ -138,16 +138,16 @@ public class ClickHouseStatsStore implements StatsStore {
@Override
public Map<String, long[]> slaCountsByApp(Instant from, Instant to, int defaultThresholdMs) {
String sql = "SELECT application_name, " +
String sql = "SELECT application_id, " +
"countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
"countIf(status != 'RUNNING') AS total " +
"FROM executions FINAL " +
"WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
"GROUP BY application_name";
"GROUP BY application_id";
Map<String, long[]> result = new LinkedHashMap<>();
jdbc.query(sql, (rs) -> {
result.put(rs.getString("application_name"),
result.put(rs.getString("application_id"),
new long[]{rs.getLong("compliant"), rs.getLong("total")});
}, defaultThresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to));
return result;
@@ -155,35 +155,35 @@ public class ClickHouseStatsStore implements StatsStore {
@Override
public Map<String, long[]> slaCountsByRoute(Instant from, Instant to,
String applicationName, int thresholdMs) {
String applicationId, int thresholdMs) {
String sql = "SELECT route_id, " +
"countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
"countIf(status != 'RUNNING') AS total " +
"FROM executions FINAL " +
"WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
"AND application_name = ? GROUP BY route_id";
"AND application_id = ? GROUP BY route_id";
Map<String, long[]> result = new LinkedHashMap<>();
jdbc.query(sql, (rs) -> {
result.put(rs.getString("route_id"),
new long[]{rs.getLong("compliant"), rs.getLong("total")});
}, thresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to), applicationName);
}, thresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to), applicationId);
return result;
}
// ── Top errors (raw table — prepared statements OK) ──────────────────
@Override
public List<TopError> topErrors(Instant from, Instant to, String applicationName,
public List<TopError> topErrors(Instant from, Instant to, String applicationId,
String routeId, int limit) {
StringBuilder where = new StringBuilder(
"status = 'FAILED' AND start_time >= ? AND start_time < ?");
List<Object> params = new ArrayList<>();
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
where.append(" AND application_name = ?");
params.add(applicationName);
if (applicationId != null) {
where.append(" AND application_id = ?");
params.add(applicationId);
}
String table;
@@ -247,7 +247,7 @@ public class ClickHouseStatsStore implements StatsStore {
}
@Override
public int activeErrorTypes(Instant from, Instant to, String applicationName) {
public int activeErrorTypes(Instant from, Instant to, String applicationId) {
String sql = "SELECT COUNT(DISTINCT COALESCE(error_type, substring(error_message, 1, 200))) " +
"FROM executions FINAL " +
"WHERE tenant_id = ? AND status = 'FAILED' AND start_time >= ? AND start_time < ?";
@@ -256,9 +256,9 @@ public class ClickHouseStatsStore implements StatsStore {
params.add(TENANT);
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
sql += " AND application_name = ?";
params.add(applicationName);
if (applicationId != null) {
sql += " AND application_id = ?";
params.add(applicationId);
}
Integer count = jdbc.queryForObject(sql, Integer.class, params.toArray());
@@ -268,8 +268,8 @@ public class ClickHouseStatsStore implements StatsStore {
// ── Punchcard (AggregatingMergeTree — literal SQL) ───────────────────
@Override
public List<PunchcardCell> punchcard(Instant from, Instant to, String applicationName) {
String view = applicationName != null ? "stats_1m_app" : "stats_1m_all";
public List<PunchcardCell> punchcard(Instant from, Instant to, String applicationId) {
String view = applicationId != null ? "stats_1m_app" : "stats_1m_all";
String sql = "SELECT toDayOfWeek(bucket, 1) % 7 AS weekday, " +
"toHour(bucket) AS hour, " +
"countMerge(total_count) AS total_count, " +
@@ -278,8 +278,8 @@ public class ClickHouseStatsStore implements StatsStore {
" WHERE tenant_id = " + lit(TENANT) +
" AND bucket >= " + lit(from) +
" AND bucket < " + lit(to);
if (applicationName != null) {
sql += " AND application_name = " + lit(applicationName);
if (applicationId != null) {
sql += " AND application_id = " + lit(applicationId);
}
sql += " GROUP BY weekday, hour ORDER BY weekday, hour";

View File

@@ -22,24 +22,24 @@ public class PostgresAgentEventRepository implements AgentEventRepository {
}
@Override
public void insert(String agentId, String appId, String eventType, String detail) {
public void insert(String instanceId, String applicationId, String eventType, String detail) {
jdbc.update(
"INSERT INTO agent_events (agent_id, app_id, event_type, detail) VALUES (?, ?, ?, ?)",
agentId, appId, eventType, detail);
"INSERT INTO agent_events (instance_id, application_id, event_type, detail) VALUES (?, ?, ?, ?)",
instanceId, applicationId, eventType, detail);
}
@Override
public List<AgentEventRecord> query(String appId, String agentId, Instant from, Instant to, int limit) {
var sql = new StringBuilder("SELECT id, agent_id, app_id, event_type, detail, timestamp FROM agent_events WHERE 1=1");
public List<AgentEventRecord> query(String applicationId, String instanceId, Instant from, Instant to, int limit) {
var sql = new StringBuilder("SELECT id, instance_id, application_id, event_type, detail, timestamp FROM agent_events WHERE 1=1");
var params = new ArrayList<Object>();
if (appId != null) {
sql.append(" AND app_id = ?");
params.add(appId);
if (applicationId != null) {
sql.append(" AND application_id = ?");
params.add(applicationId);
}
if (agentId != null) {
sql.append(" AND agent_id = ?");
params.add(agentId);
if (instanceId != null) {
sql.append(" AND instance_id = ?");
params.add(instanceId);
}
if (from != null) {
sql.append(" AND timestamp >= ?");
@@ -54,8 +54,8 @@ public class PostgresAgentEventRepository implements AgentEventRepository {
return jdbc.query(sql.toString(), (rs, rowNum) -> new AgentEventRecord(
rs.getLong("id"),
rs.getString("agent_id"),
rs.getString("app_id"),
rs.getString("instance_id"),
rs.getString("application_id"),
rs.getString("event_type"),
rs.getString("detail"),
rs.getTimestamp("timestamp").toInstant()

View File

@@ -15,7 +15,7 @@ public class PostgresAppSettingsRepository implements AppSettingsRepository {
private final JdbcTemplate jdbc;
private static final RowMapper<AppSettings> ROW_MAPPER = (rs, rowNum) -> new AppSettings(
rs.getString("app_id"),
rs.getString("application_id"),
rs.getInt("sla_threshold_ms"),
rs.getDouble("health_error_warn"),
rs.getDouble("health_error_crit"),
@@ -29,24 +29,24 @@ public class PostgresAppSettingsRepository implements AppSettingsRepository {
}
@Override
public Optional<AppSettings> findByAppId(String appId) {
public Optional<AppSettings> findByApplicationId(String applicationId) {
List<AppSettings> results = jdbc.query(
"SELECT * FROM app_settings WHERE app_id = ?", ROW_MAPPER, appId);
"SELECT * FROM app_settings WHERE application_id = ?", ROW_MAPPER, applicationId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public List<AppSettings> findAll() {
return jdbc.query("SELECT * FROM app_settings ORDER BY app_id", ROW_MAPPER);
return jdbc.query("SELECT * FROM app_settings ORDER BY application_id", ROW_MAPPER);
}
@Override
public AppSettings save(AppSettings settings) {
jdbc.update("""
INSERT INTO app_settings (app_id, sla_threshold_ms, health_error_warn,
INSERT INTO app_settings (application_id, sla_threshold_ms, health_error_warn,
health_error_crit, health_sla_warn, health_sla_crit, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, now(), now())
ON CONFLICT (app_id) DO UPDATE SET
ON CONFLICT (application_id) DO UPDATE SET
sla_threshold_ms = EXCLUDED.sla_threshold_ms,
health_error_warn = EXCLUDED.health_error_warn,
health_error_crit = EXCLUDED.health_error_crit,
@@ -54,14 +54,14 @@ public class PostgresAppSettingsRepository implements AppSettingsRepository {
health_sla_crit = EXCLUDED.health_sla_crit,
updated_at = now()
""",
settings.appId(), settings.slaThresholdMs(),
settings.applicationId(), settings.slaThresholdMs(),
settings.healthErrorWarn(), settings.healthErrorCrit(),
settings.healthSlaWarn(), settings.healthSlaCrit());
return findByAppId(settings.appId()).orElseThrow();
return findByApplicationId(settings.applicationId()).orElseThrow();
}
@Override
public void delete(String appId) {
jdbc.update("DELETE FROM app_settings WHERE app_id = ?", appId);
jdbc.update("DELETE FROM app_settings WHERE application_id = ?", appId);
}
}

View File

@@ -36,7 +36,7 @@ public class PostgresDiagramStore implements DiagramStore {
private static final Logger log = LoggerFactory.getLogger(PostgresDiagramStore.class);
private static final String INSERT_SQL = """
INSERT INTO route_diagrams (content_hash, route_id, agent_id, application_name, definition)
INSERT INTO route_diagrams (content_hash, route_id, instance_id, application_id, definition)
VALUES (?, ?, ?, ?, ?::jsonb)
ON CONFLICT (content_hash) DO NOTHING
""";
@@ -47,7 +47,7 @@ public class PostgresDiagramStore implements DiagramStore {
private static final String SELECT_HASH_FOR_ROUTE = """
SELECT content_hash FROM route_diagrams
WHERE route_id = ? AND agent_id = ?
WHERE route_id = ? AND instance_id = ?
ORDER BY created_at DESC LIMIT 1
""";
@@ -64,13 +64,13 @@ public class PostgresDiagramStore implements DiagramStore {
public void store(TaggedDiagram diagram) {
try {
RouteGraph graph = diagram.graph();
String agentId = diagram.agentId() != null ? diagram.agentId() : "";
String applicationName = diagram.applicationName() != null ? diagram.applicationName() : "";
String agentId = diagram.instanceId() != null ? diagram.instanceId() : "";
String applicationId = diagram.applicationId() != null ? diagram.applicationId() : "";
String json = objectMapper.writeValueAsString(graph);
String contentHash = sha256Hex(json);
String routeId = graph.getRouteId() != null ? graph.getRouteId() : "";
jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, applicationName, json);
jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, applicationId, json);
log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize RouteGraph to JSON", e);
@@ -108,7 +108,7 @@ public class PostgresDiagramStore implements DiagramStore {
}
String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?"));
String sql = "SELECT content_hash FROM route_diagrams " +
"WHERE route_id = ? AND agent_id IN (" + placeholders + ") " +
"WHERE route_id = ? AND instance_id IN (" + placeholders + ") " +
"ORDER BY created_at DESC LIMIT 1";
var params = new ArrayList<Object>();
params.add(routeId);
@@ -121,17 +121,17 @@ public class PostgresDiagramStore implements DiagramStore {
}
@Override
public Map<String, String> findProcessorRouteMapping(String applicationName) {
public Map<String, String> findProcessorRouteMapping(String applicationId) {
Map<String, String> mapping = new HashMap<>();
jdbcTemplate.query("""
SELECT DISTINCT rd.route_id, node_elem->>'id' AS processor_id
FROM route_diagrams rd,
jsonb_array_elements(rd.definition::jsonb->'nodes') AS node_elem
WHERE rd.application_name = ?
WHERE rd.application_id = ?
AND node_elem->>'id' IS NOT NULL
""",
rs -> { mapping.put(rs.getString("processor_id"), rs.getString("route_id")); },
applicationName);
applicationId);
return mapping;
}

View File

@@ -22,7 +22,7 @@ public class PostgresExecutionStore implements ExecutionStore {
@Override
public void upsert(ExecutionRecord execution) {
jdbc.update("""
INSERT INTO executions (execution_id, route_id, agent_id, application_name,
INSERT INTO executions (execution_id, route_id, instance_id, application_id,
status, correlation_id, exchange_id, start_time, end_time,
duration_ms, error_message, error_stacktrace, diagram_content_hash,
engine_level, input_body, output_body, input_headers, output_headers,
@@ -63,8 +63,8 @@ public class PostgresExecutionStore implements ExecutionStore {
is_replay = EXCLUDED.is_replay OR executions.is_replay,
updated_at = now()
""",
execution.executionId(), execution.routeId(), execution.agentId(),
execution.applicationName(), execution.status(), execution.correlationId(),
execution.executionId(), execution.routeId(), execution.instanceId(),
execution.applicationId(), execution.status(), execution.correlationId(),
execution.exchangeId(),
Timestamp.from(execution.startTime()),
execution.endTime() != null ? Timestamp.from(execution.endTime()) : null,
@@ -82,11 +82,11 @@ public class PostgresExecutionStore implements ExecutionStore {
@Override
public void upsertProcessors(String executionId, Instant startTime,
String applicationName, String routeId,
String applicationId, String routeId,
List<ProcessorRecord> processors) {
jdbc.batchUpdate("""
INSERT INTO processor_executions (execution_id, processor_id, processor_type,
application_name, route_id, depth, parent_processor_id,
application_id, route_id, depth, parent_processor_id,
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
input_body, output_body, input_headers, output_headers, attributes,
loop_index, loop_size, split_index, split_size, multicast_index,
@@ -122,7 +122,7 @@ public class PostgresExecutionStore implements ExecutionStore {
""",
processors.stream().map(p -> new Object[]{
p.executionId(), p.processorId(), p.processorType(),
p.applicationName(), p.routeId(),
p.applicationId(), p.routeId(),
p.depth(), p.parentProcessorId(), p.status(),
Timestamp.from(p.startTime()),
p.endTime() != null ? Timestamp.from(p.endTime()) : null,
@@ -164,7 +164,7 @@ public class PostgresExecutionStore implements ExecutionStore {
private static final RowMapper<ExecutionRecord> EXECUTION_MAPPER = (rs, rowNum) ->
new ExecutionRecord(
rs.getString("execution_id"), rs.getString("route_id"),
rs.getString("agent_id"), rs.getString("application_name"),
rs.getString("instance_id"), rs.getString("application_id"),
rs.getString("status"), rs.getString("correlation_id"),
rs.getString("exchange_id"),
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
@@ -186,7 +186,7 @@ public class PostgresExecutionStore implements ExecutionStore {
new ProcessorRecord(
rs.getString("execution_id"), rs.getString("processor_id"),
rs.getString("processor_type"),
rs.getString("application_name"), rs.getString("route_id"),
rs.getString("application_id"), rs.getString("route_id"),
rs.getInt("depth"), rs.getString("parent_processor_id"),
rs.getString("status"),
toInstant(rs, "start_time"), toInstant(rs, "end_time"),

View File

@@ -34,7 +34,7 @@ public class PostgresMetricsQueryStore implements MetricsQueryStore {
metric_name,
AVG(metric_value) AS avg_value
FROM agent_metrics
WHERE agent_id = ?
WHERE instance_id = ?
AND collected_at >= ? AND collected_at < ?
AND metric_name = ANY(?)
GROUP BY bucket, metric_name

View File

@@ -21,12 +21,12 @@ public class PostgresMetricsStore implements MetricsStore {
@Override
public void insertBatch(List<MetricsSnapshot> snapshots) {
jdbc.batchUpdate("""
INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags,
INSERT INTO agent_metrics (instance_id, metric_name, metric_value, tags,
collected_at, server_received_at)
VALUES (?, ?, ?, ?::jsonb, ?, now())
""",
snapshots.stream().map(s -> new Object[]{
s.agentId(), s.metricName(), s.metricValue(),
s.instanceId(), s.metricName(), s.metricValue(),
tagsToJson(s.tags()),
Timestamp.from(s.collectedAt())
}).toList());

View File

@@ -34,15 +34,15 @@ public class PostgresStatsStore implements StatsStore {
}
@Override
public ExecutionStats statsForApp(Instant from, Instant to, String applicationName) {
public ExecutionStats statsForApp(Instant from, Instant to, String applicationId) {
return queryStats("stats_1m_app", from, to, List.of(
new Filter("application_name", applicationName)));
new Filter("application_id", applicationId)));
}
@Override
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds) {
// Note: agentIds is accepted for interface compatibility but not filterable
// on the continuous aggregate (it groups by route_id, not agent_id).
// on the continuous aggregate (it groups by route_id, not instance_id).
// All agents for the same route contribute to the same aggregate.
return queryStats("stats_1m_route", from, to, List.of(
new Filter("route_id", routeId)));
@@ -61,9 +61,9 @@ public class PostgresStatsStore implements StatsStore {
}
@Override
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationName) {
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationId) {
return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
new Filter("application_name", applicationName)), true);
new Filter("application_id", applicationId)), true);
}
@Override
@@ -194,15 +194,15 @@ public class PostgresStatsStore implements StatsStore {
@Override
public Map<String, StatsTimeseries> timeseriesGroupedByApp(Instant from, Instant to, int bucketCount) {
return queryGroupedTimeseries("stats_1m_app", "application_name", from, to,
return queryGroupedTimeseries("stats_1m_app", "application_id", from, to,
bucketCount, List.of());
}
@Override
public Map<String, StatsTimeseries> timeseriesGroupedByRoute(Instant from, Instant to,
int bucketCount, String applicationName) {
int bucketCount, String applicationId) {
return queryGroupedTimeseries("stats_1m_route", "route_id", from, to,
bucketCount, List.of(new Filter("application_name", applicationName)));
bucketCount, List.of(new Filter("application_id", applicationId)));
}
private Map<String, StatsTimeseries> queryGroupedTimeseries(
@@ -251,7 +251,7 @@ public class PostgresStatsStore implements StatsStore {
@Override
public double slaCompliance(Instant from, Instant to, int thresholdMs,
String applicationName, String routeId) {
String applicationId, String routeId) {
String sql = "SELECT " +
"COUNT(*) FILTER (WHERE duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
"COUNT(*) FILTER (WHERE status != 'RUNNING') AS total " +
@@ -261,9 +261,9 @@ public class PostgresStatsStore implements StatsStore {
params.add(thresholdMs);
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
sql += " AND application_name = ?";
params.add(applicationName);
if (applicationId != null) {
sql += " AND application_id = ?";
params.add(applicationId);
}
if (routeId != null) {
sql += " AND route_id = ?";
@@ -279,15 +279,15 @@ public class PostgresStatsStore implements StatsStore {
@Override
public Map<String, long[]> slaCountsByApp(Instant from, Instant to, int defaultThresholdMs) {
String sql = "SELECT application_name, " +
String sql = "SELECT application_id, " +
"COUNT(*) FILTER (WHERE duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
"COUNT(*) FILTER (WHERE status != 'RUNNING') AS total " +
"FROM executions WHERE start_time >= ? AND start_time < ? " +
"GROUP BY application_name";
"GROUP BY application_id";
Map<String, long[]> result = new LinkedHashMap<>();
jdbc.query(sql, (rs) -> {
result.put(rs.getString("application_name"),
result.put(rs.getString("application_id"),
new long[]{rs.getLong("compliant"), rs.getLong("total")});
}, defaultThresholdMs, Timestamp.from(from), Timestamp.from(to));
return result;
@@ -295,34 +295,34 @@ public class PostgresStatsStore implements StatsStore {
@Override
public Map<String, long[]> slaCountsByRoute(Instant from, Instant to,
String applicationName, int thresholdMs) {
String applicationId, int thresholdMs) {
String sql = "SELECT route_id, " +
"COUNT(*) FILTER (WHERE duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
"COUNT(*) FILTER (WHERE status != 'RUNNING') AS total " +
"FROM executions WHERE start_time >= ? AND start_time < ? " +
"AND application_name = ? GROUP BY route_id";
"AND application_id = ? GROUP BY route_id";
Map<String, long[]> result = new LinkedHashMap<>();
jdbc.query(sql, (rs) -> {
result.put(rs.getString("route_id"),
new long[]{rs.getLong("compliant"), rs.getLong("total")});
}, thresholdMs, Timestamp.from(from), Timestamp.from(to), applicationName);
}, thresholdMs, Timestamp.from(from), Timestamp.from(to), applicationId);
return result;
}
// ── Top errors ────────────────────────────────────────────────────────
@Override
public List<TopError> topErrors(Instant from, Instant to, String applicationName,
public List<TopError> topErrors(Instant from, Instant to, String applicationId,
String routeId, int limit) {
StringBuilder where = new StringBuilder(
"status = 'FAILED' AND start_time >= ? AND start_time < ?");
List<Object> params = new ArrayList<>();
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
where.append(" AND application_name = ?");
params.add(applicationName);
if (applicationId != null) {
where.append(" AND application_id = ?");
params.add(applicationId);
}
String table;
@@ -386,16 +386,16 @@ public class PostgresStatsStore implements StatsStore {
}
@Override
public int activeErrorTypes(Instant from, Instant to, String applicationName) {
public int activeErrorTypes(Instant from, Instant to, String applicationId) {
String sql = "SELECT COUNT(DISTINCT COALESCE(error_type, LEFT(error_message, 200))) " +
"FROM executions WHERE status = 'FAILED' AND start_time >= ? AND start_time < ?";
List<Object> params = new ArrayList<>();
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
sql += " AND application_name = ?";
params.add(applicationName);
if (applicationId != null) {
sql += " AND application_id = ?";
params.add(applicationId);
}
Integer count = jdbc.queryForObject(sql, Integer.class, params.toArray());
@@ -405,8 +405,8 @@ public class PostgresStatsStore implements StatsStore {
// ── Punchcard ─────────────────────────────────────────────────────────
@Override
public List<PunchcardCell> punchcard(Instant from, Instant to, String applicationName) {
String view = applicationName != null ? "stats_1m_app" : "stats_1m_all";
public List<PunchcardCell> punchcard(Instant from, Instant to, String applicationId) {
String view = applicationId != null ? "stats_1m_app" : "stats_1m_all";
String sql = "SELECT EXTRACT(DOW FROM bucket) AS weekday, " +
"EXTRACT(HOUR FROM bucket) AS hour, " +
"COALESCE(SUM(total_count), 0) AS total_count, " +
@@ -416,9 +416,9 @@ public class PostgresStatsStore implements StatsStore {
List<Object> params = new ArrayList<>();
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
sql += " AND application_name = ?";
params.add(applicationName);
if (applicationId != null) {
sql += " AND application_id = ?";
params.add(applicationId);
}
sql += " GROUP BY weekday, hour ORDER BY weekday, hour";

View File

@@ -29,9 +29,9 @@ public class TestSecurityHelper {
/**
* Registers a test agent and returns a valid JWT access token with AGENT role.
*/
public String registerTestAgent(String agentId) {
agentRegistryService.register(agentId, "test", "test-group", "1.0", List.of(), Map.of());
return jwtService.createAccessToken(agentId, "test-group", List.of("AGENT"));
public String registerTestAgent(String instanceId) {
agentRegistryService.register(instanceId, "test", "test-group", "1.0", List.of(), Map.of());
return jwtService.createAccessToken(instanceId, "test-group", List.of("AGENT"));
}
/**

View File

@@ -41,9 +41,9 @@ class AgentCommandControllerIT extends AbstractPostgresIT {
private ResponseEntity<String> registerAgent(String agentId, String name, String application) {
String json = """
{
"agentId": "%s",
"name": "%s",
"application": "%s",
"instanceId": "%s",
"displayName": "%s",
"applicationId": "%s",
"version": "1.0.0",
"routeIds": ["route-1"],
"capabilities": {}

View File

@@ -39,9 +39,9 @@ class AgentRegistrationControllerIT extends AbstractPostgresIT {
private ResponseEntity<String> registerAgent(String agentId, String name) {
String json = """
{
"agentId": "%s",
"name": "%s",
"application": "test-group",
"instanceId": "%s",
"displayName": "%s",
"applicationId": "test-group",
"version": "1.0.0",
"routeIds": ["route-1", "route-2"],
"capabilities": {"tracing": true}
@@ -61,7 +61,7 @@ class AgentRegistrationControllerIT extends AbstractPostgresIT {
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("agentId").asText()).isEqualTo("agent-it-1");
assertThat(body.get("instanceId").asText()).isEqualTo("agent-it-1");
assertThat(body.get("sseEndpoint").asText()).isEqualTo("/api/v1/agents/agent-it-1/events");
assertThat(body.get("heartbeatIntervalMs").asLong()).isGreaterThan(0);
assertThat(body.has("serverPublicKey")).isTrue();
@@ -81,7 +81,7 @@ class AgentRegistrationControllerIT extends AbstractPostgresIT {
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("agentId").asText()).isEqualTo("agent-it-reregister");
assertThat(body.get("instanceId").asText()).isEqualTo("agent-it-reregister");
}
@Test

View File

@@ -56,9 +56,9 @@ class AgentSseControllerIT extends AbstractPostgresIT {
private ResponseEntity<String> registerAgent(String agentId, String name, String application) {
String json = """
{
"agentId": "%s",
"name": "%s",
"application": "%s",
"instanceId": "%s",
"displayName": "%s",
"applicationId": "%s",
"version": "1.0.0",
"routeIds": ["route-1"],
"capabilities": {}

View File

@@ -50,11 +50,11 @@ class BackpressureIT extends AbstractPostgresIT {
// Fill the metrics buffer completely with a batch of 5
String batchJson = """
[
{"agentId":"bp-agent","collectedAt":"2026-03-11T10:00:00Z","metricName":"test.metric","metricValue":1.0,"tags":{}},
{"agentId":"bp-agent","collectedAt":"2026-03-11T10:00:01Z","metricName":"test.metric","metricValue":2.0,"tags":{}},
{"agentId":"bp-agent","collectedAt":"2026-03-11T10:00:02Z","metricName":"test.metric","metricValue":3.0,"tags":{}},
{"agentId":"bp-agent","collectedAt":"2026-03-11T10:00:03Z","metricName":"test.metric","metricValue":4.0,"tags":{}},
{"agentId":"bp-agent","collectedAt":"2026-03-11T10:00:04Z","metricName":"test.metric","metricValue":5.0,"tags":{}}
{"instanceId":"bp-agent","collectedAt":"2026-03-11T10:00:00Z","metricName":"test.metric","metricValue":1.0,"tags":{}},
{"instanceId":"bp-agent","collectedAt":"2026-03-11T10:00:01Z","metricName":"test.metric","metricValue":2.0,"tags":{}},
{"instanceId":"bp-agent","collectedAt":"2026-03-11T10:00:02Z","metricName":"test.metric","metricValue":3.0,"tags":{}},
{"instanceId":"bp-agent","collectedAt":"2026-03-11T10:00:03Z","metricName":"test.metric","metricValue":4.0,"tags":{}},
{"instanceId":"bp-agent","collectedAt":"2026-03-11T10:00:04Z","metricName":"test.metric","metricValue":5.0,"tags":{}}
]
""";
@@ -66,7 +66,7 @@ class BackpressureIT extends AbstractPostgresIT {
// Now buffer should be full -- next POST should get 503
String overflowJson = """
[{"agentId":"bp-agent","collectedAt":"2026-03-11T10:00:05Z","metricName":"test.metric","metricValue":6.0,"tags":{}}]
[{"instanceId":"bp-agent","collectedAt":"2026-03-11T10:00:05Z","metricName":"test.metric","metricValue":6.0,"tags":{}}]
""";
ResponseEntity<String> response = restTemplate.postForEntity(

View File

@@ -35,7 +35,7 @@ class MetricsControllerIT extends AbstractPostgresIT {
void postMetrics_returns202() {
String json = """
[{
"agentId": "agent-1",
"instanceId": "agent-1",
"collectedAt": "2026-03-11T10:00:00Z",
"metricName": "cpu.usage",
"metricValue": 75.5,
@@ -55,7 +55,7 @@ class MetricsControllerIT extends AbstractPostgresIT {
void postMetrics_dataAppearsAfterFlush() {
String json = """
[{
"agentId": "agent-flush-test",
"instanceId": "agent-flush-test",
"collectedAt": "2026-03-11T10:00:00Z",
"metricName": "memory.used",
"metricValue": 1024.0,
@@ -70,7 +70,7 @@ class MetricsControllerIT extends AbstractPostgresIT {
await().atMost(10, SECONDS).untilAsserted(() -> {
Integer count = jdbcTemplate.queryForObject(
"SELECT count(*) FROM agent_metrics WHERE agent_id = 'agent-flush-test'",
"SELECT count(*) FROM agent_metrics WHERE instance_id = 'agent-flush-test'",
Integer.class);
assertThat(count).isGreaterThanOrEqualTo(1);
});

View File

@@ -27,9 +27,9 @@ class BootstrapTokenIT extends AbstractPostgresIT {
private static final String REGISTRATION_JSON = """
{
"agentId": "bootstrap-test-agent",
"name": "Bootstrap Test",
"application": "test-group",
"instanceId": "bootstrap-test-agent",
"displayName": "Bootstrap Test",
"applicationId": "test-group",
"version": "1.0.0",
"routeIds": [],
"capabilities": {}
@@ -95,9 +95,9 @@ class BootstrapTokenIT extends AbstractPostgresIT {
String json = """
{
"agentId": "bootstrap-test-previous",
"name": "Previous Token Test",
"application": "test-group",
"instanceId": "bootstrap-test-previous",
"displayName": "Previous Token Test",
"applicationId": "test-group",
"version": "1.0.0",
"routeIds": [],
"capabilities": {}

View File

@@ -37,9 +37,9 @@ class JwtRefreshIT extends AbstractPostgresIT {
private JsonNode registerAndGetTokens(String agentId) throws Exception {
String json = """
{
"agentId": "%s",
"name": "Refresh Test Agent",
"application": "test-group",
"instanceId": "%s",
"displayName": "Refresh Test Agent",
"applicationId": "test-group",
"version": "1.0.0",
"routeIds": [],
"capabilities": {}

View File

@@ -30,9 +30,9 @@ class RegistrationSecurityIT extends AbstractPostgresIT {
private ResponseEntity<String> registerAgent(String agentId) {
String json = """
{
"agentId": "%s",
"name": "Security Test Agent",
"application": "test-group",
"instanceId": "%s",
"displayName": "Security Test Agent",
"applicationId": "test-group",
"version": "1.0.0",
"routeIds": [],
"capabilities": {}

View File

@@ -83,14 +83,14 @@ class SseSigningIT extends AbstractPostgresIT {
/**
* Registers an agent using the bootstrap token and returns the registration response.
* The response contains: agentId, sseEndpoint, accessToken, refreshToken, serverPublicKey.
* The response contains: instanceId, sseEndpoint, accessToken, refreshToken, serverPublicKey.
*/
private JsonNode registerAgentWithAuth(String agentId) throws Exception {
String json = """
{
"agentId": "%s",
"name": "SSE Signing Test Agent",
"application": "test-group",
"instanceId": "%s",
"displayName": "SSE Signing Test Agent",
"applicationId": "test-group",
"version": "1.0.0",
"routeIds": ["route-1"],
"capabilities": {}

View File

@@ -39,6 +39,9 @@ class ClickHouseAgentEventRepositoryIT {
String ddl = new ClassPathResource("clickhouse/V7__agent_events.sql")
.getContentAsString(StandardCharsets.UTF_8);
jdbc.execute(ddl);
// Apply identity column renames (subset of V9 migration)
jdbc.execute("ALTER TABLE agent_events RENAME COLUMN agent_id TO instance_id");
jdbc.execute("ALTER TABLE agent_events RENAME COLUMN app_id TO application_id");
jdbc.execute("TRUNCATE TABLE agent_events");
repo = new ClickHouseAgentEventRepository(jdbc);
@@ -49,10 +52,10 @@ class ClickHouseAgentEventRepositoryIT {
/**
* Insert a row with an explicit timestamp so tests can control ordering and ranges.
*/
private void insertAt(String agentId, String appId, String eventType, String detail, Instant ts) {
private void insertAt(String instanceId, String applicationId, String eventType, String detail, Instant ts) {
jdbc.update(
"INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail, timestamp) VALUES (?, ?, ?, ?, ?, ?)",
"default", agentId, appId, eventType, detail, Timestamp.from(ts));
"INSERT INTO agent_events (tenant_id, instance_id, application_id, event_type, detail, timestamp) VALUES (?, ?, ?, ?, ?, ?)",
"default", instanceId, applicationId, eventType, detail, Timestamp.from(ts));
}
// ── Tests ─────────────────────────────────────────────────────────────────
@@ -62,7 +65,7 @@ class ClickHouseAgentEventRepositoryIT {
repo.insert("agent-1", "app-a", "CONNECTED", "agent came online");
Long count = jdbc.queryForObject(
"SELECT count() FROM agent_events WHERE agent_id = 'agent-1'",
"SELECT count() FROM agent_events WHERE instance_id = 'agent-1'",
Long.class);
assertThat(count).isEqualTo(1);
}
@@ -75,8 +78,8 @@ class ClickHouseAgentEventRepositoryIT {
List<AgentEventRecord> results = repo.query("app-x", null, null, null, 100);
assertThat(results).hasSize(1);
assertThat(results.get(0).appId()).isEqualTo("app-x");
assertThat(results.get(0).agentId()).isEqualTo("agent-1");
assertThat(results.get(0).applicationId()).isEqualTo("app-x");
assertThat(results.get(0).instanceId()).isEqualTo("agent-1");
}
@Test
@@ -87,7 +90,7 @@ class ClickHouseAgentEventRepositoryIT {
List<AgentEventRecord> results = repo.query(null, "agent-alpha", null, null, 100);
assertThat(results).hasSize(1);
assertThat(results.get(0).agentId()).isEqualTo("agent-alpha");
assertThat(results.get(0).instanceId()).isEqualTo("agent-alpha");
}
@Test

View File

@@ -75,8 +75,8 @@ class ClickHouseChunkPipelineIT {
// Chunk 0: RUNNING with initial processors
ExecutionChunk chunk0 = new ExecutionChunk();
chunk0.setExchangeId("pipeline-1");
chunk0.setApplicationName("order-service");
chunk0.setAgentId("pod-1");
chunk0.setApplicationId("order-service");
chunk0.setInstanceId("pod-1");
chunk0.setRouteId("order-route");
chunk0.setCorrelationId("corr-1");
chunk0.setStatus(ExecutionStatus.RUNNING);
@@ -118,8 +118,8 @@ class ClickHouseChunkPipelineIT {
// Chunk 1: COMPLETED (final)
ExecutionChunk chunk1 = new ExecutionChunk();
chunk1.setExchangeId("pipeline-1");
chunk1.setApplicationName("order-service");
chunk1.setAgentId("pod-1");
chunk1.setApplicationId("order-service");
chunk1.setInstanceId("pod-1");
chunk1.setRouteId("order-route");
chunk1.setCorrelationId("corr-1");
chunk1.setStatus(ExecutionStatus.COMPLETED);
@@ -152,7 +152,7 @@ class ClickHouseChunkPipelineIT {
for (ChunkAccumulator.ProcessorBatch batch : processorBuffer) {
executionStore.insertProcessorBatch(
batch.tenantId(), batch.executionId(),
batch.routeId(), batch.applicationName(),
batch.routeId(), batch.applicationId(),
batch.execStartTime(), batch.processors());
}

View File

@@ -41,6 +41,9 @@ class ClickHouseDiagramStoreIT {
String ddl = new ClassPathResource("clickhouse/V6__route_diagrams.sql")
.getContentAsString(StandardCharsets.UTF_8);
jdbc.execute(ddl);
// Apply identity column renames (subset of V9 migration)
jdbc.execute("ALTER TABLE route_diagrams RENAME COLUMN agent_id TO instance_id");
jdbc.execute("ALTER TABLE route_diagrams RENAME COLUMN application_name TO application_id");
jdbc.execute("TRUNCATE TABLE route_diagrams");
store = new ClickHouseDiagramStore(jdbc);
@@ -60,8 +63,8 @@ class ClickHouseDiagramStoreIT {
return graph;
}
private TaggedDiagram tagged(String agentId, String appName, RouteGraph graph) {
return new TaggedDiagram(agentId, appName, graph);
private TaggedDiagram tagged(String instanceId, String applicationId, RouteGraph graph) {
return new TaggedDiagram(instanceId, applicationId, graph);
}
// ── Tests ─────────────────────────────────────────────────────────────

View File

@@ -105,8 +105,8 @@ class ClickHouseExecutionReadIT {
assertThat(result.get().executionId()).isEqualTo("exec-1");
assertThat(result.get().routeId()).isEqualTo("route-a");
assertThat(result.get().status()).isEqualTo("COMPLETED");
assertThat(result.get().agentId()).isEqualTo("agent-1");
assertThat(result.get().applicationName()).isEqualTo("my-app");
assertThat(result.get().instanceId()).isEqualTo("agent-1");
assertThat(result.get().applicationId()).isEqualTo("my-app");
assertThat(result.get().processorsJson()).isNull();
}

View File

@@ -38,14 +38,14 @@ class ClickHouseMetricsQueryStoreIT {
CREATE TABLE IF NOT EXISTS agent_metrics (
tenant_id LowCardinality(String) DEFAULT 'default',
collected_at DateTime64(3),
agent_id LowCardinality(String),
instance_id LowCardinality(String),
metric_name LowCardinality(String),
metric_value Float64,
tags Map(String, String) DEFAULT map(),
server_received_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
ORDER BY (tenant_id, instance_id, metric_name, collected_at)
""");
jdbc.execute("TRUNCATE TABLE agent_metrics");
@@ -54,9 +54,9 @@ class ClickHouseMetricsQueryStoreIT {
Instant base = Instant.parse("2026-03-31T10:00:00Z");
for (int i = 0; i < 6; i++) {
Instant ts = base.plusSeconds(i * 600); // every 10 minutes
jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)",
jdbc.update("INSERT INTO agent_metrics (instance_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)",
"agent-1", "cpu.usage", 50.0 + i * 5, java.sql.Timestamp.from(ts));
jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)",
jdbc.update("INSERT INTO agent_metrics (instance_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)",
"agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts));
}

View File

@@ -39,14 +39,14 @@ class ClickHouseMetricsStoreIT {
CREATE TABLE IF NOT EXISTS agent_metrics (
tenant_id LowCardinality(String) DEFAULT 'default',
collected_at DateTime64(3),
agent_id LowCardinality(String),
instance_id LowCardinality(String),
metric_name LowCardinality(String),
metric_value Float64,
tags Map(String, String) DEFAULT map(),
server_received_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
ORDER BY (tenant_id, agent_id, metric_name, collected_at)
ORDER BY (tenant_id, instance_id, metric_name, collected_at)
""");
jdbc.execute("TRUNCATE TABLE agent_metrics");
@@ -66,7 +66,7 @@ class ClickHouseMetricsStoreIT {
store.insertBatch(batch);
Integer count = jdbc.queryForObject(
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-1'",
"SELECT count() FROM agent_metrics WHERE instance_id = 'agent-1'",
Integer.class);
assertThat(count).isEqualTo(2);
}
@@ -80,7 +80,7 @@ class ClickHouseMetricsStoreIT {
// Just verify we can read back the row with tags
Integer count = jdbc.queryForObject(
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-2'",
"SELECT count() FROM agent_metrics WHERE instance_id = 'agent-2'",
Integer.class);
assertThat(count).isEqualTo(1);
}
@@ -101,7 +101,7 @@ class ClickHouseMetricsStoreIT {
));
Integer count = jdbc.queryForObject(
"SELECT count() FROM agent_metrics WHERE agent_id = 'agent-3'",
"SELECT count() FROM agent_metrics WHERE instance_id = 'agent-3'",
Integer.class);
assertThat(count).isEqualTo(1);
}

View File

@@ -43,13 +43,15 @@ class ClickHouseStatsStoreIT {
jdbc = new JdbcTemplate(ds);
// Load DDL from classpath resources
// Load DDL from classpath resources (V2, V3, V4 create tables with old column names)
String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql")
.getContentAsString(StandardCharsets.UTF_8);
String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql")
.getContentAsString(StandardCharsets.UTF_8);
String statsDdl = new ClassPathResource("clickhouse/V4__stats_tables_and_mvs.sql")
.getContentAsString(StandardCharsets.UTF_8);
String renameDdl = new ClassPathResource("clickhouse/V9__rename_identity_columns.sql")
.getContentAsString(StandardCharsets.UTF_8);
jdbc.execute(executionsDdl);
jdbc.execute(processorsDdl);
@@ -66,10 +68,18 @@ class ClickHouseStatsStoreIT {
jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_detail");
// Strip SQL line comments first (they may contain semicolons),
// then split by ';' and execute non-empty statements.
String cleanedDdl = statsDdl.replaceAll("--[^\n]*", "");
for (String stmt : cleanedDdl.split(";")) {
// Create stats tables and MVs (using old column names from V4)
String cleanedStatsDdl = statsDdl.replaceAll("--[^\n]*", "");
for (String stmt : cleanedStatsDdl.split(";")) {
String trimmed = stmt.trim();
if (!trimmed.isEmpty()) {
jdbc.execute(trimmed);
}
}
// Apply identity column renames (V9 migration)
String cleanedRenameDdl = renameDdl.replaceAll("--[^\n]*", "");
for (String stmt : cleanedRenameDdl.split(";")) {
String trimmed = stmt.trim();
if (!trimmed.isEmpty()) {
jdbc.execute(trimmed);
@@ -157,27 +167,27 @@ class ClickHouseStatsStoreIT {
"app-1", "route-a", "COMPLETED", 15L);
}
private void insertExecution(String executionId, Instant startTime, String appName,
String routeId, String agentId, String status,
private void insertExecution(String executionId, Instant startTime, String appId,
String routeId, String instanceId, String status,
Long durationMs, String errorType, String errorMessage) {
jdbc.update(
"INSERT INTO executions (tenant_id, execution_id, start_time, route_id, " +
"agent_id, application_name, status, duration_ms, error_type, error_message) " +
"instance_id, application_id, status, duration_ms, error_type, error_message) " +
"VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?)",
executionId, Timestamp.from(startTime), routeId, agentId, appName,
executionId, Timestamp.from(startTime), routeId, instanceId, appId,
status, durationMs, errorType, errorMessage);
}
private void insertProcessor(String executionId, int seq, String processorId,
String processorType, Instant startTime,
String appName, String routeId, String status,
String appId, String routeId, String status,
Long durationMs) {
jdbc.update(
"INSERT INTO processor_executions (tenant_id, execution_id, seq, processor_id, " +
"processor_type, start_time, route_id, application_name, status, duration_ms) " +
"processor_type, start_time, route_id, application_id, status, duration_ms) " +
"VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?)",
executionId, seq, processorId, processorType, Timestamp.from(startTime),
routeId, appName, status, durationMs);
routeId, appId, status, durationMs);
}
// ── Stats Tests ──────────────────────────────────────────────────────

View File

@@ -54,10 +54,10 @@ class PostgresStatsStoreIT extends AbstractPostgresIT {
assertFalse(ts.buckets().isEmpty());
}
private void insertExecution(String id, String routeId, String applicationName,
private void insertExecution(String id, String routeId, String applicationId,
String status, Instant startTime, long durationMs) {
executionStore.upsert(new ExecutionRecord(
id, routeId, "agent-1", applicationName, status, null, null,
id, routeId, "agent-1", applicationId, status, null, null,
startTime, startTime.plusMillis(durationMs), durationMs,
status.equals("FAILED") ? "error" : null, null, null,
null, null, null, null, null, null,

View File

@@ -3,7 +3,7 @@ package com.cameleer3.server.core.admin;
import java.time.Instant;
public record AppSettings(
String appId,
String applicationId,
int slaThresholdMs,
double healthErrorWarn,
double healthErrorCrit,
@@ -12,8 +12,8 @@ public record AppSettings(
Instant createdAt,
Instant updatedAt) {
public static AppSettings defaults(String appId) {
public static AppSettings defaults(String applicationId) {
Instant now = Instant.now();
return new AppSettings(appId, 300, 1.0, 5.0, 99.0, 95.0, now, now);
return new AppSettings(applicationId, 300, 1.0, 5.0, 99.0, 95.0, now, now);
}
}

View File

@@ -4,8 +4,8 @@ import java.util.List;
import java.util.Optional;
public interface AppSettingsRepository {
Optional<AppSettings> findByAppId(String appId);
Optional<AppSettings> findByApplicationId(String applicationId);
List<AppSettings> findAll();
AppSettings save(AppSettings settings);
void delete(String appId);
void delete(String applicationId);
}

View File

@@ -8,7 +8,7 @@ import java.time.Instant;
* @param id unique command identifier (UUID)
* @param type command type
* @param payload raw JSON payload
* @param targetAgentId target agent identifier
* @param targetInstanceId target agent instance identifier
* @param createdAt when the command was created
* @param status current delivery status
*/
@@ -16,12 +16,12 @@ public record AgentCommand(
String id,
CommandType type,
String payload,
String targetAgentId,
String targetInstanceId,
Instant createdAt,
CommandStatus status
) {
public AgentCommand withStatus(CommandStatus newStatus) {
return new AgentCommand(id, type, payload, targetAgentId, createdAt, newStatus);
return new AgentCommand(id, type, payload, targetInstanceId, createdAt, newStatus);
}
}

View File

@@ -12,8 +12,8 @@ public interface AgentEventListener {
/**
* Called when a new command is ready to be delivered to an agent.
*
* @param agentId the target agent identifier
* @param instanceId the target agent instance identifier
* @param command the command to deliver
*/
void onCommandReady(String agentId, AgentCommand command);
void onCommandReady(String instanceId, AgentCommand command);
}

View File

@@ -4,8 +4,8 @@ import java.time.Instant;
public record AgentEventRecord(
long id,
String agentId,
String appId,
String instanceId,
String applicationId,
String eventType,
String detail,
Instant timestamp

View File

@@ -5,7 +5,7 @@ import java.util.List;
public interface AgentEventRepository {
void insert(String agentId, String appId, String eventType, String detail);
void insert(String instanceId, String applicationId, String eventType, String detail);
List<AgentEventRecord> query(String appId, String agentId, Instant from, Instant to, int limit);
List<AgentEventRecord> query(String applicationId, String instanceId, Instant from, Instant to, int limit);
}

View File

@@ -16,12 +16,12 @@ public class AgentEventService {
this.repository = repository;
}
public void recordEvent(String agentId, String appId, String eventType, String detail) {
log.debug("Recording agent event: agent={}, app={}, type={}", agentId, appId, eventType);
repository.insert(agentId, appId, eventType, detail);
public void recordEvent(String instanceId, String applicationId, String eventType, String detail) {
log.debug("Recording agent event: instance={}, app={}, type={}", instanceId, applicationId, eventType);
repository.insert(instanceId, applicationId, eventType, detail);
}
public List<AgentEventRecord> queryEvents(String appId, String agentId, Instant from, Instant to, int limit) {
return repository.query(appId, agentId, from, to, limit);
public List<AgentEventRecord> queryEvents(String applicationId, String instanceId, Instant from, Instant to, int limit) {
return repository.query(applicationId, instanceId, from, to, limit);
}
}

View File

@@ -11,9 +11,9 @@ import java.util.Map;
* via {@code computeIfPresent} for thread-safe state transitions. Wither-style methods
* return new instances with the specified field changed.
*
* @param id agent-provided persistent identifier
* @param name human-readable agent name
* @param application application name (e.g., "order-service-prod")
* @param instanceId agent-provided persistent identifier
* @param displayName human-readable agent name
* @param applicationId application identifier (e.g., "order-service-prod")
* @param version agent software version
* @param routeIds list of Camel route IDs managed by this agent
* @param capabilities agent-declared capabilities (free-form)
@@ -23,9 +23,9 @@ import java.util.Map;
* @param staleTransitionTime when the agent transitioned to STALE (null if not STALE/DEAD)
*/
public record AgentInfo(
String id,
String name,
String application,
String instanceId,
String displayName,
String applicationId,
String version,
List<String> routeIds,
Map<String, Object> capabilities,
@@ -36,28 +36,28 @@ public record AgentInfo(
) {
public AgentInfo withState(AgentState newState) {
return new AgentInfo(id, name, application, version, routeIds, capabilities,
return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities,
newState, registeredAt, lastHeartbeat, staleTransitionTime);
}
public AgentInfo withLastHeartbeat(Instant newLastHeartbeat) {
return new AgentInfo(id, name, application, version, routeIds, capabilities,
return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities,
state, registeredAt, newLastHeartbeat, staleTransitionTime);
}
public AgentInfo withRegisteredAt(Instant newRegisteredAt) {
return new AgentInfo(id, name, application, version, routeIds, capabilities,
return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities,
state, newRegisteredAt, lastHeartbeat, staleTransitionTime);
}
public AgentInfo withStaleTransitionTime(Instant newStaleTransitionTime) {
return new AgentInfo(id, name, application, version, routeIds, capabilities,
return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities,
state, registeredAt, lastHeartbeat, newStaleTransitionTime);
}
public AgentInfo withMetadata(String name, String application, String version,
public AgentInfo withMetadata(String displayName, String applicationId, String version,
List<String> routeIds, Map<String, Object> capabilities) {
return new AgentInfo(id, name, application, version, routeIds, capabilities,
return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities,
state, registeredAt, lastHeartbeat, staleTransitionTime);
}
}

View File

@@ -106,10 +106,39 @@ public class AgentRegistryService {
});
}
/**
* Gracefully shut down an agent. Transitions to SHUTDOWN state,
* which is excluded from the LIVE -> STALE -> DEAD lifecycle.
*
* @return true if the agent was found and transitioned
*/
public boolean shutdown(String id) {
AgentInfo updated = agents.computeIfPresent(id, (key, existing) -> {
log.info("Agent {} graceful shutdown ({} -> SHUTDOWN)", id, existing.state());
return existing.withState(AgentState.SHUTDOWN);
});
return updated != null;
}
/**
* Remove an agent from the registry entirely.
*
* @return true if the agent was found and removed
*/
public boolean deregister(String id) {
AgentInfo removed = agents.remove(id);
if (removed != null) {
commands.remove(id);
log.info("Agent {} deregistered (was {})", id, removed.state());
}
return removed != null;
}
/**
* Check all agents and apply lifecycle transitions:
* LIVE -> STALE when lastHeartbeat exceeds staleThresholdMs,
* STALE -> DEAD when staleTransitionTime exceeds deadThresholdMs.
* SHUTDOWN and DEAD agents are skipped.
*/
public void checkLifecycle() {
Instant now = Instant.now();
@@ -174,7 +203,7 @@ public class AgentRegistryService {
*/
public List<AgentInfo> findByApplication(String application) {
return agents.values().stream()
.filter(a -> application.equals(a.application()))
.filter(a -> application.equals(a.applicationId()))
.collect(Collectors.toList());
}

View File

@@ -6,5 +6,6 @@ package com.cameleer3.server.core.agent;
public enum AgentState {
LIVE,
STALE,
DEAD
DEAD,
SHUTDOWN
}

View File

@@ -35,8 +35,8 @@ public class DetailService {
processors = buildTree(records);
}
return new ExecutionDetail(
exec.executionId(), exec.routeId(), exec.agentId(),
exec.applicationName(),
exec.executionId(), exec.routeId(), exec.instanceId(),
exec.applicationId(),
exec.status(), exec.startTime(), exec.endTime(),
exec.durationMs() != null ? exec.durationMs() : 0L,
exec.correlationId(), exec.exchangeId(),

View File

@@ -12,7 +12,7 @@ import java.util.Map;
*
* @param executionId unique execution identifier
* @param routeId Camel route ID
* @param agentId agent instance that reported the execution
* @param instanceId agent instance that reported the execution
* @param status execution status (COMPLETED, FAILED, RUNNING)
* @param startTime execution start time
* @param endTime execution end time (may be null for RUNNING)
@@ -31,8 +31,8 @@ import java.util.Map;
public record ExecutionDetail(
String executionId,
String routeId,
String agentId,
String applicationName,
String instanceId,
String applicationId,
String status,
Instant startTime,
Instant endTime,

View File

@@ -75,7 +75,7 @@ public class SearchIndexer implements SearchIndexerStats {
.toList();
searchIndex.index(new ExecutionDocument(
exec.executionId(), exec.routeId(), exec.agentId(), exec.applicationName(),
exec.executionId(), exec.routeId(), exec.instanceId(), exec.applicationId(),
exec.status(), exec.correlationId(), exec.exchangeId(),
exec.startTime(), exec.endTime(), exec.durationMs(),
exec.errorMessage(), exec.errorStacktrace(), processorDocs,

View File

@@ -55,7 +55,7 @@ public class ChunkAccumulator {
DEFAULT_TENANT,
chunk.getExchangeId(),
chunk.getRouteId(),
chunk.getApplicationName(),
chunk.getApplicationId(),
chunk.getStartTime(),
chunk.getProcessors()));
}
@@ -110,8 +110,8 @@ public class ChunkAccumulator {
private static ExecutionChunk mergeEnvelopes(ExecutionChunk older, ExecutionChunk newer) {
ExecutionChunk merged = new ExecutionChunk();
merged.setExchangeId(coalesce(newer.getExchangeId(), older.getExchangeId()));
merged.setApplicationName(coalesce(newer.getApplicationName(), older.getApplicationName()));
merged.setAgentId(coalesce(newer.getAgentId(), older.getAgentId()));
merged.setApplicationId(coalesce(newer.getApplicationId(), older.getApplicationId()));
merged.setInstanceId(coalesce(newer.getInstanceId(), older.getInstanceId()));
merged.setRouteId(coalesce(newer.getRouteId(), older.getRouteId()));
merged.setCorrelationId(coalesce(newer.getCorrelationId(), older.getCorrelationId()));
merged.setStatus(coalesce(newer.getStatus(), older.getStatus()));
@@ -146,7 +146,7 @@ public class ChunkAccumulator {
String diagramHash = "";
try {
diagramHash = diagramStore
.findContentHashForRoute(envelope.getRouteId(), envelope.getAgentId())
.findContentHashForRoute(envelope.getRouteId(), envelope.getInstanceId())
.orElse("");
} catch (Exception e) {
log.debug("Could not resolve diagram hash for route={}", envelope.getRouteId());
@@ -156,8 +156,8 @@ public class ChunkAccumulator {
1L,
envelope.getExchangeId(),
envelope.getRouteId(),
envelope.getAgentId(),
envelope.getApplicationName(),
envelope.getInstanceId(),
envelope.getApplicationId(),
envelope.getStatus() != null ? envelope.getStatus().name() : "RUNNING",
envelope.getCorrelationId(),
envelope.getExchangeId(),
@@ -207,7 +207,7 @@ public class ChunkAccumulator {
String tenantId,
String executionId,
String routeId,
String applicationName,
String applicationId,
Instant execStartTime,
List<FlatProcessorRecord> processors
) {}

View File

@@ -42,18 +42,18 @@ public class IngestionService {
this.bodySizeLimit = bodySizeLimit;
}
public void ingestExecution(String agentId, String applicationName, RouteExecution execution) {
ExecutionRecord record = toExecutionRecord(agentId, applicationName, execution);
public void ingestExecution(String instanceId, String applicationId, RouteExecution execution) {
ExecutionRecord record = toExecutionRecord(instanceId, applicationId, execution);
executionStore.upsert(record);
if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) {
List<ProcessorRecord> processors = flattenProcessors(
execution.getProcessors(), record.executionId(),
record.startTime(), applicationName, execution.getRouteId(),
record.startTime(), applicationId, execution.getRouteId(),
null, 0);
executionStore.upsertProcessors(
record.executionId(), record.startTime(),
applicationName, execution.getRouteId(), processors);
applicationId, execution.getRouteId(), processors);
}
eventPublisher.accept(new ExecutionUpdatedEvent(
@@ -76,10 +76,10 @@ public class IngestionService {
return metricsBuffer;
}
private ExecutionRecord toExecutionRecord(String agentId, String applicationName,
private ExecutionRecord toExecutionRecord(String instanceId, String applicationId,
RouteExecution exec) {
String diagramHash = diagramStore
.findContentHashForRoute(exec.getRouteId(), agentId)
.findContentHashForRoute(exec.getRouteId(), instanceId)
.orElse("");
// Extract route-level snapshots (critical for REGULAR mode where no processors are recorded)
@@ -109,7 +109,7 @@ public class IngestionService {
}
return new ExecutionRecord(
exec.getExchangeId(), exec.getRouteId(), agentId, applicationName,
exec.getExchangeId(), exec.getRouteId(), instanceId, applicationId,
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
exec.getCorrelationId(), exec.getExchangeId(),
exec.getStartTime(), exec.getEndTime(),
@@ -138,13 +138,13 @@ public class IngestionService {
private List<ProcessorRecord> flattenProcessors(
List<ProcessorExecution> processors, String executionId,
java.time.Instant execStartTime, String applicationName, String routeId,
java.time.Instant execStartTime, String applicationId, String routeId,
String parentProcessorId, int depth) {
List<ProcessorRecord> flat = new ArrayList<>();
for (ProcessorExecution p : processors) {
flat.add(new ProcessorRecord(
executionId, p.getProcessorId(), p.getProcessorType(),
applicationName, routeId,
applicationId, routeId,
depth, parentProcessorId,
p.getStatus() != null ? p.getStatus().name() : "RUNNING",
p.getStartTime() != null ? p.getStartTime() : execStartTime,

View File

@@ -11,8 +11,8 @@ public record MergedExecution(
long version,
String executionId,
String routeId,
String agentId,
String applicationName,
String instanceId,
String applicationId,
String status,
String correlationId,
String exchangeId,

View File

@@ -8,4 +8,4 @@ import com.cameleer3.common.graph.RouteGraph;
* The agent ID is extracted from the SecurityContext in the controller layer
* and carried through the write buffer so the flush scheduler can persist it.
*/
public record TaggedDiagram(String agentId, String applicationName, RouteGraph graph) {}
public record TaggedDiagram(String instanceId, String applicationId, RouteGraph graph) {}

View File

@@ -8,4 +8,4 @@ import com.cameleer3.common.model.RouteExecution;
* The agent ID is extracted from the SecurityContext in the controller layer
* and carried through the write buffer so the flush scheduler can persist it.
*/
public record TaggedExecution(String agentId, RouteExecution execution) {}
public record TaggedExecution(String instanceId, RouteExecution execution) {}

View File

@@ -11,7 +11,7 @@ import java.util.Map;
*
* @param executionId unique execution identifier
* @param routeId Camel route ID
* @param agentId agent instance that reported the execution
* @param instanceId agent instance that reported the execution
* @param status execution status (COMPLETED, FAILED, RUNNING)
* @param startTime execution start time
* @param endTime execution end time (may be null for RUNNING)
@@ -23,8 +23,8 @@ import java.util.Map;
public record ExecutionSummary(
String executionId,
String routeId,
String agentId,
String applicationName,
String instanceId,
String applicationId,
String status,
Instant startTime,
Instant endTime,

View File

@@ -20,10 +20,10 @@ import java.util.List;
* @param textInHeaders full-text search scoped to exchange headers
* @param textInErrors full-text search scoped to error messages and stack traces
* @param routeId exact match on route_id
* @param agentId exact match on agent_id
* @param instanceId exact match on instance_id
* @param processorType matches processor_types array via has()
* @param application application name filter (resolved to agentIds server-side)
* @param agentIds list of agent IDs (resolved from group, used for IN clause)
* @param applicationId application ID filter (resolved to instanceIds server-side)
* @param instanceIds list of instance IDs (resolved from application, used for IN clause)
* @param offset pagination offset (0-based)
* @param limit page size (default 50, max 500)
* @param sortField column to sort by (default: startTime)
@@ -41,10 +41,10 @@ public record SearchRequest(
String textInHeaders,
String textInErrors,
String routeId,
String agentId,
String instanceId,
String processorType,
String application,
List<String> agentIds,
String applicationId,
List<String> instanceIds,
int offset,
int limit,
String sortField,
@@ -55,8 +55,8 @@ public record SearchRequest(
private static final int MAX_LIMIT = 500;
private static final java.util.Set<String> ALLOWED_SORT_FIELDS = java.util.Set.of(
"startTime", "status", "agentId", "routeId", "correlationId",
"durationMs", "executionId", "applicationName"
"startTime", "status", "instanceId", "routeId", "correlationId",
"durationMs", "executionId", "applicationId"
);
/** Maps camelCase API sort field names to OpenSearch field names.
@@ -65,11 +65,11 @@ public record SearchRequest(
java.util.Map.entry("startTime", "start_time"),
java.util.Map.entry("durationMs", "duration_ms"),
java.util.Map.entry("status", "status.keyword"),
java.util.Map.entry("agentId", "agent_id.keyword"),
java.util.Map.entry("instanceId", "instance_id.keyword"),
java.util.Map.entry("routeId", "route_id.keyword"),
java.util.Map.entry("correlationId", "correlation_id.keyword"),
java.util.Map.entry("executionId", "execution_id.keyword"),
java.util.Map.entry("applicationName", "application_name.keyword")
java.util.Map.entry("applicationId", "application_id.keyword")
);
public SearchRequest {
@@ -85,12 +85,12 @@ public record SearchRequest(
return SORT_FIELD_TO_COLUMN.getOrDefault(sortField, "start_time");
}
/** Create a copy with resolved agentIds (from application name lookup). */
public SearchRequest withAgentIds(List<String> resolvedAgentIds) {
/** Create a copy with resolved instanceIds (from application ID lookup). */
public SearchRequest withInstanceIds(List<String> resolvedInstanceIds) {
return new SearchRequest(
status, timeFrom, timeTo, durationMin, durationMax, correlationId,
text, textInBody, textInHeaders, textInErrors,
routeId, agentId, processorType, application, resolvedAgentIds,
routeId, instanceId, processorType, applicationId, resolvedInstanceIds,
offset, limit, sortField, sortDir
);
}

View File

@@ -29,8 +29,8 @@ public class SearchService {
return statsStore.stats(from, to);
}
public ExecutionStats statsForApp(Instant from, Instant to, String applicationName) {
return statsStore.statsForApp(from, to, applicationName);
public ExecutionStats statsForApp(Instant from, Instant to, String applicationId) {
return statsStore.statsForApp(from, to, applicationId);
}
public ExecutionStats stats(Instant from, Instant to, String routeId, List<String> agentIds) {
@@ -41,8 +41,8 @@ public class SearchService {
return statsStore.timeseries(from, to, bucketCount);
}
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationName) {
return statsStore.timeseriesForApp(from, to, bucketCount, applicationName);
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationId) {
return statsStore.timeseriesForApp(from, to, bucketCount, applicationId);
}
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount,
@@ -57,13 +57,13 @@ public class SearchService {
}
public Map<String, StatsTimeseries> timeseriesGroupedByRoute(Instant from, Instant to,
int bucketCount, String applicationName) {
return statsStore.timeseriesGroupedByRoute(from, to, bucketCount, applicationName);
int bucketCount, String applicationId) {
return statsStore.timeseriesGroupedByRoute(from, to, bucketCount, applicationId);
}
public double slaCompliance(Instant from, Instant to, int thresholdMs,
String applicationName, String routeId) {
return statsStore.slaCompliance(from, to, thresholdMs, applicationName, routeId);
String applicationId, String routeId) {
return statsStore.slaCompliance(from, to, thresholdMs, applicationId, routeId);
}
public Map<String, long[]> slaCountsByApp(Instant from, Instant to, int defaultThresholdMs) {
@@ -71,20 +71,20 @@ public class SearchService {
}
public Map<String, long[]> slaCountsByRoute(Instant from, Instant to,
String applicationName, int thresholdMs) {
return statsStore.slaCountsByRoute(from, to, applicationName, thresholdMs);
String applicationId, int thresholdMs) {
return statsStore.slaCountsByRoute(from, to, applicationId, thresholdMs);
}
public List<TopError> topErrors(Instant from, Instant to, String applicationName,
public List<TopError> topErrors(Instant from, Instant to, String applicationId,
String routeId, int limit) {
return statsStore.topErrors(from, to, applicationName, routeId, limit);
return statsStore.topErrors(from, to, applicationId, routeId, limit);
}
public int activeErrorTypes(Instant from, Instant to, String applicationName) {
return statsStore.activeErrorTypes(from, to, applicationName);
public int activeErrorTypes(Instant from, Instant to, String applicationId) {
return statsStore.activeErrorTypes(from, to, applicationId);
}
public List<StatsStore.PunchcardCell> punchcard(Instant from, Instant to, String applicationName) {
return statsStore.punchcard(from, to, applicationName);
public List<StatsStore.PunchcardCell> punchcard(Instant from, Instant to, String applicationId) {
return statsStore.punchcard(from, to, applicationId);
}
}

View File

@@ -13,9 +13,9 @@ public interface DiagramStore {
Optional<RouteGraph> findByContentHash(String contentHash);
Optional<String> findContentHashForRoute(String routeId, String agentId);
Optional<String> findContentHashForRoute(String routeId, String instanceId);
Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds);
Optional<String> findContentHashForRouteByAgents(String routeId, List<String> instanceIds);
Map<String, String> findProcessorRouteMapping(String applicationName);
Map<String, String> findProcessorRouteMapping(String applicationId);
}

View File

@@ -9,7 +9,7 @@ public interface ExecutionStore {
void upsert(ExecutionRecord execution);
void upsertProcessors(String executionId, Instant startTime,
String applicationName, String routeId,
String applicationId, String routeId,
List<ProcessorRecord> processors);
Optional<ExecutionRecord> findById(String executionId);
@@ -19,7 +19,7 @@ public interface ExecutionStore {
Optional<ProcessorRecord> findProcessorById(String executionId, String processorId);
record ExecutionRecord(
String executionId, String routeId, String agentId, String applicationName,
String executionId, String routeId, String instanceId, String applicationId,
String status, String correlationId, String exchangeId,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace, String diagramContentHash,
@@ -36,7 +36,7 @@ public interface ExecutionStore {
record ProcessorRecord(
String executionId, String processorId, String processorType,
String applicationName, String routeId,
String applicationId, String routeId,
int depth, String parentProcessorId, String status,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace,

View File

@@ -7,9 +7,9 @@ import java.util.List;
public interface LogIndex {
List<LogEntryResult> search(String application, String agentId, String level,
List<LogEntryResult> search(String applicationId, String instanceId, String level,
String query, String exchangeId,
Instant from, Instant to, int limit);
void indexBatch(String agentId, String application, List<LogEntry> entries);
void indexBatch(String instanceId, String applicationId, List<LogEntry> entries);
}

View File

@@ -9,6 +9,6 @@ import java.util.Map;
public interface MetricsQueryStore {
Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
String agentId, List<String> metricNames,
String instanceId, List<String> metricNames,
Instant from, Instant to, int buckets);
}

View File

@@ -14,7 +14,7 @@ public interface StatsStore {
ExecutionStats stats(Instant from, Instant to);
// Per-app stats (stats_1m_app)
ExecutionStats statsForApp(Instant from, Instant to, String applicationName);
ExecutionStats statsForApp(Instant from, Instant to, String applicationId);
// Per-route stats (stats_1m_route), optionally scoped to specific agents
ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds);
@@ -26,7 +26,7 @@ public interface StatsStore {
StatsTimeseries timeseries(Instant from, Instant to, int bucketCount);
// Per-app timeseries
StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationName);
StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationId);
// Per-route timeseries, optionally scoped to specific agents
StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
@@ -41,28 +41,28 @@ public interface StatsStore {
// Grouped timeseries by route within an application (for L2 dashboard charts)
Map<String, StatsTimeseries> timeseriesGroupedByRoute(Instant from, Instant to, int bucketCount,
String applicationName);
String applicationId);
// SLA compliance: % of completed exchanges with duration <= thresholdMs
double slaCompliance(Instant from, Instant to, int thresholdMs,
String applicationName, String routeId);
String applicationId, String routeId);
// Batch SLA counts by app: {appId -> [compliant, total]}
Map<String, long[]> slaCountsByApp(Instant from, Instant to, int defaultThresholdMs);
// Batch SLA counts by route within an app: {routeId -> [compliant, total]}
Map<String, long[]> slaCountsByRoute(Instant from, Instant to, String applicationName,
Map<String, long[]> slaCountsByRoute(Instant from, Instant to, String applicationId,
int thresholdMs);
// Top N errors with velocity trend
List<TopError> topErrors(Instant from, Instant to, String applicationName,
List<TopError> topErrors(Instant from, Instant to, String applicationId,
String routeId, int limit);
// Count of distinct error types in window
int activeErrorTypes(Instant from, Instant to, String applicationName);
int activeErrorTypes(Instant from, Instant to, String applicationId);
// Punchcard: aggregate by weekday (0=Sun..6=Sat) x hour (0-23) over last 7 days
List<PunchcardCell> punchcard(Instant from, Instant to, String applicationName);
List<PunchcardCell> punchcard(Instant from, Instant to, String applicationId);
record PunchcardCell(int weekday, int hour, long totalCount, long failedCount) {}
}

View File

@@ -4,7 +4,7 @@ import java.time.Instant;
import java.util.List;
public record ExecutionDocument(
String executionId, String routeId, String agentId, String applicationName,
String executionId, String routeId, String instanceId, String applicationId,
String status, String correlationId, String exchangeId,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace,

View File

@@ -7,7 +7,7 @@ import java.util.Map;
* A single metrics data point from an agent.
*/
public record MetricsSnapshot(
String agentId,
String instanceId,
Instant collectedAt,
String metricName,
double metricValue,

View File

@@ -30,9 +30,9 @@ class AgentRegistryServiceTest {
"1.0.0", List.of("route1", "route2"), Map.of("feature", "tracing"));
assertThat(agent).isNotNull();
assertThat(agent.id()).isEqualTo("agent-1");
assertThat(agent.name()).isEqualTo("Order Agent");
assertThat(agent.application()).isEqualTo("order-svc");
assertThat(agent.instanceId()).isEqualTo("agent-1");
assertThat(agent.displayName()).isEqualTo("Order Agent");
assertThat(agent.applicationId()).isEqualTo("order-svc");
assertThat(agent.version()).isEqualTo("1.0.0");
assertThat(agent.routeIds()).containsExactly("route1", "route2");
assertThat(agent.capabilities()).containsEntry("feature", "tracing");
@@ -50,9 +50,9 @@ class AgentRegistryServiceTest {
AgentInfo updated = registry.register("agent-1", "New Name", "new-group",
"2.0.0", List.of("route1", "route2"), Map.of("new", "cap"));
assertThat(updated.id()).isEqualTo("agent-1");
assertThat(updated.name()).isEqualTo("New Name");
assertThat(updated.application()).isEqualTo("new-group");
assertThat(updated.instanceId()).isEqualTo("agent-1");
assertThat(updated.displayName()).isEqualTo("New Name");
assertThat(updated.applicationId()).isEqualTo("new-group");
assertThat(updated.version()).isEqualTo("2.0.0");
assertThat(updated.routeIds()).containsExactly("route1", "route2");
assertThat(updated.capabilities()).containsEntry("new", "cap");
@@ -192,7 +192,7 @@ class AgentRegistryServiceTest {
List<AgentInfo> all = registry.findAll();
assertThat(all).hasSize(2);
assertThat(all).extracting(AgentInfo::id).containsExactlyInAnyOrder("agent-1", "agent-2");
assertThat(all).extracting(AgentInfo::instanceId).containsExactlyInAnyOrder("agent-1", "agent-2");
}
@Test
@@ -204,8 +204,8 @@ class AgentRegistryServiceTest {
List<AgentInfo> live = registry.findByState(AgentState.LIVE);
List<AgentInfo> stale = registry.findByState(AgentState.STALE);
assertThat(live).hasSize(1).extracting(AgentInfo::id).containsExactly("agent-1");
assertThat(stale).hasSize(1).extracting(AgentInfo::id).containsExactly("agent-2");
assertThat(live).hasSize(1).extracting(AgentInfo::instanceId).containsExactly("agent-1");
assertThat(stale).hasSize(1).extracting(AgentInfo::instanceId).containsExactly("agent-2");
}
@Test
@@ -222,7 +222,7 @@ class AgentRegistryServiceTest {
AgentInfo result = registry.findById("agent-1");
assertThat(result).isNotNull();
assertThat(result.id()).isEqualTo("agent-1");
assertThat(result.instanceId()).isEqualTo("agent-1");
}
}
@@ -239,7 +239,7 @@ class AgentRegistryServiceTest {
assertThat(cmd.id()).isNotNull();
assertThat(cmd.type()).isEqualTo(CommandType.CONFIG_UPDATE);
assertThat(cmd.payload()).isEqualTo("{\"key\":\"val\"}");
assertThat(cmd.targetAgentId()).isEqualTo("agent-1");
assertThat(cmd.targetInstanceId()).isEqualTo("agent-1");
assertThat(cmd.status()).isEqualTo(CommandStatus.PENDING);
assertThat(cmd.createdAt()).isNotNull();
}

View File

@@ -58,7 +58,7 @@ class ChunkAccumulatorTest {
assertThat(batch.tenantId()).isEqualTo("default");
assertThat(batch.executionId()).isEqualTo("ex-1");
assertThat(batch.routeId()).isEqualTo("route-1");
assertThat(batch.applicationName()).isEqualTo("order-service");
assertThat(batch.applicationId()).isEqualTo("order-service");
assertThat(batch.execStartTime()).isEqualTo(Instant.parse("2026-03-31T10:00:00Z"));
assertThat(batch.processors()).hasSize(1);
@@ -206,8 +206,8 @@ class ChunkAccumulatorTest {
int chunkSeq, boolean isFinal, List<FlatProcessorRecord> processors) {
ExecutionChunk c = new ExecutionChunk();
c.setExchangeId(exchangeId);
c.setApplicationName(exchangeId.equals("ex-1") ? "order-service" : "app");
c.setAgentId("agent-1");
c.setApplicationId(exchangeId.equals("ex-1") ? "order-service" : "app");
c.setInstanceId("agent-1");
c.setRouteId("route-1");
c.setCorrelationId(null);
c.setStatus(ExecutionStatus.valueOf(status));