fix: JmxMetricsBridge always exports JVM metrics even without Camel JMX
JVM metrics (heap, threads, CPU, GC) use ManagementFactory which is always available. Previously the entire bridge was disabled when camel-management was missing. Now it starts the scheduler regardless and skips only Camel route/processor/context metrics. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -109,7 +109,7 @@ class JmxMetricsBridgeTest {
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@Test
|
||||
void start_withJmxUnavailable_doesNotStart() throws Exception {
|
||||
void start_withJmxUnavailable_stillExportsJvmMetrics() throws Exception {
|
||||
camelContext = createContextWithoutJmx();
|
||||
|
||||
CapturingExporter exporter = new CapturingExporter();
|
||||
@@ -117,11 +117,23 @@ class JmxMetricsBridgeTest {
|
||||
|
||||
bridge.start();
|
||||
|
||||
assertFalse(bridge.isAvailable(), "Bridge should not be available when JMX is disabled");
|
||||
// Give a brief window to confirm no exports happen
|
||||
Thread.sleep(1500); // SonarQube: Thread.sleep used for async test synchronization
|
||||
assertTrue(exporter.getExportedSnapshots().isEmpty(),
|
||||
"No metrics should be exported when JMX is unavailable");
|
||||
// Bridge is available even without Camel JMX — JVM metrics still work
|
||||
assertTrue(bridge.isAvailable(), "Bridge should be available (JVM metrics don't need Camel JMX)");
|
||||
|
||||
// Wait for at least one export cycle
|
||||
assertTrue(exporter.awaitExport(5, TimeUnit.SECONDS),
|
||||
"Should export JVM metrics even when Camel JMX is disabled");
|
||||
|
||||
// Verify only JVM metrics are present (no Camel route/processor metrics)
|
||||
List<String> metricNames = exporter.getExportedSnapshots().stream()
|
||||
.map(MetricsSnapshot::getMetricName)
|
||||
.toList();
|
||||
assertTrue(metricNames.contains("jvm.memory.used.value"),
|
||||
"Should contain JVM memory metrics");
|
||||
assertTrue(metricNames.contains("jvm.threads.live.value"),
|
||||
"Should contain JVM thread metrics");
|
||||
assertFalse(metricNames.contains("camel.exchanges.succeeded.count"),
|
||||
"Should NOT contain Camel route metrics when JMX is disabled");
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@@ -32,6 +32,7 @@ public class JmxMetricsBridge implements MetricsBridge {
|
||||
private ScheduledExecutorService scheduler;
|
||||
private final AtomicReference<List<MetricsSnapshot>> latestSnapshots = new AtomicReference<>(List.of());
|
||||
private volatile boolean available = false;
|
||||
private volatile boolean camelJmxAvailable = false;
|
||||
|
||||
public JmxMetricsBridge(CameleerAgentConfig config, CamelContext camelContext,
|
||||
Exporter exporter, String instanceId) {
|
||||
@@ -53,14 +54,17 @@ public class JmxMetricsBridge implements MetricsBridge {
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
// Check JMX management availability before starting scheduler
|
||||
// Check JMX management availability — Camel route/processor metrics need it,
|
||||
// but JVM metrics (heap, threads, CPU, GC) work without it via ManagementFactory
|
||||
var mgmtAgent = camelContext.getManagementStrategy().getManagementAgent();
|
||||
if (mgmtAgent == null) {
|
||||
LOG.warn("Cameleer3: JMX management agent not available — metrics collection disabled. "
|
||||
+ "Add camel-management (or camel-management-starter) to your application's classpath.");
|
||||
return;
|
||||
LOG.info("Cameleer3: Camel JMX management agent not available — "
|
||||
+ "route/processor metrics disabled, JVM metrics still active. "
|
||||
+ "Add camel-management to enable full Camel metrics.");
|
||||
camelJmxAvailable = false;
|
||||
} else {
|
||||
camelJmxAvailable = true;
|
||||
}
|
||||
available = true;
|
||||
|
||||
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
|
||||
Thread t = new Thread(r, "cameleer3-metrics");
|
||||
@@ -73,7 +77,9 @@ public class JmxMetricsBridge implements MetricsBridge {
|
||||
config.getMetricsIntervalSeconds(),
|
||||
TimeUnit.SECONDS);
|
||||
|
||||
LOG.info("Cameleer3: Metrics bridge started (interval={}s)", config.getMetricsIntervalSeconds());
|
||||
available = true;
|
||||
LOG.info("Cameleer3: Metrics bridge started (interval={}s, camelJmx={})",
|
||||
config.getMetricsIntervalSeconds(), camelJmxAvailable);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -96,68 +102,16 @@ public class JmxMetricsBridge implements MetricsBridge {
|
||||
|
||||
private void collectAndExportMetrics() {
|
||||
try {
|
||||
MBeanServer mbeanServer = camelContext.getManagementStrategy().getManagementAgent().getMBeanServer();
|
||||
if (mbeanServer == null) {
|
||||
LOG.debug("Cameleer3: MBeanServer not available");
|
||||
return;
|
||||
}
|
||||
|
||||
String contextName = camelContext.getManagementName();
|
||||
Instant collectedAt = Instant.now();
|
||||
List<MetricsSnapshot> snapshots = new ArrayList<>();
|
||||
|
||||
// Collect route metrics
|
||||
Set<ObjectName> routeNames = mbeanServer.queryNames(
|
||||
new ObjectName(JMX_DOMAIN_PREFIX + contextName + ",type=routes,name=*"), null);
|
||||
|
||||
for (ObjectName routeName : routeNames) {
|
||||
String routeId = mbeanServer.getAttribute(routeName, "RouteId").toString();
|
||||
Map<String, String> tags = new LinkedHashMap<>();
|
||||
tags.put("routeId", routeId);
|
||||
|
||||
// Micrometer-compatible names: camel.exchanges.* and camel.route.policy.*
|
||||
addMetric(snapshots, collectedAt, "camel.exchanges.succeeded.count",
|
||||
toLong(mbeanServer.getAttribute(routeName, "ExchangesCompleted")), tags);
|
||||
addMetric(snapshots, collectedAt, "camel.exchanges.failed.count",
|
||||
toLong(mbeanServer.getAttribute(routeName, "ExchangesFailed")), tags);
|
||||
addMetric(snapshots, collectedAt, "camel.exchanges.total.count",
|
||||
toLong(mbeanServer.getAttribute(routeName, "ExchangesTotal")), tags);
|
||||
|
||||
// JMX provides mean/min/max directly; Micrometer provides count+total_time+max
|
||||
// Emit total_time (mean * completed) and max for Micrometer compat, plus mean for convenience
|
||||
long completed = toLong(mbeanServer.getAttribute(routeName, "ExchangesCompleted"));
|
||||
long meanMs = toLong(mbeanServer.getAttribute(routeName, "MeanProcessingTime"));
|
||||
addMetric(snapshots, collectedAt, "camel.route.policy.total_time",
|
||||
completed * meanMs, tags);
|
||||
addMetric(snapshots, collectedAt, "camel.route.policy.count",
|
||||
completed, tags);
|
||||
addMetric(snapshots, collectedAt, "camel.route.policy.max",
|
||||
toLong(mbeanServer.getAttribute(routeName, "MaxProcessingTime")), tags);
|
||||
}
|
||||
|
||||
// Collect processor metrics
|
||||
Set<ObjectName> processorNames = mbeanServer.queryNames(
|
||||
new ObjectName(JMX_DOMAIN_PREFIX + contextName + ",type=processors,name=*"), null);
|
||||
|
||||
for (ObjectName procName : processorNames) {
|
||||
Map<String, String> tags = new LinkedHashMap<>();
|
||||
tags.put("nodeId", mbeanServer.getAttribute(procName, "ProcessorId").toString());
|
||||
tags.put("routeId", mbeanServer.getAttribute(procName, "RouteId").toString());
|
||||
|
||||
// Micrometer-compatible: camel.message.history.* (matches MicrometerMessageHistoryFactory)
|
||||
long procCompleted = toLong(mbeanServer.getAttribute(procName, "ExchangesCompleted"));
|
||||
long procMean = toLong(mbeanServer.getAttribute(procName, "MeanProcessingTime"));
|
||||
addMetric(snapshots, collectedAt, "camel.message.history.count",
|
||||
procCompleted, tags);
|
||||
addMetric(snapshots, collectedAt, "camel.message.history.total_time",
|
||||
procCompleted * procMean, tags);
|
||||
}
|
||||
|
||||
// Collect JVM metrics
|
||||
// JVM metrics always available (ManagementFactory, no Camel JMX needed)
|
||||
collectJvmMetrics(snapshots, collectedAt);
|
||||
|
||||
// Collect context-level metrics
|
||||
collectContextMetrics(snapshots, collectedAt, mbeanServer, contextName);
|
||||
// Camel route/processor/context metrics require JMX management agent
|
||||
if (camelJmxAvailable) {
|
||||
collectCamelMetrics(snapshots, collectedAt);
|
||||
}
|
||||
|
||||
// Store latest snapshots for Prometheus access
|
||||
this.latestSnapshots.set(List.copyOf(snapshots));
|
||||
@@ -168,6 +122,66 @@ public class JmxMetricsBridge implements MetricsBridge {
|
||||
}
|
||||
}
|
||||
|
||||
private void collectCamelMetrics(List<MetricsSnapshot> snapshots, Instant collectedAt) throws Exception {
|
||||
MBeanServer mbeanServer = camelContext.getManagementStrategy().getManagementAgent().getMBeanServer();
|
||||
if (mbeanServer == null) {
|
||||
LOG.debug("Cameleer3: MBeanServer not available");
|
||||
return;
|
||||
}
|
||||
|
||||
String contextName = camelContext.getManagementName();
|
||||
|
||||
// Collect route metrics
|
||||
Set<ObjectName> routeNames = mbeanServer.queryNames(
|
||||
new ObjectName(JMX_DOMAIN_PREFIX + contextName + ",type=routes,name=*"), null);
|
||||
|
||||
for (ObjectName routeName : routeNames) {
|
||||
String routeId = mbeanServer.getAttribute(routeName, "RouteId").toString();
|
||||
Map<String, String> tags = new LinkedHashMap<>();
|
||||
tags.put("routeId", routeId);
|
||||
|
||||
// Micrometer-compatible names: camel.exchanges.* and camel.route.policy.*
|
||||
addMetric(snapshots, collectedAt, "camel.exchanges.succeeded.count",
|
||||
toLong(mbeanServer.getAttribute(routeName, "ExchangesCompleted")), tags);
|
||||
addMetric(snapshots, collectedAt, "camel.exchanges.failed.count",
|
||||
toLong(mbeanServer.getAttribute(routeName, "ExchangesFailed")), tags);
|
||||
addMetric(snapshots, collectedAt, "camel.exchanges.total.count",
|
||||
toLong(mbeanServer.getAttribute(routeName, "ExchangesTotal")), tags);
|
||||
|
||||
// JMX provides mean/min/max directly; Micrometer provides count+total_time+max
|
||||
// Emit total_time (mean * completed) and max for Micrometer compat
|
||||
long completed = toLong(mbeanServer.getAttribute(routeName, "ExchangesCompleted"));
|
||||
long meanMs = toLong(mbeanServer.getAttribute(routeName, "MeanProcessingTime"));
|
||||
addMetric(snapshots, collectedAt, "camel.route.policy.total_time",
|
||||
completed * meanMs, tags);
|
||||
addMetric(snapshots, collectedAt, "camel.route.policy.count",
|
||||
completed, tags);
|
||||
addMetric(snapshots, collectedAt, "camel.route.policy.max",
|
||||
toLong(mbeanServer.getAttribute(routeName, "MaxProcessingTime")), tags);
|
||||
}
|
||||
|
||||
// Collect processor metrics
|
||||
Set<ObjectName> processorNames = mbeanServer.queryNames(
|
||||
new ObjectName(JMX_DOMAIN_PREFIX + contextName + ",type=processors,name=*"), null);
|
||||
|
||||
for (ObjectName procName : processorNames) {
|
||||
Map<String, String> tags = new LinkedHashMap<>();
|
||||
tags.put("nodeId", mbeanServer.getAttribute(procName, "ProcessorId").toString());
|
||||
tags.put("routeId", mbeanServer.getAttribute(procName, "RouteId").toString());
|
||||
|
||||
// Micrometer-compatible: camel.message.history.* (matches MicrometerMessageHistoryFactory)
|
||||
long procCompleted = toLong(mbeanServer.getAttribute(procName, "ExchangesCompleted"));
|
||||
long procMean = toLong(mbeanServer.getAttribute(procName, "MeanProcessingTime"));
|
||||
addMetric(snapshots, collectedAt, "camel.message.history.count",
|
||||
procCompleted, tags);
|
||||
addMetric(snapshots, collectedAt, "camel.message.history.total_time",
|
||||
procCompleted * procMean, tags);
|
||||
}
|
||||
|
||||
// Collect context-level metrics
|
||||
collectContextMetrics(snapshots, collectedAt, mbeanServer, contextName);
|
||||
}
|
||||
|
||||
private void collectJvmMetrics(List<MetricsSnapshot> snapshots, Instant collectedAt) {
|
||||
Map<String, String> empty = Map.of();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user