feat(clickhouse): add read methods to ClickHouseExecutionStore

Implements ExecutionStore interface with findById (FINAL for
ReplacingMergeTree), findProcessors (ORDER BY seq), findProcessorById,
and findProcessorBySeq. Write methods unchanged.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-01 00:04:03 +02:00
parent 190ae2797d
commit 0661fd995f

View File

@@ -1,17 +1,21 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.common.model.FlatProcessorRecord;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class ClickHouseExecutionStore {
public class ClickHouseExecutionStore implements ExecutionStore {
private final JdbcTemplate jdbc;
private final ObjectMapper objectMapper;
@@ -128,6 +132,187 @@ public class ClickHouseExecutionStore {
}).toList());
}
// --- ExecutionStore interface: read methods ---
@Override
public Optional<ExecutionRecord> findById(String executionId) {
List<ExecutionRecord> results = jdbc.query("""
SELECT execution_id, route_id, agent_id, application_name, status,
correlation_id, exchange_id, start_time, end_time, duration_ms,
error_message, error_stacktrace, diagram_content_hash, engine_level,
input_body, output_body, input_headers, output_headers, attributes,
error_type, error_category, root_cause_type, root_cause_message,
trace_id, span_id, has_trace_data, is_replay
FROM executions FINAL
WHERE tenant_id = 'default' AND execution_id = ?
LIMIT 1
""",
(rs, rowNum) -> mapExecutionRecord(rs),
executionId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public List<ProcessorRecord> findProcessors(String executionId) {
return jdbc.query("""
SELECT execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time, route_id, application_name,
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
FROM processor_executions
WHERE tenant_id = 'default' AND execution_id = ?
ORDER BY seq
""",
(rs, rowNum) -> mapProcessorRecord(rs),
executionId);
}
@Override
public Optional<ProcessorRecord> findProcessorById(String executionId, String processorId) {
List<ProcessorRecord> results = jdbc.query("""
SELECT execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time, route_id, application_name,
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
FROM processor_executions
WHERE tenant_id = 'default' AND execution_id = ? AND processor_id = ?
LIMIT 1
""",
(rs, rowNum) -> mapProcessorRecord(rs),
executionId, processorId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public Optional<ProcessorRecord> findProcessorBySeq(String executionId, int seq) {
List<ProcessorRecord> results = jdbc.query("""
SELECT execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time, route_id, application_name,
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
FROM processor_executions
WHERE tenant_id = 'default' AND execution_id = ? AND seq = ?
LIMIT 1
""",
(rs, rowNum) -> mapProcessorRecord(rs),
executionId, seq);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
// --- ExecutionStore interface: write methods (unsupported, use chunked pipeline) ---
@Override
public void upsert(ExecutionRecord execution) {
throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline");
}
@Override
public void upsertProcessors(String executionId, Instant startTime,
String applicationName, String routeId,
List<ProcessorRecord> processors) {
throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline");
}
// --- Row mappers ---
private static ExecutionRecord mapExecutionRecord(ResultSet rs) throws SQLException {
return new ExecutionRecord(
emptyToNull(rs.getString("execution_id")),
emptyToNull(rs.getString("route_id")),
emptyToNull(rs.getString("agent_id")),
emptyToNull(rs.getString("application_name")),
emptyToNull(rs.getString("status")),
emptyToNull(rs.getString("correlation_id")),
emptyToNull(rs.getString("exchange_id")),
toInstant(rs, "start_time"),
toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
emptyToNull(rs.getString("error_message")),
emptyToNull(rs.getString("error_stacktrace")),
emptyToNull(rs.getString("diagram_content_hash")),
emptyToNull(rs.getString("engine_level")),
emptyToNull(rs.getString("input_body")),
emptyToNull(rs.getString("output_body")),
emptyToNull(rs.getString("input_headers")),
emptyToNull(rs.getString("output_headers")),
emptyToNull(rs.getString("attributes")),
emptyToNull(rs.getString("error_type")),
emptyToNull(rs.getString("error_category")),
emptyToNull(rs.getString("root_cause_type")),
emptyToNull(rs.getString("root_cause_message")),
emptyToNull(rs.getString("trace_id")),
emptyToNull(rs.getString("span_id")),
null, // processorsJson not stored in ClickHouse
rs.getBoolean("has_trace_data"),
rs.getBoolean("is_replay")
);
}
private static ProcessorRecord mapProcessorRecord(ResultSet rs) throws SQLException {
return new ProcessorRecord(
emptyToNull(rs.getString("execution_id")),
emptyToNull(rs.getString("processor_id")),
emptyToNull(rs.getString("processor_type")),
emptyToNull(rs.getString("application_name")),
emptyToNull(rs.getString("route_id")),
0, // depth not stored in ClickHouse
emptyToNull(rs.getString("parent_processor_id")),
emptyToNull(rs.getString("status")),
toInstant(rs, "start_time"),
toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
emptyToNull(rs.getString("error_message")),
emptyToNull(rs.getString("error_stacktrace")),
emptyToNull(rs.getString("input_body")),
emptyToNull(rs.getString("output_body")),
emptyToNull(rs.getString("input_headers")),
emptyToNull(rs.getString("output_headers")),
emptyToNull(rs.getString("attributes")),
null, // loopIndex
null, // loopSize
null, // splitIndex
null, // splitSize
null, // multicastIndex
emptyToNull(rs.getString("resolved_endpoint_uri")),
emptyToNull(rs.getString("error_type")),
emptyToNull(rs.getString("error_category")),
emptyToNull(rs.getString("root_cause_type")),
emptyToNull(rs.getString("root_cause_message")),
null, // errorHandlerType
emptyToNull(rs.getString("circuit_breaker_state")),
rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null,
rs.getObject("seq") != null ? rs.getInt("seq") : null,
rs.getObject("parent_seq") != null ? rs.getInt("parent_seq") : null,
rs.getObject("iteration") != null ? rs.getInt("iteration") : null,
rs.getObject("iteration_size") != null ? rs.getInt("iteration_size") : null,
rs.getObject("filter_matched") != null ? rs.getBoolean("filter_matched") : null,
rs.getObject("duplicate_message") != null ? rs.getBoolean("duplicate_message") : null
);
}
// --- Helpers ---
private static String emptyToNull(String value) {
return (value == null || value.isEmpty()) ? null : value;
}
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
Timestamp ts = rs.getTimestamp(column);
return ts != null ? ts.toInstant() : null;
}
private static String nullToEmpty(String value) {
return value != null ? value : "";
}