refactor: remove all ClickHouse code, old interfaces, and SQL migrations

- Delete all ClickHouse storage implementations and config
- Delete old core interfaces (ExecutionRepository, DiagramRepository, MetricsRepository, SearchEngine, RawExecutionRow)
- Delete ClickHouse SQL migration files
- Delete AbstractClickHouseIT
- Update controllers to use new store interfaces (DiagramStore, ExecutionStore)
- Fix IngestionService calls in controllers for new synchronous API
- Migrate all ITs from AbstractClickHouseIT to AbstractPostgresIT
- Fix count() syntax and remove ClickHouse-specific test assertions
- Update TreeReconstructionTest for new buildTree() method

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-16 18:56:13 +01:00
parent 7dbfaf0932
commit 565b548ac1
68 changed files with 226 additions and 2238 deletions

View File

@@ -1,357 +0,0 @@
package com.cameleer3.server.app.search;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchEngine;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.search.StatsTimeseries;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* ClickHouse implementation of {@link SearchEngine}.
* <p>
* Builds dynamic WHERE clauses from non-null {@link SearchRequest} fields
* and queries the {@code route_executions} table. LIKE patterns are properly
* escaped to prevent injection.
*/
public class ClickHouseSearchEngine implements SearchEngine {
/** Per-query memory cap (1 GiB) — prevents a single query from OOMing ClickHouse. */
private static final String SETTINGS = " SETTINGS max_memory_usage = 1000000000";
private final JdbcTemplate jdbcTemplate;
public ClickHouseSearchEngine(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public SearchResult<ExecutionSummary> search(SearchRequest request) {
var conditions = new ArrayList<String>();
var params = new ArrayList<Object>();
buildWhereClause(request, conditions, params);
String where = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions);
// Count query
var countParams = params.toArray();
Long total = jdbcTemplate.queryForObject(
"SELECT count() FROM route_executions" + where + SETTINGS, Long.class, countParams);
if (total == null) total = 0L;
if (total == 0) {
return SearchResult.empty(request.offset(), request.limit());
}
// Data query
params.add(request.limit());
params.add(request.offset());
String orderDir = "asc".equalsIgnoreCase(request.sortDir()) ? "ASC" : "DESC";
String dataSql = "SELECT execution_id, route_id, agent_id, status, start_time, end_time, " +
"duration_ms, correlation_id, error_message, diagram_content_hash " +
"FROM route_executions" + where +
" ORDER BY " + request.sortColumn() + " " + orderDir + " LIMIT ? OFFSET ?" + SETTINGS;
List<ExecutionSummary> data = jdbcTemplate.query(dataSql, (rs, rowNum) -> {
Timestamp endTs = rs.getTimestamp("end_time");
return new ExecutionSummary(
rs.getString("execution_id"),
rs.getString("route_id"),
rs.getString("agent_id"),
rs.getString("status"),
rs.getTimestamp("start_time").toInstant(),
endTs != null ? endTs.toInstant() : null,
rs.getLong("duration_ms"),
rs.getString("correlation_id"),
rs.getString("error_message"),
rs.getString("diagram_content_hash")
);
}, params.toArray());
return new SearchResult<>(data, total, request.offset(), request.limit());
}
@Override
public long count(SearchRequest request) {
var conditions = new ArrayList<String>();
var params = new ArrayList<Object>();
buildWhereClause(request, conditions, params);
String where = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions);
Long result = jdbcTemplate.queryForObject(
"SELECT count() FROM route_executions" + where + SETTINGS, Long.class, params.toArray());
return result != null ? result : 0L;
}
@Override
public ExecutionStats stats(Instant from, Instant to) {
return stats(from, to, null, null);
}
@Override
public ExecutionStats stats(Instant from, Instant to, String routeId, List<String> agentIds) {
// Current period — read from rollup
var conditions = new ArrayList<String>();
var params = new ArrayList<Object>();
conditions.add("bucket >= ?");
params.add(bucketTimestamp(floorToFiveMinutes(from)));
conditions.add("bucket <= ?");
params.add(bucketTimestamp(to));
addScopeFilters(routeId, agentIds, conditions, params);
String where = " WHERE " + String.join(" AND ", conditions);
String rollupSql = "SELECT " +
"countMerge(total_count) AS cnt, " +
"countIfMerge(failed_count) AS failed, " +
"toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " +
"toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " +
"FROM route_execution_stats_5m" + where + SETTINGS;
record PeriodStats(long totalCount, long failedCount, long avgDurationMs, long p99LatencyMs) {}
PeriodStats current = jdbcTemplate.queryForObject(rollupSql,
(rs, rowNum) -> new PeriodStats(
rs.getLong("cnt"),
rs.getLong("failed"),
rs.getLong("avg_ms"),
rs.getLong("p99_ms")),
params.toArray());
// Active count — PREWHERE reads only the status column before touching wide rows
var scopeConditions = new ArrayList<String>();
var activeParams = new ArrayList<Object>();
addScopeFilters(routeId, agentIds, scopeConditions, activeParams);
String scopeWhere = scopeConditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", scopeConditions);
Long activeCount = jdbcTemplate.queryForObject(
"SELECT count() FROM route_executions PREWHERE status = 'RUNNING'" + scopeWhere + SETTINGS,
Long.class, activeParams.toArray());
// Previous period (same window shifted back 24h) — read from rollup
Duration window = Duration.between(from, to);
Instant prevFrom = from.minus(Duration.ofHours(24));
Instant prevTo = prevFrom.plus(window);
var prevConditions = new ArrayList<String>();
var prevParams = new ArrayList<Object>();
prevConditions.add("bucket >= ?");
prevParams.add(bucketTimestamp(floorToFiveMinutes(prevFrom)));
prevConditions.add("bucket <= ?");
prevParams.add(bucketTimestamp(prevTo));
addScopeFilters(routeId, agentIds, prevConditions, prevParams);
String prevWhere = " WHERE " + String.join(" AND ", prevConditions);
String prevRollupSql = "SELECT " +
"countMerge(total_count) AS cnt, " +
"countIfMerge(failed_count) AS failed, " +
"toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " +
"toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " +
"FROM route_execution_stats_5m" + prevWhere + SETTINGS;
PeriodStats prev = jdbcTemplate.queryForObject(prevRollupSql,
(rs, rowNum) -> new PeriodStats(
rs.getLong("cnt"),
rs.getLong("failed"),
rs.getLong("avg_ms"),
rs.getLong("p99_ms")),
prevParams.toArray());
// Today total (midnight UTC to now) — read from rollup with same scope
Instant todayStart = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.DAYS);
var todayConditions = new ArrayList<String>();
var todayParams = new ArrayList<Object>();
todayConditions.add("bucket >= ?");
todayParams.add(bucketTimestamp(floorToFiveMinutes(todayStart)));
addScopeFilters(routeId, agentIds, todayConditions, todayParams);
String todayWhere = " WHERE " + String.join(" AND ", todayConditions);
Long totalToday = jdbcTemplate.queryForObject(
"SELECT countMerge(total_count) FROM route_execution_stats_5m" + todayWhere + SETTINGS,
Long.class, todayParams.toArray());
return new ExecutionStats(
current.totalCount, current.failedCount, current.avgDurationMs,
current.p99LatencyMs, activeCount != null ? activeCount : 0L,
totalToday != null ? totalToday : 0L,
prev.totalCount, prev.failedCount, prev.avgDurationMs, prev.p99LatencyMs);
}
@Override
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
return timeseries(from, to, bucketCount, null, null);
}
@Override
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount,
String routeId, List<String> agentIds) {
long intervalSeconds = Duration.between(from, to).getSeconds() / bucketCount;
if (intervalSeconds < 1) intervalSeconds = 1;
var conditions = new ArrayList<String>();
var params = new ArrayList<Object>();
conditions.add("bucket >= ?");
params.add(bucketTimestamp(floorToFiveMinutes(from)));
conditions.add("bucket <= ?");
params.add(bucketTimestamp(to));
addScopeFilters(routeId, agentIds, conditions, params);
String where = " WHERE " + String.join(" AND ", conditions);
// Re-aggregate 5-minute rollup buckets into the requested interval
String sql = "SELECT " +
"toDateTime(intDiv(toUInt32(bucket), " + intervalSeconds + ") * " + intervalSeconds + ") AS ts_bucket, " +
"countMerge(total_count) AS cnt, " +
"countIfMerge(failed_count) AS failed, " +
"toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " +
"toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " +
"FROM route_execution_stats_5m" + where +
" GROUP BY ts_bucket ORDER BY ts_bucket" + SETTINGS;
List<StatsTimeseries.TimeseriesBucket> buckets = jdbcTemplate.query(sql, (rs, rowNum) ->
new StatsTimeseries.TimeseriesBucket(
rs.getTimestamp("ts_bucket").toInstant(),
rs.getLong("cnt"),
rs.getLong("failed"),
rs.getLong("avg_ms"),
rs.getLong("p99_ms"),
0L
),
params.toArray());
return new StatsTimeseries(buckets);
}
private void buildWhereClause(SearchRequest req, List<String> conditions, List<Object> params) {
if (req.status() != null && !req.status().isBlank()) {
String[] statuses = req.status().split(",");
if (statuses.length == 1) {
conditions.add("status = ?");
params.add(statuses[0].trim());
} else {
String placeholders = String.join(", ", Collections.nCopies(statuses.length, "?"));
conditions.add("status IN (" + placeholders + ")");
for (String s : statuses) {
params.add(s.trim());
}
}
}
if (req.timeFrom() != null) {
conditions.add("start_time >= ?");
params.add(Timestamp.from(req.timeFrom()));
}
if (req.timeTo() != null) {
conditions.add("start_time <= ?");
params.add(Timestamp.from(req.timeTo()));
}
if (req.durationMin() != null) {
conditions.add("duration_ms >= ?");
params.add(req.durationMin());
}
if (req.durationMax() != null) {
conditions.add("duration_ms <= ?");
params.add(req.durationMax());
}
if (req.correlationId() != null && !req.correlationId().isBlank()) {
conditions.add("correlation_id = ?");
params.add(req.correlationId());
}
if (req.routeId() != null && !req.routeId().isBlank()) {
conditions.add("route_id = ?");
params.add(req.routeId());
}
if (req.agentId() != null && !req.agentId().isBlank()) {
conditions.add("agent_id = ?");
params.add(req.agentId());
}
// agentIds from group resolution (takes precedence when agentId is not set)
if ((req.agentId() == null || req.agentId().isBlank())
&& req.agentIds() != null && !req.agentIds().isEmpty()) {
String placeholders = String.join(", ", Collections.nCopies(req.agentIds().size(), "?"));
conditions.add("agent_id IN (" + placeholders + ")");
params.addAll(req.agentIds());
}
if (req.processorType() != null && !req.processorType().isBlank()) {
conditions.add("has(processor_types, ?)");
params.add(req.processorType());
}
if (req.text() != null && !req.text().isBlank()) {
String pattern = "%" + escapeLike(req.text()) + "%";
String[] textColumns = {
"execution_id", "route_id", "agent_id",
"error_message", "error_stacktrace",
"exchange_bodies", "exchange_headers"
};
var likeClauses = java.util.Arrays.stream(textColumns)
.map(col -> col + " LIKE ?")
.toList();
conditions.add("(" + String.join(" OR ", likeClauses) + ")");
for (int i = 0; i < textColumns.length; i++) {
params.add(pattern);
}
}
if (req.textInBody() != null && !req.textInBody().isBlank()) {
conditions.add("exchange_bodies LIKE ?");
params.add("%" + escapeLike(req.textInBody()) + "%");
}
if (req.textInHeaders() != null && !req.textInHeaders().isBlank()) {
conditions.add("exchange_headers LIKE ?");
params.add("%" + escapeLike(req.textInHeaders()) + "%");
}
if (req.textInErrors() != null && !req.textInErrors().isBlank()) {
String pattern = "%" + escapeLike(req.textInErrors()) + "%";
conditions.add("(error_message LIKE ? OR error_stacktrace LIKE ?)");
params.add(pattern);
params.add(pattern);
}
}
/**
* Add route ID and agent IDs scope filters to conditions/params.
*/
private void addScopeFilters(String routeId, List<String> agentIds,
List<String> conditions, List<Object> params) {
if (routeId != null && !routeId.isBlank()) {
conditions.add("route_id = ?");
params.add(routeId);
}
if (agentIds != null && !agentIds.isEmpty()) {
String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?"));
conditions.add("agent_id IN (" + placeholders + ")");
params.addAll(agentIds);
}
}
/**
* Floor an Instant to the start of its 5-minute bucket.
*/
private static Instant floorToFiveMinutes(Instant instant) {
long epochSecond = instant.getEpochSecond();
return Instant.ofEpochSecond(epochSecond - (epochSecond % 300));
}
/**
* Create a second-precision Timestamp for rollup bucket comparisons.
* The bucket column is DateTime('UTC') (second precision); the JDBC driver
* sends java.sql.Timestamp with nanoseconds which ClickHouse rejects.
*/
private static Timestamp bucketTimestamp(Instant instant) {
return Timestamp.from(instant.truncatedTo(java.time.temporal.ChronoUnit.SECONDS));
}
/**
* Escape special LIKE characters to prevent LIKE injection.
*/
static String escapeLike(String input) {
return input
.replace("\\", "\\\\")
.replace("%", "\\%")
.replace("_", "\\_");
}
}

View File

@@ -7,6 +7,7 @@ import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import jakarta.annotation.PostConstruct;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.SortOrder;
@@ -41,8 +42,6 @@ public class OpenSearchIndex implements SearchIndex {
@PostConstruct
void ensureIndexTemplate() {
// Full template with ngram analyzer for infix wildcard search.
// The template JSON matches the spec's OpenSearch index template definition.
try {
boolean exists = client.indices().existsIndexTemplate(
ExistsIndexTemplateRequest.of(b -> b.name("executions-template"))).value();
@@ -53,22 +52,8 @@ public class OpenSearchIndex implements SearchIndex {
.template(t -> t
.settings(s -> s
.numberOfShards("3")
.numberOfReplicas("1")
.analysis(a -> a
.analyzer("ngram_analyzer", an -> an
.custom(c -> c
.tokenizer("ngram_tokenizer")
.filter("lowercase")))
.tokenizer("ngram_tokenizer", tk -> tk
.definition(d -> d
.ngram(ng -> ng
.minGram(3)
.maxGram(4)
.tokenChars(TokenChar.Letter,
TokenChar.Digit,
TokenChar.Punctuation,
TokenChar.Symbol)))))))));
log.info("OpenSearch index template created with ngram analyzer");
.numberOfReplicas("1")))));
log.info("OpenSearch index template created");
}
} catch (IOException e) {
log.error("Failed to create index template", e);
@@ -99,10 +84,10 @@ public class OpenSearchIndex implements SearchIndex {
.collect(Collectors.toList());
long total = response.hits().total() != null ? response.hits().total().value() : 0;
return new SearchResult<>(items, total);
return new SearchResult<>(items, total, request.offset(), request.limit());
} catch (IOException e) {
log.error("Search failed", e);
return new SearchResult<>(List.of(), 0);
return SearchResult.empty(request.offset(), request.limit());
}
}
@@ -125,7 +110,8 @@ public class OpenSearchIndex implements SearchIndex {
client.deleteByQuery(DeleteByQueryRequest.of(b -> b
.index(List.of(INDEX_PREFIX + "*"))
.query(Query.of(q -> q.term(t -> t
.field("execution_id").value(executionId))))));
.field("execution_id")
.value(FieldValue.of(executionId)))))));
} catch (IOException e) {
log.error("Failed to delete execution {}", executionId, e);
}
@@ -155,9 +141,9 @@ public class OpenSearchIndex implements SearchIndex {
filter.add(Query.of(q -> q.range(r -> {
r.field("start_time");
if (request.timeFrom() != null)
r.gte(jakarta.json.Json.createValue(request.timeFrom().toString()));
r.gte(JsonData.of(request.timeFrom().toString()));
if (request.timeTo() != null)
r.lte(jakarta.json.Json.createValue(request.timeTo().toString()));
r.lte(JsonData.of(request.timeTo().toString()));
return r;
})));
}
@@ -180,8 +166,7 @@ public class OpenSearchIndex implements SearchIndex {
// Search top-level text fields
textQueries.add(Query.of(q -> q.multiMatch(m -> m
.query(text)
.fields("error_message", "error_stacktrace",
"error_message.ngram", "error_stacktrace.ngram"))));
.fields("error_message", "error_stacktrace"))));
// Search nested processor fields
textQueries.add(Query.of(q -> q.nested(n -> n
@@ -190,10 +175,7 @@ public class OpenSearchIndex implements SearchIndex {
.query(text)
.fields("processors.input_body", "processors.output_body",
"processors.input_headers", "processors.output_headers",
"processors.error_message", "processors.error_stacktrace",
"processors.input_body.ngram", "processors.output_body.ngram",
"processors.input_headers.ngram", "processors.output_headers.ngram",
"processors.error_message.ngram", "processors.error_stacktrace.ngram"))))));
"processors.error_message", "processors.error_stacktrace"))))));
// Also try keyword fields for exact matches
textQueries.add(Query.of(q -> q.multiMatch(m -> m
@@ -209,30 +191,26 @@ public class OpenSearchIndex implements SearchIndex {
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(request.textInBody())
.fields("processors.input_body", "processors.output_body",
"processors.input_body.ngram", "processors.output_body.ngram"))))));
.fields("processors.input_body", "processors.output_body"))))));
}
if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) {
must.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(request.textInHeaders())
.fields("processors.input_headers", "processors.output_headers",
"processors.input_headers.ngram", "processors.output_headers.ngram"))))));
.fields("processors.input_headers", "processors.output_headers"))))));
}
if (request.textInErrors() != null && !request.textInErrors().isBlank()) {
String errText = request.textInErrors();
must.add(Query.of(q -> q.bool(b -> b.should(
Query.of(sq -> sq.multiMatch(m -> m
.query(errText)
.fields("error_message", "error_stacktrace",
"error_message.ngram", "error_stacktrace.ngram"))),
.fields("error_message", "error_stacktrace"))),
Query.of(sq -> sq.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(errText)
.fields("processors.error_message", "processors.error_stacktrace",
"processors.error_message.ngram", "processors.error_stacktrace.ngram")))))
.fields("processors.error_message", "processors.error_stacktrace")))))
).minimumShouldMatch("1"))));
}
@@ -241,9 +219,9 @@ public class OpenSearchIndex implements SearchIndex {
filter.add(Query.of(q -> q.range(r -> {
r.field("duration_ms");
if (request.durationMin() != null)
r.gte(jakarta.json.Json.createValue(request.durationMin()));
r.gte(JsonData.of(request.durationMin()));
if (request.durationMax() != null)
r.lte(jakarta.json.Json.createValue(request.durationMax()));
r.lte(JsonData.of(request.durationMax()));
return r;
})));
}
@@ -257,7 +235,7 @@ public class OpenSearchIndex implements SearchIndex {
}
private Query termQuery(String field, String value) {
return Query.of(q -> q.term(t -> t.field(field).value(value)));
return Query.of(q -> q.term(t -> t.field(field).value(FieldValue.of(value))));
}
private Map<String, Object> toMap(ExecutionDocument doc) {
@@ -305,6 +283,8 @@ public class OpenSearchIndex implements SearchIndex {
src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null,
src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L,
(String) src.get("correlation_id"),
(String) src.get("error_message"));
(String) src.get("error_message"),
null // diagramContentHash not stored in index
);
}
}