Compare commits

3 Commits

Author SHA1 Message Date
hsiegeln
d4dbfa7ae6 fix: populate diagramContentHash in chunked ingestion pipeline
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 43s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
ChunkAccumulator now injects DiagramStore and looks up the content hash
when converting to MergedExecution. Without this, the detail page had
no diagram hash, so the overlay couldn't find the route diagram.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 10:50:34 +02:00
hsiegeln
59374482bc fix: replace PostgreSQL aggregate functions with ClickHouse -Merge combinators
RouteCatalogController, RouteMetricsController, AgentRegistrationController
all had inline SQL using SUM() on AggregateFunction columns from stats_1m_*
AggregatingMergeTree tables. Replace with countMerge/countIfMerge/sumMerge.
Also fix time_bucket() → toStartOfInterval() and ::double → toFloat64().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 10:49:06 +02:00
hsiegeln
43e187a023 fix: ChunkIngestionController ObjectMapper missing FAIL_ON_UNKNOWN_PROPERTIES
Adds DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES=false (required
by PROTOCOL.md) and explicit TypeReference<List<ExecutionChunk>> for
array parsing. Without this, batched chunks from ChunkedExporter
(2+ chunks in a JSON array) were silently rejected, causing final:true
chunks to be lost and all exchanges to go stale.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 10:45:12 +02:00
6 changed files with 94 additions and 56 deletions

View File

@@ -107,10 +107,12 @@ public class StorageBeanConfig {
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true) @ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true)
public ChunkAccumulator chunkAccumulator( public ChunkAccumulator chunkAccumulator(
WriteBuffer<MergedExecution> executionBuffer, WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer) { WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
DiagramStore diagramStore) {
return new ChunkAccumulator( return new ChunkAccumulator(
executionBuffer::offer, executionBuffer::offer,
processorBatchBuffer::offer, processorBatchBuffer::offer,
diagramStore,
java.time.Duration.ofMinutes(5)); java.time.Duration.ofMinutes(5));
} }

View File

@@ -255,12 +255,14 @@ public class AgentRegistrationController {
Instant now = Instant.now(); Instant now = Instant.now();
Instant from1m = now.minus(1, ChronoUnit.MINUTES); Instant from1m = now.minus(1, ChronoUnit.MINUTES);
try { try {
// Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries
// that strip AggregateFunction column types, breaking -Merge combinators
jdbc.query( jdbc.query(
"SELECT application_name, " + "SELECT application_name, " +
"SUM(total_count) AS total, " + "countMerge(total_count) AS total, " +
"SUM(failed_count) AS failed, " + "countIfMerge(failed_count) AS failed, " +
"COUNT(DISTINCT route_id) AS active_routes " + "COUNT(DISTINCT route_id) AS active_routes " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " + "FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) +
" GROUP BY application_name", " GROUP BY application_name",
rs -> { rs -> {
long total = rs.getLong("total"); long total = rs.getLong("total");
@@ -269,11 +271,18 @@ public class AgentRegistrationController {
double errorRate = total > 0 ? (double) failed / total : 0.0; double errorRate = total > 0 ? (double) failed / total : 0.0;
int activeRoutes = rs.getInt("active_routes"); int activeRoutes = rs.getInt("active_routes");
result.put(rs.getString("application_name"), new double[]{tps, errorRate, activeRoutes}); result.put(rs.getString("application_name"), new double[]{tps, errorRate, activeRoutes});
}, });
Timestamp.from(from1m), Timestamp.from(now));
} catch (Exception e) { } catch (Exception e) {
log.debug("Could not query agent metrics: {}", e.getMessage()); log.debug("Could not query agent metrics: {}", e.getMessage());
} }
return result; return result;
} }
/** Format an Instant as a ClickHouse DateTime literal. */
private static String lit(Instant instant) {
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
String ts = new Timestamp(truncated.toEpochMilli()).toString();
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
return "'" + ts + "'";
}
} }

View File

@@ -3,6 +3,7 @@ package com.cameleer3.server.app.controller;
import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.common.model.ExecutionChunk; import com.cameleer3.common.model.ExecutionChunk;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
@@ -40,6 +41,7 @@ public class ChunkIngestionController {
this.accumulator = accumulator; this.accumulator = accumulator;
this.objectMapper = new ObjectMapper(); this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule()); this.objectMapper.registerModule(new JavaTimeModule());
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
} }
@PostMapping("/executions") @PostMapping("/executions")
@@ -49,7 +51,7 @@ public class ChunkIngestionController {
String trimmed = body.strip(); String trimmed = body.strip();
List<ExecutionChunk> chunks; List<ExecutionChunk> chunks;
if (trimmed.startsWith("[")) { if (trimmed.startsWith("[")) {
chunks = objectMapper.readValue(trimmed, new TypeReference<>() {}); chunks = objectMapper.readValue(trimmed, new TypeReference<List<ExecutionChunk>>() {});
} else { } else {
ExecutionChunk single = objectMapper.readValue(trimmed, ExecutionChunk.class); ExecutionChunk single = objectMapper.readValue(trimmed, ExecutionChunk.class);
chunks = List.of(single); chunks = List.of(single);

View File

@@ -78,38 +78,37 @@ public class RouteCatalogController {
Instant rangeTo = to != null ? Instant.parse(to) : now; Instant rangeTo = to != null ? Instant.parse(to) : now;
Instant from1m = now.minus(1, ChronoUnit.MINUTES); Instant from1m = now.minus(1, ChronoUnit.MINUTES);
// Route exchange counts from continuous aggregate // Route exchange counts from AggregatingMergeTree (literal SQL — ClickHouse JDBC driver
// wraps prepared statements in sub-queries that strip AggregateFunction column types)
Map<String, Long> routeExchangeCounts = new LinkedHashMap<>(); Map<String, Long> routeExchangeCounts = new LinkedHashMap<>();
Map<String, Instant> routeLastSeen = new LinkedHashMap<>(); Map<String, Instant> routeLastSeen = new LinkedHashMap<>();
try { try {
jdbc.query( jdbc.query(
"SELECT application_name, route_id, SUM(total_count) AS cnt, MAX(bucket) AS last_seen " + "SELECT application_name, route_id, countMerge(total_count) AS cnt, MAX(bucket) AS last_seen " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " + "FROM stats_1m_route WHERE bucket >= " + lit(rangeFrom) + " AND bucket < " + lit(rangeTo) +
" GROUP BY application_name, route_id", " GROUP BY application_name, route_id",
rs -> { rs -> {
String key = rs.getString("application_name") + "/" + rs.getString("route_id"); String key = rs.getString("application_name") + "/" + rs.getString("route_id");
routeExchangeCounts.put(key, rs.getLong("cnt")); routeExchangeCounts.put(key, rs.getLong("cnt"));
Timestamp ts = rs.getTimestamp("last_seen"); Timestamp ts = rs.getTimestamp("last_seen");
if (ts != null) routeLastSeen.put(key, ts.toInstant()); if (ts != null) routeLastSeen.put(key, ts.toInstant());
}, });
Timestamp.from(rangeFrom), Timestamp.from(rangeTo));
} catch (Exception e) { } catch (Exception e) {
// Continuous aggregate may not exist yet // AggregatingMergeTree table may not exist yet
} }
// Per-agent TPS from the last minute // Per-agent TPS from the last minute
Map<String, Double> agentTps = new LinkedHashMap<>(); Map<String, Double> agentTps = new LinkedHashMap<>();
try { try {
jdbc.query( jdbc.query(
"SELECT application_name, SUM(total_count) AS cnt " + "SELECT application_name, countMerge(total_count) AS cnt " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " + "FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) +
" GROUP BY application_name", " GROUP BY application_name",
rs -> { rs -> {
// This gives per-app TPS; we'll distribute among agents below // This gives per-app TPS; we'll distribute among agents below
}, });
Timestamp.from(from1m), Timestamp.from(now));
} catch (Exception e) { } catch (Exception e) {
// Continuous aggregate may not exist yet // AggregatingMergeTree table may not exist yet
} }
// Build catalog entries // Build catalog entries
@@ -158,6 +157,14 @@ public class RouteCatalogController {
.orElse(null); .orElse(null);
} }
/** Format an Instant as a ClickHouse DateTime literal. */
private static String lit(Instant instant) {
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
String ts = new Timestamp(truncated.toEpochMilli()).toString();
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
return "'" + ts + "'";
}
private String computeWorstHealth(List<AgentInfo> agents) { private String computeWorstHealth(List<AgentInfo> agents) {
boolean hasDead = false; boolean hasDead = false;
boolean hasStale = false; boolean hasStale = false;

View File

@@ -52,20 +52,18 @@ public class RouteMetricsController {
Instant fromInstant = from != null ? Instant.parse(from) : toInstant.minus(24, ChronoUnit.HOURS); Instant fromInstant = from != null ? Instant.parse(from) : toInstant.minus(24, ChronoUnit.HOURS);
long windowSeconds = Duration.between(fromInstant, toInstant).toSeconds(); long windowSeconds = Duration.between(fromInstant, toInstant).toSeconds();
// Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries
// that strip AggregateFunction column types, breaking -Merge combinators
var sql = new StringBuilder( var sql = new StringBuilder(
"SELECT application_name, route_id, " + "SELECT application_name, route_id, " +
"SUM(total_count) AS total, " + "countMerge(total_count) AS total, " +
"SUM(failed_count) AS failed, " + "countIfMerge(failed_count) AS failed, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_dur, " + "CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_dur, " +
"COALESCE(MAX(p99_duration), 0) AS p99_dur " + "COALESCE(quantileMerge(0.99)(p99_duration), 0) AS p99_dur " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ?"); "FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant));
var params = new ArrayList<Object>();
params.add(Timestamp.from(fromInstant));
params.add(Timestamp.from(toInstant));
if (appId != null) { if (appId != null) {
sql.append(" AND application_name = ?"); sql.append(" AND application_name = " + lit(appId));
params.add(appId);
} }
sql.append(" GROUP BY application_name, route_id ORDER BY application_name, route_id"); sql.append(" GROUP BY application_name, route_id ORDER BY application_name, route_id");
@@ -88,7 +86,7 @@ public class RouteMetricsController {
routeKeys.add(new RouteKey(applicationName, routeId)); routeKeys.add(new RouteKey(applicationName, routeId));
return new RouteMetrics(routeId, applicationName, total, successRate, return new RouteMetrics(routeId, applicationName, total, successRate,
avgDur, p99Dur, errorRate, tps, List.of(), -1.0); avgDur, p99Dur, errorRate, tps, List.of(), -1.0);
}, params.toArray()); });
// Fetch sparklines (12 buckets over the time window) // Fetch sparklines (12 buckets over the time window)
if (!metrics.isEmpty()) { if (!metrics.isEmpty()) {
@@ -98,15 +96,13 @@ public class RouteMetricsController {
for (int i = 0; i < metrics.size(); i++) { for (int i = 0; i < metrics.size(); i++) {
RouteMetrics m = metrics.get(i); RouteMetrics m = metrics.get(i);
try { try {
List<Double> sparkline = jdbc.query( String sparkSql = "SELECT toStartOfInterval(bucket, toIntervalSecond(" + bucketSeconds + ")) AS period, " +
"SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " + "COALESCE(countMerge(total_count), 0) AS cnt " +
"COALESCE(SUM(total_count), 0) AS cnt " + "FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " + " AND application_name = " + lit(m.appId()) + " AND route_id = " + lit(m.routeId()) +
"AND application_name = ? AND route_id = ? " + " GROUP BY period ORDER BY period";
"GROUP BY period ORDER BY period", List<Double> sparkline = jdbc.query(sparkSql,
(rs, rowNum) -> rs.getDouble("cnt"), (rs, rowNum) -> rs.getDouble("cnt"));
bucketSeconds, Timestamp.from(fromInstant), Timestamp.from(toInstant),
m.appId(), m.routeId());
metrics.set(i, new RouteMetrics(m.routeId(), m.appId(), m.exchangeCount(), metrics.set(i, new RouteMetrics(m.routeId(), m.appId(), m.exchangeCount(),
m.successRate(), m.avgDurationMs(), m.p99DurationMs(), m.successRate(), m.avgDurationMs(), m.p99DurationMs(),
m.errorRate(), m.throughputPerSec(), sparkline, m.slaCompliance())); m.errorRate(), m.throughputPerSec(), sparkline, m.slaCompliance()));
@@ -153,25 +149,22 @@ public class RouteMetricsController {
Instant toInstant = to != null ? to : Instant.now(); Instant toInstant = to != null ? to : Instant.now();
Instant fromInstant = from != null ? from : toInstant.minus(24, ChronoUnit.HOURS); Instant fromInstant = from != null ? from : toInstant.minus(24, ChronoUnit.HOURS);
// Literal SQL for AggregatingMergeTree -Merge combinators
var sql = new StringBuilder( var sql = new StringBuilder(
"SELECT processor_id, processor_type, route_id, application_name, " + "SELECT processor_id, processor_type, route_id, application_name, " +
"SUM(total_count) AS total_count, " + "countMerge(total_count) AS total_count, " +
"SUM(failed_count) AS failed_count, " + "countIfMerge(failed_count) AS failed_count, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum)::double precision / SUM(total_count) ELSE 0 END AS avg_duration_ms, " + "CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_duration_ms, " +
"MAX(p99_duration) AS p99_duration_ms " + "quantileMerge(0.99)(p99_duration) AS p99_duration_ms " +
"FROM stats_1m_processor_detail " + "FROM stats_1m_processor_detail " +
"WHERE bucket >= ? AND bucket < ? AND route_id = ?"); "WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) +
var params = new ArrayList<Object>(); " AND route_id = " + lit(routeId));
params.add(Timestamp.from(fromInstant));
params.add(Timestamp.from(toInstant));
params.add(routeId);
if (appId != null) { if (appId != null) {
sql.append(" AND application_name = ?"); sql.append(" AND application_name = " + lit(appId));
params.add(appId);
} }
sql.append(" GROUP BY processor_id, processor_type, route_id, application_name"); sql.append(" GROUP BY processor_id, processor_type, route_id, application_name");
sql.append(" ORDER BY SUM(total_count) DESC"); sql.append(" ORDER BY countMerge(total_count) DESC");
List<ProcessorMetrics> metrics = jdbc.query(sql.toString(), (rs, rowNum) -> { List<ProcessorMetrics> metrics = jdbc.query(sql.toString(), (rs, rowNum) -> {
long totalCount = rs.getLong("total_count"); long totalCount = rs.getLong("total_count");
@@ -187,8 +180,21 @@ public class RouteMetricsController {
rs.getDouble("avg_duration_ms"), rs.getDouble("avg_duration_ms"),
rs.getDouble("p99_duration_ms"), rs.getDouble("p99_duration_ms"),
errorRate); errorRate);
}, params.toArray()); });
return ResponseEntity.ok(metrics); return ResponseEntity.ok(metrics);
} }
/** Format an Instant as a ClickHouse DateTime literal. */
private static String lit(Instant instant) {
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
String ts = new Timestamp(truncated.toEpochMilli()).toString();
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
return "'" + ts + "'";
}
/** Format a string as a SQL literal with single-quote escaping. */
private static String lit(String value) {
return "'" + value.replace("'", "\\'") + "'";
}
} }

View File

@@ -2,6 +2,7 @@ package com.cameleer3.server.core.ingestion;
import com.cameleer3.common.model.ExecutionChunk; import com.cameleer3.common.model.ExecutionChunk;
import com.cameleer3.common.model.FlatProcessorRecord; import com.cameleer3.common.model.FlatProcessorRecord;
import com.cameleer3.server.core.storage.DiagramStore;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -29,14 +30,17 @@ public class ChunkAccumulator {
private final Consumer<MergedExecution> executionSink; private final Consumer<MergedExecution> executionSink;
private final Consumer<ProcessorBatch> processorSink; private final Consumer<ProcessorBatch> processorSink;
private final DiagramStore diagramStore;
private final Duration staleThreshold; private final Duration staleThreshold;
private final ConcurrentHashMap<String, PendingExchange> pending = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, PendingExchange> pending = new ConcurrentHashMap<>();
public ChunkAccumulator(Consumer<MergedExecution> executionSink, public ChunkAccumulator(Consumer<MergedExecution> executionSink,
Consumer<ProcessorBatch> processorSink, Consumer<ProcessorBatch> processorSink,
DiagramStore diagramStore,
Duration staleThreshold) { Duration staleThreshold) {
this.executionSink = executionSink; this.executionSink = executionSink;
this.processorSink = processorSink; this.processorSink = processorSink;
this.diagramStore = diagramStore;
this.staleThreshold = staleThreshold; this.staleThreshold = staleThreshold;
} }
@@ -138,7 +142,15 @@ public class ChunkAccumulator {
// ---- Conversion to MergedExecution ---- // ---- Conversion to MergedExecution ----
private static MergedExecution toMergedExecution(ExecutionChunk envelope) { private MergedExecution toMergedExecution(ExecutionChunk envelope) {
String diagramHash = "";
try {
diagramHash = diagramStore
.findContentHashForRoute(envelope.getRouteId(), envelope.getAgentId())
.orElse("");
} catch (Exception e) {
log.debug("Could not resolve diagram hash for route={}", envelope.getRouteId());
}
return new MergedExecution( return new MergedExecution(
DEFAULT_TENANT, DEFAULT_TENANT,
1L, 1L,
@@ -158,7 +170,7 @@ public class ChunkAccumulator {
envelope.getErrorCategory(), envelope.getErrorCategory(),
envelope.getRootCauseType(), envelope.getRootCauseType(),
envelope.getRootCauseMessage(), envelope.getRootCauseMessage(),
"", // diagramContentHash — server-side lookup, not in chunk diagramHash,
envelope.getEngineLevel(), envelope.getEngineLevel(),
"", // inputBody — on processor records now "", // inputBody — on processor records now
"", // outputBody "", // outputBody