From 0661fd995fa4f03dbc7eba0127003892d7277edc Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 1 Apr 2026 00:04:03 +0200 Subject: [PATCH] feat(clickhouse): add read methods to ClickHouseExecutionStore Implements ExecutionStore interface with findById (FINAL for ReplacingMergeTree), findProcessors (ORDER BY seq), findProcessorById, and findProcessorBySeq. Write methods unchanged. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/storage/ClickHouseExecutionStore.java | 187 +++++++++++++++++- 1 file changed, 186 insertions(+), 1 deletion(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java index 76ea8d80..ba70ea0c 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java @@ -1,17 +1,21 @@ package com.cameleer3.server.app.storage; import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.storage.ExecutionStore; import com.cameleer3.common.model.FlatProcessorRecord; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.jdbc.core.JdbcTemplate; +import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Timestamp; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Optional; -public class ClickHouseExecutionStore { +public class ClickHouseExecutionStore implements ExecutionStore { private final JdbcTemplate jdbc; private final ObjectMapper objectMapper; @@ -128,6 +132,187 @@ public class ClickHouseExecutionStore { }).toList()); } + // --- ExecutionStore interface: read methods --- + + @Override + public Optional findById(String executionId) { + List results = jdbc.query(""" + SELECT execution_id, route_id, agent_id, application_name, status, + correlation_id, exchange_id, start_time, end_time, duration_ms, + error_message, error_stacktrace, diagram_content_hash, engine_level, + input_body, output_body, input_headers, output_headers, attributes, + error_type, error_category, root_cause_type, root_cause_message, + trace_id, span_id, has_trace_data, is_replay + FROM executions FINAL + WHERE tenant_id = 'default' AND execution_id = ? + LIMIT 1 + """, + (rs, rowNum) -> mapExecutionRecord(rs), + executionId); + return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); + } + + @Override + public List findProcessors(String executionId) { + return jdbc.query(""" + SELECT execution_id, seq, parent_seq, parent_processor_id, + processor_id, processor_type, start_time, route_id, application_name, + iteration, iteration_size, status, end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, + input_body, output_body, input_headers, output_headers, attributes, + resolved_endpoint_uri, circuit_breaker_state, + fallback_triggered, filter_matched, duplicate_message + FROM processor_executions + WHERE tenant_id = 'default' AND execution_id = ? + ORDER BY seq + """, + (rs, rowNum) -> mapProcessorRecord(rs), + executionId); + } + + @Override + public Optional findProcessorById(String executionId, String processorId) { + List results = jdbc.query(""" + SELECT execution_id, seq, parent_seq, parent_processor_id, + processor_id, processor_type, start_time, route_id, application_name, + iteration, iteration_size, status, end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, + input_body, output_body, input_headers, output_headers, attributes, + resolved_endpoint_uri, circuit_breaker_state, + fallback_triggered, filter_matched, duplicate_message + FROM processor_executions + WHERE tenant_id = 'default' AND execution_id = ? AND processor_id = ? + LIMIT 1 + """, + (rs, rowNum) -> mapProcessorRecord(rs), + executionId, processorId); + return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); + } + + @Override + public Optional findProcessorBySeq(String executionId, int seq) { + List results = jdbc.query(""" + SELECT execution_id, seq, parent_seq, parent_processor_id, + processor_id, processor_type, start_time, route_id, application_name, + iteration, iteration_size, status, end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, + input_body, output_body, input_headers, output_headers, attributes, + resolved_endpoint_uri, circuit_breaker_state, + fallback_triggered, filter_matched, duplicate_message + FROM processor_executions + WHERE tenant_id = 'default' AND execution_id = ? AND seq = ? + LIMIT 1 + """, + (rs, rowNum) -> mapProcessorRecord(rs), + executionId, seq); + return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); + } + + // --- ExecutionStore interface: write methods (unsupported, use chunked pipeline) --- + + @Override + public void upsert(ExecutionRecord execution) { + throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline"); + } + + @Override + public void upsertProcessors(String executionId, Instant startTime, + String applicationName, String routeId, + List processors) { + throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline"); + } + + // --- Row mappers --- + + private static ExecutionRecord mapExecutionRecord(ResultSet rs) throws SQLException { + return new ExecutionRecord( + emptyToNull(rs.getString("execution_id")), + emptyToNull(rs.getString("route_id")), + emptyToNull(rs.getString("agent_id")), + emptyToNull(rs.getString("application_name")), + emptyToNull(rs.getString("status")), + emptyToNull(rs.getString("correlation_id")), + emptyToNull(rs.getString("exchange_id")), + toInstant(rs, "start_time"), + toInstant(rs, "end_time"), + rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, + emptyToNull(rs.getString("error_message")), + emptyToNull(rs.getString("error_stacktrace")), + emptyToNull(rs.getString("diagram_content_hash")), + emptyToNull(rs.getString("engine_level")), + emptyToNull(rs.getString("input_body")), + emptyToNull(rs.getString("output_body")), + emptyToNull(rs.getString("input_headers")), + emptyToNull(rs.getString("output_headers")), + emptyToNull(rs.getString("attributes")), + emptyToNull(rs.getString("error_type")), + emptyToNull(rs.getString("error_category")), + emptyToNull(rs.getString("root_cause_type")), + emptyToNull(rs.getString("root_cause_message")), + emptyToNull(rs.getString("trace_id")), + emptyToNull(rs.getString("span_id")), + null, // processorsJson not stored in ClickHouse + rs.getBoolean("has_trace_data"), + rs.getBoolean("is_replay") + ); + } + + private static ProcessorRecord mapProcessorRecord(ResultSet rs) throws SQLException { + return new ProcessorRecord( + emptyToNull(rs.getString("execution_id")), + emptyToNull(rs.getString("processor_id")), + emptyToNull(rs.getString("processor_type")), + emptyToNull(rs.getString("application_name")), + emptyToNull(rs.getString("route_id")), + 0, // depth not stored in ClickHouse + emptyToNull(rs.getString("parent_processor_id")), + emptyToNull(rs.getString("status")), + toInstant(rs, "start_time"), + toInstant(rs, "end_time"), + rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, + emptyToNull(rs.getString("error_message")), + emptyToNull(rs.getString("error_stacktrace")), + emptyToNull(rs.getString("input_body")), + emptyToNull(rs.getString("output_body")), + emptyToNull(rs.getString("input_headers")), + emptyToNull(rs.getString("output_headers")), + emptyToNull(rs.getString("attributes")), + null, // loopIndex + null, // loopSize + null, // splitIndex + null, // splitSize + null, // multicastIndex + emptyToNull(rs.getString("resolved_endpoint_uri")), + emptyToNull(rs.getString("error_type")), + emptyToNull(rs.getString("error_category")), + emptyToNull(rs.getString("root_cause_type")), + emptyToNull(rs.getString("root_cause_message")), + null, // errorHandlerType + emptyToNull(rs.getString("circuit_breaker_state")), + rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null, + rs.getObject("seq") != null ? rs.getInt("seq") : null, + rs.getObject("parent_seq") != null ? rs.getInt("parent_seq") : null, + rs.getObject("iteration") != null ? rs.getInt("iteration") : null, + rs.getObject("iteration_size") != null ? rs.getInt("iteration_size") : null, + rs.getObject("filter_matched") != null ? rs.getBoolean("filter_matched") : null, + rs.getObject("duplicate_message") != null ? rs.getBoolean("duplicate_message") : null + ); + } + + // --- Helpers --- + + private static String emptyToNull(String value) { + return (value == null || value.isEmpty()) ? null : value; + } + + private static Instant toInstant(ResultSet rs, String column) throws SQLException { + Timestamp ts = rs.getTimestamp(column); + return ts != null ? ts.toInstant() : null; + } + private static String nullToEmpty(String value) { return value != null ? value : ""; }