refactor: rename group_name→application_name in DB, OpenSearch, SQL
Consolidate V1-V7 Flyway migrations into single V1__init.sql with all columns renamed from group_name to application_name. Requires fresh database (wipe flyway_schema_history, all data). - DB columns: executions.group_name → application_name, processor_executions.group_name → application_name - Continuous aggregates: all views updated to use application_name - OpenSearch field: group_name → application_name in index/query - All Java SQL strings updated to match new column names - Delete V2-V7 migration files (folded into V1) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -242,19 +242,19 @@ public class AgentRegistrationController {
|
||||
Instant from1m = now.minus(1, ChronoUnit.MINUTES);
|
||||
try {
|
||||
jdbc.query(
|
||||
"SELECT group_name, " +
|
||||
"SELECT application_name, " +
|
||||
"SUM(total_count) AS total, " +
|
||||
"SUM(failed_count) AS failed, " +
|
||||
"COUNT(DISTINCT route_id) AS active_routes " +
|
||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
|
||||
"GROUP BY group_name",
|
||||
"GROUP BY application_name",
|
||||
rs -> {
|
||||
long total = rs.getLong("total");
|
||||
long failed = rs.getLong("failed");
|
||||
double tps = total / 60.0;
|
||||
double errorRate = total > 0 ? (double) failed / total : 0.0;
|
||||
int activeRoutes = rs.getInt("active_routes");
|
||||
result.put(rs.getString("group_name"), new double[]{tps, errorRate, activeRoutes});
|
||||
result.put(rs.getString("application_name"), new double[]{tps, errorRate, activeRoutes});
|
||||
},
|
||||
Timestamp.from(from1m), Timestamp.from(now));
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -73,11 +73,11 @@ public class RouteCatalogController {
|
||||
Map<String, Instant> routeLastSeen = new LinkedHashMap<>();
|
||||
try {
|
||||
jdbc.query(
|
||||
"SELECT group_name, route_id, SUM(total_count) AS cnt, MAX(bucket) AS last_seen " +
|
||||
"SELECT application_name, route_id, SUM(total_count) AS cnt, MAX(bucket) AS last_seen " +
|
||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
|
||||
"GROUP BY group_name, route_id",
|
||||
"GROUP BY application_name, route_id",
|
||||
rs -> {
|
||||
String key = rs.getString("group_name") + "/" + rs.getString("route_id");
|
||||
String key = rs.getString("application_name") + "/" + rs.getString("route_id");
|
||||
routeExchangeCounts.put(key, rs.getLong("cnt"));
|
||||
Timestamp ts = rs.getTimestamp("last_seen");
|
||||
if (ts != null) routeLastSeen.put(key, ts.toInstant());
|
||||
@@ -91,9 +91,9 @@ public class RouteCatalogController {
|
||||
Map<String, Double> agentTps = new LinkedHashMap<>();
|
||||
try {
|
||||
jdbc.query(
|
||||
"SELECT group_name, SUM(total_count) AS cnt " +
|
||||
"SELECT application_name, SUM(total_count) AS cnt " +
|
||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
|
||||
"GROUP BY group_name",
|
||||
"GROUP BY application_name",
|
||||
rs -> {
|
||||
// This gives per-app TPS; we'll distribute among agents below
|
||||
},
|
||||
|
||||
@@ -44,7 +44,7 @@ public class RouteMetricsController {
|
||||
long windowSeconds = Duration.between(fromInstant, toInstant).toSeconds();
|
||||
|
||||
var sql = new StringBuilder(
|
||||
"SELECT group_name, route_id, " +
|
||||
"SELECT application_name, route_id, " +
|
||||
"SUM(total_count) AS total, " +
|
||||
"SUM(failed_count) AS failed, " +
|
||||
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_dur, " +
|
||||
@@ -55,17 +55,17 @@ public class RouteMetricsController {
|
||||
params.add(Timestamp.from(toInstant));
|
||||
|
||||
if (appId != null) {
|
||||
sql.append(" AND group_name = ?");
|
||||
sql.append(" AND application_name = ?");
|
||||
params.add(appId);
|
||||
}
|
||||
sql.append(" GROUP BY group_name, route_id ORDER BY group_name, route_id");
|
||||
sql.append(" GROUP BY application_name, route_id ORDER BY application_name, route_id");
|
||||
|
||||
// Key struct for sparkline lookup
|
||||
record RouteKey(String appId, String routeId) {}
|
||||
List<RouteKey> routeKeys = new ArrayList<>();
|
||||
|
||||
List<RouteMetrics> metrics = jdbc.query(sql.toString(), (rs, rowNum) -> {
|
||||
String applicationName = rs.getString("group_name");
|
||||
String applicationName = rs.getString("application_name");
|
||||
String routeId = rs.getString("route_id");
|
||||
long total = rs.getLong("total");
|
||||
long failed = rs.getLong("failed");
|
||||
@@ -93,7 +93,7 @@ public class RouteMetricsController {
|
||||
"SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " +
|
||||
"COALESCE(SUM(total_count), 0) AS cnt " +
|
||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
|
||||
"AND group_name = ? AND route_id = ? " +
|
||||
"AND application_name = ? AND route_id = ? " +
|
||||
"GROUP BY period ORDER BY period",
|
||||
(rs, rowNum) -> rs.getDouble("cnt"),
|
||||
bucketSeconds, Timestamp.from(fromInstant), Timestamp.from(toInstant),
|
||||
@@ -124,7 +124,7 @@ public class RouteMetricsController {
|
||||
Instant fromInstant = from != null ? from : toInstant.minus(24, ChronoUnit.HOURS);
|
||||
|
||||
var sql = new StringBuilder(
|
||||
"SELECT processor_id, processor_type, route_id, group_name, " +
|
||||
"SELECT processor_id, processor_type, route_id, application_name, " +
|
||||
"SUM(total_count) AS total_count, " +
|
||||
"SUM(failed_count) AS failed_count, " +
|
||||
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum)::double precision / SUM(total_count) ELSE 0 END AS avg_duration_ms, " +
|
||||
@@ -137,10 +137,10 @@ public class RouteMetricsController {
|
||||
params.add(routeId);
|
||||
|
||||
if (appId != null) {
|
||||
sql.append(" AND group_name = ?");
|
||||
sql.append(" AND application_name = ?");
|
||||
params.add(appId);
|
||||
}
|
||||
sql.append(" GROUP BY processor_id, processor_type, route_id, group_name");
|
||||
sql.append(" GROUP BY processor_id, processor_type, route_id, application_name");
|
||||
sql.append(" ORDER BY SUM(total_count) DESC");
|
||||
|
||||
List<ProcessorMetrics> metrics = jdbc.query(sql.toString(), (rs, rowNum) -> {
|
||||
@@ -151,7 +151,7 @@ public class RouteMetricsController {
|
||||
rs.getString("processor_id"),
|
||||
rs.getString("processor_type"),
|
||||
rs.getString("route_id"),
|
||||
rs.getString("group_name"),
|
||||
rs.getString("application_name"),
|
||||
totalCount,
|
||||
failedCount,
|
||||
rs.getDouble("avg_duration_ms"),
|
||||
|
||||
@@ -288,7 +288,7 @@ public class OpenSearchIndex implements SearchIndex {
|
||||
map.put("execution_id", doc.executionId());
|
||||
map.put("route_id", doc.routeId());
|
||||
map.put("agent_id", doc.agentId());
|
||||
map.put("group_name", doc.applicationName());
|
||||
map.put("application_name", doc.applicationName());
|
||||
map.put("status", doc.status());
|
||||
map.put("correlation_id", doc.correlationId());
|
||||
map.put("exchange_id", doc.exchangeId());
|
||||
@@ -323,7 +323,7 @@ public class OpenSearchIndex implements SearchIndex {
|
||||
(String) src.get("execution_id"),
|
||||
(String) src.get("route_id"),
|
||||
(String) src.get("agent_id"),
|
||||
(String) src.get("group_name"),
|
||||
(String) src.get("application_name"),
|
||||
(String) src.get("status"),
|
||||
src.get("start_time") != null ? Instant.parse((String) src.get("start_time")) : null,
|
||||
src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null,
|
||||
|
||||
@@ -24,7 +24,7 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
@Override
|
||||
public void upsert(ExecutionRecord execution) {
|
||||
jdbc.update("""
|
||||
INSERT INTO executions (execution_id, route_id, agent_id, group_name,
|
||||
INSERT INTO executions (execution_id, route_id, agent_id, application_name,
|
||||
status, correlation_id, exchange_id, start_time, end_time,
|
||||
duration_ms, error_message, error_stacktrace, diagram_content_hash,
|
||||
created_at, updated_at)
|
||||
@@ -59,7 +59,7 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
List<ProcessorRecord> processors) {
|
||||
jdbc.batchUpdate("""
|
||||
INSERT INTO processor_executions (execution_id, processor_id, processor_type,
|
||||
diagram_node_id, group_name, route_id, depth, parent_processor_id,
|
||||
diagram_node_id, application_name, route_id, depth, parent_processor_id,
|
||||
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
|
||||
input_body, output_body, input_headers, output_headers)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb)
|
||||
@@ -103,7 +103,7 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
private static final RowMapper<ExecutionRecord> EXECUTION_MAPPER = (rs, rowNum) ->
|
||||
new ExecutionRecord(
|
||||
rs.getString("execution_id"), rs.getString("route_id"),
|
||||
rs.getString("agent_id"), rs.getString("group_name"),
|
||||
rs.getString("agent_id"), rs.getString("application_name"),
|
||||
rs.getString("status"), rs.getString("correlation_id"),
|
||||
rs.getString("exchange_id"),
|
||||
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
|
||||
@@ -115,7 +115,7 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
new ProcessorRecord(
|
||||
rs.getString("execution_id"), rs.getString("processor_id"),
|
||||
rs.getString("processor_type"), rs.getString("diagram_node_id"),
|
||||
rs.getString("group_name"), rs.getString("route_id"),
|
||||
rs.getString("application_name"), rs.getString("route_id"),
|
||||
rs.getInt("depth"), rs.getString("parent_processor_id"),
|
||||
rs.getString("status"),
|
||||
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
|
||||
|
||||
@@ -31,7 +31,7 @@ public class PostgresStatsStore implements StatsStore {
|
||||
@Override
|
||||
public ExecutionStats statsForApp(Instant from, Instant to, String applicationName) {
|
||||
return queryStats("stats_1m_app", from, to, List.of(
|
||||
new Filter("group_name", applicationName)));
|
||||
new Filter("application_name", applicationName)));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -58,7 +58,7 @@ public class PostgresStatsStore implements StatsStore {
|
||||
@Override
|
||||
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationName) {
|
||||
return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
|
||||
new Filter("group_name", applicationName)), true);
|
||||
new Filter("application_name", applicationName)), true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user