Compare commits
5 Commits
968117c41a
...
520181d241
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
520181d241 | ||
|
|
95b9dea5c4 | ||
|
|
151b96a680 | ||
|
|
0661fd995f | ||
|
|
190ae2797d |
@@ -6,6 +6,7 @@ import com.cameleer3.server.app.storage.ClickHouseDiagramStore;
|
||||
import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore;
|
||||
import com.cameleer3.server.app.storage.ClickHouseMetricsStore;
|
||||
import com.cameleer3.server.app.storage.ClickHouseStatsStore;
|
||||
import com.cameleer3.server.app.storage.PostgresExecutionStore;
|
||||
import com.cameleer3.server.app.storage.PostgresMetricsQueryStore;
|
||||
import com.cameleer3.server.app.storage.PostgresMetricsStore;
|
||||
import com.cameleer3.server.core.admin.AuditRepository;
|
||||
@@ -96,6 +97,18 @@ public class StorageBeanConfig {
|
||||
return new ClickHouseExecutionStore(clickHouseJdbc);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true)
|
||||
public ExecutionStore executionStoreClickHouse(ClickHouseExecutionStore chStore) {
|
||||
return chStore; // Same instance, also exposed as ExecutionStore
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "postgres")
|
||||
public ExecutionStore executionStorePostgres(JdbcTemplate jdbc) {
|
||||
return new PostgresExecutionStore(jdbc);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
|
||||
public ChunkAccumulator chunkAccumulator(
|
||||
|
||||
@@ -81,4 +81,16 @@ public class DetailController {
|
||||
.map(ResponseEntity::ok)
|
||||
.orElse(ResponseEntity.notFound().build());
|
||||
}
|
||||
|
||||
@GetMapping("/{executionId}/processors/by-seq/{seq}/snapshot")
|
||||
@Operation(summary = "Get exchange snapshot for a processor by seq number")
|
||||
@ApiResponse(responseCode = "200", description = "Snapshot data")
|
||||
@ApiResponse(responseCode = "404", description = "Snapshot not found")
|
||||
public ResponseEntity<Map<String, String>> processorSnapshotBySeq(
|
||||
@PathVariable String executionId,
|
||||
@PathVariable int seq) {
|
||||
return detailService.getProcessorSnapshotBySeq(executionId, seq)
|
||||
.map(ResponseEntity::ok)
|
||||
.orElse(ResponseEntity.notFound().build());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,17 +1,21 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.ingestion.MergedExecution;
|
||||
import com.cameleer3.server.core.storage.ExecutionStore;
|
||||
import com.cameleer3.common.model.FlatProcessorRecord;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public class ClickHouseExecutionStore {
|
||||
public class ClickHouseExecutionStore implements ExecutionStore {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
private final ObjectMapper objectMapper;
|
||||
@@ -128,6 +132,187 @@ public class ClickHouseExecutionStore {
|
||||
}).toList());
|
||||
}
|
||||
|
||||
// --- ExecutionStore interface: read methods ---
|
||||
|
||||
@Override
|
||||
public Optional<ExecutionRecord> findById(String executionId) {
|
||||
List<ExecutionRecord> results = jdbc.query("""
|
||||
SELECT execution_id, route_id, agent_id, application_name, status,
|
||||
correlation_id, exchange_id, start_time, end_time, duration_ms,
|
||||
error_message, error_stacktrace, diagram_content_hash, engine_level,
|
||||
input_body, output_body, input_headers, output_headers, attributes,
|
||||
error_type, error_category, root_cause_type, root_cause_message,
|
||||
trace_id, span_id, has_trace_data, is_replay
|
||||
FROM executions FINAL
|
||||
WHERE tenant_id = 'default' AND execution_id = ?
|
||||
LIMIT 1
|
||||
""",
|
||||
(rs, rowNum) -> mapExecutionRecord(rs),
|
||||
executionId);
|
||||
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ProcessorRecord> findProcessors(String executionId) {
|
||||
return jdbc.query("""
|
||||
SELECT execution_id, seq, parent_seq, parent_processor_id,
|
||||
processor_id, processor_type, start_time, route_id, application_name,
|
||||
iteration, iteration_size, status, end_time, duration_ms,
|
||||
error_message, error_stacktrace, error_type, error_category,
|
||||
root_cause_type, root_cause_message,
|
||||
input_body, output_body, input_headers, output_headers, attributes,
|
||||
resolved_endpoint_uri, circuit_breaker_state,
|
||||
fallback_triggered, filter_matched, duplicate_message
|
||||
FROM processor_executions
|
||||
WHERE tenant_id = 'default' AND execution_id = ?
|
||||
ORDER BY seq
|
||||
""",
|
||||
(rs, rowNum) -> mapProcessorRecord(rs),
|
||||
executionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ProcessorRecord> findProcessorById(String executionId, String processorId) {
|
||||
List<ProcessorRecord> results = jdbc.query("""
|
||||
SELECT execution_id, seq, parent_seq, parent_processor_id,
|
||||
processor_id, processor_type, start_time, route_id, application_name,
|
||||
iteration, iteration_size, status, end_time, duration_ms,
|
||||
error_message, error_stacktrace, error_type, error_category,
|
||||
root_cause_type, root_cause_message,
|
||||
input_body, output_body, input_headers, output_headers, attributes,
|
||||
resolved_endpoint_uri, circuit_breaker_state,
|
||||
fallback_triggered, filter_matched, duplicate_message
|
||||
FROM processor_executions
|
||||
WHERE tenant_id = 'default' AND execution_id = ? AND processor_id = ?
|
||||
LIMIT 1
|
||||
""",
|
||||
(rs, rowNum) -> mapProcessorRecord(rs),
|
||||
executionId, processorId);
|
||||
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ProcessorRecord> findProcessorBySeq(String executionId, int seq) {
|
||||
List<ProcessorRecord> results = jdbc.query("""
|
||||
SELECT execution_id, seq, parent_seq, parent_processor_id,
|
||||
processor_id, processor_type, start_time, route_id, application_name,
|
||||
iteration, iteration_size, status, end_time, duration_ms,
|
||||
error_message, error_stacktrace, error_type, error_category,
|
||||
root_cause_type, root_cause_message,
|
||||
input_body, output_body, input_headers, output_headers, attributes,
|
||||
resolved_endpoint_uri, circuit_breaker_state,
|
||||
fallback_triggered, filter_matched, duplicate_message
|
||||
FROM processor_executions
|
||||
WHERE tenant_id = 'default' AND execution_id = ? AND seq = ?
|
||||
LIMIT 1
|
||||
""",
|
||||
(rs, rowNum) -> mapProcessorRecord(rs),
|
||||
executionId, seq);
|
||||
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
||||
}
|
||||
|
||||
// --- ExecutionStore interface: write methods (unsupported, use chunked pipeline) ---
|
||||
|
||||
@Override
|
||||
public void upsert(ExecutionRecord execution) {
|
||||
throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void upsertProcessors(String executionId, Instant startTime,
|
||||
String applicationName, String routeId,
|
||||
List<ProcessorRecord> processors) {
|
||||
throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline");
|
||||
}
|
||||
|
||||
// --- Row mappers ---
|
||||
|
||||
private static ExecutionRecord mapExecutionRecord(ResultSet rs) throws SQLException {
|
||||
return new ExecutionRecord(
|
||||
emptyToNull(rs.getString("execution_id")),
|
||||
emptyToNull(rs.getString("route_id")),
|
||||
emptyToNull(rs.getString("agent_id")),
|
||||
emptyToNull(rs.getString("application_name")),
|
||||
emptyToNull(rs.getString("status")),
|
||||
emptyToNull(rs.getString("correlation_id")),
|
||||
emptyToNull(rs.getString("exchange_id")),
|
||||
toInstant(rs, "start_time"),
|
||||
toInstant(rs, "end_time"),
|
||||
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
|
||||
emptyToNull(rs.getString("error_message")),
|
||||
emptyToNull(rs.getString("error_stacktrace")),
|
||||
emptyToNull(rs.getString("diagram_content_hash")),
|
||||
emptyToNull(rs.getString("engine_level")),
|
||||
emptyToNull(rs.getString("input_body")),
|
||||
emptyToNull(rs.getString("output_body")),
|
||||
emptyToNull(rs.getString("input_headers")),
|
||||
emptyToNull(rs.getString("output_headers")),
|
||||
emptyToNull(rs.getString("attributes")),
|
||||
emptyToNull(rs.getString("error_type")),
|
||||
emptyToNull(rs.getString("error_category")),
|
||||
emptyToNull(rs.getString("root_cause_type")),
|
||||
emptyToNull(rs.getString("root_cause_message")),
|
||||
emptyToNull(rs.getString("trace_id")),
|
||||
emptyToNull(rs.getString("span_id")),
|
||||
null, // processorsJson not stored in ClickHouse
|
||||
rs.getBoolean("has_trace_data"),
|
||||
rs.getBoolean("is_replay")
|
||||
);
|
||||
}
|
||||
|
||||
private static ProcessorRecord mapProcessorRecord(ResultSet rs) throws SQLException {
|
||||
return new ProcessorRecord(
|
||||
emptyToNull(rs.getString("execution_id")),
|
||||
emptyToNull(rs.getString("processor_id")),
|
||||
emptyToNull(rs.getString("processor_type")),
|
||||
emptyToNull(rs.getString("application_name")),
|
||||
emptyToNull(rs.getString("route_id")),
|
||||
0, // depth not stored in ClickHouse
|
||||
emptyToNull(rs.getString("parent_processor_id")),
|
||||
emptyToNull(rs.getString("status")),
|
||||
toInstant(rs, "start_time"),
|
||||
toInstant(rs, "end_time"),
|
||||
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
|
||||
emptyToNull(rs.getString("error_message")),
|
||||
emptyToNull(rs.getString("error_stacktrace")),
|
||||
emptyToNull(rs.getString("input_body")),
|
||||
emptyToNull(rs.getString("output_body")),
|
||||
emptyToNull(rs.getString("input_headers")),
|
||||
emptyToNull(rs.getString("output_headers")),
|
||||
emptyToNull(rs.getString("attributes")),
|
||||
null, // loopIndex
|
||||
null, // loopSize
|
||||
null, // splitIndex
|
||||
null, // splitSize
|
||||
null, // multicastIndex
|
||||
emptyToNull(rs.getString("resolved_endpoint_uri")),
|
||||
emptyToNull(rs.getString("error_type")),
|
||||
emptyToNull(rs.getString("error_category")),
|
||||
emptyToNull(rs.getString("root_cause_type")),
|
||||
emptyToNull(rs.getString("root_cause_message")),
|
||||
null, // errorHandlerType
|
||||
emptyToNull(rs.getString("circuit_breaker_state")),
|
||||
rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null,
|
||||
rs.getObject("seq") != null ? rs.getInt("seq") : null,
|
||||
rs.getObject("parent_seq") != null ? rs.getInt("parent_seq") : null,
|
||||
rs.getObject("iteration") != null ? rs.getInt("iteration") : null,
|
||||
rs.getObject("iteration_size") != null ? rs.getInt("iteration_size") : null,
|
||||
rs.getObject("filter_matched") != null ? rs.getBoolean("filter_matched") : null,
|
||||
rs.getObject("duplicate_message") != null ? rs.getBoolean("duplicate_message") : null
|
||||
);
|
||||
}
|
||||
|
||||
// --- Helpers ---
|
||||
|
||||
private static String emptyToNull(String value) {
|
||||
return (value == null || value.isEmpty()) ? null : value;
|
||||
}
|
||||
|
||||
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
|
||||
Timestamp ts = rs.getTimestamp(column);
|
||||
return ts != null ? ts.toInstant() : null;
|
||||
}
|
||||
|
||||
private static String nullToEmpty(String value) {
|
||||
return value != null ? value : "";
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package com.cameleer3.server.app.storage;
|
||||
import com.cameleer3.server.core.storage.ExecutionStore;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
@@ -12,7 +11,6 @@ import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Repository
|
||||
public class PostgresExecutionStore implements ExecutionStore {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
@@ -206,7 +204,8 @@ public class PostgresExecutionStore implements ExecutionStore {
|
||||
rs.getString("error_type"), rs.getString("error_category"),
|
||||
rs.getString("root_cause_type"), rs.getString("root_cause_message"),
|
||||
rs.getString("error_handler_type"), rs.getString("circuit_breaker_state"),
|
||||
rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null);
|
||||
rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null,
|
||||
null, null, null, null, null, null);
|
||||
|
||||
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
|
||||
Timestamp ts = rs.getTimestamp(column);
|
||||
|
||||
@@ -55,6 +55,7 @@ cameleer:
|
||||
diagrams: ${CAMELEER_STORAGE_DIAGRAMS:clickhouse}
|
||||
events: ${CAMELEER_STORAGE_EVENTS:clickhouse}
|
||||
logs: ${CAMELEER_STORAGE_LOGS:clickhouse}
|
||||
executions: ${CAMELEER_STORAGE_EXECUTIONS:clickhouse}
|
||||
|
||||
security:
|
||||
access-token-expiry-ms: 3600000
|
||||
|
||||
@@ -0,0 +1,262 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.common.model.ExecutionStatus;
|
||||
import com.cameleer3.common.model.FlatProcessorRecord;
|
||||
import com.cameleer3.server.core.detail.DetailService;
|
||||
import com.cameleer3.server.core.detail.ExecutionDetail;
|
||||
import com.cameleer3.server.core.detail.ProcessorNode;
|
||||
import com.cameleer3.server.core.ingestion.MergedExecution;
|
||||
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.testcontainers.clickhouse.ClickHouseContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Testcontainers
|
||||
class ClickHouseExecutionReadIT {
|
||||
|
||||
@Container
|
||||
static final ClickHouseContainer clickhouse =
|
||||
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
|
||||
|
||||
private JdbcTemplate jdbc;
|
||||
private ClickHouseExecutionStore store;
|
||||
private DetailService detailService;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws Exception {
|
||||
HikariDataSource ds = new HikariDataSource();
|
||||
ds.setJdbcUrl(clickhouse.getJdbcUrl());
|
||||
ds.setUsername(clickhouse.getUsername());
|
||||
ds.setPassword(clickhouse.getPassword());
|
||||
jdbc = new JdbcTemplate(ds);
|
||||
|
||||
// Load DDL for both tables
|
||||
String execDdl = new ClassPathResource("clickhouse/V2__executions.sql")
|
||||
.getContentAsString(StandardCharsets.UTF_8);
|
||||
String procDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql")
|
||||
.getContentAsString(StandardCharsets.UTF_8);
|
||||
// Also load V5 for replay fields
|
||||
String replayDdl = new ClassPathResource("clickhouse/V5__replay_fields.sql")
|
||||
.getContentAsString(StandardCharsets.UTF_8);
|
||||
|
||||
jdbc.execute(execDdl);
|
||||
jdbc.execute(procDdl);
|
||||
// V5 has ALTER TABLE statements — execute each separately
|
||||
for (String stmt : replayDdl.split(";")) {
|
||||
String trimmed = stmt.trim();
|
||||
if (!trimmed.isEmpty()) jdbc.execute(trimmed);
|
||||
}
|
||||
|
||||
jdbc.execute("TRUNCATE TABLE executions");
|
||||
jdbc.execute("TRUNCATE TABLE processor_executions");
|
||||
|
||||
store = new ClickHouseExecutionStore(jdbc);
|
||||
detailService = new DetailService(store);
|
||||
}
|
||||
|
||||
// --- Helper factory methods ---
|
||||
|
||||
private MergedExecution minimalExecution(String executionId) {
|
||||
return new MergedExecution(
|
||||
"default", 1L, executionId, "route-a", "agent-1", "my-app",
|
||||
"COMPLETED", "corr-1", "exchange-1",
|
||||
Instant.parse("2026-04-01T10:00:00Z"),
|
||||
Instant.parse("2026-04-01T10:00:01Z"),
|
||||
1000L,
|
||||
"", "", "", "", "", "",
|
||||
"", "REGULAR",
|
||||
"", "", "", "", "{}",
|
||||
"", "",
|
||||
false, false,
|
||||
null, null
|
||||
);
|
||||
}
|
||||
|
||||
private FlatProcessorRecord processor(int seq, String processorId, String processorType) {
|
||||
FlatProcessorRecord p = new FlatProcessorRecord(seq, processorId, processorType);
|
||||
p.setStatus(ExecutionStatus.COMPLETED);
|
||||
p.setStartTime(Instant.parse("2026-04-01T10:00:00Z"));
|
||||
p.setDurationMs(10L);
|
||||
return p;
|
||||
}
|
||||
|
||||
// --- Tests ---
|
||||
|
||||
@Test
|
||||
void findById_returnsInsertedExecution() {
|
||||
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
|
||||
|
||||
Optional<com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord> result =
|
||||
store.findById("exec-1");
|
||||
|
||||
assertThat(result).isPresent();
|
||||
assertThat(result.get().executionId()).isEqualTo("exec-1");
|
||||
assertThat(result.get().routeId()).isEqualTo("route-a");
|
||||
assertThat(result.get().status()).isEqualTo("COMPLETED");
|
||||
assertThat(result.get().agentId()).isEqualTo("agent-1");
|
||||
assertThat(result.get().applicationName()).isEqualTo("my-app");
|
||||
assertThat(result.get().processorsJson()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void findById_notFound_returnsEmpty() {
|
||||
Optional<com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord> result =
|
||||
store.findById("nonexistent");
|
||||
|
||||
assertThat(result).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void findProcessors_returnsOrderedBySeq() {
|
||||
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
|
||||
|
||||
FlatProcessorRecord p1 = processor(1, "log-1", "log");
|
||||
FlatProcessorRecord p2 = processor(2, "transform-1", "setBody");
|
||||
FlatProcessorRecord p3 = processor(3, "to-1", "to");
|
||||
p2.setParentSeq(1);
|
||||
p2.setParentProcessorId("log-1");
|
||||
p3.setParentSeq(1);
|
||||
p3.setParentProcessorId("log-1");
|
||||
|
||||
store.insertProcessorBatch(
|
||||
"default", "exec-1", "route-a", "my-app",
|
||||
Instant.parse("2026-04-01T10:00:00Z"),
|
||||
List.of(p1, p2, p3));
|
||||
|
||||
List<ProcessorRecord> records = store.findProcessors("exec-1");
|
||||
|
||||
assertThat(records).hasSize(3);
|
||||
assertThat(records.get(0).seq()).isEqualTo(1);
|
||||
assertThat(records.get(0).processorId()).isEqualTo("log-1");
|
||||
assertThat(records.get(1).seq()).isEqualTo(2);
|
||||
assertThat(records.get(1).processorId()).isEqualTo("transform-1");
|
||||
assertThat(records.get(1).parentSeq()).isEqualTo(1);
|
||||
assertThat(records.get(2).seq()).isEqualTo(3);
|
||||
assertThat(records.get(2).processorId()).isEqualTo("to-1");
|
||||
assertThat(records.get(2).parentSeq()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findProcessorBySeq_returnsCorrectRecord() {
|
||||
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
|
||||
|
||||
FlatProcessorRecord p1 = processor(1, "log-1", "log");
|
||||
FlatProcessorRecord p2 = processor(2, "to-1", "to");
|
||||
FlatProcessorRecord p3 = processor(3, "log-2", "log");
|
||||
|
||||
store.insertProcessorBatch(
|
||||
"default", "exec-1", "route-a", "my-app",
|
||||
Instant.parse("2026-04-01T10:00:00Z"),
|
||||
List.of(p1, p2, p3));
|
||||
|
||||
Optional<ProcessorRecord> result = store.findProcessorBySeq("exec-1", 2);
|
||||
|
||||
assertThat(result).isPresent();
|
||||
assertThat(result.get().seq()).isEqualTo(2);
|
||||
assertThat(result.get().processorId()).isEqualTo("to-1");
|
||||
assertThat(result.get().processorType()).isEqualTo("to");
|
||||
}
|
||||
|
||||
@Test
|
||||
void findProcessorById_returnsFirstOccurrence() {
|
||||
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
|
||||
|
||||
// Three processors with the same processorId (iteration scenario)
|
||||
FlatProcessorRecord iter0 = processor(1, "to-1", "to");
|
||||
iter0.setIteration(0);
|
||||
|
||||
FlatProcessorRecord iter1 = processor(2, "to-1", "to");
|
||||
iter1.setIteration(1);
|
||||
|
||||
FlatProcessorRecord iter2 = processor(3, "to-1", "to");
|
||||
iter2.setIteration(2);
|
||||
|
||||
store.insertProcessorBatch(
|
||||
"default", "exec-1", "route-a", "my-app",
|
||||
Instant.parse("2026-04-01T10:00:00Z"),
|
||||
List.of(iter0, iter1, iter2));
|
||||
|
||||
Optional<ProcessorRecord> result = store.findProcessorById("exec-1", "to-1");
|
||||
|
||||
assertThat(result).isPresent();
|
||||
// ClickHouse LIMIT 1 returns one record — verify it has the correct processorId
|
||||
assertThat(result.get().processorId()).isEqualTo("to-1");
|
||||
// The returned record should have the lowest seq (first occurrence)
|
||||
assertThat(result.get().seq()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void detailService_buildTree_withIterations() {
|
||||
// Insert an execution
|
||||
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
|
||||
|
||||
// Build a split scenario:
|
||||
// seq=1: log-1 (root)
|
||||
// seq=2: split-1 (root)
|
||||
// seq=3: to-1 (child of split-1, iteration=0)
|
||||
// seq=4: to-1 (child of split-1, iteration=1)
|
||||
// seq=5: to-1 (child of split-1, iteration=2)
|
||||
// seq=6: log-2 (root)
|
||||
|
||||
FlatProcessorRecord log1 = processor(1, "log-1", "log");
|
||||
|
||||
FlatProcessorRecord split1 = processor(2, "split-1", "split");
|
||||
split1.setIterationSize(3);
|
||||
|
||||
FlatProcessorRecord to1iter0 = processor(3, "to-1", "to");
|
||||
to1iter0.setParentSeq(2);
|
||||
to1iter0.setParentProcessorId("split-1");
|
||||
to1iter0.setIteration(0);
|
||||
|
||||
FlatProcessorRecord to1iter1 = processor(4, "to-1", "to");
|
||||
to1iter1.setParentSeq(2);
|
||||
to1iter1.setParentProcessorId("split-1");
|
||||
to1iter1.setIteration(1);
|
||||
|
||||
FlatProcessorRecord to1iter2 = processor(5, "to-1", "to");
|
||||
to1iter2.setParentSeq(2);
|
||||
to1iter2.setParentProcessorId("split-1");
|
||||
to1iter2.setIteration(2);
|
||||
|
||||
FlatProcessorRecord log2 = processor(6, "log-2", "log");
|
||||
|
||||
store.insertProcessorBatch(
|
||||
"default", "exec-1", "route-a", "my-app",
|
||||
Instant.parse("2026-04-01T10:00:00Z"),
|
||||
List.of(log1, split1, to1iter0, to1iter1, to1iter2, log2));
|
||||
|
||||
// Invoke DetailService
|
||||
Optional<ExecutionDetail> detail = detailService.getDetail("exec-1");
|
||||
|
||||
assertThat(detail).isPresent();
|
||||
|
||||
List<ProcessorNode> roots = detail.get().processors();
|
||||
assertThat(roots).hasSize(3);
|
||||
assertThat(roots.get(0).getProcessorId()).isEqualTo("log-1");
|
||||
assertThat(roots.get(1).getProcessorId()).isEqualTo("split-1");
|
||||
assertThat(roots.get(2).getProcessorId()).isEqualTo("log-2");
|
||||
|
||||
// Verify split-1 has 3 children (all with processorId "to-1")
|
||||
ProcessorNode splitNode = roots.get(1);
|
||||
List<ProcessorNode> children = splitNode.getChildren();
|
||||
assertThat(children).hasSize(3);
|
||||
assertThat(children).allMatch(c -> "to-1".equals(c.getProcessorId()));
|
||||
|
||||
// Verify iteration values via getLoopIndex() (iteration maps to loopIndex in the seq-based path)
|
||||
assertThat(children.get(0).getLoopIndex()).isEqualTo(0);
|
||||
assertThat(children.get(1).getLoopIndex()).isEqualTo(1);
|
||||
assertThat(children.get(2).getLoopIndex()).isEqualTo(2);
|
||||
}
|
||||
}
|
||||
@@ -77,13 +77,15 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
|
||||
now, now.plusMillis(10), 10L, null, null,
|
||||
"input body", "output body", null, null, null,
|
||||
null, null, null, null, null,
|
||||
null, null, null, null, null, null, null, null),
|
||||
null, null, null, null, null, null, null, null,
|
||||
null, null, null, null, null, null),
|
||||
new ProcessorRecord("exec-proc", "proc-2", "to",
|
||||
"app-1", "route-a", 1, "proc-1", "COMPLETED",
|
||||
now.plusMillis(10), now.plusMillis(30), 20L, null, null,
|
||||
null, null, null, null, null,
|
||||
null, null, null, null, null,
|
||||
null, null, null, null, null, null, null, null)
|
||||
null, null, null, null, null, null, null, null,
|
||||
null, null, null, null, null, null)
|
||||
);
|
||||
executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors);
|
||||
|
||||
|
||||
@@ -64,6 +64,18 @@ public class DetailService {
|
||||
});
|
||||
}
|
||||
|
||||
public Optional<Map<String, String>> getProcessorSnapshotBySeq(String executionId, int seq) {
|
||||
return executionStore.findProcessorBySeq(executionId, seq)
|
||||
.map(p -> {
|
||||
Map<String, String> snapshot = new LinkedHashMap<>();
|
||||
if (p.inputBody() != null) snapshot.put("inputBody", p.inputBody());
|
||||
if (p.outputBody() != null) snapshot.put("outputBody", p.outputBody());
|
||||
if (p.inputHeaders() != null) snapshot.put("inputHeaders", p.inputHeaders());
|
||||
if (p.outputHeaders() != null) snapshot.put("outputHeaders", p.outputHeaders());
|
||||
return snapshot;
|
||||
});
|
||||
}
|
||||
|
||||
/** Parse the raw processor tree JSON stored alongside the execution. */
|
||||
private List<ProcessorNode> parseProcessorsJson(String json) {
|
||||
if (json == null || json.isBlank()) return null;
|
||||
@@ -104,12 +116,68 @@ public class DetailService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Fallback: reconstruct processor tree from flat records.
|
||||
* Note: this loses iteration context for processors with the same ID across iterations.
|
||||
* Reconstruct processor tree from flat records.
|
||||
* Detects whether records use the seq-based model (ClickHouse) or
|
||||
* processorId-based model (PostgreSQL) and delegates accordingly.
|
||||
*/
|
||||
List<ProcessorNode> buildTree(List<ProcessorRecord> processors) {
|
||||
if (processors.isEmpty()) return List.of();
|
||||
boolean hasSeq = processors.stream().anyMatch(p -> p.seq() != null);
|
||||
return hasSeq ? buildTreeBySeq(processors) : buildTreeByProcessorId(processors);
|
||||
}
|
||||
|
||||
/**
|
||||
* Seq-based tree reconstruction for ClickHouse flat processor model.
|
||||
* Uses seq/parentSeq linkage, correctly handling duplicate processorIds
|
||||
* across iterations (e.g., the same processor inside a split running N times).
|
||||
*/
|
||||
private List<ProcessorNode> buildTreeBySeq(List<ProcessorRecord> processors) {
|
||||
Map<Integer, ProcessorNode> nodeBySeq = new LinkedHashMap<>();
|
||||
|
||||
for (ProcessorRecord p : processors) {
|
||||
boolean hasTrace = p.inputBody() != null || p.outputBody() != null
|
||||
|| p.inputHeaders() != null || p.outputHeaders() != null;
|
||||
ProcessorNode node = new ProcessorNode(
|
||||
p.processorId(), p.processorType(), p.status(),
|
||||
p.startTime(), p.endTime(),
|
||||
p.durationMs() != null ? p.durationMs() : 0L,
|
||||
p.errorMessage(), p.errorStacktrace(),
|
||||
parseAttributes(p.attributes()),
|
||||
p.iteration(), p.iterationSize(),
|
||||
null, null, null,
|
||||
p.resolvedEndpointUri(),
|
||||
p.errorType(), p.errorCategory(),
|
||||
p.rootCauseType(), p.rootCauseMessage(),
|
||||
null, p.circuitBreakerState(),
|
||||
p.fallbackTriggered(),
|
||||
p.filterMatched(), p.duplicateMessage(),
|
||||
hasTrace
|
||||
);
|
||||
nodeBySeq.put(p.seq(), node);
|
||||
}
|
||||
|
||||
List<ProcessorNode> roots = new ArrayList<>();
|
||||
for (ProcessorRecord p : processors) {
|
||||
ProcessorNode node = nodeBySeq.get(p.seq());
|
||||
if (p.parentSeq() == null) {
|
||||
roots.add(node);
|
||||
} else {
|
||||
ProcessorNode parent = nodeBySeq.get(p.parentSeq());
|
||||
if (parent != null) {
|
||||
parent.addChild(node);
|
||||
} else {
|
||||
roots.add(node); // orphan safety
|
||||
}
|
||||
}
|
||||
}
|
||||
return roots;
|
||||
}
|
||||
|
||||
/**
|
||||
* ProcessorId-based tree reconstruction for PostgreSQL flat records.
|
||||
* Note: this loses iteration context for processors with the same ID across iterations.
|
||||
*/
|
||||
private List<ProcessorNode> buildTreeByProcessorId(List<ProcessorRecord> processors) {
|
||||
Map<String, ProcessorNode> nodeMap = new LinkedHashMap<>();
|
||||
for (ProcessorRecord p : processors) {
|
||||
boolean hasTrace = p.inputBody() != null || p.outputBody() != null
|
||||
|
||||
@@ -159,7 +159,8 @@ public class IngestionService {
|
||||
p.getErrorType(), p.getErrorCategory(),
|
||||
p.getRootCauseType(), p.getRootCauseMessage(),
|
||||
p.getErrorHandlerType(), p.getCircuitBreakerState(),
|
||||
p.getFallbackTriggered()
|
||||
p.getFallbackTriggered(),
|
||||
null, null, null, null, null, null
|
||||
));
|
||||
}
|
||||
return flat;
|
||||
|
||||
@@ -49,6 +49,17 @@ public interface ExecutionStore {
|
||||
String errorType, String errorCategory,
|
||||
String rootCauseType, String rootCauseMessage,
|
||||
String errorHandlerType, String circuitBreakerState,
|
||||
Boolean fallbackTriggered
|
||||
Boolean fallbackTriggered,
|
||||
// New fields for ClickHouse seq-based model
|
||||
Integer seq,
|
||||
Integer parentSeq,
|
||||
Integer iteration,
|
||||
Integer iterationSize,
|
||||
Boolean filterMatched,
|
||||
Boolean duplicateMessage
|
||||
) {}
|
||||
|
||||
default Optional<ProcessorRecord> findProcessorBySeq(String executionId, int seq) {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,8 @@ class TreeReconstructionTest {
|
||||
status, NOW, NOW, 10L,
|
||||
null, null, null, null, null, null, null,
|
||||
null, null, null, null, null,
|
||||
null, null, null, null, null, null, null, null
|
||||
null, null, null, null, null, null, null, null,
|
||||
null, null, null, null, null, null
|
||||
);
|
||||
}
|
||||
|
||||
@@ -108,4 +109,100 @@ class TreeReconstructionTest {
|
||||
List<ProcessorNode> roots = detailService.buildTree(List.of());
|
||||
assertThat(roots).isEmpty();
|
||||
}
|
||||
|
||||
// --- seq-based model tests (ClickHouse) ---
|
||||
|
||||
private ProcessorRecord procWithSeq(String id, String type, String status,
|
||||
int seq, Integer parentSeq,
|
||||
Integer iteration, Integer iterationSize) {
|
||||
return new ProcessorRecord(
|
||||
"exec-1", id, type, "app", "route1",
|
||||
0, null, status, NOW, NOW, 10L,
|
||||
null, null, null, null, null, null, null,
|
||||
null, null, null, null, null,
|
||||
null, null, null, null, null, null, null, null,
|
||||
seq, parentSeq, iteration, iterationSize, null, null
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void buildTree_seqBasedModel_linearChain() {
|
||||
List<ProcessorRecord> processors = List.of(
|
||||
procWithSeq("from", "from", "COMPLETED", 1, null, null, null),
|
||||
procWithSeq("log1", "log", "COMPLETED", 2, 1, null, null),
|
||||
procWithSeq("to1", "to", "COMPLETED", 3, 2, null, null)
|
||||
);
|
||||
|
||||
List<ProcessorNode> roots = detailService.buildTree(processors);
|
||||
|
||||
assertThat(roots).hasSize(1);
|
||||
ProcessorNode root = roots.get(0);
|
||||
assertThat(root.getProcessorId()).isEqualTo("from");
|
||||
assertThat(root.getChildren()).hasSize(1);
|
||||
|
||||
ProcessorNode child = root.getChildren().get(0);
|
||||
assertThat(child.getProcessorId()).isEqualTo("log1");
|
||||
assertThat(child.getChildren()).hasSize(1);
|
||||
|
||||
ProcessorNode grandchild = child.getChildren().get(0);
|
||||
assertThat(grandchild.getProcessorId()).isEqualTo("to1");
|
||||
assertThat(grandchild.getChildren()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void buildTree_seqBasedModel_sameProcessorIdMultipleIterations() {
|
||||
// A split processor (seq 1) with 3 child processors all having the SAME
|
||||
// processorId but different seq values — this is the key scenario that
|
||||
// breaks the old processorId-based approach.
|
||||
List<ProcessorRecord> processors = List.of(
|
||||
procWithSeq("split1", "split", "COMPLETED", 1, null, null, null),
|
||||
procWithSeq("log-inside", "log", "COMPLETED", 2, 1, 0, 3),
|
||||
procWithSeq("log-inside", "log", "COMPLETED", 3, 1, 1, 3),
|
||||
procWithSeq("log-inside", "log", "COMPLETED", 4, 1, 2, 3)
|
||||
);
|
||||
|
||||
List<ProcessorNode> roots = detailService.buildTree(processors);
|
||||
|
||||
assertThat(roots).hasSize(1);
|
||||
ProcessorNode split = roots.get(0);
|
||||
assertThat(split.getProcessorId()).isEqualTo("split1");
|
||||
assertThat(split.getChildren()).hasSize(3);
|
||||
|
||||
// All three children should have the same processorId
|
||||
for (ProcessorNode child : split.getChildren()) {
|
||||
assertThat(child.getProcessorId()).isEqualTo("log-inside");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void buildTree_seqBasedModel_orphanSafety() {
|
||||
// A processor whose parentSeq points to a non-existent seq
|
||||
List<ProcessorRecord> processors = List.of(
|
||||
procWithSeq("root", "from", "COMPLETED", 1, null, null, null),
|
||||
procWithSeq("orphan", "log", "COMPLETED", 2, 999, null, null)
|
||||
);
|
||||
|
||||
List<ProcessorNode> roots = detailService.buildTree(processors);
|
||||
|
||||
// Both should be roots — the orphan falls through to root list
|
||||
assertThat(roots).hasSize(2);
|
||||
assertThat(roots.get(0).getProcessorId()).isEqualTo("root");
|
||||
assertThat(roots.get(1).getProcessorId()).isEqualTo("orphan");
|
||||
}
|
||||
|
||||
@Test
|
||||
void buildTree_seqBasedModel_iterationFields() {
|
||||
// Verify iteration/iterationSize are populated as loopIndex/loopSize
|
||||
List<ProcessorRecord> processors = List.of(
|
||||
procWithSeq("loop1", "loop", "COMPLETED", 1, null, null, null),
|
||||
procWithSeq("body", "log", "COMPLETED", 2, 1, 5, 10)
|
||||
);
|
||||
|
||||
List<ProcessorNode> roots = detailService.buildTree(processors);
|
||||
|
||||
assertThat(roots).hasSize(1);
|
||||
ProcessorNode child = roots.get(0).getChildren().get(0);
|
||||
assertThat(child.getLoopIndex()).isEqualTo(5);
|
||||
assertThat(child.getLoopSize()).isEqualTo(10);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,6 +101,8 @@ spec:
|
||||
value: "clickhouse"
|
||||
- name: CAMELEER_STORAGE_LOGS
|
||||
value: "clickhouse"
|
||||
- name: CAMELEER_STORAGE_EXECUTIONS
|
||||
value: "clickhouse"
|
||||
|
||||
resources:
|
||||
requests:
|
||||
|
||||
Reference in New Issue
Block a user