fix: replace PostgreSQL aggregate functions with ClickHouse -Merge combinators

RouteCatalogController, RouteMetricsController, AgentRegistrationController
all had inline SQL using SUM() on AggregateFunction columns from stats_1m_*
AggregatingMergeTree tables. Replace with countMerge/countIfMerge/sumMerge.
Also fix time_bucket() → toStartOfInterval() and ::double → toFloat64().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-01 10:49:06 +02:00
parent 43e187a023
commit 59374482bc
3 changed files with 74 additions and 52 deletions

View File

@@ -255,13 +255,15 @@ public class AgentRegistrationController {
Instant now = Instant.now();
Instant from1m = now.minus(1, ChronoUnit.MINUTES);
try {
// Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries
// that strip AggregateFunction column types, breaking -Merge combinators
jdbc.query(
"SELECT application_name, " +
"SUM(total_count) AS total, " +
"SUM(failed_count) AS failed, " +
"countMerge(total_count) AS total, " +
"countIfMerge(failed_count) AS failed, " +
"COUNT(DISTINCT route_id) AS active_routes " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
"GROUP BY application_name",
"FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) +
" GROUP BY application_name",
rs -> {
long total = rs.getLong("total");
long failed = rs.getLong("failed");
@@ -269,11 +271,18 @@ public class AgentRegistrationController {
double errorRate = total > 0 ? (double) failed / total : 0.0;
int activeRoutes = rs.getInt("active_routes");
result.put(rs.getString("application_name"), new double[]{tps, errorRate, activeRoutes});
},
Timestamp.from(from1m), Timestamp.from(now));
});
} catch (Exception e) {
log.debug("Could not query agent metrics: {}", e.getMessage());
}
return result;
}
/** Format an Instant as a ClickHouse DateTime literal. */
private static String lit(Instant instant) {
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
String ts = new Timestamp(truncated.toEpochMilli()).toString();
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
return "'" + ts + "'";
}
}

View File

@@ -78,38 +78,37 @@ public class RouteCatalogController {
Instant rangeTo = to != null ? Instant.parse(to) : now;
Instant from1m = now.minus(1, ChronoUnit.MINUTES);
// Route exchange counts from continuous aggregate
// Route exchange counts from AggregatingMergeTree (literal SQL — ClickHouse JDBC driver
// wraps prepared statements in sub-queries that strip AggregateFunction column types)
Map<String, Long> routeExchangeCounts = new LinkedHashMap<>();
Map<String, Instant> routeLastSeen = new LinkedHashMap<>();
try {
jdbc.query(
"SELECT application_name, route_id, SUM(total_count) AS cnt, MAX(bucket) AS last_seen " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
"GROUP BY application_name, route_id",
"SELECT application_name, route_id, countMerge(total_count) AS cnt, MAX(bucket) AS last_seen " +
"FROM stats_1m_route WHERE bucket >= " + lit(rangeFrom) + " AND bucket < " + lit(rangeTo) +
" GROUP BY application_name, route_id",
rs -> {
String key = rs.getString("application_name") + "/" + rs.getString("route_id");
routeExchangeCounts.put(key, rs.getLong("cnt"));
Timestamp ts = rs.getTimestamp("last_seen");
if (ts != null) routeLastSeen.put(key, ts.toInstant());
},
Timestamp.from(rangeFrom), Timestamp.from(rangeTo));
});
} catch (Exception e) {
// Continuous aggregate may not exist yet
// AggregatingMergeTree table may not exist yet
}
// Per-agent TPS from the last minute
Map<String, Double> agentTps = new LinkedHashMap<>();
try {
jdbc.query(
"SELECT application_name, SUM(total_count) AS cnt " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
"GROUP BY application_name",
"SELECT application_name, countMerge(total_count) AS cnt " +
"FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) +
" GROUP BY application_name",
rs -> {
// This gives per-app TPS; we'll distribute among agents below
},
Timestamp.from(from1m), Timestamp.from(now));
});
} catch (Exception e) {
// Continuous aggregate may not exist yet
// AggregatingMergeTree table may not exist yet
}
// Build catalog entries
@@ -158,6 +157,14 @@ public class RouteCatalogController {
.orElse(null);
}
/** Format an Instant as a ClickHouse DateTime literal. */
private static String lit(Instant instant) {
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
String ts = new Timestamp(truncated.toEpochMilli()).toString();
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
return "'" + ts + "'";
}
private String computeWorstHealth(List<AgentInfo> agents) {
boolean hasDead = false;
boolean hasStale = false;

View File

@@ -52,20 +52,18 @@ public class RouteMetricsController {
Instant fromInstant = from != null ? Instant.parse(from) : toInstant.minus(24, ChronoUnit.HOURS);
long windowSeconds = Duration.between(fromInstant, toInstant).toSeconds();
// Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries
// that strip AggregateFunction column types, breaking -Merge combinators
var sql = new StringBuilder(
"SELECT application_name, route_id, " +
"SUM(total_count) AS total, " +
"SUM(failed_count) AS failed, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_dur, " +
"COALESCE(MAX(p99_duration), 0) AS p99_dur " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ?");
var params = new ArrayList<Object>();
params.add(Timestamp.from(fromInstant));
params.add(Timestamp.from(toInstant));
"countMerge(total_count) AS total, " +
"countIfMerge(failed_count) AS failed, " +
"CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_dur, " +
"COALESCE(quantileMerge(0.99)(p99_duration), 0) AS p99_dur " +
"FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant));
if (appId != null) {
sql.append(" AND application_name = ?");
params.add(appId);
sql.append(" AND application_name = " + lit(appId));
}
sql.append(" GROUP BY application_name, route_id ORDER BY application_name, route_id");
@@ -88,7 +86,7 @@ public class RouteMetricsController {
routeKeys.add(new RouteKey(applicationName, routeId));
return new RouteMetrics(routeId, applicationName, total, successRate,
avgDur, p99Dur, errorRate, tps, List.of(), -1.0);
}, params.toArray());
});
// Fetch sparklines (12 buckets over the time window)
if (!metrics.isEmpty()) {
@@ -98,15 +96,13 @@ public class RouteMetricsController {
for (int i = 0; i < metrics.size(); i++) {
RouteMetrics m = metrics.get(i);
try {
List<Double> sparkline = jdbc.query(
"SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " +
"COALESCE(SUM(total_count), 0) AS cnt " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
"AND application_name = ? AND route_id = ? " +
"GROUP BY period ORDER BY period",
(rs, rowNum) -> rs.getDouble("cnt"),
bucketSeconds, Timestamp.from(fromInstant), Timestamp.from(toInstant),
m.appId(), m.routeId());
String sparkSql = "SELECT toStartOfInterval(bucket, toIntervalSecond(" + bucketSeconds + ")) AS period, " +
"COALESCE(countMerge(total_count), 0) AS cnt " +
"FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) +
" AND application_name = " + lit(m.appId()) + " AND route_id = " + lit(m.routeId()) +
" GROUP BY period ORDER BY period";
List<Double> sparkline = jdbc.query(sparkSql,
(rs, rowNum) -> rs.getDouble("cnt"));
metrics.set(i, new RouteMetrics(m.routeId(), m.appId(), m.exchangeCount(),
m.successRate(), m.avgDurationMs(), m.p99DurationMs(),
m.errorRate(), m.throughputPerSec(), sparkline, m.slaCompliance()));
@@ -153,25 +149,22 @@ public class RouteMetricsController {
Instant toInstant = to != null ? to : Instant.now();
Instant fromInstant = from != null ? from : toInstant.minus(24, ChronoUnit.HOURS);
// Literal SQL for AggregatingMergeTree -Merge combinators
var sql = new StringBuilder(
"SELECT processor_id, processor_type, route_id, application_name, " +
"SUM(total_count) AS total_count, " +
"SUM(failed_count) AS failed_count, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum)::double precision / SUM(total_count) ELSE 0 END AS avg_duration_ms, " +
"MAX(p99_duration) AS p99_duration_ms " +
"countMerge(total_count) AS total_count, " +
"countIfMerge(failed_count) AS failed_count, " +
"CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_duration_ms, " +
"quantileMerge(0.99)(p99_duration) AS p99_duration_ms " +
"FROM stats_1m_processor_detail " +
"WHERE bucket >= ? AND bucket < ? AND route_id = ?");
var params = new ArrayList<Object>();
params.add(Timestamp.from(fromInstant));
params.add(Timestamp.from(toInstant));
params.add(routeId);
"WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) +
" AND route_id = " + lit(routeId));
if (appId != null) {
sql.append(" AND application_name = ?");
params.add(appId);
sql.append(" AND application_name = " + lit(appId));
}
sql.append(" GROUP BY processor_id, processor_type, route_id, application_name");
sql.append(" ORDER BY SUM(total_count) DESC");
sql.append(" ORDER BY countMerge(total_count) DESC");
List<ProcessorMetrics> metrics = jdbc.query(sql.toString(), (rs, rowNum) -> {
long totalCount = rs.getLong("total_count");
@@ -187,8 +180,21 @@ public class RouteMetricsController {
rs.getDouble("avg_duration_ms"),
rs.getDouble("p99_duration_ms"),
errorRate);
}, params.toArray());
});
return ResponseEntity.ok(metrics);
}
/** Format an Instant as a ClickHouse DateTime literal. */
private static String lit(Instant instant) {
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
String ts = new Timestamp(truncated.toEpochMilli()).toString();
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
return "'" + ts + "'";
}
/** Format a string as a SQL literal with single-quote escaping. */
private static String lit(String value) {
return "'" + value.replace("'", "\\'") + "'";
}
}