Compare commits
3 Commits
bc1c71277c
...
d4dbfa7ae6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4dbfa7ae6 | ||
|
|
59374482bc | ||
|
|
43e187a023 |
@@ -107,10 +107,12 @@ public class StorageBeanConfig {
|
|||||||
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true)
|
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true)
|
||||||
public ChunkAccumulator chunkAccumulator(
|
public ChunkAccumulator chunkAccumulator(
|
||||||
WriteBuffer<MergedExecution> executionBuffer,
|
WriteBuffer<MergedExecution> executionBuffer,
|
||||||
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer) {
|
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
|
||||||
|
DiagramStore diagramStore) {
|
||||||
return new ChunkAccumulator(
|
return new ChunkAccumulator(
|
||||||
executionBuffer::offer,
|
executionBuffer::offer,
|
||||||
processorBatchBuffer::offer,
|
processorBatchBuffer::offer,
|
||||||
|
diagramStore,
|
||||||
java.time.Duration.ofMinutes(5));
|
java.time.Duration.ofMinutes(5));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -255,13 +255,15 @@ public class AgentRegistrationController {
|
|||||||
Instant now = Instant.now();
|
Instant now = Instant.now();
|
||||||
Instant from1m = now.minus(1, ChronoUnit.MINUTES);
|
Instant from1m = now.minus(1, ChronoUnit.MINUTES);
|
||||||
try {
|
try {
|
||||||
|
// Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries
|
||||||
|
// that strip AggregateFunction column types, breaking -Merge combinators
|
||||||
jdbc.query(
|
jdbc.query(
|
||||||
"SELECT application_name, " +
|
"SELECT application_name, " +
|
||||||
"SUM(total_count) AS total, " +
|
"countMerge(total_count) AS total, " +
|
||||||
"SUM(failed_count) AS failed, " +
|
"countIfMerge(failed_count) AS failed, " +
|
||||||
"COUNT(DISTINCT route_id) AS active_routes " +
|
"COUNT(DISTINCT route_id) AS active_routes " +
|
||||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
|
"FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) +
|
||||||
"GROUP BY application_name",
|
" GROUP BY application_name",
|
||||||
rs -> {
|
rs -> {
|
||||||
long total = rs.getLong("total");
|
long total = rs.getLong("total");
|
||||||
long failed = rs.getLong("failed");
|
long failed = rs.getLong("failed");
|
||||||
@@ -269,11 +271,18 @@ public class AgentRegistrationController {
|
|||||||
double errorRate = total > 0 ? (double) failed / total : 0.0;
|
double errorRate = total > 0 ? (double) failed / total : 0.0;
|
||||||
int activeRoutes = rs.getInt("active_routes");
|
int activeRoutes = rs.getInt("active_routes");
|
||||||
result.put(rs.getString("application_name"), new double[]{tps, errorRate, activeRoutes});
|
result.put(rs.getString("application_name"), new double[]{tps, errorRate, activeRoutes});
|
||||||
},
|
});
|
||||||
Timestamp.from(from1m), Timestamp.from(now));
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.debug("Could not query agent metrics: {}", e.getMessage());
|
log.debug("Could not query agent metrics: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Format an Instant as a ClickHouse DateTime literal. */
|
||||||
|
private static String lit(Instant instant) {
|
||||||
|
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
|
||||||
|
String ts = new Timestamp(truncated.toEpochMilli()).toString();
|
||||||
|
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
|
||||||
|
return "'" + ts + "'";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package com.cameleer3.server.app.controller;
|
|||||||
import com.cameleer3.server.core.ingestion.ChunkAccumulator;
|
import com.cameleer3.server.core.ingestion.ChunkAccumulator;
|
||||||
import com.cameleer3.common.model.ExecutionChunk;
|
import com.cameleer3.common.model.ExecutionChunk;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||||
import io.swagger.v3.oas.annotations.Operation;
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
@@ -40,6 +41,7 @@ public class ChunkIngestionController {
|
|||||||
this.accumulator = accumulator;
|
this.accumulator = accumulator;
|
||||||
this.objectMapper = new ObjectMapper();
|
this.objectMapper = new ObjectMapper();
|
||||||
this.objectMapper.registerModule(new JavaTimeModule());
|
this.objectMapper.registerModule(new JavaTimeModule());
|
||||||
|
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@PostMapping("/executions")
|
@PostMapping("/executions")
|
||||||
@@ -49,7 +51,7 @@ public class ChunkIngestionController {
|
|||||||
String trimmed = body.strip();
|
String trimmed = body.strip();
|
||||||
List<ExecutionChunk> chunks;
|
List<ExecutionChunk> chunks;
|
||||||
if (trimmed.startsWith("[")) {
|
if (trimmed.startsWith("[")) {
|
||||||
chunks = objectMapper.readValue(trimmed, new TypeReference<>() {});
|
chunks = objectMapper.readValue(trimmed, new TypeReference<List<ExecutionChunk>>() {});
|
||||||
} else {
|
} else {
|
||||||
ExecutionChunk single = objectMapper.readValue(trimmed, ExecutionChunk.class);
|
ExecutionChunk single = objectMapper.readValue(trimmed, ExecutionChunk.class);
|
||||||
chunks = List.of(single);
|
chunks = List.of(single);
|
||||||
|
|||||||
@@ -78,38 +78,37 @@ public class RouteCatalogController {
|
|||||||
Instant rangeTo = to != null ? Instant.parse(to) : now;
|
Instant rangeTo = to != null ? Instant.parse(to) : now;
|
||||||
Instant from1m = now.minus(1, ChronoUnit.MINUTES);
|
Instant from1m = now.minus(1, ChronoUnit.MINUTES);
|
||||||
|
|
||||||
// Route exchange counts from continuous aggregate
|
// Route exchange counts from AggregatingMergeTree (literal SQL — ClickHouse JDBC driver
|
||||||
|
// wraps prepared statements in sub-queries that strip AggregateFunction column types)
|
||||||
Map<String, Long> routeExchangeCounts = new LinkedHashMap<>();
|
Map<String, Long> routeExchangeCounts = new LinkedHashMap<>();
|
||||||
Map<String, Instant> routeLastSeen = new LinkedHashMap<>();
|
Map<String, Instant> routeLastSeen = new LinkedHashMap<>();
|
||||||
try {
|
try {
|
||||||
jdbc.query(
|
jdbc.query(
|
||||||
"SELECT application_name, route_id, SUM(total_count) AS cnt, MAX(bucket) AS last_seen " +
|
"SELECT application_name, route_id, countMerge(total_count) AS cnt, MAX(bucket) AS last_seen " +
|
||||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
|
"FROM stats_1m_route WHERE bucket >= " + lit(rangeFrom) + " AND bucket < " + lit(rangeTo) +
|
||||||
"GROUP BY application_name, route_id",
|
" GROUP BY application_name, route_id",
|
||||||
rs -> {
|
rs -> {
|
||||||
String key = rs.getString("application_name") + "/" + rs.getString("route_id");
|
String key = rs.getString("application_name") + "/" + rs.getString("route_id");
|
||||||
routeExchangeCounts.put(key, rs.getLong("cnt"));
|
routeExchangeCounts.put(key, rs.getLong("cnt"));
|
||||||
Timestamp ts = rs.getTimestamp("last_seen");
|
Timestamp ts = rs.getTimestamp("last_seen");
|
||||||
if (ts != null) routeLastSeen.put(key, ts.toInstant());
|
if (ts != null) routeLastSeen.put(key, ts.toInstant());
|
||||||
},
|
});
|
||||||
Timestamp.from(rangeFrom), Timestamp.from(rangeTo));
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Continuous aggregate may not exist yet
|
// AggregatingMergeTree table may not exist yet
|
||||||
}
|
}
|
||||||
|
|
||||||
// Per-agent TPS from the last minute
|
// Per-agent TPS from the last minute
|
||||||
Map<String, Double> agentTps = new LinkedHashMap<>();
|
Map<String, Double> agentTps = new LinkedHashMap<>();
|
||||||
try {
|
try {
|
||||||
jdbc.query(
|
jdbc.query(
|
||||||
"SELECT application_name, SUM(total_count) AS cnt " +
|
"SELECT application_name, countMerge(total_count) AS cnt " +
|
||||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
|
"FROM stats_1m_route WHERE bucket >= " + lit(from1m) + " AND bucket < " + lit(now) +
|
||||||
"GROUP BY application_name",
|
" GROUP BY application_name",
|
||||||
rs -> {
|
rs -> {
|
||||||
// This gives per-app TPS; we'll distribute among agents below
|
// This gives per-app TPS; we'll distribute among agents below
|
||||||
},
|
});
|
||||||
Timestamp.from(from1m), Timestamp.from(now));
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Continuous aggregate may not exist yet
|
// AggregatingMergeTree table may not exist yet
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build catalog entries
|
// Build catalog entries
|
||||||
@@ -158,6 +157,14 @@ public class RouteCatalogController {
|
|||||||
.orElse(null);
|
.orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Format an Instant as a ClickHouse DateTime literal. */
|
||||||
|
private static String lit(Instant instant) {
|
||||||
|
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
|
||||||
|
String ts = new Timestamp(truncated.toEpochMilli()).toString();
|
||||||
|
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
|
||||||
|
return "'" + ts + "'";
|
||||||
|
}
|
||||||
|
|
||||||
private String computeWorstHealth(List<AgentInfo> agents) {
|
private String computeWorstHealth(List<AgentInfo> agents) {
|
||||||
boolean hasDead = false;
|
boolean hasDead = false;
|
||||||
boolean hasStale = false;
|
boolean hasStale = false;
|
||||||
|
|||||||
@@ -52,20 +52,18 @@ public class RouteMetricsController {
|
|||||||
Instant fromInstant = from != null ? Instant.parse(from) : toInstant.minus(24, ChronoUnit.HOURS);
|
Instant fromInstant = from != null ? Instant.parse(from) : toInstant.minus(24, ChronoUnit.HOURS);
|
||||||
long windowSeconds = Duration.between(fromInstant, toInstant).toSeconds();
|
long windowSeconds = Duration.between(fromInstant, toInstant).toSeconds();
|
||||||
|
|
||||||
|
// Literal SQL — ClickHouse JDBC driver wraps prepared statements in sub-queries
|
||||||
|
// that strip AggregateFunction column types, breaking -Merge combinators
|
||||||
var sql = new StringBuilder(
|
var sql = new StringBuilder(
|
||||||
"SELECT application_name, route_id, " +
|
"SELECT application_name, route_id, " +
|
||||||
"SUM(total_count) AS total, " +
|
"countMerge(total_count) AS total, " +
|
||||||
"SUM(failed_count) AS failed, " +
|
"countIfMerge(failed_count) AS failed, " +
|
||||||
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_dur, " +
|
"CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_dur, " +
|
||||||
"COALESCE(MAX(p99_duration), 0) AS p99_dur " +
|
"COALESCE(quantileMerge(0.99)(p99_duration), 0) AS p99_dur " +
|
||||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ?");
|
"FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant));
|
||||||
var params = new ArrayList<Object>();
|
|
||||||
params.add(Timestamp.from(fromInstant));
|
|
||||||
params.add(Timestamp.from(toInstant));
|
|
||||||
|
|
||||||
if (appId != null) {
|
if (appId != null) {
|
||||||
sql.append(" AND application_name = ?");
|
sql.append(" AND application_name = " + lit(appId));
|
||||||
params.add(appId);
|
|
||||||
}
|
}
|
||||||
sql.append(" GROUP BY application_name, route_id ORDER BY application_name, route_id");
|
sql.append(" GROUP BY application_name, route_id ORDER BY application_name, route_id");
|
||||||
|
|
||||||
@@ -88,7 +86,7 @@ public class RouteMetricsController {
|
|||||||
routeKeys.add(new RouteKey(applicationName, routeId));
|
routeKeys.add(new RouteKey(applicationName, routeId));
|
||||||
return new RouteMetrics(routeId, applicationName, total, successRate,
|
return new RouteMetrics(routeId, applicationName, total, successRate,
|
||||||
avgDur, p99Dur, errorRate, tps, List.of(), -1.0);
|
avgDur, p99Dur, errorRate, tps, List.of(), -1.0);
|
||||||
}, params.toArray());
|
});
|
||||||
|
|
||||||
// Fetch sparklines (12 buckets over the time window)
|
// Fetch sparklines (12 buckets over the time window)
|
||||||
if (!metrics.isEmpty()) {
|
if (!metrics.isEmpty()) {
|
||||||
@@ -98,15 +96,13 @@ public class RouteMetricsController {
|
|||||||
for (int i = 0; i < metrics.size(); i++) {
|
for (int i = 0; i < metrics.size(); i++) {
|
||||||
RouteMetrics m = metrics.get(i);
|
RouteMetrics m = metrics.get(i);
|
||||||
try {
|
try {
|
||||||
List<Double> sparkline = jdbc.query(
|
String sparkSql = "SELECT toStartOfInterval(bucket, toIntervalSecond(" + bucketSeconds + ")) AS period, " +
|
||||||
"SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " +
|
"COALESCE(countMerge(total_count), 0) AS cnt " +
|
||||||
"COALESCE(SUM(total_count), 0) AS cnt " +
|
"FROM stats_1m_route WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) +
|
||||||
"FROM stats_1m_route WHERE bucket >= ? AND bucket < ? " +
|
" AND application_name = " + lit(m.appId()) + " AND route_id = " + lit(m.routeId()) +
|
||||||
"AND application_name = ? AND route_id = ? " +
|
" GROUP BY period ORDER BY period";
|
||||||
"GROUP BY period ORDER BY period",
|
List<Double> sparkline = jdbc.query(sparkSql,
|
||||||
(rs, rowNum) -> rs.getDouble("cnt"),
|
(rs, rowNum) -> rs.getDouble("cnt"));
|
||||||
bucketSeconds, Timestamp.from(fromInstant), Timestamp.from(toInstant),
|
|
||||||
m.appId(), m.routeId());
|
|
||||||
metrics.set(i, new RouteMetrics(m.routeId(), m.appId(), m.exchangeCount(),
|
metrics.set(i, new RouteMetrics(m.routeId(), m.appId(), m.exchangeCount(),
|
||||||
m.successRate(), m.avgDurationMs(), m.p99DurationMs(),
|
m.successRate(), m.avgDurationMs(), m.p99DurationMs(),
|
||||||
m.errorRate(), m.throughputPerSec(), sparkline, m.slaCompliance()));
|
m.errorRate(), m.throughputPerSec(), sparkline, m.slaCompliance()));
|
||||||
@@ -153,25 +149,22 @@ public class RouteMetricsController {
|
|||||||
Instant toInstant = to != null ? to : Instant.now();
|
Instant toInstant = to != null ? to : Instant.now();
|
||||||
Instant fromInstant = from != null ? from : toInstant.minus(24, ChronoUnit.HOURS);
|
Instant fromInstant = from != null ? from : toInstant.minus(24, ChronoUnit.HOURS);
|
||||||
|
|
||||||
|
// Literal SQL for AggregatingMergeTree -Merge combinators
|
||||||
var sql = new StringBuilder(
|
var sql = new StringBuilder(
|
||||||
"SELECT processor_id, processor_type, route_id, application_name, " +
|
"SELECT processor_id, processor_type, route_id, application_name, " +
|
||||||
"SUM(total_count) AS total_count, " +
|
"countMerge(total_count) AS total_count, " +
|
||||||
"SUM(failed_count) AS failed_count, " +
|
"countIfMerge(failed_count) AS failed_count, " +
|
||||||
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum)::double precision / SUM(total_count) ELSE 0 END AS avg_duration_ms, " +
|
"CASE WHEN countMerge(total_count) > 0 THEN toFloat64(sumMerge(duration_sum)) / countMerge(total_count) ELSE 0 END AS avg_duration_ms, " +
|
||||||
"MAX(p99_duration) AS p99_duration_ms " +
|
"quantileMerge(0.99)(p99_duration) AS p99_duration_ms " +
|
||||||
"FROM stats_1m_processor_detail " +
|
"FROM stats_1m_processor_detail " +
|
||||||
"WHERE bucket >= ? AND bucket < ? AND route_id = ?");
|
"WHERE bucket >= " + lit(fromInstant) + " AND bucket < " + lit(toInstant) +
|
||||||
var params = new ArrayList<Object>();
|
" AND route_id = " + lit(routeId));
|
||||||
params.add(Timestamp.from(fromInstant));
|
|
||||||
params.add(Timestamp.from(toInstant));
|
|
||||||
params.add(routeId);
|
|
||||||
|
|
||||||
if (appId != null) {
|
if (appId != null) {
|
||||||
sql.append(" AND application_name = ?");
|
sql.append(" AND application_name = " + lit(appId));
|
||||||
params.add(appId);
|
|
||||||
}
|
}
|
||||||
sql.append(" GROUP BY processor_id, processor_type, route_id, application_name");
|
sql.append(" GROUP BY processor_id, processor_type, route_id, application_name");
|
||||||
sql.append(" ORDER BY SUM(total_count) DESC");
|
sql.append(" ORDER BY countMerge(total_count) DESC");
|
||||||
|
|
||||||
List<ProcessorMetrics> metrics = jdbc.query(sql.toString(), (rs, rowNum) -> {
|
List<ProcessorMetrics> metrics = jdbc.query(sql.toString(), (rs, rowNum) -> {
|
||||||
long totalCount = rs.getLong("total_count");
|
long totalCount = rs.getLong("total_count");
|
||||||
@@ -187,8 +180,21 @@ public class RouteMetricsController {
|
|||||||
rs.getDouble("avg_duration_ms"),
|
rs.getDouble("avg_duration_ms"),
|
||||||
rs.getDouble("p99_duration_ms"),
|
rs.getDouble("p99_duration_ms"),
|
||||||
errorRate);
|
errorRate);
|
||||||
}, params.toArray());
|
});
|
||||||
|
|
||||||
return ResponseEntity.ok(metrics);
|
return ResponseEntity.ok(metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Format an Instant as a ClickHouse DateTime literal. */
|
||||||
|
private static String lit(Instant instant) {
|
||||||
|
Instant truncated = instant.truncatedTo(ChronoUnit.SECONDS);
|
||||||
|
String ts = new Timestamp(truncated.toEpochMilli()).toString();
|
||||||
|
if (ts.endsWith(".0")) ts = ts.substring(0, ts.length() - 2);
|
||||||
|
return "'" + ts + "'";
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Format a string as a SQL literal with single-quote escaping. */
|
||||||
|
private static String lit(String value) {
|
||||||
|
return "'" + value.replace("'", "\\'") + "'";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package com.cameleer3.server.core.ingestion;
|
|||||||
|
|
||||||
import com.cameleer3.common.model.ExecutionChunk;
|
import com.cameleer3.common.model.ExecutionChunk;
|
||||||
import com.cameleer3.common.model.FlatProcessorRecord;
|
import com.cameleer3.common.model.FlatProcessorRecord;
|
||||||
|
import com.cameleer3.server.core.storage.DiagramStore;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -29,14 +30,17 @@ public class ChunkAccumulator {
|
|||||||
|
|
||||||
private final Consumer<MergedExecution> executionSink;
|
private final Consumer<MergedExecution> executionSink;
|
||||||
private final Consumer<ProcessorBatch> processorSink;
|
private final Consumer<ProcessorBatch> processorSink;
|
||||||
|
private final DiagramStore diagramStore;
|
||||||
private final Duration staleThreshold;
|
private final Duration staleThreshold;
|
||||||
private final ConcurrentHashMap<String, PendingExchange> pending = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, PendingExchange> pending = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public ChunkAccumulator(Consumer<MergedExecution> executionSink,
|
public ChunkAccumulator(Consumer<MergedExecution> executionSink,
|
||||||
Consumer<ProcessorBatch> processorSink,
|
Consumer<ProcessorBatch> processorSink,
|
||||||
|
DiagramStore diagramStore,
|
||||||
Duration staleThreshold) {
|
Duration staleThreshold) {
|
||||||
this.executionSink = executionSink;
|
this.executionSink = executionSink;
|
||||||
this.processorSink = processorSink;
|
this.processorSink = processorSink;
|
||||||
|
this.diagramStore = diagramStore;
|
||||||
this.staleThreshold = staleThreshold;
|
this.staleThreshold = staleThreshold;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,7 +142,15 @@ public class ChunkAccumulator {
|
|||||||
|
|
||||||
// ---- Conversion to MergedExecution ----
|
// ---- Conversion to MergedExecution ----
|
||||||
|
|
||||||
private static MergedExecution toMergedExecution(ExecutionChunk envelope) {
|
private MergedExecution toMergedExecution(ExecutionChunk envelope) {
|
||||||
|
String diagramHash = "";
|
||||||
|
try {
|
||||||
|
diagramHash = diagramStore
|
||||||
|
.findContentHashForRoute(envelope.getRouteId(), envelope.getAgentId())
|
||||||
|
.orElse("");
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("Could not resolve diagram hash for route={}", envelope.getRouteId());
|
||||||
|
}
|
||||||
return new MergedExecution(
|
return new MergedExecution(
|
||||||
DEFAULT_TENANT,
|
DEFAULT_TENANT,
|
||||||
1L,
|
1L,
|
||||||
@@ -158,7 +170,7 @@ public class ChunkAccumulator {
|
|||||||
envelope.getErrorCategory(),
|
envelope.getErrorCategory(),
|
||||||
envelope.getRootCauseType(),
|
envelope.getRootCauseType(),
|
||||||
envelope.getRootCauseMessage(),
|
envelope.getRootCauseMessage(),
|
||||||
"", // diagramContentHash — server-side lookup, not in chunk
|
diagramHash,
|
||||||
envelope.getEngineLevel(),
|
envelope.getEngineLevel(),
|
||||||
"", // inputBody — on processor records now
|
"", // inputBody — on processor records now
|
||||||
"", // outputBody
|
"", // outputBody
|
||||||
|
|||||||
Reference in New Issue
Block a user