Compare commits
3 Commits
bc1c71277c
...
d4dbfa7ae6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4dbfa7ae6 | ||
|
|
59374482bc | ||
|
|
43e187a023 |
@@ -107,10 +107,12 @@ public class StorageBeanConfig {
|
||||
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true)
|
||||
public ChunkAccumulator chunkAccumulator(
|
||||
WriteBuffer<MergedExecution> executionBuffer,
|
||||
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer) {
|
||||
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
|
||||
DiagramStore diagramStore) {
|
||||
return new ChunkAccumulator(
|
||||
executionBuffer::offer,
|
||||
processorBatchBuffer::offer,
|
||||
diagramStore,
|
||||
java.time.Duration.ofMinutes(5));
|
||||
}
|
||||
|
||||
|
||||
@@ -255,13 +255,15 @@ public class AgentRegistrationController {
|
||||
Instant now = Instant.now();
|
||||
Instant from1m = now.minus(1, ChronoUnit.MINUTES);
|
||||
try {
|
||||
// Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries
|
||||
// that strip AggregateFunction column types, breaking -Merge combinators
|
||||
jdbc.query(
|
||||
"SELECT application_name, " +
|
||||
"SUM(total_count) AS total, " +
|
||||
"SUM(failed_count) AS failed, " +
|
||||
"countMerge(total_count) AS total, " +
|
||||
"countIfMerge(failed_count) AS failed, " +
|
||||
"COUNT(DISTINCT route_id) AS active_routes " +
|
||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
|
||||
"GROUP BY application_name",
|
||||
"FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) +
|
||||
" GROUP BY application_name",
|
||||
rs -> {
|
||||
long total = rs.getLong("total");
|
||||
long failed = rs.getLong("failed");
|
||||
@@ -269,11 +271,18 @@ public class AgentRegistrationController {
|
||||
double errorRate = total > 0 ? (double) failed / total : 0.0;
|
||||
int activeRoutes = rs.getInt("active_routes");
|
||||
result.put(rs.getString("application_name"), new double[]{tps, errorRate, activeRoutes});
|
||||
},
|
||||
Timestamp.from(from1m), Timestamp.from(now));
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.debug("Could not query agent metrics: {}", e.getMessage());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Format an Instant as a ClickHouse DateTime literal. */
|
||||
private static String lit(Instant instant) {
|
||||
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
|
||||
String ts = new Timestamp(truncated.toEpochMilli()).toString();
|
||||
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
|
||||
return "'" + ts + "'";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.cameleer3.server.app.controller;
|
||||
import com.cameleer3.server.core.ingestion.ChunkAccumulator;
|
||||
import com.cameleer3.common.model.ExecutionChunk;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
@@ -40,6 +41,7 @@ public class ChunkIngestionController {
|
||||
this.accumulator = accumulator;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
}
|
||||
|
||||
@PostMapping("/executions")
|
||||
@@ -49,7 +51,7 @@ public class ChunkIngestionController {
|
||||
String trimmed = body.strip();
|
||||
List<ExecutionChunk> chunks;
|
||||
if (trimmed.startsWith("[")) {
|
||||
chunks = objectMapper.readValue(trimmed, new TypeReference<>() {});
|
||||
chunks = objectMapper.readValue(trimmed, new TypeReference<List<ExecutionChunk>>() {});
|
||||
} else {
|
||||
ExecutionChunk single = objectMapper.readValue(trimmed, ExecutionChunk.class);
|
||||
chunks = List.of(single);
|
||||
|
||||
@@ -78,38 +78,37 @@ public class RouteCatalogController {
|
||||
Instant rangeTo = to != null ? Instant.parse(to) : now;
|
||||
Instant from1m = now.minus(1, ChronoUnit.MINUTES);
|
||||
|
||||
// Route exchange counts from continuous aggregate
|
||||
// Route exchange counts from AggregatingMergeTree (literal SQL — ClickHouse JDBC driver
|
||||
// wraps prepared statements in sub-queries that strip AggregateFunction column types)
|
||||
Map<String, Long> routeExchangeCounts = new LinkedHashMap<>();
|
||||
Map<String, Instant> routeLastSeen = new LinkedHashMap<>();
|
||||
try {
|
||||
jdbc.query(
|
||||
"SELECT application_name, route_id, SUM(total_count) AS cnt, MAX(bucket) AS last_seen " +
|
||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
|
||||
"GROUP BY application_name, route_id",
|
||||
"SELECT application_name, route_id, countMerge(total_count) AS cnt, MAX(bucket) AS last_seen " +
|
||||
"FROM stats_1m_route WHERE bucket >= " + lit(rangeFrom) + " AND bucket < " + lit(rangeTo) +
|
||||
" GROUP BY application_name, route_id",
|
||||
rs -> {
|
||||
String key = rs.getString("application_name") + "/" + rs.getString("route_id");
|
||||
routeExchangeCounts.put(key, rs.getLong("cnt"));
|
||||
Timestamp ts = rs.getTimestamp("last_seen");
|
||||
if (ts != null) routeLastSeen.put(key, ts.toInstant());
|
||||
},
|
||||
Timestamp.from(rangeFrom), Timestamp.from(rangeTo));
|
||||
});
|
||||
} catch (Exception e) {
|
||||
// Continuous aggregate may not exist yet
|
||||
// AggregatingMergeTree table may not exist yet
|
||||
}
|
||||
|
||||
// Per-agent TPS from the last minute
|
||||
Map<String, Double> agentTps = new LinkedHashMap<>();
|
||||
try {
|
||||
jdbc.query(
|
||||
"SELECT application_name, SUM(total_count) AS cnt " +
|
||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
|
||||
"GROUP BY application_name",
|
||||
"SELECT application_name, countMerge(total_count) AS cnt " +
|
||||
"FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) +
|
||||
" GROUP BY application_name",
|
||||
rs -> {
|
||||
// This gives per-app TPS; we'll distribute among agents below
|
||||
},
|
||||
Timestamp.from(from1m), Timestamp.from(now));
|
||||
});
|
||||
} catch (Exception e) {
|
||||
// Continuous aggregate may not exist yet
|
||||
// AggregatingMergeTree table may not exist yet
|
||||
}
|
||||
|
||||
// Build catalog entries
|
||||
@@ -158,6 +157,14 @@ public class RouteCatalogController {
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
/** Format an Instant as a ClickHouse DateTime literal. */
|
||||
private static String lit(Instant instant) {
|
||||
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
|
||||
String ts = new Timestamp(truncated.toEpochMilli()).toString();
|
||||
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
|
||||
return "'" + ts + "'";
|
||||
}
|
||||
|
||||
private String computeWorstHealth(List<AgentInfo> agents) {
|
||||
boolean hasDead = false;
|
||||
boolean hasStale = false;
|
||||
|
||||
@@ -52,20 +52,18 @@ public class RouteMetricsController {
|
||||
Instant fromInstant = from != null ? Instant.parse(from) : toInstant.minus(24, ChronoUnit.HOURS);
|
||||
long windowSeconds = Duration.between(fromInstant, toInstant).toSeconds();
|
||||
|
||||
// Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries
|
||||
// that strip AggregateFunction column types, breaking -Merge combinators
|
||||
var sql = new StringBuilder(
|
||||
"SELECT application_name, route_id, " +
|
||||
"SUM(total_count) AS total, " +
|
||||
"SUM(failed_count) AS failed, " +
|
||||
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_dur, " +
|
||||
"COALESCE(MAX(p99_duration), 0) AS p99_dur " +
|
||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ?");
|
||||
var params = new ArrayList<Object>();
|
||||
params.add(Timestamp.from(fromInstant));
|
||||
params.add(Timestamp.from(toInstant));
|
||||
"countMerge(total_count) AS total, " +
|
||||
"countIfMerge(failed_count) AS failed, " +
|
||||
"CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_dur, " +
|
||||
"COALESCE(quantileMerge(0.99)(p99_duration), 0) AS p99_dur " +
|
||||
"FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant));
|
||||
|
||||
if (appId != null) {
|
||||
sql.append(" AND application_name = ?");
|
||||
params.add(appId);
|
||||
sql.append(" AND application_name = " + lit(appId));
|
||||
}
|
||||
sql.append(" GROUP BY application_name, route_id ORDER BY application_name, route_id");
|
||||
|
||||
@@ -88,7 +86,7 @@ public class RouteMetricsController {
|
||||
routeKeys.add(new RouteKey(applicationName, routeId));
|
||||
return new RouteMetrics(routeId, applicationName, total, successRate,
|
||||
avgDur, p99Dur, errorRate, tps, List.of(), -1.0);
|
||||
}, params.toArray());
|
||||
});
|
||||
|
||||
// Fetch sparklines (12 buckets over the time window)
|
||||
if (!metrics.isEmpty()) {
|
||||
@@ -98,15 +96,13 @@ public class RouteMetricsController {
|
||||
for (int i = 0; i < metrics.size(); i++) {
|
||||
RouteMetrics m = metrics.get(i);
|
||||
try {
|
||||
List<Double> sparkline = jdbc.query(
|
||||
"SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " +
|
||||
"COALESCE(SUM(total_count), 0) AS cnt " +
|
||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
|
||||
"AND application_name = ? AND route_id = ? " +
|
||||
"GROUP BY period ORDER BY period",
|
||||
(rs, rowNum) -> rs.getDouble("cnt"),
|
||||
bucketSeconds, Timestamp.from(fromInstant), Timestamp.from(toInstant),
|
||||
m.appId(), m.routeId());
|
||||
String sparkSql = "SELECT toStartOfInterval(bucket, toIntervalSecond(" + bucketSeconds + ")) AS period, " +
|
||||
"COALESCE(countMerge(total_count), 0) AS cnt " +
|
||||
"FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) +
|
||||
" AND application_name = " + lit(m.appId()) + " AND route_id = " + lit(m.routeId()) +
|
||||
" GROUP BY period ORDER BY period";
|
||||
List<Double> sparkline = jdbc.query(sparkSql,
|
||||
(rs, rowNum) -> rs.getDouble("cnt"));
|
||||
metrics.set(i, new RouteMetrics(m.routeId(), m.appId(), m.exchangeCount(),
|
||||
m.successRate(), m.avgDurationMs(), m.p99DurationMs(),
|
||||
m.errorRate(), m.throughputPerSec(), sparkline, m.slaCompliance()));
|
||||
@@ -153,25 +149,22 @@ public class RouteMetricsController {
|
||||
Instant toInstant = to != null ? to : Instant.now();
|
||||
Instant fromInstant = from != null ? from : toInstant.minus(24, ChronoUnit.HOURS);
|
||||
|
||||
// Literal SQL for AggregatingMergeTree -Merge combinators
|
||||
var sql = new StringBuilder(
|
||||
"SELECT processor_id, processor_type, route_id, application_name, " +
|
||||
"SUM(total_count) AS total_count, " +
|
||||
"SUM(failed_count) AS failed_count, " +
|
||||
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum)::double precision / SUM(total_count) ELSE 0 END AS avg_duration_ms, " +
|
||||
"MAX(p99_duration) AS p99_duration_ms " +
|
||||
"countMerge(total_count) AS total_count, " +
|
||||
"countIfMerge(failed_count) AS failed_count, " +
|
||||
"CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_duration_ms, " +
|
||||
"quantileMerge(0.99)(p99_duration) AS p99_duration_ms " +
|
||||
"FROM stats_1m_processor_detail " +
|
||||
"WHERE bucket >= ? AND bucket < ? AND route_id = ?");
|
||||
var params = new ArrayList<Object>();
|
||||
params.add(Timestamp.from(fromInstant));
|
||||
params.add(Timestamp.from(toInstant));
|
||||
params.add(routeId);
|
||||
"WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) +
|
||||
" AND route_id = " + lit(routeId));
|
||||
|
||||
if (appId != null) {
|
||||
sql.append(" AND application_name = ?");
|
||||
params.add(appId);
|
||||
sql.append(" AND application_name = " + lit(appId));
|
||||
}
|
||||
sql.append(" GROUP BY processor_id, processor_type, route_id, application_name");
|
||||
sql.append(" ORDER BY SUM(total_count) DESC");
|
||||
sql.append(" ORDER BY countMerge(total_count) DESC");
|
||||
|
||||
List<ProcessorMetrics> metrics = jdbc.query(sql.toString(), (rs, rowNum) -> {
|
||||
long totalCount = rs.getLong("total_count");
|
||||
@@ -187,8 +180,21 @@ public class RouteMetricsController {
|
||||
rs.getDouble("avg_duration_ms"),
|
||||
rs.getDouble("p99_duration_ms"),
|
||||
errorRate);
|
||||
}, params.toArray());
|
||||
});
|
||||
|
||||
return ResponseEntity.ok(metrics);
|
||||
}
|
||||
|
||||
/** Format an Instant as a ClickHouse DateTime literal. */
|
||||
private static String lit(Instant instant) {
|
||||
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
|
||||
String ts = new Timestamp(truncated.toEpochMilli()).toString();
|
||||
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
|
||||
return "'" + ts + "'";
|
||||
}
|
||||
|
||||
/** Format a string as a SQL literal with single-quote escaping. */
|
||||
private static String lit(String value) {
|
||||
return "'" + value.replace("'", "\\'") + "'";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.cameleer3.server.core.ingestion;
|
||||
|
||||
import com.cameleer3.common.model.ExecutionChunk;
|
||||
import com.cameleer3.common.model.FlatProcessorRecord;
|
||||
import com.cameleer3.server.core.storage.DiagramStore;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
@@ -29,14 +30,17 @@ public class ChunkAccumulator {
|
||||
|
||||
private final Consumer<MergedExecution> executionSink;
|
||||
private final Consumer<ProcessorBatch> processorSink;
|
||||
private final DiagramStore diagramStore;
|
||||
private final Duration staleThreshold;
|
||||
private final ConcurrentHashMap<String, PendingExchange> pending = new ConcurrentHashMap<>();
|
||||
|
||||
public ChunkAccumulator(Consumer<MergedExecution> executionSink,
|
||||
Consumer<ProcessorBatch> processorSink,
|
||||
DiagramStore diagramStore,
|
||||
Duration staleThreshold) {
|
||||
this.executionSink = executionSink;
|
||||
this.processorSink = processorSink;
|
||||
this.diagramStore = diagramStore;
|
||||
this.staleThreshold = staleThreshold;
|
||||
}
|
||||
|
||||
@@ -138,7 +142,15 @@ public class ChunkAccumulator {
|
||||
|
||||
// ---- Conversion to MergedExecution ----
|
||||
|
||||
private static MergedExecution toMergedExecution(ExecutionChunk envelope) {
|
||||
private MergedExecution toMergedExecution(ExecutionChunk envelope) {
|
||||
String diagramHash = "";
|
||||
try {
|
||||
diagramHash = diagramStore
|
||||
.findContentHashForRoute(envelope.getRouteId(), envelope.getAgentId())
|
||||
.orElse("");
|
||||
} catch (Exception e) {
|
||||
log.debug("Could not resolve diagram hash for route={}", envelope.getRouteId());
|
||||
}
|
||||
return new MergedExecution(
|
||||
DEFAULT_TENANT,
|
||||
1L,
|
||||
@@ -158,7 +170,7 @@ public class ChunkAccumulator {
|
||||
envelope.getErrorCategory(),
|
||||
envelope.getRootCauseType(),
|
||||
envelope.getRootCauseMessage(),
|
||||
"", // diagramContentHash — server-side lookup, not in chunk
|
||||
diagramHash,
|
||||
envelope.getEngineLevel(),
|
||||
"", // inputBody — on processor records now
|
||||
"", // outputBody
|
||||
|
||||
Reference in New Issue
Block a user