diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 9a971357..44a78555 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -1,5 +1,6 @@ package com.cameleer3.server.app.config; +import com.cameleer3.server.app.storage.PostgresMetricsQueryStore; import com.cameleer3.server.core.admin.AuditRepository; import com.cameleer3.server.core.admin.AuditService; import com.cameleer3.server.core.detail.DetailService; @@ -11,6 +12,7 @@ import com.cameleer3.server.core.storage.model.MetricsSnapshot; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; @Configuration public class StorageBeanConfig { @@ -41,4 +43,9 @@ public class StorageBeanConfig { return new IngestionService(executionStore, diagramStore, metricsBuffer, searchIndexer::onExecutionUpdated, bodySizeLimit); } + + @Bean + public MetricsQueryStore metricsQueryStore(JdbcTemplate jdbc) { + return new PostgresMetricsQueryStore(jdbc); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java index 032cfea1..78edd94f 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java @@ -2,22 +2,23 @@ package com.cameleer3.server.app.controller; import com.cameleer3.server.app.dto.AgentMetricsResponse; import com.cameleer3.server.app.dto.MetricBucket; -import org.springframework.jdbc.core.JdbcTemplate; +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; import org.springframework.web.bind.annotation.*; -import java.sql.Timestamp; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.*; +import java.util.stream.Collectors; @RestController @RequestMapping("/api/v1/agents/{agentId}/metrics") public class AgentMetricsController { - private final JdbcTemplate jdbc; + private final MetricsQueryStore metricsQueryStore; - public AgentMetricsController(JdbcTemplate jdbc) { - this.jdbc = jdbc; + public AgentMetricsController(MetricsQueryStore metricsQueryStore) { + this.metricsQueryStore = metricsQueryStore; } @GetMapping @@ -32,34 +33,18 @@ public class AgentMetricsController { if (to == null) to = Instant.now(); List metricNames = Arrays.asList(names.split(",")); - long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1); - String intervalStr = intervalMs + " milliseconds"; - Map> result = new LinkedHashMap<>(); - for (String name : metricNames) { - result.put(name.trim(), new ArrayList<>()); - } + Map> raw = + metricsQueryStore.queryTimeSeries(agentId, metricNames, from, to, buckets); - String sql = """ - SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket, - metric_name, - AVG(metric_value) AS avg_value - FROM agent_metrics - WHERE agent_id = ? - AND collected_at >= ? AND collected_at < ? - AND metric_name = ANY(?) - GROUP BY bucket, metric_name - ORDER BY bucket - """; - - String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); - jdbc.query(sql, rs -> { - String metricName = rs.getString("metric_name"); - Instant bucket = rs.getTimestamp("bucket").toInstant(); - double value = rs.getDouble("avg_value"); - result.computeIfAbsent(metricName, k -> new ArrayList<>()) - .add(new MetricBucket(bucket, value)); - }, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray); + Map> result = raw.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().stream() + .map(b -> new MetricBucket(b.time(), b.value())) + .toList(), + (a, b) -> a, + LinkedHashMap::new)); return new AgentMetricsResponse(result); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java new file mode 100644 index 00000000..bcd5d2bc --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java @@ -0,0 +1,55 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.*; + +public class PostgresMetricsQueryStore implements MetricsQueryStore { + + private final JdbcTemplate jdbc; + + public PostgresMetricsQueryStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets) { + + long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1); + String intervalStr = intervalMs + " milliseconds"; + + Map> result = new LinkedHashMap<>(); + for (String name : metricNames) { + result.put(name.trim(), new ArrayList<>()); + } + + String sql = """ + SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket, + metric_name, + AVG(metric_value) AS avg_value + FROM agent_metrics + WHERE agent_id = ? + AND collected_at >= ? AND collected_at < ? + AND metric_name = ANY(?) + GROUP BY bucket, metric_name + ORDER BY bucket + """; + + String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); + jdbc.query(sql, rs -> { + String metricName = rs.getString("metric_name"); + Instant bucket = rs.getTimestamp("bucket").toInstant(); + double value = rs.getDouble("avg_value"); + result.computeIfAbsent(metricName, k -> new ArrayList<>()) + .add(new MetricTimeSeries.Bucket(bucket, value)); + }, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray); + + return result; + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java new file mode 100644 index 00000000..650e5b3b --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java @@ -0,0 +1,14 @@ +package com.cameleer3.server.core.storage; + +import com.cameleer3.server.core.storage.model.MetricTimeSeries; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +public interface MetricsQueryStore { + + Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets); +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricTimeSeries.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricTimeSeries.java new file mode 100644 index 00000000..6107fa0e --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricTimeSeries.java @@ -0,0 +1,9 @@ +package com.cameleer3.server.core.storage.model; + +import java.time.Instant; +import java.util.List; + +public record MetricTimeSeries(String metricName, List buckets) { + + public record Bucket(Instant time, double value) {} +}