feat(server): REST API over server_metrics for SaaS dashboards
Adds /api/v1/admin/server-metrics/{catalog,instances,query} so SaaS control
planes can build the server-health dashboard without direct ClickHouse
access. One generic /query endpoint covers every panel in the
server-self-metrics doc: aggregation (avg/sum/max/min/latest), group-by-tag,
filter-by-tag, counter-delta mode with per-server_instance_id rotation
handling, and a derived 'mean' statistic for timers. Regex-validated
identifiers, parameterised literals, 31-day range cap, 500-series response
cap. ADMIN-only via the existing /api/v1/admin/** RBAC gate. Docs updated:
all 17 suggested panels now expressed as single-endpoint queries.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -9,6 +9,7 @@ import com.cameleer.server.app.storage.ClickHouseRouteCatalogStore;
|
||||
import com.cameleer.server.core.storage.RouteCatalogStore;
|
||||
import com.cameleer.server.app.storage.ClickHouseMetricsQueryStore;
|
||||
import com.cameleer.server.app.storage.ClickHouseMetricsStore;
|
||||
import com.cameleer.server.app.storage.ClickHouseServerMetricsQueryStore;
|
||||
import com.cameleer.server.app.storage.ClickHouseServerMetricsStore;
|
||||
import com.cameleer.server.app.storage.ClickHouseStatsStore;
|
||||
import com.cameleer.server.core.admin.AuditRepository;
|
||||
@@ -74,6 +75,13 @@ public class StorageBeanConfig {
|
||||
return new ClickHouseServerMetricsStore(clickHouseJdbc);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ServerMetricsQueryStore clickHouseServerMetricsQueryStore(
|
||||
TenantProperties tenantProperties,
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||
return new ClickHouseServerMetricsQueryStore(tenantProperties.getId(), clickHouseJdbc);
|
||||
}
|
||||
|
||||
// ── Execution Store ──────────────────────────────────────────────────
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -0,0 +1,135 @@
|
||||
package com.cameleer.server.app.controller;
|
||||
|
||||
import com.cameleer.server.core.storage.ServerMetricsQueryStore;
|
||||
import com.cameleer.server.core.storage.model.ServerInstanceInfo;
|
||||
import com.cameleer.server.core.storage.model.ServerMetricCatalogEntry;
|
||||
import com.cameleer.server.core.storage.model.ServerMetricQueryRequest;
|
||||
import com.cameleer.server.core.storage.model.ServerMetricQueryResponse;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.ExceptionHandler;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Generic read API over the ClickHouse {@code server_metrics} table. Lets
|
||||
* SaaS control planes build server-health dashboards without requiring direct
|
||||
* ClickHouse access.
|
||||
*
|
||||
* <p>Three endpoints cover all 17 panels in {@code docs/server-self-metrics.md}:
|
||||
* <ul>
|
||||
* <li>{@code GET /catalog} — discover available metric names, types, statistics, and tags</li>
|
||||
* <li>{@code POST /query} — generic time-series query with aggregation, grouping, filtering, and counter-delta mode</li>
|
||||
* <li>{@code GET /instances} — list server instances (useful for partitioning counter math)</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>Protected by the {@code /api/v1/admin/**} catch-all in {@code SecurityConfig} — requires ADMIN role.
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/api/v1/admin/server-metrics")
|
||||
@Tag(name = "Server Self-Metrics",
|
||||
description = "Read API over the server's own Micrometer registry snapshots for dashboards")
|
||||
public class ServerMetricsAdminController {
|
||||
|
||||
/** Default lookback window for catalog/instances when from/to are omitted. */
|
||||
private static final long DEFAULT_LOOKBACK_SECONDS = 3_600L;
|
||||
|
||||
private final ServerMetricsQueryStore store;
|
||||
|
||||
public ServerMetricsAdminController(ServerMetricsQueryStore store) {
|
||||
this.store = store;
|
||||
}
|
||||
|
||||
@GetMapping("/catalog")
|
||||
@Operation(summary = "List metric names observed in the window",
|
||||
description = "For each metric_name, returns metric_type, the set of statistics emitted, and the union of tag keys.")
|
||||
public ResponseEntity<List<ServerMetricCatalogEntry>> catalog(
|
||||
@RequestParam(required = false) String from,
|
||||
@RequestParam(required = false) String to) {
|
||||
Instant[] window = resolveWindow(from, to);
|
||||
return ResponseEntity.ok(store.catalog(window[0], window[1]));
|
||||
}
|
||||
|
||||
@GetMapping("/instances")
|
||||
@Operation(summary = "List server_instance_id values observed in the window",
|
||||
description = "Returns first/last seen timestamps — use to partition counter-delta computations.")
|
||||
public ResponseEntity<List<ServerInstanceInfo>> instances(
|
||||
@RequestParam(required = false) String from,
|
||||
@RequestParam(required = false) String to) {
|
||||
Instant[] window = resolveWindow(from, to);
|
||||
return ResponseEntity.ok(store.listInstances(window[0], window[1]));
|
||||
}
|
||||
|
||||
@PostMapping("/query")
|
||||
@Operation(summary = "Generic time-series query",
|
||||
description = "Returns bucketed series for a single metric_name. Supports aggregation (avg/sum/max/min/latest), group-by-tag, filter-by-tag, counter delta mode, and a derived 'mean' statistic for timers.")
|
||||
public ResponseEntity<ServerMetricQueryResponse> query(@RequestBody QueryBody body) {
|
||||
ServerMetricQueryRequest request = new ServerMetricQueryRequest(
|
||||
body.metric(),
|
||||
body.statistic(),
|
||||
parseInstant(body.from(), "from"),
|
||||
parseInstant(body.to(), "to"),
|
||||
body.stepSeconds(),
|
||||
body.groupByTags(),
|
||||
body.filterTags(),
|
||||
body.aggregation(),
|
||||
body.mode(),
|
||||
body.serverInstanceIds());
|
||||
return ResponseEntity.ok(store.query(request));
|
||||
}
|
||||
|
||||
@ExceptionHandler(IllegalArgumentException.class)
|
||||
public ResponseEntity<Map<String, String>> handleBadRequest(IllegalArgumentException e) {
|
||||
return ResponseEntity.badRequest().body(Map.of("error", e.getMessage()));
|
||||
}
|
||||
|
||||
private static Instant[] resolveWindow(String from, String to) {
|
||||
Instant toI = to != null ? parseInstant(to, "to") : Instant.now();
|
||||
Instant fromI = from != null
|
||||
? parseInstant(from, "from")
|
||||
: toI.minusSeconds(DEFAULT_LOOKBACK_SECONDS);
|
||||
if (!fromI.isBefore(toI)) {
|
||||
throw new IllegalArgumentException("from must be strictly before to");
|
||||
}
|
||||
return new Instant[]{fromI, toI};
|
||||
}
|
||||
|
||||
private static Instant parseInstant(String raw, String field) {
|
||||
if (raw == null || raw.isBlank()) {
|
||||
throw new IllegalArgumentException(field + " is required");
|
||||
}
|
||||
try {
|
||||
return Instant.parse(raw);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException(
|
||||
field + " must be an ISO-8601 instant (e.g. 2026-04-23T10:00:00Z)");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Request body for {@link #query(QueryBody)}. Uses ISO-8601 strings on
|
||||
* the wire so the OpenAPI schema stays language-neutral.
|
||||
*/
|
||||
public record QueryBody(
|
||||
String metric,
|
||||
String statistic,
|
||||
String from,
|
||||
String to,
|
||||
Integer stepSeconds,
|
||||
List<String> groupByTags,
|
||||
Map<String, String> filterTags,
|
||||
String aggregation,
|
||||
String mode,
|
||||
List<String> serverInstanceIds
|
||||
) {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,408 @@
|
||||
package com.cameleer.server.app.storage;
|
||||
|
||||
import com.cameleer.server.core.storage.ServerMetricsQueryStore;
|
||||
import com.cameleer.server.core.storage.model.ServerInstanceInfo;
|
||||
import com.cameleer.server.core.storage.model.ServerMetricCatalogEntry;
|
||||
import com.cameleer.server.core.storage.model.ServerMetricPoint;
|
||||
import com.cameleer.server.core.storage.model.ServerMetricQueryRequest;
|
||||
import com.cameleer.server.core.storage.model.ServerMetricQueryResponse;
|
||||
import com.cameleer.server.core.storage.model.ServerMetricSeries;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.sql.Array;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* ClickHouse-backed {@link ServerMetricsQueryStore}.
|
||||
*
|
||||
* <p>Safety rules for every query:
|
||||
* <ul>
|
||||
* <li>tenant_id always bound as a parameter — no cross-tenant reads.</li>
|
||||
* <li>Identifier-like inputs (metric name, statistic, tag keys,
|
||||
* aggregation, mode) are regex-validated. Tag keys flow through the
|
||||
* query as JDBC parameter-bound values of {@code tags[?]} map lookups,
|
||||
* so even with a "safe" regex they cannot inject SQL.</li>
|
||||
* <li>Literal values ({@code from}, {@code to}, tag filter values,
|
||||
* server_instance_id allow-list) always go through {@code ?}.</li>
|
||||
* <li>The time range is capped at {@link #MAX_RANGE}.</li>
|
||||
* <li>Result cardinality is capped at {@link #MAX_SERIES} series.</li>
|
||||
* </ul>
|
||||
*/
|
||||
public class ClickHouseServerMetricsQueryStore implements ServerMetricsQueryStore {
|
||||
|
||||
private static final Pattern SAFE_IDENTIFIER = Pattern.compile("^[a-zA-Z0-9._]+$");
|
||||
private static final Pattern SAFE_STATISTIC = Pattern.compile("^[a-z_]+$");
|
||||
|
||||
private static final Set<String> AGGREGATIONS = Set.of("avg", "sum", "max", "min", "latest");
|
||||
private static final Set<String> MODES = Set.of("raw", "delta");
|
||||
|
||||
/** Maximum {@code to - from} window accepted by the API. */
|
||||
static final Duration MAX_RANGE = Duration.ofDays(31);
|
||||
|
||||
/** Clamp bounds and default for {@code stepSeconds}. */
|
||||
static final int MIN_STEP = 10;
|
||||
static final int MAX_STEP = 3600;
|
||||
static final int DEFAULT_STEP = 60;
|
||||
|
||||
/** Defence against group-by explosion — limit the series count per response. */
|
||||
static final int MAX_SERIES = 500;
|
||||
|
||||
private final String tenantId;
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public ClickHouseServerMetricsQueryStore(String tenantId, JdbcTemplate jdbc) {
|
||||
this.tenantId = tenantId;
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
// ── catalog ─────────────────────────────────────────────────────────
|
||||
|
||||
@Override
|
||||
public List<ServerMetricCatalogEntry> catalog(Instant from, Instant to) {
|
||||
requireRange(from, to);
|
||||
String sql = """
|
||||
SELECT
|
||||
metric_name,
|
||||
any(metric_type) AS metric_type,
|
||||
arraySort(groupUniqArray(statistic)) AS statistics,
|
||||
arraySort(arrayDistinct(arrayFlatten(groupArray(mapKeys(tags))))) AS tag_keys
|
||||
FROM server_metrics
|
||||
WHERE tenant_id = ?
|
||||
AND collected_at >= ?
|
||||
AND collected_at < ?
|
||||
GROUP BY metric_name
|
||||
ORDER BY metric_name
|
||||
""";
|
||||
return jdbc.query(sql, (rs, n) -> new ServerMetricCatalogEntry(
|
||||
rs.getString("metric_name"),
|
||||
rs.getString("metric_type"),
|
||||
arrayToStringList(rs.getArray("statistics")),
|
||||
arrayToStringList(rs.getArray("tag_keys"))
|
||||
), tenantId, Timestamp.from(from), Timestamp.from(to));
|
||||
}
|
||||
|
||||
// ── instances ───────────────────────────────────────────────────────
|
||||
|
||||
@Override
|
||||
public List<ServerInstanceInfo> listInstances(Instant from, Instant to) {
|
||||
requireRange(from, to);
|
||||
String sql = """
|
||||
SELECT
|
||||
server_instance_id,
|
||||
min(collected_at) AS first_seen,
|
||||
max(collected_at) AS last_seen
|
||||
FROM server_metrics
|
||||
WHERE tenant_id = ?
|
||||
AND collected_at >= ?
|
||||
AND collected_at < ?
|
||||
GROUP BY server_instance_id
|
||||
ORDER BY last_seen DESC
|
||||
""";
|
||||
return jdbc.query(sql, (rs, n) -> new ServerInstanceInfo(
|
||||
rs.getString("server_instance_id"),
|
||||
rs.getTimestamp("first_seen").toInstant(),
|
||||
rs.getTimestamp("last_seen").toInstant()
|
||||
), tenantId, Timestamp.from(from), Timestamp.from(to));
|
||||
}
|
||||
|
||||
// ── query ───────────────────────────────────────────────────────────
|
||||
|
||||
@Override
|
||||
public ServerMetricQueryResponse query(ServerMetricQueryRequest request) {
|
||||
if (request == null) throw new IllegalArgumentException("request is required");
|
||||
String metric = requireSafeIdentifier(request.metric(), "metric");
|
||||
requireRange(request.from(), request.to());
|
||||
|
||||
String aggregation = request.aggregation() != null ? request.aggregation().toLowerCase() : "avg";
|
||||
if (!AGGREGATIONS.contains(aggregation)) {
|
||||
throw new IllegalArgumentException("aggregation must be one of " + AGGREGATIONS);
|
||||
}
|
||||
|
||||
String mode = request.mode() != null ? request.mode().toLowerCase() : "raw";
|
||||
if (!MODES.contains(mode)) {
|
||||
throw new IllegalArgumentException("mode must be one of " + MODES);
|
||||
}
|
||||
|
||||
int step = request.stepSeconds() != null ? request.stepSeconds() : DEFAULT_STEP;
|
||||
if (step < MIN_STEP || step > MAX_STEP) {
|
||||
throw new IllegalArgumentException(
|
||||
"stepSeconds must be in [" + MIN_STEP + "," + MAX_STEP + "]");
|
||||
}
|
||||
|
||||
String statistic = request.statistic();
|
||||
if (statistic != null && !SAFE_STATISTIC.matcher(statistic).matches()) {
|
||||
throw new IllegalArgumentException("statistic contains unsafe characters");
|
||||
}
|
||||
|
||||
List<String> groupByTags = request.groupByTags() != null
|
||||
? request.groupByTags() : List.of();
|
||||
for (String t : groupByTags) requireSafeIdentifier(t, "groupByTag");
|
||||
|
||||
Map<String, String> filterTags = request.filterTags() != null
|
||||
? request.filterTags() : Map.of();
|
||||
for (String t : filterTags.keySet()) requireSafeIdentifier(t, "filterTag key");
|
||||
|
||||
List<String> instanceAllowList = request.serverInstanceIds() != null
|
||||
? request.serverInstanceIds() : List.of();
|
||||
|
||||
boolean isDelta = "delta".equals(mode);
|
||||
boolean isMean = "mean".equals(statistic);
|
||||
|
||||
String sql = isDelta
|
||||
? buildDeltaSql(step, groupByTags, filterTags, instanceAllowList, statistic, isMean)
|
||||
: buildRawSql(step, groupByTags, filterTags, instanceAllowList,
|
||||
statistic, aggregation, isMean);
|
||||
|
||||
List<Object> params = buildParams(groupByTags, metric, statistic, isMean,
|
||||
request.from(), request.to(),
|
||||
filterTags, instanceAllowList);
|
||||
|
||||
List<Row> rows = jdbc.query(sql, (rs, n) -> {
|
||||
int idx = 1;
|
||||
Instant bucket = rs.getTimestamp(idx++).toInstant();
|
||||
List<String> tagValues = new ArrayList<>(groupByTags.size());
|
||||
for (int g = 0; g < groupByTags.size(); g++) {
|
||||
tagValues.add(rs.getString(idx++));
|
||||
}
|
||||
double value = rs.getDouble(idx);
|
||||
return new Row(bucket, tagValues, value);
|
||||
}, params.toArray());
|
||||
|
||||
return assembleSeries(rows, metric, statistic, aggregation, mode, step, groupByTags);
|
||||
}
|
||||
|
||||
// ── SQL builders ────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Builds a single-pass SQL for raw mode:
|
||||
* <pre>{@code
|
||||
* SELECT bucket, tag0, ..., <agg>(metric_value) AS value
|
||||
* FROM server_metrics WHERE ...
|
||||
* GROUP BY bucket, tag0, ...
|
||||
* ORDER BY bucket, tag0, ...
|
||||
* }</pre>
|
||||
* For {@code statistic=mean}, replaces the aggregate with
|
||||
* {@code sumIf(value, statistic IN ('total','total_time')) / nullIf(sumIf(value, statistic='count'), 0)}.
|
||||
*/
|
||||
private String buildRawSql(int step, List<String> groupByTags,
|
||||
Map<String, String> filterTags,
|
||||
List<String> instanceAllowList,
|
||||
String statistic, String aggregation, boolean isMean) {
|
||||
StringBuilder s = new StringBuilder(512);
|
||||
s.append("SELECT\n toDateTime64(toStartOfInterval(collected_at, INTERVAL ")
|
||||
.append(step).append(" SECOND), 3) AS bucket");
|
||||
for (int i = 0; i < groupByTags.size(); i++) {
|
||||
s.append(",\n tags[?] AS tag").append(i);
|
||||
}
|
||||
s.append(",\n ").append(isMean ? meanExpr() : scalarAggExpr(aggregation))
|
||||
.append(" AS value\nFROM server_metrics\n");
|
||||
appendWhereClause(s, filterTags, instanceAllowList, statistic, isMean);
|
||||
s.append("GROUP BY bucket");
|
||||
for (int i = 0; i < groupByTags.size(); i++) s.append(", tag").append(i);
|
||||
s.append("\nORDER BY bucket");
|
||||
for (int i = 0; i < groupByTags.size(); i++) s.append(", tag").append(i);
|
||||
return s.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a three-level SQL for delta mode. Inner fills one
|
||||
* (bucket, instance, tag-group) row via {@code max(metric_value)};
|
||||
* middle computes positive-clipped per-instance differences via a
|
||||
* window function; outer sums across instances.
|
||||
*/
|
||||
private String buildDeltaSql(int step, List<String> groupByTags,
|
||||
Map<String, String> filterTags,
|
||||
List<String> instanceAllowList,
|
||||
String statistic, boolean isMean) {
|
||||
StringBuilder s = new StringBuilder(1024);
|
||||
s.append("SELECT bucket");
|
||||
for (int i = 0; i < groupByTags.size(); i++) s.append(", tag").append(i);
|
||||
s.append(", sum(delta) AS value FROM (\n");
|
||||
|
||||
// Middle: per-instance positive-clipped delta using window.
|
||||
s.append(" SELECT bucket");
|
||||
for (int i = 0; i < groupByTags.size(); i++) s.append(", tag").append(i);
|
||||
s.append(", server_instance_id, greatest(0, value - coalesce(any(value) OVER (")
|
||||
.append("PARTITION BY server_instance_id");
|
||||
for (int i = 0; i < groupByTags.size(); i++) s.append(", tag").append(i);
|
||||
s.append(" ORDER BY bucket ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), value)) AS delta FROM (\n");
|
||||
|
||||
// Inner: one representative value per (bucket, instance, tag-group).
|
||||
s.append(" SELECT\n toDateTime64(toStartOfInterval(collected_at, INTERVAL ")
|
||||
.append(step).append(" SECOND), 3) AS bucket,\n server_instance_id");
|
||||
for (int i = 0; i < groupByTags.size(); i++) {
|
||||
s.append(",\n tags[?] AS tag").append(i);
|
||||
}
|
||||
s.append(",\n ").append(isMean ? meanExpr() : "max(metric_value)")
|
||||
.append(" AS value\n FROM server_metrics\n");
|
||||
appendWhereClause(s, filterTags, instanceAllowList, statistic, isMean);
|
||||
s.append(" GROUP BY bucket, server_instance_id");
|
||||
for (int i = 0; i < groupByTags.size(); i++) s.append(", tag").append(i);
|
||||
s.append("\n ) AS bucketed\n) AS deltas\n");
|
||||
|
||||
s.append("GROUP BY bucket");
|
||||
for (int i = 0; i < groupByTags.size(); i++) s.append(", tag").append(i);
|
||||
s.append("\nORDER BY bucket");
|
||||
for (int i = 0; i < groupByTags.size(); i++) s.append(", tag").append(i);
|
||||
return s.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* WHERE clause shared by both raw and delta SQL shapes. Appended at the
|
||||
* correct indent under either the single {@code FROM server_metrics}
|
||||
* (raw) or the innermost one (delta).
|
||||
*/
|
||||
private void appendWhereClause(StringBuilder s, Map<String, String> filterTags,
|
||||
List<String> instanceAllowList,
|
||||
String statistic, boolean isMean) {
|
||||
s.append(" WHERE tenant_id = ?\n")
|
||||
.append(" AND metric_name = ?\n");
|
||||
if (isMean) {
|
||||
s.append(" AND statistic IN ('count', 'total', 'total_time')\n");
|
||||
} else if (statistic != null) {
|
||||
s.append(" AND statistic = ?\n");
|
||||
}
|
||||
s.append(" AND collected_at >= ?\n")
|
||||
.append(" AND collected_at < ?\n");
|
||||
for (int i = 0; i < filterTags.size(); i++) {
|
||||
s.append(" AND tags[?] = ?\n");
|
||||
}
|
||||
if (!instanceAllowList.isEmpty()) {
|
||||
s.append(" AND server_instance_id IN (")
|
||||
.append("?,".repeat(instanceAllowList.size() - 1)).append("?)\n");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* SQL-positional params for both raw and delta queries (same relative
|
||||
* order because the WHERE clause is emitted by {@link #appendWhereClause}
|
||||
* only once, with the {@code tags[?]} select-list placeholders appearing
|
||||
* earlier in the SQL text).
|
||||
*/
|
||||
private List<Object> buildParams(List<String> groupByTags, String metric,
|
||||
String statistic, boolean isMean,
|
||||
Instant from, Instant to,
|
||||
Map<String, String> filterTags,
|
||||
List<String> instanceAllowList) {
|
||||
List<Object> params = new ArrayList<>();
|
||||
// SELECT-list tags[?] placeholders
|
||||
params.addAll(groupByTags);
|
||||
// WHERE
|
||||
params.add(tenantId);
|
||||
params.add(metric);
|
||||
if (!isMean && statistic != null) params.add(statistic);
|
||||
params.add(Timestamp.from(from));
|
||||
params.add(Timestamp.from(to));
|
||||
for (Map.Entry<String, String> e : filterTags.entrySet()) {
|
||||
params.add(e.getKey());
|
||||
params.add(e.getValue());
|
||||
}
|
||||
params.addAll(instanceAllowList);
|
||||
return params;
|
||||
}
|
||||
|
||||
private static String scalarAggExpr(String aggregation) {
|
||||
return switch (aggregation) {
|
||||
case "avg" -> "avg(metric_value)";
|
||||
case "sum" -> "sum(metric_value)";
|
||||
case "max" -> "max(metric_value)";
|
||||
case "min" -> "min(metric_value)";
|
||||
case "latest" -> "argMax(metric_value, collected_at)";
|
||||
default -> throw new IllegalStateException("unreachable: " + aggregation);
|
||||
};
|
||||
}
|
||||
|
||||
private static String meanExpr() {
|
||||
return "sumIf(metric_value, statistic IN ('total', 'total_time'))"
|
||||
+ " / nullIf(sumIf(metric_value, statistic = 'count'), 0)";
|
||||
}
|
||||
|
||||
// ── response assembly ───────────────────────────────────────────────
|
||||
|
||||
private ServerMetricQueryResponse assembleSeries(
|
||||
List<Row> rows, String metric, String statistic,
|
||||
String aggregation, String mode, int step, List<String> groupByTags) {
|
||||
|
||||
Map<List<String>, List<ServerMetricPoint>> bySignature = new LinkedHashMap<>();
|
||||
for (Row r : rows) {
|
||||
if (Double.isNaN(r.value) || Double.isInfinite(r.value)) continue;
|
||||
bySignature.computeIfAbsent(r.tagValues, k -> new ArrayList<>())
|
||||
.add(new ServerMetricPoint(r.bucket, r.value));
|
||||
}
|
||||
|
||||
if (bySignature.size() > MAX_SERIES) {
|
||||
throw new IllegalArgumentException(
|
||||
"query produced " + bySignature.size()
|
||||
+ " series; reduce groupByTags or tighten filterTags (max "
|
||||
+ MAX_SERIES + ")");
|
||||
}
|
||||
|
||||
List<ServerMetricSeries> series = new ArrayList<>(bySignature.size());
|
||||
for (Map.Entry<List<String>, List<ServerMetricPoint>> e : bySignature.entrySet()) {
|
||||
Map<String, String> tags = new LinkedHashMap<>();
|
||||
for (int i = 0; i < groupByTags.size(); i++) {
|
||||
tags.put(groupByTags.get(i), e.getKey().get(i));
|
||||
}
|
||||
series.add(new ServerMetricSeries(Collections.unmodifiableMap(tags), e.getValue()));
|
||||
}
|
||||
|
||||
return new ServerMetricQueryResponse(metric,
|
||||
statistic != null ? statistic : "value",
|
||||
aggregation, mode, step, series);
|
||||
}
|
||||
|
||||
// ── helpers ─────────────────────────────────────────────────────────
|
||||
|
||||
private static void requireRange(Instant from, Instant to) {
|
||||
if (from == null || to == null) {
|
||||
throw new IllegalArgumentException("from and to are required");
|
||||
}
|
||||
if (!from.isBefore(to)) {
|
||||
throw new IllegalArgumentException("from must be strictly before to");
|
||||
}
|
||||
if (Duration.between(from, to).compareTo(MAX_RANGE) > 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"time range exceeds maximum of " + MAX_RANGE.toDays() + " days");
|
||||
}
|
||||
}
|
||||
|
||||
private static String requireSafeIdentifier(String value, String field) {
|
||||
if (value == null || value.isBlank()) {
|
||||
throw new IllegalArgumentException(field + " is required");
|
||||
}
|
||||
if (!SAFE_IDENTIFIER.matcher(value).matches()) {
|
||||
throw new IllegalArgumentException(
|
||||
field + " contains unsafe characters (allowed: [a-zA-Z0-9._])");
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
private static List<String> arrayToStringList(Array array) {
|
||||
if (array == null) return List.of();
|
||||
try {
|
||||
Object[] values = (Object[]) array.getArray();
|
||||
Set<String> sorted = new TreeSet<>();
|
||||
for (Object v : values) {
|
||||
if (v != null) sorted.add(v.toString());
|
||||
}
|
||||
return List.copyOf(sorted);
|
||||
} catch (Exception e) {
|
||||
return List.of();
|
||||
} finally {
|
||||
try { array.free(); } catch (Exception ignore) { }
|
||||
}
|
||||
}
|
||||
|
||||
private record Row(Instant bucket, List<String> tagValues, double value) {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,314 @@
|
||||
package com.cameleer.server.app.controller;
|
||||
|
||||
import com.cameleer.server.app.AbstractPostgresIT;
|
||||
import com.cameleer.server.app.TestSecurityHelper;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class ServerMetricsAdminControllerIT extends AbstractPostgresIT {
|
||||
|
||||
@Autowired
|
||||
private TestRestTemplate restTemplate;
|
||||
|
||||
@Autowired
|
||||
private TestSecurityHelper securityHelper;
|
||||
|
||||
private final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
private HttpHeaders adminJson;
|
||||
private HttpHeaders adminGet;
|
||||
private HttpHeaders viewerGet;
|
||||
|
||||
@BeforeEach
|
||||
void seedAndAuth() {
|
||||
adminJson = securityHelper.adminHeaders();
|
||||
adminGet = securityHelper.authHeadersNoBody(securityHelper.adminToken());
|
||||
viewerGet = securityHelper.authHeadersNoBody(securityHelper.viewerToken());
|
||||
|
||||
// Fresh rows for each test. The Spring-context ClickHouse JdbcTemplate
|
||||
// lives in a different bean; reach for it here by executing through
|
||||
// the same JdbcTemplate used by the store via the ClickHouseConfig bean.
|
||||
org.springframework.jdbc.core.JdbcTemplate ch = clickhouseJdbc();
|
||||
ch.execute("TRUNCATE TABLE server_metrics");
|
||||
|
||||
Instant t0 = Instant.parse("2026-04-23T10:00:00Z");
|
||||
// Gauge: cameleer.agents.connected, two states, two buckets.
|
||||
insert(ch, "default", t0, "srv-A", "cameleer.agents.connected", "gauge", "value", 3.0,
|
||||
Map.of("state", "live"));
|
||||
insert(ch, "default", t0.plusSeconds(60), "srv-A", "cameleer.agents.connected", "gauge", "value", 4.0,
|
||||
Map.of("state", "live"));
|
||||
insert(ch, "default", t0, "srv-A", "cameleer.agents.connected", "gauge", "value", 1.0,
|
||||
Map.of("state", "stale"));
|
||||
insert(ch, "default", t0.plusSeconds(60), "srv-A", "cameleer.agents.connected", "gauge", "value", 0.0,
|
||||
Map.of("state", "stale"));
|
||||
|
||||
// Counter: cumulative drops, +5 per minute on srv-A.
|
||||
insert(ch, "default", t0, "srv-A", "cameleer.ingestion.drops", "counter", "count", 0.0, Map.of("reason", "buffer_full"));
|
||||
insert(ch, "default", t0.plusSeconds(60), "srv-A", "cameleer.ingestion.drops", "counter", "count", 5.0, Map.of("reason", "buffer_full"));
|
||||
insert(ch, "default", t0.plusSeconds(120), "srv-A", "cameleer.ingestion.drops", "counter", "count", 10.0, Map.of("reason", "buffer_full"));
|
||||
// Simulated restart to srv-B: counter resets to 0, then climbs to 2.
|
||||
insert(ch, "default", t0.plusSeconds(180), "srv-B", "cameleer.ingestion.drops", "counter", "count", 0.0, Map.of("reason", "buffer_full"));
|
||||
insert(ch, "default", t0.plusSeconds(240), "srv-B", "cameleer.ingestion.drops", "counter", "count", 2.0, Map.of("reason", "buffer_full"));
|
||||
|
||||
// Timer mean inputs: two buckets, 2 samples each (count=2, total_time=30).
|
||||
insert(ch, "default", t0, "srv-A", "cameleer.ingestion.flush.duration", "timer", "count", 2.0, Map.of("type", "execution"));
|
||||
insert(ch, "default", t0, "srv-A", "cameleer.ingestion.flush.duration", "timer", "total_time", 30.0, Map.of("type", "execution"));
|
||||
insert(ch, "default", t0.plusSeconds(60), "srv-A", "cameleer.ingestion.flush.duration", "timer", "count", 4.0, Map.of("type", "execution"));
|
||||
insert(ch, "default", t0.plusSeconds(60), "srv-A", "cameleer.ingestion.flush.duration", "timer", "total_time", 100.0, Map.of("type", "execution"));
|
||||
}
|
||||
|
||||
// ── catalog ─────────────────────────────────────────────────────────
|
||||
|
||||
@Test
|
||||
void catalog_listsSeededMetricsWithStatisticsAndTagKeys() throws Exception {
|
||||
ResponseEntity<String> r = restTemplate.exchange(
|
||||
"/api/v1/admin/server-metrics/catalog?from=2026-04-23T09:00:00Z&to=2026-04-23T11:00:00Z",
|
||||
HttpMethod.GET, new HttpEntity<>(adminGet), String.class);
|
||||
assertThat(r.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
|
||||
JsonNode body = mapper.readTree(r.getBody());
|
||||
assertThat(body.isArray()).isTrue();
|
||||
|
||||
JsonNode drops = findByField(body, "metricName", "cameleer.ingestion.drops");
|
||||
assertThat(drops.get("metricType").asText()).isEqualTo("counter");
|
||||
assertThat(asStringList(drops.get("statistics"))).contains("count");
|
||||
assertThat(asStringList(drops.get("tagKeys"))).contains("reason");
|
||||
|
||||
JsonNode timer = findByField(body, "metricName", "cameleer.ingestion.flush.duration");
|
||||
assertThat(asStringList(timer.get("statistics"))).contains("count", "total_time");
|
||||
}
|
||||
|
||||
// ── instances ───────────────────────────────────────────────────────
|
||||
|
||||
@Test
|
||||
void instances_listsDistinctServerInstanceIdsWithFirstAndLastSeen() throws Exception {
|
||||
ResponseEntity<String> r = restTemplate.exchange(
|
||||
"/api/v1/admin/server-metrics/instances?from=2026-04-23T09:00:00Z&to=2026-04-23T11:00:00Z",
|
||||
HttpMethod.GET, new HttpEntity<>(adminGet), String.class);
|
||||
assertThat(r.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
|
||||
JsonNode body = mapper.readTree(r.getBody());
|
||||
assertThat(body.isArray()).isTrue();
|
||||
assertThat(body.size()).isEqualTo(2);
|
||||
// Ordered by last_seen DESC — srv-B saw a later row.
|
||||
assertThat(body.get(0).get("serverInstanceId").asText()).isEqualTo("srv-B");
|
||||
assertThat(body.get(1).get("serverInstanceId").asText()).isEqualTo("srv-A");
|
||||
}
|
||||
|
||||
// ── query — gauge with group-by-tag ─────────────────────────────────
|
||||
|
||||
@Test
|
||||
void query_gaugeWithGroupByTag_returnsSeriesPerTagValue() throws Exception {
|
||||
String requestBody = """
|
||||
{
|
||||
"metric": "cameleer.agents.connected",
|
||||
"statistic": "value",
|
||||
"from": "2026-04-23T09:59:00Z",
|
||||
"to": "2026-04-23T10:02:00Z",
|
||||
"stepSeconds": 60,
|
||||
"groupByTags": ["state"],
|
||||
"aggregation": "avg",
|
||||
"mode": "raw"
|
||||
}
|
||||
""";
|
||||
|
||||
ResponseEntity<String> r = restTemplate.postForEntity(
|
||||
"/api/v1/admin/server-metrics/query",
|
||||
new HttpEntity<>(requestBody, adminJson), String.class);
|
||||
assertThat(r.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
|
||||
JsonNode body = mapper.readTree(r.getBody());
|
||||
assertThat(body.get("metric").asText()).isEqualTo("cameleer.agents.connected");
|
||||
assertThat(body.get("statistic").asText()).isEqualTo("value");
|
||||
assertThat(body.get("mode").asText()).isEqualTo("raw");
|
||||
assertThat(body.get("stepSeconds").asInt()).isEqualTo(60);
|
||||
|
||||
JsonNode series = body.get("series");
|
||||
assertThat(series.isArray()).isTrue();
|
||||
assertThat(series.size()).isEqualTo(2);
|
||||
|
||||
JsonNode live = findByTag(series, "state", "live");
|
||||
assertThat(live.get("points").size()).isEqualTo(2);
|
||||
assertThat(live.get("points").get(0).get("v").asDouble()).isEqualTo(3.0);
|
||||
assertThat(live.get("points").get(1).get("v").asDouble()).isEqualTo(4.0);
|
||||
}
|
||||
|
||||
// ── query — counter delta across instance rotation ──────────────────
|
||||
|
||||
@Test
|
||||
void query_counterDelta_clipsNegativesAcrossInstanceRotation() throws Exception {
|
||||
String requestBody = """
|
||||
{
|
||||
"metric": "cameleer.ingestion.drops",
|
||||
"statistic": "count",
|
||||
"from": "2026-04-23T09:59:00Z",
|
||||
"to": "2026-04-23T10:05:00Z",
|
||||
"stepSeconds": 60,
|
||||
"groupByTags": ["reason"],
|
||||
"aggregation": "sum",
|
||||
"mode": "delta"
|
||||
}
|
||||
""";
|
||||
|
||||
ResponseEntity<String> r = restTemplate.postForEntity(
|
||||
"/api/v1/admin/server-metrics/query",
|
||||
new HttpEntity<>(requestBody, adminJson), String.class);
|
||||
assertThat(r.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
|
||||
JsonNode body = mapper.readTree(r.getBody());
|
||||
JsonNode reason = findByTag(body.get("series"), "reason", "buffer_full");
|
||||
// Deltas: 0 (first bucket on srv-A), 5, 5, 0 (first on srv-B, clipped), 2.
|
||||
// Sum across the window should be 12 if we tally all positive deltas.
|
||||
double sum = 0;
|
||||
for (JsonNode p : reason.get("points")) sum += p.get("v").asDouble();
|
||||
assertThat(sum).isEqualTo(12.0);
|
||||
// No individual point may be negative.
|
||||
for (JsonNode p : reason.get("points")) {
|
||||
assertThat(p.get("v").asDouble()).isGreaterThanOrEqualTo(0.0);
|
||||
}
|
||||
}
|
||||
|
||||
// ── query — derived 'mean' statistic for timers ─────────────────────
|
||||
|
||||
@Test
|
||||
void query_timerMeanStatistic_computesTotalOverCountPerBucket() throws Exception {
|
||||
String requestBody = """
|
||||
{
|
||||
"metric": "cameleer.ingestion.flush.duration",
|
||||
"statistic": "mean",
|
||||
"from": "2026-04-23T09:59:00Z",
|
||||
"to": "2026-04-23T10:02:00Z",
|
||||
"stepSeconds": 60,
|
||||
"groupByTags": ["type"],
|
||||
"aggregation": "avg",
|
||||
"mode": "raw"
|
||||
}
|
||||
""";
|
||||
|
||||
ResponseEntity<String> r = restTemplate.postForEntity(
|
||||
"/api/v1/admin/server-metrics/query",
|
||||
new HttpEntity<>(requestBody, adminJson), String.class);
|
||||
assertThat(r.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
|
||||
JsonNode body = mapper.readTree(r.getBody());
|
||||
JsonNode points = findByTag(body.get("series"), "type", "execution").get("points");
|
||||
// Bucket 0: 30 / 2 = 15.0
|
||||
// Bucket 1: 100 / 4 = 25.0
|
||||
assertThat(points.get(0).get("v").asDouble()).isEqualTo(15.0);
|
||||
assertThat(points.get(1).get("v").asDouble()).isEqualTo(25.0);
|
||||
}
|
||||
|
||||
// ── query — input validation ────────────────────────────────────────
|
||||
|
||||
@Test
|
||||
void query_rejectsUnsafeMetricName() {
|
||||
String requestBody = """
|
||||
{
|
||||
"metric": "cameleer.agents; DROP TABLE server_metrics",
|
||||
"from": "2026-04-23T09:59:00Z",
|
||||
"to": "2026-04-23T10:02:00Z"
|
||||
}
|
||||
""";
|
||||
|
||||
ResponseEntity<String> r = restTemplate.postForEntity(
|
||||
"/api/v1/admin/server-metrics/query",
|
||||
new HttpEntity<>(requestBody, adminJson), String.class);
|
||||
assertThat(r.getStatusCode()).isEqualTo(HttpStatus.BAD_REQUEST);
|
||||
}
|
||||
|
||||
@Test
|
||||
void query_rejectsRangeBeyondMax() {
|
||||
String requestBody = """
|
||||
{
|
||||
"metric": "cameleer.agents.connected",
|
||||
"from": "2026-01-01T00:00:00Z",
|
||||
"to": "2026-04-23T00:00:00Z"
|
||||
}
|
||||
""";
|
||||
|
||||
ResponseEntity<String> r = restTemplate.postForEntity(
|
||||
"/api/v1/admin/server-metrics/query",
|
||||
new HttpEntity<>(requestBody, adminJson), String.class);
|
||||
assertThat(r.getStatusCode()).isEqualTo(HttpStatus.BAD_REQUEST);
|
||||
}
|
||||
|
||||
// ── authorization ───────────────────────────────────────────────────
|
||||
|
||||
@Test
|
||||
void allEndpoints_requireAdminRole() {
|
||||
ResponseEntity<String> catalog = restTemplate.exchange(
|
||||
"/api/v1/admin/server-metrics/catalog",
|
||||
HttpMethod.GET, new HttpEntity<>(viewerGet), String.class);
|
||||
assertThat(catalog.getStatusCode()).isEqualTo(HttpStatus.FORBIDDEN);
|
||||
|
||||
ResponseEntity<String> instances = restTemplate.exchange(
|
||||
"/api/v1/admin/server-metrics/instances",
|
||||
HttpMethod.GET, new HttpEntity<>(viewerGet), String.class);
|
||||
assertThat(instances.getStatusCode()).isEqualTo(HttpStatus.FORBIDDEN);
|
||||
|
||||
HttpHeaders viewerPost = securityHelper.authHeaders(securityHelper.viewerToken());
|
||||
ResponseEntity<String> query = restTemplate.exchange(
|
||||
"/api/v1/admin/server-metrics/query",
|
||||
HttpMethod.POST, new HttpEntity<>("{}", viewerPost), String.class);
|
||||
assertThat(query.getStatusCode()).isEqualTo(HttpStatus.FORBIDDEN);
|
||||
}
|
||||
|
||||
// ── helpers ─────────────────────────────────────────────────────────
|
||||
|
||||
private org.springframework.jdbc.core.JdbcTemplate clickhouseJdbc() {
|
||||
return org.springframework.test.util.AopTestUtils.getTargetObject(
|
||||
applicationContext.getBean("clickHouseJdbcTemplate"));
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private org.springframework.context.ApplicationContext applicationContext;
|
||||
|
||||
private static void insert(org.springframework.jdbc.core.JdbcTemplate jdbc,
|
||||
String tenantId, Instant collectedAt, String serverInstanceId,
|
||||
String metricName, String metricType, String statistic,
|
||||
double value, Map<String, String> tags) {
|
||||
jdbc.update("""
|
||||
INSERT INTO server_metrics
|
||||
(tenant_id, collected_at, server_instance_id,
|
||||
metric_name, metric_type, statistic, metric_value, tags)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
tenantId, Timestamp.from(collectedAt), serverInstanceId,
|
||||
metricName, metricType, statistic, value, tags);
|
||||
}
|
||||
|
||||
private static JsonNode findByField(JsonNode array, String field, String value) {
|
||||
for (JsonNode n : array) {
|
||||
if (value.equals(n.path(field).asText())) return n;
|
||||
}
|
||||
throw new AssertionError("no element with " + field + "=" + value);
|
||||
}
|
||||
|
||||
private static JsonNode findByTag(JsonNode seriesArray, String tagKey, String tagValue) {
|
||||
for (JsonNode s : seriesArray) {
|
||||
if (tagValue.equals(s.path("tags").path(tagKey).asText())) return s;
|
||||
}
|
||||
throw new AssertionError("no series with tag " + tagKey + "=" + tagValue);
|
||||
}
|
||||
|
||||
private static java.util.List<String> asStringList(JsonNode arr) {
|
||||
java.util.List<String> out = new java.util.ArrayList<>();
|
||||
if (arr != null) for (JsonNode n : arr) out.add(n.asText());
|
||||
return out;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user