diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java new file mode 100644 index 00000000..fff9b70f --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java @@ -0,0 +1,187 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.search.ExecutionStats; +import com.cameleer3.server.core.search.StatsTimeseries; +import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket; +import com.cameleer3.server.core.storage.StatsStore; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; + +@Repository +public class PostgresStatsStore implements StatsStore { + + private final JdbcTemplate jdbc; + + public PostgresStatsStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public ExecutionStats stats(Instant from, Instant to) { + return queryStats("stats_1m_all", from, to, List.of()); + } + + @Override + public ExecutionStats statsForApp(Instant from, Instant to, String groupName) { + return queryStats("stats_1m_app", from, to, List.of( + new Filter("group_name", groupName))); + } + + @Override + public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds) { + // Note: agentIds is accepted for interface compatibility but not filterable + // on the continuous aggregate (it groups by route_id, not agent_id). + // All agents for the same route contribute to the same aggregate. + return queryStats("stats_1m_route", from, to, List.of( + new Filter("route_id", routeId))); + } + + @Override + public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) { + return queryStats("stats_1m_processor", from, to, List.of( + new Filter("route_id", routeId), + new Filter("processor_type", processorType))); + } + + @Override + public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) { + return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true); + } + + @Override + public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName) { + return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of( + new Filter("group_name", groupName)), true); + } + + @Override + public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount, + String routeId, List agentIds) { + return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of( + new Filter("route_id", routeId)), true); + } + + @Override + public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount, + String routeId, String processorType) { + // stats_1m_processor does NOT have running_count column + return queryTimeseries("stats_1m_processor", from, to, bucketCount, List.of( + new Filter("route_id", routeId), + new Filter("processor_type", processorType)), false); + } + + private record Filter(String column, String value) {} + + private ExecutionStats queryStats(String view, Instant from, Instant to, List filters) { + // running_count only exists on execution-level aggregates, not processor + boolean hasRunning = !view.equals("stats_1m_processor"); + String runningCol = hasRunning ? "COALESCE(SUM(running_count), 0)" : "0"; + + String sql = "SELECT COALESCE(SUM(total_count), 0) AS total_count, " + + "COALESCE(SUM(failed_count), 0) AS failed_count, " + + "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " + + "COALESCE(MAX(p99_duration), 0) AS p99_duration, " + + runningCol + " AS active_count " + + "FROM " + view + " WHERE bucket >= ? AND bucket < ?"; + + List params = new ArrayList<>(); + params.add(Timestamp.from(from)); + params.add(Timestamp.from(to)); + for (Filter f : filters) { + sql += " AND " + f.column() + " = ?"; + params.add(f.value()); + } + + long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0; + var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{ + rs.getLong("total_count"), rs.getLong("failed_count"), + rs.getLong("avg_duration"), rs.getLong("p99_duration"), + rs.getLong("active_count") + }, params.toArray()); + if (!currentResult.isEmpty()) { + long[] r = currentResult.get(0); + totalCount = r[0]; failedCount = r[1]; avgDuration = r[2]; + p99Duration = r[3]; activeCount = r[4]; + } + + // Previous period (shifted back 24h) + Instant prevFrom = from.minus(Duration.ofHours(24)); + Instant prevTo = to.minus(Duration.ofHours(24)); + List prevParams = new ArrayList<>(); + prevParams.add(Timestamp.from(prevFrom)); + prevParams.add(Timestamp.from(prevTo)); + for (Filter f : filters) prevParams.add(f.value()); + String prevSql = sql; // same shape, different time params + + long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0; + var prevResult = jdbc.query(prevSql, (rs, rowNum) -> new long[]{ + rs.getLong("total_count"), rs.getLong("failed_count"), + rs.getLong("avg_duration"), rs.getLong("p99_duration") + }, prevParams.toArray()); + if (!prevResult.isEmpty()) { + long[] r = prevResult.get(0); + prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3]; + } + + // Today total (from midnight UTC) + Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS); + List todayParams = new ArrayList<>(); + todayParams.add(Timestamp.from(todayStart)); + todayParams.add(Timestamp.from(Instant.now())); + for (Filter f : filters) todayParams.add(f.value()); + String todaySql = sql; + + long totalToday = 0; + var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"), + todayParams.toArray()); + if (!todayResult.isEmpty()) totalToday = todayResult.get(0); + + return new ExecutionStats( + totalCount, failedCount, avgDuration, p99Duration, activeCount, + totalToday, prevTotal, prevFailed, prevAvg, prevP99); + } + + private StatsTimeseries queryTimeseries(String view, Instant from, Instant to, + int bucketCount, List filters, + boolean hasRunningCount) { + long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1); + if (intervalSeconds < 60) intervalSeconds = 60; + + String runningCol = hasRunningCount ? "COALESCE(SUM(running_count), 0)" : "0"; + + String sql = "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " + + "COALESCE(SUM(total_count), 0) AS total_count, " + + "COALESCE(SUM(failed_count), 0) AS failed_count, " + + "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " + + "COALESCE(MAX(p99_duration), 0) AS p99_duration, " + + runningCol + " AS active_count " + + "FROM " + view + " WHERE bucket >= ? AND bucket < ?"; + + List params = new ArrayList<>(); + params.add(intervalSeconds); + params.add(Timestamp.from(from)); + params.add(Timestamp.from(to)); + for (Filter f : filters) { + sql += " AND " + f.column() + " = ?"; + params.add(f.value()); + } + sql += " GROUP BY period ORDER BY period"; + + List buckets = jdbc.query(sql, (rs, rowNum) -> + new TimeseriesBucket( + rs.getTimestamp("period").toInstant(), + rs.getLong("total_count"), rs.getLong("failed_count"), + rs.getLong("avg_duration"), rs.getLong("p99_duration"), + rs.getLong("active_count") + ), params.toArray()); + + return new StatsTimeseries(buckets); + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java new file mode 100644 index 00000000..d3a1548f --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java @@ -0,0 +1,61 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.app.AbstractPostgresIT; +import com.cameleer3.server.core.search.ExecutionStats; +import com.cameleer3.server.core.search.StatsTimeseries; +import com.cameleer3.server.core.storage.ExecutionStore; +import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord; +import com.cameleer3.server.core.storage.StatsStore; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +import static org.junit.jupiter.api.Assertions.*; + +class PostgresStatsStoreIT extends AbstractPostgresIT { + + @Autowired StatsStore statsStore; + @Autowired ExecutionStore executionStore; + @Autowired JdbcTemplate jdbc; + + @Test + void statsReturnsCountsForTimeWindow() { + Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); + insertExecution("stats-1", "route-a", "app-1", "COMPLETED", now, 100L); + insertExecution("stats-2", "route-a", "app-1", "FAILED", now.plusSeconds(10), 200L); + insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L); + + // Force continuous aggregate refresh + jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)"); + + ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60)); + assertEquals(3, stats.totalCount()); + assertEquals(1, stats.failedCount()); + } + + @Test + void timeseriesReturnsBuckets() { + Instant now = Instant.now().truncatedTo(ChronoUnit.MINUTES); + for (int i = 0; i < 10; i++) { + insertExecution("ts-" + i, "route-a", "app-1", "COMPLETED", + now.plusSeconds(i * 30), 100L + i); + } + + jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)"); + + StatsTimeseries ts = statsStore.timeseries(now.minusMinutes(1), now.plusMinutes(10), 5); + assertNotNull(ts); + assertFalse(ts.buckets().isEmpty()); + } + + private void insertExecution(String id, String routeId, String groupName, + String status, Instant startTime, long durationMs) { + executionStore.upsert(new ExecutionRecord( + id, routeId, "agent-1", groupName, status, null, null, + startTime, startTime.plusMillis(durationMs), durationMs, + status.equals("FAILED") ? "error" : null, null, null)); + } +}