Compare commits

3 Commits

Author SHA1 Message Date
hsiegeln
d4dbfa7ae6 fix: populate diagramContentHash in chunked ingestion pipeline
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 43s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
ChunkAccumulator now injects DiagramStore and looks up the content hash
when converting to MergedExecution. Without this, the detail page had
no diagram hash, so the overlay couldn't find the route diagram.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 10:50:34 +02:00
hsiegeln
59374482bc fix: replace PostgreSQL aggregate functions with ClickHouse -Merge combinators
RouteCatalogController, RouteMetricsController, AgentRegistrationController
all had inline SQL using SUM() on AggregateFunction columns from stats_1m_*
AggregatingMergeTree tables. Replace with countMerge/countIfMerge/sumMerge.
Also fix time_bucket() → toStartOfInterval() and ::double → toFloat64().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 10:49:06 +02:00
hsiegeln
43e187a023 fix: ChunkIngestionController ObjectMapper missing FAIL_ON_UNKNOWN_PROPERTIES
Adds DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES=false (required
by PROTOCOL.md) and explicit TypeReference<List<ExecutionChunk>> for
array parsing. Without this, batched chunks from ChunkedExporter
(2+ chunks in a JSON array) were silently rejected, causing final:true
chunks to be lost and all exchanges to go stale.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 10:45:12 +02:00
6 changed files with 94 additions and 56 deletions

View File

@@ -107,10 +107,12 @@ public class StorageBeanConfig {
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true)
public ChunkAccumulator chunkAccumulator(
WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer) {
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
DiagramStore diagramStore) {
return new ChunkAccumulator(
executionBuffer::offer,
processorBatchBuffer::offer,
diagramStore,
java.time.Duration.ofMinutes(5));
}

View File

@@ -255,13 +255,15 @@ public class AgentRegistrationController {
Instant now = Instant.now();
Instant from1m = now.minus(1, ChronoUnit.MINUTES);
try {
// Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries
// that strip AggregateFunction column types, breaking -Merge combinators
jdbc.query(
"SELECT application_name, " +
"SUM(total_count) AS total, " +
"SUM(failed_count) AS failed, " +
"countMerge(total_count) AS total, " +
"countIfMerge(failed_count) AS failed, " +
"COUNT(DISTINCT route_id) AS active_routes " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
"GROUP BY application_name",
"FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) +
" GROUP BY application_name",
rs -> {
long total = rs.getLong("total");
long failed = rs.getLong("failed");
@@ -269,11 +271,18 @@ public class AgentRegistrationController {
double errorRate = total > 0 ? (double) failed / total : 0.0;
int activeRoutes = rs.getInt("active_routes");
result.put(rs.getString("application_name"), new double[]{tps, errorRate, activeRoutes});
},
Timestamp.from(from1m), Timestamp.from(now));
});
} catch (Exception e) {
log.debug("Could not query agent metrics: {}", e.getMessage());
}
return result;
}
/** Format an Instant as a ClickHouse DateTime literal. */
private static String lit(Instant instant) {
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
String ts = new Timestamp(truncated.toEpochMilli()).toString();
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
return "'" + ts + "'";
}
}

View File

@@ -3,6 +3,7 @@ package com.cameleer3.server.app.controller;
import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.common.model.ExecutionChunk;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.swagger.v3.oas.annotations.Operation;
@@ -40,6 +41,7 @@ public class ChunkIngestionController {
this.accumulator = accumulator;
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
@PostMapping("/executions")
@@ -49,7 +51,7 @@ public class ChunkIngestionController {
String trimmed = body.strip();
List<ExecutionChunk> chunks;
if (trimmed.startsWith("[")) {
chunks = objectMapper.readValue(trimmed, new TypeReference<>() {});
chunks = objectMapper.readValue(trimmed, new TypeReference<List<ExecutionChunk>>() {});
} else {
ExecutionChunk single = objectMapper.readValue(trimmed, ExecutionChunk.class);
chunks = List.of(single);

View File

@@ -78,38 +78,37 @@ public class RouteCatalogController {
Instant rangeTo = to != null ? Instant.parse(to) : now;
Instant from1m = now.minus(1, ChronoUnit.MINUTES);
// Route exchange counts from continuous aggregate
// Route exchange counts from AggregatingMergeTree (literal SQL — ClickHouse JDBC driver
// wraps prepared statements in sub-queries that strip AggregateFunction column types)
Map<String, Long> routeExchangeCounts = new LinkedHashMap<>();
Map<String, Instant> routeLastSeen = new LinkedHashMap<>();
try {
jdbc.query(
"SELECT application_name, route_id, SUM(total_count) AS cnt, MAX(bucket) AS last_seen " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
"GROUP BY application_name, route_id",
"SELECT application_name, route_id, countMerge(total_count) AS cnt, MAX(bucket) AS last_seen " +
"FROM stats_1m_route WHERE bucket >= " + lit(rangeFrom) + " AND bucket < " + lit(rangeTo) +
" GROUP BY application_name, route_id",
rs -> {
String key = rs.getString("application_name") + "/" + rs.getString("route_id");
routeExchangeCounts.put(key, rs.getLong("cnt"));
Timestamp ts = rs.getTimestamp("last_seen");
if (ts != null) routeLastSeen.put(key, ts.toInstant());
},
Timestamp.from(rangeFrom), Timestamp.from(rangeTo));
});
} catch (Exception e) {
// Continuous aggregate may not exist yet
// AggregatingMergeTree table may not exist yet
}
// Per-agent TPS from the last minute
Map<String, Double> agentTps = new LinkedHashMap<>();
try {
jdbc.query(
"SELECT application_name, SUM(total_count) AS cnt " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
"GROUP BY application_name",
"SELECT application_name, countMerge(total_count) AS cnt " +
"FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) +
" GROUP BY application_name",
rs -> {
// This gives per-app TPS; we'll distribute among agents below
},
Timestamp.from(from1m), Timestamp.from(now));
});
} catch (Exception e) {
// Continuous aggregate may not exist yet
// AggregatingMergeTree table may not exist yet
}
// Build catalog entries
@@ -158,6 +157,14 @@ public class RouteCatalogController {
.orElse(null);
}
/** Format an Instant as a ClickHouse DateTime literal. */
private static String lit(Instant instant) {
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
String ts = new Timestamp(truncated.toEpochMilli()).toString();
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
return "'" + ts + "'";
}
private String computeWorstHealth(List<AgentInfo> agents) {
boolean hasDead = false;
boolean hasStale = false;

View File

@@ -52,20 +52,18 @@ public class RouteMetricsController {
Instant fromInstant = from != null ? Instant.parse(from) : toInstant.minus(24, ChronoUnit.HOURS);
long windowSeconds = Duration.between(fromInstant, toInstant).toSeconds();
// Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries
// that strip AggregateFunction column types, breaking -Merge combinators
var sql = new StringBuilder(
"SELECT application_name, route_id, " +
"SUM(total_count) AS total, " +
"SUM(failed_count) AS failed, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_dur, " +
"COALESCE(MAX(p99_duration), 0) AS p99_dur " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ?");
var params = new ArrayList<Object>();
params.add(Timestamp.from(fromInstant));
params.add(Timestamp.from(toInstant));
"countMerge(total_count) AS total, " +
"countIfMerge(failed_count) AS failed, " +
"CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_dur, " +
"COALESCE(quantileMerge(0.99)(p99_duration), 0) AS p99_dur " +
"FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant));
if (appId != null) {
sql.append(" AND application_name = ?");
params.add(appId);
sql.append(" AND application_name = " + lit(appId));
}
sql.append(" GROUP BY application_name, route_id ORDER BY application_name, route_id");
@@ -88,7 +86,7 @@ public class RouteMetricsController {
routeKeys.add(new RouteKey(applicationName, routeId));
return new RouteMetrics(routeId, applicationName, total, successRate,
avgDur, p99Dur, errorRate, tps, List.of(), -1.0);
}, params.toArray());
});
// Fetch sparklines (12 buckets over the time window)
if (!metrics.isEmpty()) {
@@ -98,15 +96,13 @@ public class RouteMetricsController {
for (int i = 0; i < metrics.size(); i++) {
RouteMetrics m = metrics.get(i);
try {
List<Double> sparkline = jdbc.query(
"SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " +
"COALESCE(SUM(total_count), 0) AS cnt " +
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
"AND application_name = ? AND route_id = ? " +
"GROUP BY period ORDER BY period",
(rs, rowNum) -> rs.getDouble("cnt"),
bucketSeconds, Timestamp.from(fromInstant), Timestamp.from(toInstant),
m.appId(), m.routeId());
String sparkSql = "SELECT toStartOfInterval(bucket, toIntervalSecond(" + bucketSeconds + ")) AS period, " +
"COALESCE(countMerge(total_count), 0) AS cnt " +
"FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) +
" AND application_name = " + lit(m.appId()) + " AND route_id = " + lit(m.routeId()) +
" GROUP BY period ORDER BY period";
List<Double> sparkline = jdbc.query(sparkSql,
(rs, rowNum) -> rs.getDouble("cnt"));
metrics.set(i, new RouteMetrics(m.routeId(), m.appId(), m.exchangeCount(),
m.successRate(), m.avgDurationMs(), m.p99DurationMs(),
m.errorRate(), m.throughputPerSec(), sparkline, m.slaCompliance()));
@@ -153,25 +149,22 @@ public class RouteMetricsController {
Instant toInstant = to != null ? to : Instant.now();
Instant fromInstant = from != null ? from : toInstant.minus(24, ChronoUnit.HOURS);
// Literal SQL for AggregatingMergeTree -Merge combinators
var sql = new StringBuilder(
"SELECT processor_id, processor_type, route_id, application_name, " +
"SUM(total_count) AS total_count, " +
"SUM(failed_count) AS failed_count, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum)::double precision / SUM(total_count) ELSE 0 END AS avg_duration_ms, " +
"MAX(p99_duration) AS p99_duration_ms " +
"countMerge(total_count) AS total_count, " +
"countIfMerge(failed_count) AS failed_count, " +
"CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_duration_ms, " +
"quantileMerge(0.99)(p99_duration) AS p99_duration_ms " +
"FROM stats_1m_processor_detail " +
"WHERE bucket >= ? AND bucket < ? AND route_id = ?");
var params = new ArrayList<Object>();
params.add(Timestamp.from(fromInstant));
params.add(Timestamp.from(toInstant));
params.add(routeId);
"WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) +
" AND route_id = " + lit(routeId));
if (appId != null) {
sql.append(" AND application_name = ?");
params.add(appId);
sql.append(" AND application_name = " + lit(appId));
}
sql.append(" GROUP BY processor_id, processor_type, route_id, application_name");
sql.append(" ORDER BY SUM(total_count) DESC");
sql.append(" ORDER BY countMerge(total_count) DESC");
List<ProcessorMetrics> metrics = jdbc.query(sql.toString(), (rs, rowNum) -> {
long totalCount = rs.getLong("total_count");
@@ -187,8 +180,21 @@ public class RouteMetricsController {
rs.getDouble("avg_duration_ms"),
rs.getDouble("p99_duration_ms"),
errorRate);
}, params.toArray());
});
return ResponseEntity.ok(metrics);
}
/** Format an Instant as a ClickHouse DateTime literal. */
private static String lit(Instant instant) {
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
String ts = new Timestamp(truncated.toEpochMilli()).toString();
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
return "'" + ts + "'";
}
/** Format a string as a SQL literal with single-quote escaping. */
private static String lit(String value) {
return "'" + value.replace("'", "\\'") + "'";
}
}

View File

@@ -2,6 +2,7 @@ package com.cameleer3.server.core.ingestion;
import com.cameleer3.common.model.ExecutionChunk;
import com.cameleer3.common.model.FlatProcessorRecord;
import com.cameleer3.server.core.storage.DiagramStore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
@@ -29,14 +30,17 @@ public class ChunkAccumulator {
private final Consumer<MergedExecution> executionSink;
private final Consumer<ProcessorBatch> processorSink;
private final DiagramStore diagramStore;
private final Duration staleThreshold;
private final ConcurrentHashMap<String, PendingExchange> pending = new ConcurrentHashMap<>();
public ChunkAccumulator(Consumer<MergedExecution> executionSink,
Consumer<ProcessorBatch> processorSink,
DiagramStore diagramStore,
Duration staleThreshold) {
this.executionSink = executionSink;
this.processorSink = processorSink;
this.diagramStore = diagramStore;
this.staleThreshold = staleThreshold;
}
@@ -138,7 +142,15 @@ public class ChunkAccumulator {
// ---- Conversion to MergedExecution ----
private static MergedExecution toMergedExecution(ExecutionChunk envelope) {
private MergedExecution toMergedExecution(ExecutionChunk envelope) {
String diagramHash = "";
try {
diagramHash = diagramStore
.findContentHashForRoute(envelope.getRouteId(), envelope.getAgentId())
.orElse("");
} catch (Exception e) {
log.debug("Could not resolve diagram hash for route={}", envelope.getRouteId());
}
return new MergedExecution(
DEFAULT_TENANT,
1L,
@@ -158,7 +170,7 @@ public class ChunkAccumulator {
envelope.getErrorCategory(),
envelope.getRootCauseType(),
envelope.getRootCauseMessage(),
"", // diagramContentHash — server-side lookup, not in chunk
diagramHash,
envelope.getEngineLevel(),
"", // inputBody — on processor records now
"", // outputBody