Fix ClickHouse OOM: PREWHERE on active-count query + per-query memory limits
All checks were successful
CI / build (push) Successful in 1m16s
CI / docker (push) Successful in 41s
CI / deploy (push) Successful in 33s

The active-count query scanned all wide rows on the base table, exceeding
the 3.6 GiB memory limit. Use PREWHERE status = 'RUNNING' so ClickHouse
reads only the status column first. Add SETTINGS max_memory_usage = 1 GiB
to all queries so concurrent requests degrade gracefully instead of crashing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-15 11:55:26 +01:00
parent 61a9549853
commit 48d944354a

View File

@@ -24,6 +24,9 @@ import java.util.List;
*/ */
public class ClickHouseSearchEngine implements SearchEngine { public class ClickHouseSearchEngine implements SearchEngine {
/** Per-query memory cap (1 GiB) — prevents a single query from OOMing ClickHouse. */
private static final String SETTINGS = " SETTINGS max_memory_usage = 1000000000";
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
public ClickHouseSearchEngine(JdbcTemplate jdbcTemplate) { public ClickHouseSearchEngine(JdbcTemplate jdbcTemplate) {
@@ -42,7 +45,7 @@ public class ClickHouseSearchEngine implements SearchEngine {
// Count query // Count query
var countParams = params.toArray(); var countParams = params.toArray();
Long total = jdbcTemplate.queryForObject( Long total = jdbcTemplate.queryForObject(
"SELECT count() FROM route_executions" + where, Long.class, countParams); "SELECT count() FROM route_executions" + where + SETTINGS, Long.class, countParams);
if (total == null) total = 0L; if (total == null) total = 0L;
if (total == 0) { if (total == 0) {
@@ -56,7 +59,7 @@ public class ClickHouseSearchEngine implements SearchEngine {
String dataSql = "SELECT execution_id, route_id, agent_id, status, start_time, end_time, " + String dataSql = "SELECT execution_id, route_id, agent_id, status, start_time, end_time, " +
"duration_ms, correlation_id, error_message, diagram_content_hash " + "duration_ms, correlation_id, error_message, diagram_content_hash " +
"FROM route_executions" + where + "FROM route_executions" + where +
" ORDER BY " + request.sortColumn() + " " + orderDir + " LIMIT ? OFFSET ?"; " ORDER BY " + request.sortColumn() + " " + orderDir + " LIMIT ? OFFSET ?" + SETTINGS;
List<ExecutionSummary> data = jdbcTemplate.query(dataSql, (rs, rowNum) -> { List<ExecutionSummary> data = jdbcTemplate.query(dataSql, (rs, rowNum) -> {
Timestamp endTs = rs.getTimestamp("end_time"); Timestamp endTs = rs.getTimestamp("end_time");
@@ -85,7 +88,7 @@ public class ClickHouseSearchEngine implements SearchEngine {
String where = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions); String where = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions);
Long result = jdbcTemplate.queryForObject( Long result = jdbcTemplate.queryForObject(
"SELECT count() FROM route_executions" + where, Long.class, params.toArray()); "SELECT count() FROM route_executions" + where + SETTINGS, Long.class, params.toArray());
return result != null ? result : 0L; return result != null ? result : 0L;
} }
@@ -112,7 +115,7 @@ public class ClickHouseSearchEngine implements SearchEngine {
"countIfMerge(failed_count) AS failed, " + "countIfMerge(failed_count) AS failed, " +
"toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " + "toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " +
"toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " + "toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " +
"FROM route_execution_stats_5m" + where; "FROM route_execution_stats_5m" + where + SETTINGS;
record PeriodStats(long totalCount, long failedCount, long avgDurationMs, long p99LatencyMs) {} record PeriodStats(long totalCount, long failedCount, long avgDurationMs, long p99LatencyMs) {}
PeriodStats current = jdbcTemplate.queryForObject(rollupSql, PeriodStats current = jdbcTemplate.queryForObject(rollupSql,
@@ -123,14 +126,13 @@ public class ClickHouseSearchEngine implements SearchEngine {
rs.getLong("p99_ms")), rs.getLong("p99_ms")),
params.toArray()); params.toArray());
// Active count — lightweight real-time query on base table (RUNNING is transient) // Active count — PREWHERE reads only the status column before touching wide rows
var activeConditions = new ArrayList<String>(); var scopeConditions = new ArrayList<String>();
var activeParams = new ArrayList<Object>(); var activeParams = new ArrayList<Object>();
activeConditions.add("status = 'RUNNING'"); addScopeFilters(routeId, agentIds, scopeConditions, activeParams);
addScopeFilters(routeId, agentIds, activeConditions, activeParams); String scopeWhere = scopeConditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", scopeConditions);
String activeWhere = " WHERE " + String.join(" AND ", activeConditions);
Long activeCount = jdbcTemplate.queryForObject( Long activeCount = jdbcTemplate.queryForObject(
"SELECT count() FROM route_executions" + activeWhere, "SELECT count() FROM route_executions PREWHERE status = 'RUNNING'" + scopeWhere + SETTINGS,
Long.class, activeParams.toArray()); Long.class, activeParams.toArray());
// Previous period (same window shifted back 24h) — read from rollup // Previous period (same window shifted back 24h) — read from rollup
@@ -151,7 +153,7 @@ public class ClickHouseSearchEngine implements SearchEngine {
"countIfMerge(failed_count) AS failed, " + "countIfMerge(failed_count) AS failed, " +
"toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " + "toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " +
"toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " + "toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " +
"FROM route_execution_stats_5m" + prevWhere; "FROM route_execution_stats_5m" + prevWhere + SETTINGS;
PeriodStats prev = jdbcTemplate.queryForObject(prevRollupSql, PeriodStats prev = jdbcTemplate.queryForObject(prevRollupSql,
(rs, rowNum) -> new PeriodStats( (rs, rowNum) -> new PeriodStats(
@@ -171,7 +173,7 @@ public class ClickHouseSearchEngine implements SearchEngine {
String todayWhere = " WHERE " + String.join(" AND ", todayConditions); String todayWhere = " WHERE " + String.join(" AND ", todayConditions);
Long totalToday = jdbcTemplate.queryForObject( Long totalToday = jdbcTemplate.queryForObject(
"SELECT countMerge(total_count) FROM route_execution_stats_5m" + todayWhere, "SELECT countMerge(total_count) FROM route_execution_stats_5m" + todayWhere + SETTINGS,
Long.class, todayParams.toArray()); Long.class, todayParams.toArray());
return new ExecutionStats( return new ExecutionStats(
@@ -210,7 +212,7 @@ public class ClickHouseSearchEngine implements SearchEngine {
"toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " + "toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " +
"toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " + "toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " +
"FROM route_execution_stats_5m" + where + "FROM route_execution_stats_5m" + where +
" GROUP BY ts_bucket ORDER BY ts_bucket"; " GROUP BY ts_bucket ORDER BY ts_bucket" + SETTINGS;
List<StatsTimeseries.TimeseriesBucket> buckets = jdbcTemplate.query(sql, (rs, rowNum) -> List<StatsTimeseries.TimeseriesBucket> buckets = jdbcTemplate.query(sql, (rs, rowNum) ->
new StatsTimeseries.TimeseriesBucket( new StatsTimeseries.TimeseriesBucket(