feat: ClickHouse Phase 3 — Stats & Analytics (materialized views)
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m8s
CI / docker (push) Successful in 43s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 44s

- DDL for 5 AggregatingMergeTree tables + 5 materialized views
- ClickHouseStatsStore: all 15 StatsStore methods using -Merge combinators
- Stats/timeseries read from pre-aggregated MVs (countMerge, sumMerge, quantileMerge)
- SLA/topErrors/punchcard query raw executions FINAL table
- Feature flag: cameleer.storage.stats (default: clickhouse)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-31 21:52:13 +02:00
7 changed files with 1121 additions and 0 deletions

View File

@@ -2,6 +2,7 @@ package com.cameleer3.server.app.config;
import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore;
import com.cameleer3.server.app.storage.ClickHouseMetricsStore;
import com.cameleer3.server.app.storage.ClickHouseStatsStore;
import com.cameleer3.server.app.storage.PostgresMetricsQueryStore;
import com.cameleer3.server.app.storage.PostgresMetricsStore;
import com.cameleer3.server.core.admin.AuditRepository;
@@ -16,6 +17,7 @@ import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.*;
import com.cameleer3.server.core.storage.StatsStore;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
@@ -118,4 +120,13 @@ public class StorageBeanConfig {
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseSearchIndex(clickHouseJdbc);
}
// ── ClickHouse Stats Store ─────────────────────────────────────────
@Bean
@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "clickhouse", matchIfMissing = true)
public StatsStore clickHouseStatsStore(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseStatsStore(clickHouseJdbc);
}
}

View File

@@ -0,0 +1,565 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket;
import com.cameleer3.server.core.search.TopError;
import com.cameleer3.server.core.storage.StatsStore;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* ClickHouse implementation of {@link StatsStore}.
* Reads from AggregatingMergeTree tables populated by materialized views,
* using {@code -Merge} aggregate combinators to finalize partial states.
*
* <p>Queries against AggregatingMergeTree tables use literal SQL values instead
* of JDBC prepared-statement parameters because the ClickHouse JDBC v2 driver
* (0.9.x) wraps prepared statements in a sub-query that strips the
* {@code AggregateFunction} column type, breaking {@code -Merge} combinators.
* Queries against raw tables ({@code executions FINAL},
* {@code processor_executions}) use normal prepared-statement parameters
* since they have no AggregateFunction columns.</p>
*/
public class ClickHouseStatsStore implements StatsStore {
private static final String TENANT = "default";
private final JdbcTemplate jdbc;
public ClickHouseStatsStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
// ── Stats (aggregate) ────────────────────────────────────────────────
@Override
public ExecutionStats stats(Instant from, Instant to) {
return queryStats("stats_1m_all", from, to, List.of(), true);
}
@Override
public ExecutionStats statsForApp(Instant from, Instant to, String applicationName) {
return queryStats("stats_1m_app", from, to, List.of(
new Filter("application_name", applicationName)), true);
}
@Override
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds) {
return queryStats("stats_1m_route", from, to, List.of(
new Filter("route_id", routeId)), true);
}
@Override
public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) {
return queryProcessorStatsRaw(from, to, routeId, processorType);
}
// ── Timeseries ───────────────────────────────────────────────────────
@Override
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true);
}
@Override
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationName) {
return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
new Filter("application_name", applicationName)), true);
}
@Override
public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
String routeId, List<String> agentIds) {
return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of(
new Filter("route_id", routeId)), true);
}
@Override
public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
String routeId, String processorType) {
return queryProcessorTimeseriesRaw(from, to, bucketCount, routeId, processorType);
}
// ── Grouped timeseries ───────────────────────────────────────────────
@Override
public Map<String, StatsTimeseries> timeseriesGroupedByApp(Instant from, Instant to, int bucketCount) {
return queryGroupedTimeseries("stats_1m_app", "application_name", from, to,
bucketCount, List.of());
}
@Override
public Map<String, StatsTimeseries> timeseriesGroupedByRoute(Instant from, Instant to,
int bucketCount, String applicationName) {
return queryGroupedTimeseries("stats_1m_route", "route_id", from, to,
bucketCount, List.of(new Filter("application_name", applicationName)));
}
// ── SLA compliance (raw table — prepared statements OK) ──────────────
@Override
public double slaCompliance(Instant from, Instant to, int thresholdMs,
String applicationName, String routeId) {
String sql = "SELECT " +
"countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
"countIf(status != 'RUNNING') AS total " +
"FROM executions FINAL " +
"WHERE tenant_id = ? AND start_time >= ? AND start_time < ?";
List<Object> params = new ArrayList<>();
params.add(thresholdMs);
params.add(TENANT);
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
sql += " AND application_name = ?";
params.add(applicationName);
}
if (routeId != null) {
sql += " AND route_id = ?";
params.add(routeId);
}
return jdbc.query(sql, (rs, rowNum) -> {
long total = rs.getLong("total");
if (total == 0) return 1.0;
return rs.getLong("compliant") * 100.0 / total;
}, params.toArray()).stream().findFirst().orElse(1.0);
}
@Override
public Map<String, long[]> slaCountsByApp(Instant from, Instant to, int defaultThresholdMs) {
String sql = "SELECT application_name, " +
"countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
"countIf(status != 'RUNNING') AS total " +
"FROM executions FINAL " +
"WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
"GROUP BY application_name";
Map<String, long[]> result = new LinkedHashMap<>();
jdbc.query(sql, (rs) -> {
result.put(rs.getString("application_name"),
new long[]{rs.getLong("compliant"), rs.getLong("total")});
}, defaultThresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to));
return result;
}
@Override
public Map<String, long[]> slaCountsByRoute(Instant from, Instant to,
String applicationName, int thresholdMs) {
String sql = "SELECT route_id, " +
"countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
"countIf(status != 'RUNNING') AS total " +
"FROM executions FINAL " +
"WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
"AND application_name = ? GROUP BY route_id";
Map<String, long[]> result = new LinkedHashMap<>();
jdbc.query(sql, (rs) -> {
result.put(rs.getString("route_id"),
new long[]{rs.getLong("compliant"), rs.getLong("total")});
}, thresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to), applicationName);
return result;
}
// ── Top errors (raw table — prepared statements OK) ──────────────────
@Override
public List<TopError> topErrors(Instant from, Instant to, String applicationName,
String routeId, int limit) {
StringBuilder where = new StringBuilder(
"status = 'FAILED' AND start_time >= ? AND start_time < ?");
List<Object> params = new ArrayList<>();
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
where.append(" AND application_name = ?");
params.add(applicationName);
}
String table;
String groupId;
if (routeId != null) {
table = "processor_executions";
groupId = "processor_id";
where.append(" AND route_id = ?");
params.add(routeId);
} else {
table = "executions FINAL";
groupId = "route_id";
}
Instant fiveMinAgo = Instant.now().minus(5, ChronoUnit.MINUTES);
Instant tenMinAgo = Instant.now().minus(10, ChronoUnit.MINUTES);
String sql = "WITH counted AS (" +
" SELECT COALESCE(error_type, substring(error_message, 1, 200)) AS error_key, " +
" " + groupId + " AS group_id, " +
" count() AS cnt, max(start_time) AS last_seen " +
" FROM " + table + " WHERE tenant_id = ? AND " + where +
" GROUP BY error_key, group_id ORDER BY cnt DESC LIMIT ?" +
"), velocity AS (" +
" SELECT COALESCE(error_type, substring(error_message, 1, 200)) AS error_key, " +
" countIf(start_time >= ?) AS recent_5m, " +
" countIf(start_time >= ? AND start_time < ?) AS prev_5m " +
" FROM " + table + " WHERE tenant_id = ? AND " + where +
" GROUP BY error_key" +
") SELECT c.error_key, c.group_id, c.cnt, c.last_seen, " +
" COALESCE(v.recent_5m, 0) / 5.0 AS velocity, " +
" CASE " +
" WHEN COALESCE(v.recent_5m, 0) > COALESCE(v.prev_5m, 0) * 1.2 THEN 'accelerating' " +
" WHEN COALESCE(v.recent_5m, 0) < COALESCE(v.prev_5m, 0) * 0.8 THEN 'decelerating' " +
" ELSE 'stable' END AS trend " +
"FROM counted c LEFT JOIN velocity v ON c.error_key = v.error_key " +
"ORDER BY c.cnt DESC";
List<Object> fullParams = new ArrayList<>();
fullParams.add(TENANT);
fullParams.addAll(params);
fullParams.add(limit);
fullParams.add(Timestamp.from(fiveMinAgo));
fullParams.add(Timestamp.from(tenMinAgo));
fullParams.add(Timestamp.from(fiveMinAgo));
fullParams.add(TENANT);
fullParams.addAll(params);
return jdbc.query(sql, (rs, rowNum) -> {
String errorKey = rs.getString("error_key");
String gid = rs.getString("group_id");
return new TopError(
errorKey,
routeId != null ? routeId : gid,
routeId != null ? gid : null,
rs.getLong("cnt"),
rs.getDouble("velocity"),
rs.getString("trend"),
rs.getTimestamp("last_seen").toInstant());
}, fullParams.toArray());
}
@Override
public int activeErrorTypes(Instant from, Instant to, String applicationName) {
String sql = "SELECT COUNT(DISTINCT COALESCE(error_type, substring(error_message, 1, 200))) " +
"FROM executions FINAL " +
"WHERE tenant_id = ? AND status = 'FAILED' AND start_time >= ? AND start_time < ?";
List<Object> params = new ArrayList<>();
params.add(TENANT);
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
if (applicationName != null) {
sql += " AND application_name = ?";
params.add(applicationName);
}
Integer count = jdbc.queryForObject(sql, Integer.class, params.toArray());
return count != null ? count : 0;
}
// ── Punchcard (AggregatingMergeTree — literal SQL) ───────────────────
@Override
public List<PunchcardCell> punchcard(Instant from, Instant to, String applicationName) {
String view = applicationName != null ? "stats_1m_app" : "stats_1m_all";
String sql = "SELECT toDayOfWeek(bucket, 1) % 7 AS weekday, " +
"toHour(bucket) AS hour, " +
"countMerge(total_count) AS total_count, " +
"countIfMerge(failed_count) AS failed_count " +
"FROM " + view +
" WHERE tenant_id = " + lit(TENANT) +
" AND bucket >= " + lit(from) +
" AND bucket < " + lit(to);
if (applicationName != null) {
sql += " AND application_name = " + lit(applicationName);
}
sql += " GROUP BY weekday, hour ORDER BY weekday, hour";
return jdbc.query(sql, (rs, rowNum) -> new PunchcardCell(
rs.getInt("weekday"), rs.getInt("hour"),
rs.getLong("total_count"), rs.getLong("failed_count")));
}
// ── Private helpers ──────────────────────────────────────────────────
private record Filter(String column, String value) {}
/**
* Format an Instant as a ClickHouse DateTime literal.
* Uses java.sql.Timestamp to match the JVM→ClickHouse timezone convention
* used by the JDBC driver, then truncates to second precision for DateTime
* column compatibility.
*/
private static String lit(Instant instant) {
// Truncate to seconds — ClickHouse DateTime has second precision
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
String ts = new Timestamp(truncated.toEpochMilli()).toString();
// Remove trailing ".0" that Timestamp.toString() always appends
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
return "'" + ts + "'";
}
/** Format a string as a SQL literal with single-quote escaping. */
private static String lit(String value) {
return "'" + value.replace("'", "\\'") + "'";
}
/** Convert Instant to java.sql.Timestamp for JDBC binding. */
private static Timestamp ts(Instant instant) {
return Timestamp.from(instant);
}
/**
* Build -Merge combinator SQL for the given view and time range.
*/
private String buildStatsSql(String view, Instant rangeFrom, Instant rangeTo,
List<Filter> filters, boolean hasRunning) {
String runningCol = hasRunning ? "countIfMerge(running_count)" : "0";
String sql = "SELECT " +
"countMerge(total_count) AS total_count, " +
"countIfMerge(failed_count) AS failed_count, " +
"sumMerge(duration_sum) AS duration_sum, " +
"quantileMerge(0.99)(p99_duration) AS p99_duration, " +
runningCol + " AS active_count " +
"FROM " + view +
" WHERE tenant_id = " + lit(TENANT) +
" AND bucket >= " + lit(rangeFrom) +
" AND bucket < " + lit(rangeTo);
for (Filter f : filters) {
sql += " AND " + f.column() + " = " + lit(f.value());
}
return sql;
}
/**
* Query an AggregatingMergeTree stats table using -Merge combinators.
* Uses literal SQL to avoid ClickHouse JDBC driver PreparedStatement issues.
*/
private ExecutionStats queryStats(String view, Instant from, Instant to,
List<Filter> filters, boolean hasRunning) {
String sql = buildStatsSql(view, from, to, filters, hasRunning);
long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0;
var currentResult = jdbc.query(sql, (rs, rowNum) -> {
long tc = rs.getLong("total_count");
long fc = rs.getLong("failed_count");
long ds = rs.getLong("duration_sum"); // Nullable → 0 if null
long p99 = (long) rs.getDouble("p99_duration"); // quantileMerge returns Float64
long ac = rs.getLong("active_count");
return new long[]{tc, fc, ds, p99, ac};
});
if (!currentResult.isEmpty()) {
long[] r = currentResult.get(0);
totalCount = r[0]; failedCount = r[1];
avgDuration = totalCount > 0 ? r[2] / totalCount : 0;
p99Duration = r[3]; activeCount = r[4];
}
// Previous period (shifted back 24h)
Instant prevFrom = from.minus(Duration.ofHours(24));
Instant prevTo = to.minus(Duration.ofHours(24));
String prevSql = buildStatsSql(view, prevFrom, prevTo, filters, hasRunning);
long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0;
var prevResult = jdbc.query(prevSql, (rs, rowNum) -> {
long tc = rs.getLong("total_count");
long fc = rs.getLong("failed_count");
long ds = rs.getLong("duration_sum");
long p99 = (long) rs.getDouble("p99_duration");
return new long[]{tc, fc, ds, p99};
});
if (!prevResult.isEmpty()) {
long[] r = prevResult.get(0);
prevTotal = r[0]; prevFailed = r[1];
prevAvg = prevTotal > 0 ? r[2] / prevTotal : 0;
prevP99 = r[3];
}
// Today total
Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
String todaySql = buildStatsSql(view, todayStart, Instant.now(), filters, hasRunning);
long totalToday = 0;
var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"));
if (!todayResult.isEmpty()) totalToday = todayResult.get(0);
return new ExecutionStats(
totalCount, failedCount, avgDuration, p99Duration, activeCount,
totalToday, prevTotal, prevFailed, prevAvg, prevP99);
}
/**
* Timeseries from AggregatingMergeTree using -Merge combinators.
*/
private StatsTimeseries queryTimeseries(String view, Instant from, Instant to,
int bucketCount, List<Filter> filters,
boolean hasRunningCount) {
long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
if (intervalSeconds < 60) intervalSeconds = 60;
String runningCol = hasRunningCount ? "countIfMerge(running_count)" : "0";
String sql = "SELECT " +
"toStartOfInterval(bucket, INTERVAL " + intervalSeconds + " SECOND) AS period, " +
"countMerge(total_count) AS total_count, " +
"countIfMerge(failed_count) AS failed_count, " +
"sumMerge(duration_sum) AS duration_sum, " +
"quantileMerge(0.99)(p99_duration) AS p99_duration, " +
runningCol + " AS active_count " +
"FROM " + view +
" WHERE tenant_id = " + lit(TENANT) +
" AND bucket >= " + lit(from) +
" AND bucket < " + lit(to);
for (Filter f : filters) {
sql += " AND " + f.column() + " = " + lit(f.value());
}
sql += " GROUP BY period ORDER BY period";
List<TimeseriesBucket> buckets = jdbc.query(sql, (rs, rowNum) -> {
long tc = rs.getLong("total_count");
long ds = rs.getLong("duration_sum");
return new TimeseriesBucket(
rs.getTimestamp("period").toInstant(),
tc, rs.getLong("failed_count"),
tc > 0 ? ds / tc : 0, (long) rs.getDouble("p99_duration"),
rs.getLong("active_count"));
});
return new StatsTimeseries(buckets);
}
/**
* Grouped timeseries from AggregatingMergeTree.
*/
private Map<String, StatsTimeseries> queryGroupedTimeseries(
String view, String groupCol, Instant from, Instant to,
int bucketCount, List<Filter> filters) {
long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
if (intervalSeconds < 60) intervalSeconds = 60;
String sql = "SELECT " +
"toStartOfInterval(bucket, INTERVAL " + intervalSeconds + " SECOND) AS period, " +
groupCol + " AS group_key, " +
"countMerge(total_count) AS total_count, " +
"countIfMerge(failed_count) AS failed_count, " +
"sumMerge(duration_sum) AS duration_sum, " +
"quantileMerge(0.99)(p99_duration) AS p99_duration, " +
"countIfMerge(running_count) AS active_count " +
"FROM " + view +
" WHERE tenant_id = " + lit(TENANT) +
" AND bucket >= " + lit(from) +
" AND bucket < " + lit(to);
for (Filter f : filters) {
sql += " AND " + f.column() + " = " + lit(f.value());
}
sql += " GROUP BY period, group_key ORDER BY period, group_key";
Map<String, List<TimeseriesBucket>> grouped = new LinkedHashMap<>();
jdbc.query(sql, (rs) -> {
String key = rs.getString("group_key");
long tc = rs.getLong("total_count");
long ds = rs.getLong("duration_sum");
TimeseriesBucket bucket = new TimeseriesBucket(
rs.getTimestamp("period").toInstant(),
tc, rs.getLong("failed_count"),
tc > 0 ? ds / tc : 0, (long) rs.getDouble("p99_duration"),
rs.getLong("active_count"));
grouped.computeIfAbsent(key, k -> new ArrayList<>()).add(bucket);
});
Map<String, StatsTimeseries> result = new LinkedHashMap<>();
grouped.forEach((key, buckets) -> result.put(key, new StatsTimeseries(buckets)));
return result;
}
/**
* Direct aggregation on processor_executions for processor-level stats.
*/
private ExecutionStats queryProcessorStatsRaw(Instant from, Instant to,
String routeId, String processorType) {
String sql = "SELECT " +
"count() AS total_count, " +
"countIf(status = 'FAILED') AS failed_count, " +
"CASE WHEN count() > 0 THEN sum(duration_ms) / count() ELSE 0 END AS avg_duration, " +
"quantile(0.99)(duration_ms) AS p99_duration, " +
"0 AS active_count " +
"FROM processor_executions " +
"WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
"AND route_id = ? AND processor_type = ?";
long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0;
var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
rs.getLong("total_count"), rs.getLong("failed_count"),
(long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"),
rs.getLong("active_count")
}, TENANT, ts(from), ts(to), routeId, processorType);
if (!currentResult.isEmpty()) {
long[] r = currentResult.get(0);
totalCount = r[0]; failedCount = r[1]; avgDuration = r[2]; p99Duration = r[3];
}
Instant prevFrom = from.minus(Duration.ofHours(24));
Instant prevTo = to.minus(Duration.ofHours(24));
long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0;
var prevResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
rs.getLong("total_count"), rs.getLong("failed_count"),
(long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration")
}, TENANT, ts(prevFrom), ts(prevTo), routeId, processorType);
if (!prevResult.isEmpty()) {
long[] r = prevResult.get(0);
prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3];
}
Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
long totalToday = 0;
var todayResult = jdbc.query(sql, (rs, rowNum) -> rs.getLong("total_count"),
TENANT, ts(todayStart), ts(Instant.now()), routeId, processorType);
if (!todayResult.isEmpty()) totalToday = todayResult.get(0);
return new ExecutionStats(
totalCount, failedCount, avgDuration, p99Duration, 0,
totalToday, prevTotal, prevFailed, prevAvg, prevP99);
}
/**
* Direct aggregation on processor_executions for processor-level timeseries.
*/
private StatsTimeseries queryProcessorTimeseriesRaw(Instant from, Instant to,
int bucketCount,
String routeId, String processorType) {
long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
if (intervalSeconds < 60) intervalSeconds = 60;
String sql = "SELECT " +
"toStartOfInterval(start_time, INTERVAL " + intervalSeconds + " SECOND) AS period, " +
"count() AS total_count, " +
"countIf(status = 'FAILED') AS failed_count, " +
"CASE WHEN count() > 0 THEN sum(duration_ms) / count() ELSE 0 END AS avg_duration, " +
"quantile(0.99)(duration_ms) AS p99_duration, " +
"0 AS active_count " +
"FROM processor_executions " +
"WHERE tenant_id = ? AND start_time >= ? AND start_time < ? " +
"AND route_id = ? AND processor_type = ? " +
"GROUP BY period ORDER BY period";
List<TimeseriesBucket> buckets = jdbc.query(sql, (rs, rowNum) ->
new TimeseriesBucket(
rs.getTimestamp("period").toInstant(),
rs.getLong("total_count"), rs.getLong("failed_count"),
(long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"),
rs.getLong("active_count")
), TENANT, ts(from), ts(to), routeId, processorType);
return new StatsTimeseries(buckets);
}
}

View File

@@ -5,6 +5,7 @@ import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket;
import com.cameleer3.server.core.search.TopError;
import com.cameleer3.server.core.storage.StatsStore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@@ -18,6 +19,7 @@ import java.util.List;
import java.util.Map;
@Repository
@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "postgres")
public class PostgresStatsStore implements StatsStore {
private final JdbcTemplate jdbc;

View File

@@ -51,6 +51,7 @@ cameleer:
storage:
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
search: ${CAMELEER_STORAGE_SEARCH:opensearch}
stats: ${CAMELEER_STORAGE_STATS:clickhouse}
security:
access-token-expiry-ms: 3600000

View File

@@ -0,0 +1,165 @@
-- V4__stats_tables_and_mvs.sql
-- Materialized views replacing TimescaleDB continuous aggregates.
-- Tables use AggregatingMergeTree; MVs use -State combinators.
-- stats_1m_all (global)
CREATE TABLE IF NOT EXISTS stats_1m_all (
tenant_id LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8),
running_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS
SELECT
tenant_id,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, bucket;
-- stats_1m_app (per-application)
CREATE TABLE IF NOT EXISTS stats_1m_app (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8),
running_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS
SELECT
tenant_id,
application_name,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_name, bucket;
-- stats_1m_route (per-route)
CREATE TABLE IF NOT EXISTS stats_1m_route (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
route_id LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8),
running_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, route_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS
SELECT
tenant_id,
application_name,
route_id,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM executions
GROUP BY tenant_id, application_name, route_id, bucket;
-- stats_1m_processor (per-processor-type)
CREATE TABLE IF NOT EXISTS stats_1m_processor (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
processor_type LowCardinality(String),
bucket DateTime,
total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, processor_type, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS
SELECT
tenant_id,
application_name,
processor_type,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_name, processor_type, bucket;
-- stats_1m_processor_detail (per-processor-id)
CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
tenant_id LowCardinality(String),
application_name LowCardinality(String),
route_id LowCardinality(String),
processor_id String,
bucket DateTime,
total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)),
duration_max AggregateFunction(max, Nullable(Int64)),
p99_duration AggregateFunction(quantile(0.99), Nullable(Int64))
)
ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_name, route_id, processor_id, bucket)
TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS
SELECT
tenant_id,
application_name,
route_id,
processor_id,
toStartOfMinute(start_time) AS bucket,
countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions
GROUP BY tenant_id, application_name, route_id, processor_id, bucket;

View File

@@ -0,0 +1,375 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.search.TopError;
import com.cameleer3.server.core.storage.StatsStore.PunchcardCell;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class ClickHouseStatsStoreIT {
@Container
static final ClickHouseContainer clickhouse =
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
private JdbcTemplate jdbc;
private ClickHouseStatsStore store;
// base time: 2026-03-31T10:00:00Z (a Tuesday)
private static final Instant BASE = Instant.parse("2026-03-31T10:00:00Z");
@BeforeEach
void setUp() throws Exception {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(clickhouse.getJdbcUrl());
ds.setUsername(clickhouse.getUsername());
ds.setPassword(clickhouse.getPassword());
jdbc = new JdbcTemplate(ds);
// Load DDL from classpath resources
String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql")
.getContentAsString(StandardCharsets.UTF_8);
String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql")
.getContentAsString(StandardCharsets.UTF_8);
String statsDdl = new ClassPathResource("clickhouse/V4__stats_tables_and_mvs.sql")
.getContentAsString(StandardCharsets.UTF_8);
jdbc.execute(executionsDdl);
jdbc.execute(processorsDdl);
// Drop MVs first (they reference the stats tables), then recreate everything
jdbc.execute("DROP TABLE IF EXISTS stats_1m_all_mv");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_app_mv");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_route_mv");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_mv");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_detail_mv");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_all");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_app");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_route");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor");
jdbc.execute("DROP TABLE IF EXISTS stats_1m_processor_detail");
// Strip SQL line comments first (they may contain semicolons),
// then split by ';' and execute non-empty statements.
String cleanedDdl = statsDdl.replaceAll("--[^\n]*", "");
for (String stmt : cleanedDdl.split(";")) {
String trimmed = stmt.trim();
if (!trimmed.isEmpty()) {
jdbc.execute(trimmed);
}
}
// Truncate base tables
jdbc.execute("TRUNCATE TABLE executions");
jdbc.execute("TRUNCATE TABLE processor_executions");
seedTestData();
// Try the failing query to capture it in query_log, then check
try {
jdbc.queryForMap(
"SELECT countMerge(total_count) AS tc, countIfMerge(failed_count) AS fc, " +
"sumMerge(duration_sum) / greatest(countMerge(total_count), 1) AS avg, " +
"quantileMerge(0.99)(p99_duration) AS p99, " +
"countIfMerge(running_count) AS rc " +
"FROM stats_1m_all WHERE tenant_id = 'default' " +
"AND bucket >= '2026-03-31 09:59:00' AND bucket < '2026-03-31 10:05:00'");
} catch (Exception e) {
System.out.println("Expected error: " + e.getMessage().substring(0, 80));
}
jdbc.execute("SYSTEM FLUSH LOGS");
// Get ALL recent queries to see what the driver sends
var queryLog = jdbc.queryForList(
"SELECT type, substring(query, 1, 200) AS q " +
"FROM system.query_log WHERE event_time > now() - 30 " +
"AND query NOT LIKE '%system.query_log%' AND query NOT LIKE '%FLUSH%' " +
"ORDER BY event_time DESC LIMIT 20");
for (var entry : queryLog) {
System.out.println("LOG: " + entry.get("type") + " | " + entry.get("q"));
}
store = new ClickHouseStatsStore(jdbc);
}
private void seedTestData() {
// 10 executions across 2 apps, 2 routes, spanning 5 minutes
// app-1, route-a: 4 COMPLETED (200ms, 300ms, 400ms, 500ms)
insertExecution("exec-01", BASE.plusSeconds(0), "app-1", "route-a", "agent-1",
"COMPLETED", 200L, "", "");
insertExecution("exec-02", BASE.plusSeconds(60), "app-1", "route-a", "agent-1",
"COMPLETED", 300L, "", "");
insertExecution("exec-03", BASE.plusSeconds(120), "app-1", "route-a", "agent-1",
"COMPLETED", 400L, "", "");
insertExecution("exec-04", BASE.plusSeconds(180), "app-1", "route-a", "agent-1",
"COMPLETED", 500L, "", "");
// app-1, route-a: 2 FAILED (100ms, 150ms) with error_type="NPE"
insertExecution("exec-05", BASE.plusSeconds(60), "app-1", "route-a", "agent-1",
"FAILED", 100L, "NPE", "null ref");
insertExecution("exec-06", BASE.plusSeconds(120), "app-1", "route-a", "agent-1",
"FAILED", 150L, "NPE", "null ref");
// app-1, route-b: 2 COMPLETED (50ms, 60ms)
insertExecution("exec-07", BASE.plusSeconds(60), "app-1", "route-b", "agent-1",
"COMPLETED", 50L, "", "");
insertExecution("exec-08", BASE.plusSeconds(120), "app-1", "route-b", "agent-1",
"COMPLETED", 60L, "", "");
// app-2, route-c: 1 COMPLETED (1000ms)
insertExecution("exec-09", BASE.plusSeconds(60), "app-2", "route-c", "agent-2",
"COMPLETED", 1000L, "", "");
// app-2, route-c: 1 RUNNING (null duration)
insertExecution("exec-10", BASE.plusSeconds(180), "app-2", "route-c", "agent-2",
"RUNNING", null, "", "");
// 5 processor records for processor stats testing
// app-1, route-a, processor_type="to": 3 COMPLETED
insertProcessor("exec-01", 1, "proc-to-1", "to", BASE.plusSeconds(0),
"app-1", "route-a", "COMPLETED", 50L);
insertProcessor("exec-02", 1, "proc-to-2", "to", BASE.plusSeconds(60),
"app-1", "route-a", "COMPLETED", 80L);
insertProcessor("exec-03", 1, "proc-to-3", "to", BASE.plusSeconds(120),
"app-1", "route-a", "COMPLETED", 90L);
// app-1, route-a, processor_type="log": 2 COMPLETED
insertProcessor("exec-01", 2, "proc-log-1", "log", BASE.plusSeconds(1),
"app-1", "route-a", "COMPLETED", 10L);
insertProcessor("exec-02", 2, "proc-log-2", "log", BASE.plusSeconds(61),
"app-1", "route-a", "COMPLETED", 15L);
}
private void insertExecution(String executionId, Instant startTime, String appName,
String routeId, String agentId, String status,
Long durationMs, String errorType, String errorMessage) {
jdbc.update(
"INSERT INTO executions (tenant_id, execution_id, start_time, route_id, " +
"agent_id, application_name, status, duration_ms, error_type, error_message) " +
"VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?)",
executionId, Timestamp.from(startTime), routeId, agentId, appName,
status, durationMs, errorType, errorMessage);
}
private void insertProcessor(String executionId, int seq, String processorId,
String processorType, Instant startTime,
String appName, String routeId, String status,
Long durationMs) {
jdbc.update(
"INSERT INTO processor_executions (tenant_id, execution_id, seq, processor_id, " +
"processor_type, start_time, route_id, application_name, status, duration_ms) " +
"VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?)",
executionId, seq, processorId, processorType, Timestamp.from(startTime),
routeId, appName, status, durationMs);
}
// ── Stats Tests ──────────────────────────────────────────────────────
@Test
void stats_returnsCorrectGlobalTotals() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
ExecutionStats stats = store.stats(from, to);
assertThat(stats.totalCount()).isEqualTo(10);
assertThat(stats.failedCount()).isEqualTo(2);
assertThat(stats.activeCount()).isEqualTo(1);
assertThat(stats.avgDurationMs()).isGreaterThan(0);
assertThat(stats.p99LatencyMs()).isGreaterThan(0);
}
@Test
void statsForApp_filtersCorrectly() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
ExecutionStats app1 = store.statsForApp(from, to, "app-1");
assertThat(app1.totalCount()).isEqualTo(8);
ExecutionStats app2 = store.statsForApp(from, to, "app-2");
assertThat(app2.totalCount()).isEqualTo(2);
}
@Test
void statsForRoute_filtersCorrectly() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
ExecutionStats routeA = store.statsForRoute(from, to, "route-a", List.of());
assertThat(routeA.totalCount()).isEqualTo(6);
}
// ── Timeseries Tests ─────────────────────────────────────────────────
@Test
void timeseries_returnsBuckets() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
StatsTimeseries ts = store.timeseries(from, to, 5);
assertThat(ts.buckets()).isNotEmpty();
long totalAcrossBuckets = ts.buckets().stream()
.mapToLong(StatsTimeseries.TimeseriesBucket::totalCount).sum();
assertThat(totalAcrossBuckets).isEqualTo(10);
}
@Test
void timeseriesForApp_filtersCorrectly() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
StatsTimeseries ts = store.timeseriesForApp(from, to, 5, "app-1");
long totalAcrossBuckets = ts.buckets().stream()
.mapToLong(StatsTimeseries.TimeseriesBucket::totalCount).sum();
assertThat(totalAcrossBuckets).isEqualTo(8);
}
@Test
void timeseriesGroupedByApp_returnsMap() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
Map<String, StatsTimeseries> grouped = store.timeseriesGroupedByApp(from, to, 5);
assertThat(grouped).containsKeys("app-1", "app-2");
}
@Test
void timeseriesGroupedByRoute_returnsMap() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
Map<String, StatsTimeseries> grouped = store.timeseriesGroupedByRoute(from, to, 5, "app-1");
assertThat(grouped).containsKeys("route-a", "route-b");
}
// ── SLA Tests ────────────────────────────────────────────────────────
@Test
void slaCompliance_calculatesCorrectly() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
// threshold=250ms: among 9 non-RUNNING executions:
// compliant (<=250ms): exec-01(200), exec-05(100), exec-06(150), exec-07(50), exec-08(60) = 5
// total non-running: 9
// compliance = 5/9 * 100 ~ 55.56%
double sla = store.slaCompliance(from, to, 250, null, null);
assertThat(sla).isBetween(55.0, 56.0);
}
// ── Top Errors Tests ─────────────────────────────────────────────────
@Test
void topErrors_returnsRankedErrors() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
List<TopError> errors = store.topErrors(from, to, null, null, 10);
assertThat(errors).isNotEmpty();
assertThat(errors.get(0).errorType()).isEqualTo("NPE");
assertThat(errors.get(0).count()).isEqualTo(2);
}
// ── Active Error Types Test ──────────────────────────────────────────
@Test
void activeErrorTypes_countsDistinct() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
int count = store.activeErrorTypes(from, to, "app-1");
assertThat(count).isEqualTo(1); // only "NPE"
}
// ── Punchcard Test ───────────────────────────────────────────────────
@Test
void punchcard_returnsWeekdayHourCells() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
List<PunchcardCell> cells = store.punchcard(from, to, null);
assertThat(cells).isNotEmpty();
long totalCount = cells.stream().mapToLong(PunchcardCell::totalCount).sum();
assertThat(totalCount).isEqualTo(10);
}
@Test
void slaCountsByApp_returnsMap() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
// threshold=250ms
Map<String, long[]> counts = store.slaCountsByApp(from, to, 250);
assertThat(counts).containsKeys("app-1", "app-2");
// app-1: 8 total executions, all non-RUNNING
// compliant (<=250ms): exec-01(200), exec-05(100), exec-06(150), exec-07(50), exec-08(60) = 5
long[] app1 = counts.get("app-1");
assertThat(app1[0]).isEqualTo(5); // compliant
assertThat(app1[1]).isEqualTo(8); // total non-running
// app-2: 1 COMPLETED(1000ms) + 1 RUNNING → 1 non-RUNNING, 0 compliant
long[] app2 = counts.get("app-2");
assertThat(app2[0]).isEqualTo(0); // compliant
assertThat(app2[1]).isEqualTo(1); // total non-running
}
@Test
void slaCountsByRoute_returnsMap() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
Map<String, long[]> counts = store.slaCountsByRoute(from, to, "app-1", 250);
assertThat(counts).containsKeys("route-a", "route-b");
// route-a: exec-01(200)OK, exec-02(300)NO, exec-03(400)NO, exec-04(500)NO,
// exec-05(100)OK, exec-06(150)OK → 3 compliant, 6 total
long[] routeA = counts.get("route-a");
assertThat(routeA[0]).isEqualTo(3); // compliant
assertThat(routeA[1]).isEqualTo(6); // total
// route-b: exec-07(50)OK, exec-08(60)OK → 2 compliant, 2 total
long[] routeB = counts.get("route-b");
assertThat(routeB[0]).isEqualTo(2);
assertThat(routeB[1]).isEqualTo(2);
}
// ── Processor Stats Test ─────────────────────────────────────────────
@Test
void statsForProcessor_filtersCorrectly() {
Instant from = BASE.minusSeconds(60);
Instant to = BASE.plusSeconds(300);
ExecutionStats toStats = store.statsForProcessor(from, to, "route-a", "to");
assertThat(toStats.totalCount()).isEqualTo(3);
assertThat(toStats.activeCount()).isEqualTo(0); // processor stats have no running_count
ExecutionStats logStats = store.statsForProcessor(from, to, "route-a", "log");
assertThat(logStats.totalCount()).isEqualTo(2);
}
}

View File

@@ -93,6 +93,8 @@ spec:
value: "postgres"
- name: CAMELEER_STORAGE_SEARCH
value: "opensearch"
- name: CAMELEER_STORAGE_STATS
value: "clickhouse"
resources:
requests: