add metrics ingestion diagnostics and upgrade cameleer3-common to 0.0.3
- Add logging to MetricsController: warn on parse failures, debug on received metrics, buffer depth on 503 - Add GET /api/v1/admin/database/metrics-pipeline diagnostic endpoint (buffer depth, row count, distinct agents/metrics, latest timestamp) - Fix BackpressureIT test JSON to match actual MetricsSnapshot schema (collectedAt/metricName/metricValue instead of timestamp/metrics) - Upgrade cameleer3-common from 1.0-SNAPSHOT to 0.0.3 (adds engineLevel) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -7,6 +7,7 @@ import com.cameleer3.server.app.dto.TableSizeResponse;
|
|||||||
import com.cameleer3.server.core.admin.AuditCategory;
|
import com.cameleer3.server.core.admin.AuditCategory;
|
||||||
import com.cameleer3.server.core.admin.AuditResult;
|
import com.cameleer3.server.core.admin.AuditResult;
|
||||||
import com.cameleer3.server.core.admin.AuditService;
|
import com.cameleer3.server.core.admin.AuditService;
|
||||||
|
import com.cameleer3.server.core.ingestion.IngestionService;
|
||||||
import com.zaxxer.hikari.HikariDataSource;
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
import com.zaxxer.hikari.HikariPoolMXBean;
|
import com.zaxxer.hikari.HikariPoolMXBean;
|
||||||
import io.swagger.v3.oas.annotations.Operation;
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
@@ -24,7 +25,9 @@ import org.springframework.web.bind.annotation.RestController;
|
|||||||
import org.springframework.web.server.ResponseStatusException;
|
import org.springframework.web.server.ResponseStatusException;
|
||||||
|
|
||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("/api/v1/admin/database")
|
@RequestMapping("/api/v1/admin/database")
|
||||||
@@ -35,11 +38,14 @@ public class DatabaseAdminController {
|
|||||||
private final JdbcTemplate jdbc;
|
private final JdbcTemplate jdbc;
|
||||||
private final DataSource dataSource;
|
private final DataSource dataSource;
|
||||||
private final AuditService auditService;
|
private final AuditService auditService;
|
||||||
|
private final IngestionService ingestionService;
|
||||||
|
|
||||||
public DatabaseAdminController(JdbcTemplate jdbc, DataSource dataSource, AuditService auditService) {
|
public DatabaseAdminController(JdbcTemplate jdbc, DataSource dataSource,
|
||||||
|
AuditService auditService, IngestionService ingestionService) {
|
||||||
this.jdbc = jdbc;
|
this.jdbc = jdbc;
|
||||||
this.dataSource = dataSource;
|
this.dataSource = dataSource;
|
||||||
this.auditService = auditService;
|
this.auditService = auditService;
|
||||||
|
this.ingestionService = ingestionService;
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/status")
|
@GetMapping("/status")
|
||||||
@@ -117,6 +123,29 @@ public class DatabaseAdminController {
|
|||||||
return ResponseEntity.ok().build();
|
return ResponseEntity.ok().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GetMapping("/metrics-pipeline")
|
||||||
|
@Operation(summary = "Get metrics ingestion pipeline diagnostics")
|
||||||
|
public ResponseEntity<Map<String, Object>> getMetricsPipeline() {
|
||||||
|
int bufferDepth = ingestionService.getMetricsBufferDepth();
|
||||||
|
|
||||||
|
Long totalRows = jdbc.queryForObject(
|
||||||
|
"SELECT count(*) FROM agent_metrics", Long.class);
|
||||||
|
List<String> agentIds = jdbc.queryForList(
|
||||||
|
"SELECT DISTINCT agent_id FROM agent_metrics ORDER BY agent_id", String.class);
|
||||||
|
Instant latestCollected = jdbc.queryForObject(
|
||||||
|
"SELECT max(collected_at) FROM agent_metrics", Instant.class);
|
||||||
|
List<String> metricNames = jdbc.queryForList(
|
||||||
|
"SELECT DISTINCT metric_name FROM agent_metrics ORDER BY metric_name", String.class);
|
||||||
|
|
||||||
|
return ResponseEntity.ok(Map.of(
|
||||||
|
"bufferDepth", bufferDepth,
|
||||||
|
"totalRows", totalRows != null ? totalRows : 0,
|
||||||
|
"distinctAgents", agentIds,
|
||||||
|
"distinctMetrics", metricNames,
|
||||||
|
"latestCollectedAt", latestCollected != null ? latestCollected.toString() : "none"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
private String extractHost(DataSource ds) {
|
private String extractHost(DataSource ds) {
|
||||||
try {
|
try {
|
||||||
if (ds instanceof HikariDataSource hds) {
|
if (ds instanceof HikariDataSource hds) {
|
||||||
|
|||||||
@@ -44,13 +44,23 @@ public class MetricsController {
|
|||||||
@Operation(summary = "Ingest agent metrics",
|
@Operation(summary = "Ingest agent metrics",
|
||||||
description = "Accepts an array of MetricsSnapshot objects")
|
description = "Accepts an array of MetricsSnapshot objects")
|
||||||
@ApiResponse(responseCode = "202", description = "Data accepted for processing")
|
@ApiResponse(responseCode = "202", description = "Data accepted for processing")
|
||||||
|
@ApiResponse(responseCode = "400", description = "Invalid payload")
|
||||||
@ApiResponse(responseCode = "503", description = "Buffer full, retry later")
|
@ApiResponse(responseCode = "503", description = "Buffer full, retry later")
|
||||||
public ResponseEntity<Void> ingestMetrics(@RequestBody String body) throws JsonProcessingException {
|
public ResponseEntity<Void> ingestMetrics(@RequestBody String body) {
|
||||||
List<MetricsSnapshot> metrics = parsePayload(body);
|
List<MetricsSnapshot> metrics;
|
||||||
boolean accepted = ingestionService.acceptMetrics(metrics);
|
try {
|
||||||
|
metrics = parsePayload(body);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
log.warn("Failed to parse metrics payload: {}", e.getMessage());
|
||||||
|
return ResponseEntity.badRequest().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
log.debug("Received {} metric(s) from agent(s)", metrics.size());
|
||||||
|
|
||||||
|
boolean accepted = ingestionService.acceptMetrics(metrics);
|
||||||
if (!accepted) {
|
if (!accepted) {
|
||||||
log.warn("Metrics buffer full, returning 503");
|
log.warn("Metrics buffer full ({} items), returning 503",
|
||||||
|
ingestionService.getMetricsBufferDepth());
|
||||||
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
|
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
|
||||||
.header("Retry-After", "5")
|
.header("Retry-After", "5")
|
||||||
.build();
|
.build();
|
||||||
|
|||||||
@@ -50,11 +50,11 @@ class BackpressureIT extends AbstractPostgresIT {
|
|||||||
// Fill the metrics buffer completely with a batch of 5
|
// Fill the metrics buffer completely with a batch of 5
|
||||||
String batchJson = """
|
String batchJson = """
|
||||||
[
|
[
|
||||||
{"agentId":"bp-agent","timestamp":"2026-03-11T10:00:00Z","metrics":{}},
|
{"agentId":"bp-agent","collectedAt":"2026-03-11T10:00:00Z","metricName":"test.metric","metricValue":1.0,"tags":{}},
|
||||||
{"agentId":"bp-agent","timestamp":"2026-03-11T10:00:01Z","metrics":{}},
|
{"agentId":"bp-agent","collectedAt":"2026-03-11T10:00:01Z","metricName":"test.metric","metricValue":2.0,"tags":{}},
|
||||||
{"agentId":"bp-agent","timestamp":"2026-03-11T10:00:02Z","metrics":{}},
|
{"agentId":"bp-agent","collectedAt":"2026-03-11T10:00:02Z","metricName":"test.metric","metricValue":3.0,"tags":{}},
|
||||||
{"agentId":"bp-agent","timestamp":"2026-03-11T10:00:03Z","metrics":{}},
|
{"agentId":"bp-agent","collectedAt":"2026-03-11T10:00:03Z","metricName":"test.metric","metricValue":4.0,"tags":{}},
|
||||||
{"agentId":"bp-agent","timestamp":"2026-03-11T10:00:04Z","metrics":{}}
|
{"agentId":"bp-agent","collectedAt":"2026-03-11T10:00:04Z","metricName":"test.metric","metricValue":5.0,"tags":{}}
|
||||||
]
|
]
|
||||||
""";
|
""";
|
||||||
|
|
||||||
@@ -66,7 +66,7 @@ class BackpressureIT extends AbstractPostgresIT {
|
|||||||
|
|
||||||
// Now buffer should be full -- next POST should get 503
|
// Now buffer should be full -- next POST should get 503
|
||||||
String overflowJson = """
|
String overflowJson = """
|
||||||
[{"agentId":"bp-agent","timestamp":"2026-03-11T10:00:05Z","metrics":{}}]
|
[{"agentId":"bp-agent","collectedAt":"2026-03-11T10:00:05Z","metricName":"test.metric","metricValue":6.0,"tags":{}}]
|
||||||
""";
|
""";
|
||||||
|
|
||||||
ResponseEntity<String> response = restTemplate.postForEntity(
|
ResponseEntity<String> response = restTemplate.postForEntity(
|
||||||
|
|||||||
2
pom.xml
2
pom.xml
@@ -28,7 +28,7 @@
|
|||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
<java.version>17</java.version>
|
<java.version>17</java.version>
|
||||||
<jackson.version>2.17.3</jackson.version>
|
<jackson.version>2.17.3</jackson.version>
|
||||||
<cameleer3-common.version>1.0-SNAPSHOT</cameleer3-common.version>
|
<cameleer3-common.version>0.0.3</cameleer3-common.version>
|
||||||
<testcontainers.version>2.0.3</testcontainers.version>
|
<testcontainers.version>2.0.3</testcontainers.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user