feat: add Micrometer Prometheus metrics to server
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 2m36s
CI / deploy (push) Has been cancelled
CI / docker (push) Has been cancelled
CI / deploy-feature (push) Has been cancelled

Adds micrometer-registry-prometheus and exposes /api/v1/prometheus
endpoint (unauthenticated for scraping). ServerMetrics component
provides business metrics beyond default JVM/HTTP:

Gauges: agents by state, SSE connections, buffer depths (execution,
processor, log, metrics), accumulator pending exchanges.

Counters: ingestion drops (buffer_full, no_agent, no_identity),
agent transitions (went_stale, went_dead, recovered), deployment
outcomes (running, failed, degraded), auth failures (invalid_token,
revoked, oidc_rejected).

Timers: ClickHouse flush duration by type, deployment duration.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-12 18:23:27 +02:00
parent caaa1ab0cc
commit 6bf7175a6c
12 changed files with 261 additions and 11 deletions

View File

@@ -110,6 +110,9 @@ java -jar cameleer3-server-app/target/cameleer3-server-app-1.0-SNAPSHOT.jar
- `PrometheusLabelBuilder` — generates Prometheus Docker labels (`prometheus.scrape/path/port`) per runtime type for `docker_sd_configs` auto-discovery
- `DisabledRuntimeOrchestrator` — no-op when runtime not enabled
**metrics/** — Prometheus observability
- `ServerMetrics` — centralized business metrics: gauges (agents by state, SSE connections, buffer depths), counters (ingestion drops, agent transitions, deployment outcomes, auth failures), timers (flush duration, deployment duration). Exposed via `/api/v1/prometheus`.
**storage/** — PostgreSQL repositories (JdbcTemplate)
- `PostgresAppRepository`, `PostgresAppVersionRepository`, `PostgresEnvironmentRepository`
- `PostgresDeploymentRepository` — includes JSONB replica_states, deploy_stage, findByContainerId
@@ -300,7 +303,7 @@ In SaaS mode, each tenant's server and its deployed apps are isolated at the Doc
<!-- gitnexus:start -->
# GitNexus — Code Intelligence
This project is indexed by GitNexus as **cameleer3-server** (5968 symbols, 15141 relationships, 300 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
This project is indexed by GitNexus as **cameleer3-server** (5987 symbols, 15177 relationships, 300 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
> If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first.

View File

@@ -31,6 +31,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>

View File

@@ -1,5 +1,6 @@
package com.cameleer3.server.app.agent;
import com.cameleer3.server.app.metrics.ServerMetrics;
import com.cameleer3.server.core.agent.AgentEventService;
import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService;
@@ -26,11 +27,14 @@ public class AgentLifecycleMonitor {
private final AgentRegistryService registryService;
private final AgentEventService agentEventService;
private final ServerMetrics serverMetrics;
public AgentLifecycleMonitor(AgentRegistryService registryService,
AgentEventService agentEventService) {
AgentEventService agentEventService,
ServerMetrics serverMetrics) {
this.registryService = registryService;
this.agentEventService = agentEventService;
this.serverMetrics = serverMetrics;
}
@Scheduled(fixedDelayString = "${agent-registry.lifecycle-check-interval-ms:10000}")
@@ -53,6 +57,7 @@ public class AgentLifecycleMonitor {
if (eventType != null) {
agentEventService.recordEvent(agent.instanceId(), agent.applicationId(), eventType,
agent.displayName() + " " + before + " -> " + agent.state());
serverMetrics.recordAgentTransition(eventType);
}
}
}

View File

@@ -131,6 +131,10 @@ public class SseConnectionManager implements AgentEventListener {
/**
* Check if an agent has an active SSE connection.
*/
public int getConnectionCount() {
return emitters.size();
}
public boolean isConnected(String agentId) {
return emitters.containsKey(agentId);
}

View File

@@ -1,5 +1,6 @@
package com.cameleer3.server.app.config;
import com.cameleer3.server.app.metrics.ServerMetrics;
import com.cameleer3.server.app.search.ClickHouseLogStore;
import com.cameleer3.server.app.storage.ClickHouseAgentEventRepository;
import com.cameleer3.server.app.storage.ClickHouseUsageTracker;
@@ -113,9 +114,10 @@ public class StorageBeanConfig {
ClickHouseExecutionStore executionStore,
ClickHouseLogStore logStore,
ChunkAccumulator accumulator,
IngestionConfig config) {
IngestionConfig config,
ServerMetrics serverMetrics) {
return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer,
logBuffer, executionStore, logStore, accumulator, config);
logBuffer, executionStore, logStore, accumulator, config, serverMetrics);
}
@Bean

View File

@@ -1,6 +1,7 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.common.model.LogEntry;
import com.cameleer3.server.app.metrics.ServerMetrics;
import com.cameleer3.server.core.ingestion.BufferedLogEntry;
import java.util.List;
import com.cameleer3.server.core.ingestion.WriteBuffer;
@@ -32,13 +33,16 @@ public class LogIngestionController {
private final WriteBuffer<BufferedLogEntry> logBuffer;
private final AgentRegistryService registryService;
private final TenantProperties tenantProperties;
private final ServerMetrics serverMetrics;
public LogIngestionController(WriteBuffer<BufferedLogEntry> logBuffer,
AgentRegistryService registryService,
TenantProperties tenantProperties) {
TenantProperties tenantProperties,
ServerMetrics serverMetrics) {
this.logBuffer = logBuffer;
this.registryService = registryService;
this.tenantProperties = tenantProperties;
this.serverMetrics = serverMetrics;
}
@PostMapping("/logs")
@@ -49,6 +53,7 @@ public class LogIngestionController {
String instanceId = extractAgentId();
if (instanceId == null || instanceId.isBlank()) {
log.warn("Log ingestion rejected: no agent identity in request (unauthenticated or missing principal)");
serverMetrics.recordIngestionDrop("no_identity");
return ResponseEntity.accepted().build();
}
@@ -61,6 +66,7 @@ public class LogIngestionController {
if (agent == null) {
log.warn("Log ingestion from instance={}: agent not found in registry (not registered or expired). {} entries dropped.",
instanceId, entries.size());
serverMetrics.recordIngestionDrops("no_agent", entries.size());
return ResponseEntity.accepted().build();
}
@@ -89,6 +95,7 @@ public class LogIngestionController {
if (dropped > 0) {
log.warn("Log buffer full: accepted={}, dropped={} from instance={}, app={}",
accepted, dropped, instanceId, applicationId);
serverMetrics.recordIngestionDrops("buffer_full", dropped);
} else {
log.debug("Accepted {} log entries from instance={}, app={}", accepted, instanceId, applicationId);
}

View File

@@ -12,6 +12,9 @@ import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.annotation.Scheduled;
import com.cameleer3.server.app.metrics.ServerMetrics;
import java.time.Duration;
import java.util.List;
/**
@@ -33,6 +36,7 @@ public class ExecutionFlushScheduler implements SmartLifecycle {
private final ClickHouseExecutionStore executionStore;
private final ClickHouseLogStore logStore;
private final ChunkAccumulator accumulator;
private final ServerMetrics serverMetrics;
private final int batchSize;
private volatile boolean running = false;
@@ -42,7 +46,8 @@ public class ExecutionFlushScheduler implements SmartLifecycle {
ClickHouseExecutionStore executionStore,
ClickHouseLogStore logStore,
ChunkAccumulator accumulator,
IngestionConfig config) {
IngestionConfig config,
ServerMetrics serverMetrics) {
this.executionBuffer = executionBuffer;
this.processorBuffer = processorBuffer;
this.logBuffer = logBuffer;
@@ -50,14 +55,18 @@ public class ExecutionFlushScheduler implements SmartLifecycle {
this.logStore = logStore;
this.accumulator = accumulator;
this.batchSize = config.getBatchSize();
this.serverMetrics = serverMetrics;
}
@Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
@Scheduled(fixedDelayString = "${cameleer.server.ingestion.flushintervalms:1000}")
public void flush() {
try {
List<MergedExecution> executions = executionBuffer.drain(batchSize);
if (!executions.isEmpty()) {
long start = System.nanoTime();
executionStore.insertExecutionBatch(executions);
serverMetrics.recordFlushDuration("execution",
Duration.ofNanos(System.nanoTime() - start));
log.debug("Flushed {} executions to ClickHouse", executions.size());
}
} catch (Exception e) {
@@ -67,7 +76,10 @@ public class ExecutionFlushScheduler implements SmartLifecycle {
try {
List<ChunkAccumulator.ProcessorBatch> batches = processorBuffer.drain(batchSize);
if (!batches.isEmpty()) {
long start = System.nanoTime();
executionStore.insertProcessorBatches(batches);
serverMetrics.recordFlushDuration("processor",
Duration.ofNanos(System.nanoTime() - start));
log.debug("Flushed {} processor batches to ClickHouse", batches.size());
}
} catch (Exception e) {
@@ -77,7 +89,10 @@ public class ExecutionFlushScheduler implements SmartLifecycle {
try {
List<BufferedLogEntry> logEntries = logBuffer.drain(batchSize);
if (!logEntries.isEmpty()) {
long start = System.nanoTime();
logStore.insertBufferedBatch(logEntries);
serverMetrics.recordFlushDuration("log",
Duration.ofNanos(System.nanoTime() - start));
log.debug("Flushed {} log entries to ClickHouse", logEntries.size());
}
} catch (Exception e) {

View File

@@ -0,0 +1,190 @@
package com.cameleer3.server.app.metrics;
import com.cameleer3.server.app.agent.SseConnectionManager;
import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.agent.AgentState;
import com.cameleer3.server.core.ingestion.BufferedLogEntry;
import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
@Component
public class ServerMetrics {
private final MeterRegistry registry;
// Counters
private final Counter ingestionDropsBufferFull;
private final Counter ingestionDropsNoAgent;
private final Counter ingestionDropsNoIdentity;
private final Counter transitionsWentStale;
private final Counter transitionsWentDead;
private final Counter transitionsRecovered;
private final Counter deploymentsRunning;
private final Counter deploymentsFailed;
private final Counter deploymentsDegraded;
private final Counter authFailuresInvalidToken;
private final Counter authFailuresRevoked;
private final Counter authFailuresOidc;
// Timers
private final Timer flushExecutionTimer;
private final Timer flushProcessorTimer;
private final Timer flushLogTimer;
private final Timer deploymentDurationTimer;
public ServerMetrics(MeterRegistry registry,
AgentRegistryService registryService,
SseConnectionManager sseManager,
WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBuffer,
WriteBuffer<BufferedLogEntry> logBuffer,
WriteBuffer<MetricsSnapshot> metricsBuffer,
ChunkAccumulator accumulator) {
this.registry = registry;
// ── Gauges (auto-polled) ────────────────────────────────────────
for (AgentState state : AgentState.values()) {
Gauge.builder("cameleer.agents.connected", registryService,
r -> r.findByState(state).size())
.tag("state", state.name().toLowerCase())
.register(registry);
}
Gauge.builder("cameleer.agents.sse.active", sseManager,
SseConnectionManager::getConnectionCount)
.register(registry);
Gauge.builder("cameleer.ingestion.buffer.size", executionBuffer, WriteBuffer::size)
.tag("type", "execution")
.register(registry);
Gauge.builder("cameleer.ingestion.buffer.size", processorBuffer, WriteBuffer::size)
.tag("type", "processor")
.register(registry);
Gauge.builder("cameleer.ingestion.buffer.size", logBuffer, WriteBuffer::size)
.tag("type", "log")
.register(registry);
Gauge.builder("cameleer.ingestion.buffer.size", metricsBuffer, WriteBuffer::size)
.tag("type", "metrics")
.register(registry);
Gauge.builder("cameleer.ingestion.accumulator.pending", accumulator,
ChunkAccumulator::getPendingCount)
.register(registry);
// ── Counters ────────────────────────────────────────────────────
ingestionDropsBufferFull = Counter.builder("cameleer.ingestion.drops")
.tag("reason", "buffer_full").register(registry);
ingestionDropsNoAgent = Counter.builder("cameleer.ingestion.drops")
.tag("reason", "no_agent").register(registry);
ingestionDropsNoIdentity = Counter.builder("cameleer.ingestion.drops")
.tag("reason", "no_identity").register(registry);
transitionsWentStale = Counter.builder("cameleer.agents.transitions")
.tag("transition", "went_stale").register(registry);
transitionsWentDead = Counter.builder("cameleer.agents.transitions")
.tag("transition", "went_dead").register(registry);
transitionsRecovered = Counter.builder("cameleer.agents.transitions")
.tag("transition", "recovered").register(registry);
deploymentsRunning = Counter.builder("cameleer.deployments.outcome")
.tag("status", "running").register(registry);
deploymentsFailed = Counter.builder("cameleer.deployments.outcome")
.tag("status", "failed").register(registry);
deploymentsDegraded = Counter.builder("cameleer.deployments.outcome")
.tag("status", "degraded").register(registry);
authFailuresInvalidToken = Counter.builder("cameleer.auth.failures")
.tag("reason", "invalid_token").register(registry);
authFailuresRevoked = Counter.builder("cameleer.auth.failures")
.tag("reason", "revoked").register(registry);
authFailuresOidc = Counter.builder("cameleer.auth.failures")
.tag("reason", "oidc_rejected").register(registry);
// ── Timers ──────────────────────────────────────────────────────
flushExecutionTimer = Timer.builder("cameleer.ingestion.flush.duration")
.tag("type", "execution").register(registry);
flushProcessorTimer = Timer.builder("cameleer.ingestion.flush.duration")
.tag("type", "processor").register(registry);
flushLogTimer = Timer.builder("cameleer.ingestion.flush.duration")
.tag("type", "log").register(registry);
deploymentDurationTimer = Timer.builder("cameleer.deployments.duration")
.register(registry);
}
// ── Ingestion ───────────────────────────────────────────────────────
public void recordIngestionDrop(String reason) {
switch (reason) {
case "buffer_full" -> ingestionDropsBufferFull.increment();
case "no_agent" -> ingestionDropsNoAgent.increment();
case "no_identity" -> ingestionDropsNoIdentity.increment();
}
}
public void recordIngestionDrops(String reason, int count) {
switch (reason) {
case "buffer_full" -> ingestionDropsBufferFull.increment(count);
case "no_agent" -> ingestionDropsNoAgent.increment(count);
case "no_identity" -> ingestionDropsNoIdentity.increment(count);
}
}
// ── Agent lifecycle ─────────────────────────────────────────────────
public void recordAgentTransition(String transition) {
switch (transition) {
case "WENT_STALE" -> transitionsWentStale.increment();
case "WENT_DEAD" -> transitionsWentDead.increment();
case "RECOVERED" -> transitionsRecovered.increment();
}
}
// ── Deployments ─────────────────────────────────────────────────────
public void recordDeploymentOutcome(String status) {
switch (status) {
case "RUNNING" -> deploymentsRunning.increment();
case "FAILED" -> deploymentsFailed.increment();
case "DEGRADED" -> deploymentsDegraded.increment();
}
}
public void recordDeploymentDuration(long startMillis) {
deploymentDurationTimer.record(
System.currentTimeMillis() - startMillis, TimeUnit.MILLISECONDS);
}
// ── Flush timers ────────────────────────────────────────────────────
public void recordFlushDuration(String type, Duration duration) {
switch (type) {
case "execution" -> flushExecutionTimer.record(duration);
case "processor" -> flushProcessorTimer.record(duration);
case "log" -> flushLogTimer.record(duration);
}
}
// ── Auth ────────────────────────────────────────────────────────────
public void recordAuthFailure(String reason) {
switch (reason) {
case "invalid_token" -> authFailuresInvalidToken.increment();
case "revoked" -> authFailuresRevoked.increment();
case "oidc_rejected" -> authFailuresOidc.increment();
}
}
}

View File

@@ -1,5 +1,6 @@
package com.cameleer3.server.app.runtime;
import com.cameleer3.server.app.metrics.ServerMetrics;
import com.cameleer3.server.app.storage.PostgresDeploymentRepository;
import com.cameleer3.server.core.runtime.*;
import org.slf4j.Logger;
@@ -67,6 +68,9 @@ public class DeploymentExecutor {
@Value("${cameleer.server.tenant.id:default}")
private String tenantId;
@Autowired
private ServerMetrics serverMetrics;
public DeploymentExecutor(RuntimeOrchestrator orchestrator,
DeploymentService deploymentService,
AppService appService,
@@ -82,6 +86,7 @@ public class DeploymentExecutor {
@Async("deploymentTaskExecutor")
public void executeAsync(Deployment deployment) {
long deployStart = System.currentTimeMillis();
try {
App app = appService.getById(deployment.appId());
Environment env = envService.getById(deployment.environmentId());
@@ -215,6 +220,8 @@ public class DeploymentExecutor {
}
pgDeployRepo.updateDeployStage(deployment.id(), null);
deploymentService.markFailed(deployment.id(), "No replicas passed health check within " + healthCheckTimeout + "s");
serverMetrics.recordDeploymentOutcome("FAILED");
serverMetrics.recordDeploymentDuration(deployStart);
return;
}
@@ -245,6 +252,8 @@ public class DeploymentExecutor {
}
pgDeployRepo.updateDeployStage(deployment.id(), null);
serverMetrics.recordDeploymentOutcome(finalStatus.name());
serverMetrics.recordDeploymentDuration(deployStart);
log.info("Deployment {} is {} ({}/{} replicas healthy)",
deployment.id(), finalStatus, healthyCount, config.replicas());
@@ -252,6 +261,8 @@ public class DeploymentExecutor {
log.error("Deployment {} FAILED: {}", deployment.id(), e.getMessage(), e);
pgDeployRepo.updateDeployStage(deployment.id(), null);
deploymentService.markFailed(deployment.id(), e.getMessage());
serverMetrics.recordDeploymentOutcome("FAILED");
serverMetrics.recordDeploymentDuration(deployStart);
}
}

View File

@@ -1,5 +1,6 @@
package com.cameleer3.server.app.security;
import com.cameleer3.server.app.metrics.ServerMetrics;
import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.rbac.SystemRole;
import com.cameleer3.server.core.security.JwtService;
@@ -45,15 +46,18 @@ public class JwtAuthenticationFilter extends OncePerRequestFilter {
private final AgentRegistryService agentRegistryService;
private final JwtDecoder oidcDecoder;
private final UserRepository userRepository;
private final ServerMetrics serverMetrics;
public JwtAuthenticationFilter(JwtService jwtService,
AgentRegistryService agentRegistryService,
JwtDecoder oidcDecoder,
UserRepository userRepository) {
UserRepository userRepository,
ServerMetrics serverMetrics) {
this.jwtService = jwtService;
this.agentRegistryService = agentRegistryService;
this.oidcDecoder = oidcDecoder;
this.userRepository = userRepository;
this.serverMetrics = serverMetrics;
}
@Override
@@ -85,6 +89,7 @@ public class JwtAuthenticationFilter extends OncePerRequestFilter {
userRepository.findById(subject).ifPresent(user -> {
Instant revoked = user.tokenRevokedBefore();
if (revoked != null && result.issuedAt().isBefore(revoked)) {
serverMetrics.recordAuthFailure("revoked");
throw new com.cameleer3.server.core.security.InvalidTokenException("Token revoked");
}
});
@@ -117,6 +122,7 @@ public class JwtAuthenticationFilter extends OncePerRequestFilter {
SecurityContextHolder.getContext().setAuthentication(auth);
} catch (Exception e) {
log.debug("OIDC token validation failed: {}", e.getMessage());
serverMetrics.recordAuthFailure("oidc_rejected");
}
}

View File

@@ -1,5 +1,6 @@
package com.cameleer3.server.app.security;
import com.cameleer3.server.app.metrics.ServerMetrics;
import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.security.JwtService;
import com.cameleer3.server.core.security.UserRepository;
@@ -61,7 +62,8 @@ public class SecurityConfig {
AgentRegistryService registryService,
SecurityProperties securityProperties,
CorsConfigurationSource corsConfigurationSource,
UserRepository userRepository) throws Exception {
UserRepository userRepository,
ServerMetrics serverMetrics) throws Exception {
JwtDecoder oidcDecoder = null;
String issuer = securityProperties.getOidc().getIssuerUri();
if (issuer != null && !issuer.isBlank()) {
@@ -83,6 +85,7 @@ public class SecurityConfig {
// Public endpoints
.requestMatchers(
"/api/v1/health",
"/api/v1/prometheus",
"/api/v1/branding/**",
"/api/v1/agents/register",
"/api/v1/agents/*/refresh",
@@ -142,7 +145,7 @@ public class SecurityConfig {
.authenticationEntryPoint(new HttpStatusEntryPoint(HttpStatus.UNAUTHORIZED))
)
.addFilterBefore(
new JwtAuthenticationFilter(jwtService, registryService, oidcDecoder, userRepository),
new JwtAuthenticationFilter(jwtService, registryService, oidcDecoder, userRepository, serverMetrics),
UsernamePasswordAuthenticationFilter.class
);

View File

@@ -100,7 +100,7 @@ management:
web:
base-path: /api/v1
exposure:
include: health
include: health,prometheus
endpoint:
health:
show-details: always