feat: implement PostgresStatsStore querying continuous aggregates

This commit is contained in:
hsiegeln
2026-03-16 18:22:44 +01:00
parent 9fd02c4edb
commit 527e2cf017
2 changed files with 248 additions and 0 deletions

View File

@@ -0,0 +1,187 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket;
import com.cameleer3.server.core.storage.StatsStore;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
@Repository
public class PostgresStatsStore implements StatsStore {
private final JdbcTemplate jdbc;
public PostgresStatsStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public ExecutionStats stats(Instant from, Instant to) {
return queryStats("stats_1m_all", from, to, List.of());
}
@Override
public ExecutionStats statsForApp(Instant from, Instant to, String groupName) {
return queryStats("stats_1m_app", from, to, List.of(
new Filter("group_name", groupName)));
}
@Override
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds) {
// Note: agentIds is accepted for interface compatibility but not filterable
// on the continuous aggregate (it groups by route_id, not agent_id).
// All agents for the same route contribute to the same aggregate.
return queryStats("stats_1m_route", from, to, List.of(
new Filter("route_id", routeId)));
}
@Override
public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) {
return queryStats("stats_1m_processor", from, to, List.of(
new Filter("route_id", routeId),
new Filter("processor_type", processorType)));
}
@Override
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true);
}
@Override
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName) {
return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
new Filter("group_name", groupName)), true);
}
@Override
public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
String routeId, List<String> agentIds) {
return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of(
new Filter("route_id", routeId)), true);
}
@Override
public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
String routeId, String processorType) {
// stats_1m_processor does NOT have running_count column
return queryTimeseries("stats_1m_processor", from, to, bucketCount, List.of(
new Filter("route_id", routeId),
new Filter("processor_type", processorType)), false);
}
private record Filter(String column, String value) {}
private ExecutionStats queryStats(String view, Instant from, Instant to, List<Filter> filters) {
// running_count only exists on execution-level aggregates, not processor
boolean hasRunning = !view.equals("stats_1m_processor");
String runningCol = hasRunning ? "COALESCE(SUM(running_count), 0)" : "0";
String sql = "SELECT COALESCE(SUM(total_count), 0) AS total_count, " +
"COALESCE(SUM(failed_count), 0) AS failed_count, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
"COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
runningCol + " AS active_count " +
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
List<Object> params = new ArrayList<>();
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
for (Filter f : filters) {
sql += " AND " + f.column() + " = ?";
params.add(f.value());
}
long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0;
var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
rs.getLong("total_count"), rs.getLong("failed_count"),
rs.getLong("avg_duration"), rs.getLong("p99_duration"),
rs.getLong("active_count")
}, params.toArray());
if (!currentResult.isEmpty()) {
long[] r = currentResult.get(0);
totalCount = r[0]; failedCount = r[1]; avgDuration = r[2];
p99Duration = r[3]; activeCount = r[4];
}
// Previous period (shifted back 24h)
Instant prevFrom = from.minus(Duration.ofHours(24));
Instant prevTo = to.minus(Duration.ofHours(24));
List<Object> prevParams = new ArrayList<>();
prevParams.add(Timestamp.from(prevFrom));
prevParams.add(Timestamp.from(prevTo));
for (Filter f : filters) prevParams.add(f.value());
String prevSql = sql; // same shape, different time params
long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0;
var prevResult = jdbc.query(prevSql, (rs, rowNum) -> new long[]{
rs.getLong("total_count"), rs.getLong("failed_count"),
rs.getLong("avg_duration"), rs.getLong("p99_duration")
}, prevParams.toArray());
if (!prevResult.isEmpty()) {
long[] r = prevResult.get(0);
prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3];
}
// Today total (from midnight UTC)
Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
List<Object> todayParams = new ArrayList<>();
todayParams.add(Timestamp.from(todayStart));
todayParams.add(Timestamp.from(Instant.now()));
for (Filter f : filters) todayParams.add(f.value());
String todaySql = sql;
long totalToday = 0;
var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"),
todayParams.toArray());
if (!todayResult.isEmpty()) totalToday = todayResult.get(0);
return new ExecutionStats(
totalCount, failedCount, avgDuration, p99Duration, activeCount,
totalToday, prevTotal, prevFailed, prevAvg, prevP99);
}
private StatsTimeseries queryTimeseries(String view, Instant from, Instant to,
int bucketCount, List<Filter> filters,
boolean hasRunningCount) {
long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
if (intervalSeconds < 60) intervalSeconds = 60;
String runningCol = hasRunningCount ? "COALESCE(SUM(running_count), 0)" : "0";
String sql = "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " +
"COALESCE(SUM(total_count), 0) AS total_count, " +
"COALESCE(SUM(failed_count), 0) AS failed_count, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
"COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
runningCol + " AS active_count " +
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
List<Object> params = new ArrayList<>();
params.add(intervalSeconds);
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
for (Filter f : filters) {
sql += " AND " + f.column() + " = ?";
params.add(f.value());
}
sql += " GROUP BY period ORDER BY period";
List<TimeseriesBucket> buckets = jdbc.query(sql, (rs, rowNum) ->
new TimeseriesBucket(
rs.getTimestamp("period").toInstant(),
rs.getLong("total_count"), rs.getLong("failed_count"),
rs.getLong("avg_duration"), rs.getLong("p99_duration"),
rs.getLong("active_count")
), params.toArray());
return new StatsTimeseries(buckets);
}
}

View File

@@ -0,0 +1,61 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.StatsStore;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import static org.junit.jupiter.api.Assertions.*;
class PostgresStatsStoreIT extends AbstractPostgresIT {
@Autowired StatsStore statsStore;
@Autowired ExecutionStore executionStore;
@Autowired JdbcTemplate jdbc;
@Test
void statsReturnsCountsForTimeWindow() {
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
insertExecution("stats-1", "route-a", "app-1", "COMPLETED", now, 100L);
insertExecution("stats-2", "route-a", "app-1", "FAILED", now.plusSeconds(10), 200L);
insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L);
// Force continuous aggregate refresh
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)");
ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60));
assertEquals(3, stats.totalCount());
assertEquals(1, stats.failedCount());
}
@Test
void timeseriesReturnsBuckets() {
Instant now = Instant.now().truncatedTo(ChronoUnit.MINUTES);
for (int i = 0; i < 10; i++) {
insertExecution("ts-" + i, "route-a", "app-1", "COMPLETED",
now.plusSeconds(i * 30), 100L + i);
}
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)");
StatsTimeseries ts = statsStore.timeseries(now.minusMinutes(1), now.plusMinutes(10), 5);
assertNotNull(ts);
assertFalse(ts.buckets().isEmpty());
}
private void insertExecution(String id, String routeId, String groupName,
String status, Instant startTime, long durationMs) {
executionStore.upsert(new ExecutionRecord(
id, routeId, "agent-1", groupName, status, null, null,
startTime, startTime.plusMillis(durationMs), durationMs,
status.equals("FAILED") ? "error" : null, null, null));
}
}