From 59374482bc719ea858a01e70e87af5759410db78 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 1 Apr 2026 10:49:06 +0200 Subject: [PATCH] fix: replace PostgreSQL aggregate functions with ClickHouse -Merge combinators MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RouteCatalogController, RouteMetricsController, AgentRegistrationController all had inline SQL using SUM() on AggregateFunction columns from stats_1m_* AggregatingMergeTree tables. Replace with countMerge/countIfMerge/sumMerge. Also fix time_bucket() → toStartOfInterval() and ::double → toFloat64(). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../AgentRegistrationController.java | 21 ++++-- .../controller/RouteCatalogController.java | 33 +++++---- .../controller/RouteMetricsController.java | 72 ++++++++++--------- 3 files changed, 74 insertions(+), 52 deletions(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java index 79d8ae26..bda320d4 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java @@ -255,13 +255,15 @@ public class AgentRegistrationController { Instant now = Instant.now(); Instant from1m = now.minus(1, ChronoUnit.MINUTES); try { + // Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries + // that strip AggregateFunction column types, breaking -Merge combinators jdbc.query( "SELECT application_name, " + - "SUM(total_count) AS total, " + - "SUM(failed_count) AS failed, " + + "countMerge(total_count) AS total, " + + "countIfMerge(failed_count) AS failed, " + "COUNT(DISTINCT route_id) AS active_routes " + - "FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " + - "GROUP BY application_name", + "FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) + + " GROUP BY application_name", rs -> { long total = rs.getLong("total"); long failed = rs.getLong("failed"); @@ -269,11 +271,18 @@ public class AgentRegistrationController { double errorRate = total > 0 ? (double) failed / total : 0.0; int activeRoutes = rs.getInt("active_routes"); result.put(rs.getString("application_name"), new double[]{tps, errorRate, activeRoutes}); - }, - Timestamp.from(from1m), Timestamp.from(now)); + }); } catch (Exception e) { log.debug("Could not query agent metrics: {}", e.getMessage()); } return result; } + + /** Format an Instant as a ClickHouse DateTime literal. */ + private static String lit(Instant instant) { + Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS); + String ts = new Timestamp(truncated.toEpochMilli()).toString(); + if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2); + return "'" + ts + "'"; + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteCatalogController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteCatalogController.java index 0843d2fc..18161e32 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteCatalogController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteCatalogController.java @@ -78,38 +78,37 @@ public class RouteCatalogController { Instant rangeTo = to != null ? Instant.parse(to) : now; Instant from1m = now.minus(1, ChronoUnit.MINUTES); - // Route exchange counts from continuous aggregate + // Route exchange counts from AggregatingMergeTree (literal SQL — ClickHouse JDBC driver + // wraps prepared statements in sub-queries that strip AggregateFunction column types) Map routeExchangeCounts = new LinkedHashMap<>(); Map routeLastSeen = new LinkedHashMap<>(); try { jdbc.query( - "SELECT application_name, route_id, SUM(total_count) AS cnt, MAX(bucket) AS last_seen " + - "FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " + - "GROUP BY application_name, route_id", + "SELECT application_name, route_id, countMerge(total_count) AS cnt, MAX(bucket) AS last_seen " + + "FROM stats_1m_route WHERE bucket >= " + lit(rangeFrom) + " AND bucket < " + lit(rangeTo) + + " GROUP BY application_name, route_id", rs -> { String key = rs.getString("application_name") + "/" + rs.getString("route_id"); routeExchangeCounts.put(key, rs.getLong("cnt")); Timestamp ts = rs.getTimestamp("last_seen"); if (ts != null) routeLastSeen.put(key, ts.toInstant()); - }, - Timestamp.from(rangeFrom), Timestamp.from(rangeTo)); + }); } catch (Exception e) { - // Continuous aggregate may not exist yet + // AggregatingMergeTree table may not exist yet } // Per-agent TPS from the last minute Map agentTps = new LinkedHashMap<>(); try { jdbc.query( - "SELECT application_name, SUM(total_count) AS cnt " + - "FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " + - "GROUP BY application_name", + "SELECT application_name, countMerge(total_count) AS cnt " + + "FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) + + " GROUP BY application_name", rs -> { // This gives per-app TPS; we'll distribute among agents below - }, - Timestamp.from(from1m), Timestamp.from(now)); + }); } catch (Exception e) { - // Continuous aggregate may not exist yet + // AggregatingMergeTree table may not exist yet } // Build catalog entries @@ -158,6 +157,14 @@ public class RouteCatalogController { .orElse(null); } + /** Format an Instant as a ClickHouse DateTime literal. */ + private static String lit(Instant instant) { + Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS); + String ts = new Timestamp(truncated.toEpochMilli()).toString(); + if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2); + return "'" + ts + "'"; + } + private String computeWorstHealth(List agents) { boolean hasDead = false; boolean hasStale = false; diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteMetricsController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteMetricsController.java index 9f0d3048..83757a0b 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteMetricsController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/RouteMetricsController.java @@ -52,20 +52,18 @@ public class RouteMetricsController { Instant fromInstant = from != null ? Instant.parse(from) : toInstant.minus(24, ChronoUnit.HOURS); long windowSeconds = Duration.between(fromInstant, toInstant).toSeconds(); + // Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries + // that strip AggregateFunction column types, breaking -Merge combinators var sql = new StringBuilder( "SELECT application_name, route_id, " + - "SUM(total_count) AS total, " + - "SUM(failed_count) AS failed, " + - "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_dur, " + - "COALESCE(MAX(p99_duration), 0) AS p99_dur " + - "FROM stats_1m_route WHERE bucket >= ? AND bucket < ?"); - var params = new ArrayList(); - params.add(Timestamp.from(fromInstant)); - params.add(Timestamp.from(toInstant)); + "countMerge(total_count) AS total, " + + "countIfMerge(failed_count) AS failed, " + + "CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_dur, " + + "COALESCE(quantileMerge(0.99)(p99_duration), 0) AS p99_dur " + + "FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant)); if (appId != null) { - sql.append(" AND application_name = ?"); - params.add(appId); + sql.append(" AND application_name = " + lit(appId)); } sql.append(" GROUP BY application_name, route_id ORDER BY application_name, route_id"); @@ -88,7 +86,7 @@ public class RouteMetricsController { routeKeys.add(new RouteKey(applicationName, routeId)); return new RouteMetrics(routeId, applicationName, total, successRate, avgDur, p99Dur, errorRate, tps, List.of(), -1.0); - }, params.toArray()); + }); // Fetch sparklines (12 buckets over the time window) if (!metrics.isEmpty()) { @@ -98,15 +96,13 @@ public class RouteMetricsController { for (int i = 0; i < metrics.size(); i++) { RouteMetrics m = metrics.get(i); try { - List sparkline = jdbc.query( - "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " + - "COALESCE(SUM(total_count), 0) AS cnt " + - "FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " + - "AND application_name = ? AND route_id = ? " + - "GROUP BY period ORDER BY period", - (rs, rowNum) -> rs.getDouble("cnt"), - bucketSeconds, Timestamp.from(fromInstant), Timestamp.from(toInstant), - m.appId(), m.routeId()); + String sparkSql = "SELECT toStartOfInterval(bucket, toIntervalSecond(" + bucketSeconds + ")) AS period, " + + "COALESCE(countMerge(total_count), 0) AS cnt " + + "FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) + + " AND application_name = " + lit(m.appId()) + " AND route_id = " + lit(m.routeId()) + + " GROUP BY period ORDER BY period"; + List sparkline = jdbc.query(sparkSql, + (rs, rowNum) -> rs.getDouble("cnt")); metrics.set(i, new RouteMetrics(m.routeId(), m.appId(), m.exchangeCount(), m.successRate(), m.avgDurationMs(), m.p99DurationMs(), m.errorRate(), m.throughputPerSec(), sparkline, m.slaCompliance())); @@ -153,25 +149,22 @@ public class RouteMetricsController { Instant toInstant = to != null ? to : Instant.now(); Instant fromInstant = from != null ? from : toInstant.minus(24, ChronoUnit.HOURS); + // Literal SQL for AggregatingMergeTree -Merge combinators var sql = new StringBuilder( "SELECT processor_id, processor_type, route_id, application_name, " + - "SUM(total_count) AS total_count, " + - "SUM(failed_count) AS failed_count, " + - "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum)::double precision / SUM(total_count) ELSE 0 END AS avg_duration_ms, " + - "MAX(p99_duration) AS p99_duration_ms " + + "countMerge(total_count) AS total_count, " + + "countIfMerge(failed_count) AS failed_count, " + + "CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_duration_ms, " + + "quantileMerge(0.99)(p99_duration) AS p99_duration_ms " + "FROM stats_1m_processor_detail " + - "WHERE bucket >= ? AND bucket < ? AND route_id = ?"); - var params = new ArrayList(); - params.add(Timestamp.from(fromInstant)); - params.add(Timestamp.from(toInstant)); - params.add(routeId); + "WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) + + " AND route_id = " + lit(routeId)); if (appId != null) { - sql.append(" AND application_name = ?"); - params.add(appId); + sql.append(" AND application_name = " + lit(appId)); } sql.append(" GROUP BY processor_id, processor_type, route_id, application_name"); - sql.append(" ORDER BY SUM(total_count) DESC"); + sql.append(" ORDER BY countMerge(total_count) DESC"); List metrics = jdbc.query(sql.toString(), (rs, rowNum) -> { long totalCount = rs.getLong("total_count"); @@ -187,8 +180,21 @@ public class RouteMetricsController { rs.getDouble("avg_duration_ms"), rs.getDouble("p99_duration_ms"), errorRate); - }, params.toArray()); + }); return ResponseEntity.ok(metrics); } + + /** Format an Instant as a ClickHouse DateTime literal. */ + private static String lit(Instant instant) { + Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS); + String ts = new Timestamp(truncated.toEpochMilli()).toString(); + if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2); + return "'" + ts + "'"; + } + + /** Format a string as a SQL literal with single-quote escaping. */ + private static String lit(String value) { + return "'" + value.replace("'", "\\'") + "'"; + } }