feat: add delta mode for counter metrics using ClickHouse lag()
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m17s
CI / docker (push) Successful in 1m12s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 42s

Counter metrics like chunks.exported.count are monotonically increasing.
Add mode=delta query parameter to the agent metrics API that computes
per-bucket deltas server-side using ClickHouse lag() window function:
max(value) per bucket, then greatest(0, current - previous) to get the
increase per period with counter-reset handling.

The chunks exported/dropped charts now show throughput per bucket
instead of the ever-increasing cumulative total.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-13 10:56:06 +02:00
parent ae908fb382
commit e57343e3df
5 changed files with 69 additions and 5 deletions

View File

@@ -27,15 +27,17 @@ public class AgentMetricsController {
@RequestParam String names,
@RequestParam(required = false) Instant from,
@RequestParam(required = false) Instant to,
@RequestParam(defaultValue = "60") int buckets) {
@RequestParam(defaultValue = "60") int buckets,
@RequestParam(defaultValue = "gauge") String mode) {
if (from == null) from = Instant.now().minus(1, ChronoUnit.HOURS);
if (to == null) to = Instant.now();
List<String> metricNames = Arrays.asList(names.split(","));
Map<String, List<MetricTimeSeries.Bucket>> raw =
metricsQueryStore.queryTimeSeries(agentId, metricNames, from, to, buckets);
Map<String, List<MetricTimeSeries.Bucket>> raw = "delta".equalsIgnoreCase(mode)
? metricsQueryStore.queryTimeSeriesDelta(agentId, metricNames, from, to, buckets)
: metricsQueryStore.queryTimeSeries(agentId, metricNames, from, to, buckets);
Map<String, List<MetricBucket>> result = raw.entrySet().stream()
.collect(Collectors.toMap(

View File

@@ -69,4 +69,57 @@ public class ClickHouseMetricsQueryStore implements MetricsQueryStore {
return result;
}
@Override
public Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeriesDelta(
String instanceId, List<String> metricNames,
Instant from, Instant to, int buckets) {
long intervalSeconds = Math.max(60,
(to.getEpochSecond() - from.getEpochSecond()) / Math.max(buckets, 1));
Map<String, List<MetricTimeSeries.Bucket>> result = new LinkedHashMap<>();
for (String name : metricNames) {
result.put(name.trim(), new ArrayList<>());
}
String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new);
String placeholders = String.join(", ", Collections.nCopies(namesArray.length, "?"));
String finalSql = """
SELECT bucket, metric_name,
greatest(0, max_val - lag(max_val, 1, max_val)
OVER (PARTITION BY metric_name ORDER BY bucket)) AS avg_value
FROM (
SELECT toStartOfInterval(collected_at, INTERVAL %d SECOND) AS bucket,
metric_name,
max(metric_value) AS max_val
FROM agent_metrics
WHERE tenant_id = ?
AND instance_id = ?
AND collected_at >= ?
AND collected_at < ?
AND metric_name IN (%s)
GROUP BY bucket, metric_name
)
ORDER BY bucket
""".formatted(intervalSeconds, placeholders);
List<Object> params = new ArrayList<>();
params.add(tenantId);
params.add(instanceId);
params.add(java.sql.Timestamp.from(from));
params.add(java.sql.Timestamp.from(to));
Collections.addAll(params, namesArray);
jdbc.query(finalSql, rs -> {
String metricName = rs.getString("metric_name");
Instant bucket = rs.getTimestamp("bucket").toInstant();
double value = rs.getDouble("avg_value");
result.computeIfAbsent(metricName, k -> new ArrayList<>())
.add(new MetricTimeSeries.Bucket(bucket, value));
}, params.toArray());
return result;
}
}

View File

@@ -11,4 +11,11 @@ public interface MetricsQueryStore {
Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
String instanceId, List<String> metricNames,
Instant from, Instant to, int buckets);
/** Counter mode: returns per-bucket deltas (max - previous max, floored at 0). */
default Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeriesDelta(
String instanceId, List<String> metricNames,
Instant from, Instant to, int buckets) {
return queryTimeSeries(instanceId, metricNames, from, to, buckets);
}
}

View File

@@ -9,15 +9,17 @@ export function useAgentMetrics(
buckets = 60,
from?: string,
to?: string,
mode: 'gauge' | 'delta' = 'gauge',
) {
const refetchInterval = useRefreshInterval(30_000);
return useQuery({
queryKey: ['agent-metrics', agentId, names.join(','), buckets, from, to],
queryKey: ['agent-metrics', agentId, names.join(','), buckets, from, to, mode],
queryFn: async () => {
const token = useAuthStore.getState().accessToken;
const params = new URLSearchParams({
names: names.join(','),
buckets: String(buckets),
mode,
});
if (from) params.set('from', from);
if (to) params.set('to', to);

View File

@@ -75,7 +75,7 @@ export default function AgentInstance() {
const { data: agentMetrics } = useAgentMetrics(
agent?.instanceId || null,
['cameleer.chunks.exported.count', 'cameleer.chunks.dropped.count'],
60, timeFrom, timeTo,
60, timeFrom, timeTo, 'delta',
);
const feedEvents = useMemo<FeedEvent[]>(() => {