Compare commits

5 Commits

Author SHA1 Message Date
hsiegeln
520181d241 test(clickhouse): add integration tests for execution read path and tree reconstruction
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m7s
CI / docker (push) Successful in 46s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Failing after 2m16s
SonarQube / sonarqube (push) Failing after 2m21s
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 00:11:44 +02:00
hsiegeln
95b9dea5c4 feat(clickhouse): wire ClickHouseExecutionStore as active ExecutionStore
Add cameleer.storage.executions feature flag (default: clickhouse).
PostgresExecutionStore activates only when explicitly set to postgres.
Add by-seq snapshot endpoint for iteration-aware processor lookup.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 00:09:14 +02:00
hsiegeln
151b96a680 feat: seq-based tree reconstruction for ClickHouse flat processor model
Dual-mode buildTree: detects seq presence and uses seq/parentSeq linkage
instead of processorId map. Handles duplicate processorIds across
iterations correctly. Old processorId-based mode kept for PG compat.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 00:07:20 +02:00
hsiegeln
0661fd995f feat(clickhouse): add read methods to ClickHouseExecutionStore
Implements ExecutionStore interface with findById (FINAL for
ReplacingMergeTree), findProcessors (ORDER BY seq), findProcessorById,
and findProcessorBySeq. Write methods unchanged.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 00:04:03 +02:00
hsiegeln
190ae2797d refactor: extend ProcessorRecord with seq/iteration fields for ClickHouse model
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 00:02:03 +02:00
12 changed files with 664 additions and 11 deletions

View File

@@ -6,6 +6,7 @@ import com.cameleer3.server.app.storage.ClickHouseDiagramStore;
import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore;
import com.cameleer3.server.app.storage.ClickHouseMetricsStore;
import com.cameleer3.server.app.storage.ClickHouseStatsStore;
import com.cameleer3.server.app.storage.PostgresExecutionStore;
import com.cameleer3.server.app.storage.PostgresMetricsQueryStore;
import com.cameleer3.server.app.storage.PostgresMetricsStore;
import com.cameleer3.server.core.admin.AuditRepository;
@@ -96,6 +97,18 @@ public class StorageBeanConfig {
return new ClickHouseExecutionStore(clickHouseJdbc);
}
@Bean
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true)
public ExecutionStore executionStoreClickHouse(ClickHouseExecutionStore chStore) {
return chStore; // Same instance, also exposed as ExecutionStore
}
@Bean
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "postgres")
public ExecutionStore executionStorePostgres(JdbcTemplate jdbc) {
return new PostgresExecutionStore(jdbc);
}
@Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public ChunkAccumulator chunkAccumulator(

View File

@@ -81,4 +81,16 @@ public class DetailController {
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
@GetMapping("/{executionId}/processors/by-seq/{seq}/snapshot")
@Operation(summary = "Get exchange snapshot for a processor by seq number")
@ApiResponse(responseCode = "200", description = "Snapshot data")
@ApiResponse(responseCode = "404", description = "Snapshot not found")
public ResponseEntity<Map<String, String>> processorSnapshotBySeq(
@PathVariable String executionId,
@PathVariable int seq) {
return detailService.getProcessorSnapshotBySeq(executionId, seq)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
}

View File

@@ -1,17 +1,21 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.common.model.FlatProcessorRecord;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class ClickHouseExecutionStore {
public class ClickHouseExecutionStore implements ExecutionStore {
private final JdbcTemplate jdbc;
private final ObjectMapper objectMapper;
@@ -128,6 +132,187 @@ public class ClickHouseExecutionStore {
}).toList());
}
// --- ExecutionStore interface: read methods ---
@Override
public Optional<ExecutionRecord> findById(String executionId) {
List<ExecutionRecord> results = jdbc.query("""
SELECT execution_id, route_id, agent_id, application_name, status,
correlation_id, exchange_id, start_time, end_time, duration_ms,
error_message, error_stacktrace, diagram_content_hash, engine_level,
input_body, output_body, input_headers, output_headers, attributes,
error_type, error_category, root_cause_type, root_cause_message,
trace_id, span_id, has_trace_data, is_replay
FROM executions FINAL
WHERE tenant_id = 'default' AND execution_id = ?
LIMIT 1
""",
(rs, rowNum) -> mapExecutionRecord(rs),
executionId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public List<ProcessorRecord> findProcessors(String executionId) {
return jdbc.query("""
SELECT execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time, route_id, application_name,
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
FROM processor_executions
WHERE tenant_id = 'default' AND execution_id = ?
ORDER BY seq
""",
(rs, rowNum) -> mapProcessorRecord(rs),
executionId);
}
@Override
public Optional<ProcessorRecord> findProcessorById(String executionId, String processorId) {
List<ProcessorRecord> results = jdbc.query("""
SELECT execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time, route_id, application_name,
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
FROM processor_executions
WHERE tenant_id = 'default' AND execution_id = ? AND processor_id = ?
LIMIT 1
""",
(rs, rowNum) -> mapProcessorRecord(rs),
executionId, processorId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public Optional<ProcessorRecord> findProcessorBySeq(String executionId, int seq) {
List<ProcessorRecord> results = jdbc.query("""
SELECT execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time, route_id, application_name,
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
FROM processor_executions
WHERE tenant_id = 'default' AND execution_id = ? AND seq = ?
LIMIT 1
""",
(rs, rowNum) -> mapProcessorRecord(rs),
executionId, seq);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
// --- ExecutionStore interface: write methods (unsupported, use chunked pipeline) ---
@Override
public void upsert(ExecutionRecord execution) {
throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline");
}
@Override
public void upsertProcessors(String executionId, Instant startTime,
String applicationName, String routeId,
List<ProcessorRecord> processors) {
throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline");
}
// --- Row mappers ---
private static ExecutionRecord mapExecutionRecord(ResultSet rs) throws SQLException {
return new ExecutionRecord(
emptyToNull(rs.getString("execution_id")),
emptyToNull(rs.getString("route_id")),
emptyToNull(rs.getString("agent_id")),
emptyToNull(rs.getString("application_name")),
emptyToNull(rs.getString("status")),
emptyToNull(rs.getString("correlation_id")),
emptyToNull(rs.getString("exchange_id")),
toInstant(rs, "start_time"),
toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
emptyToNull(rs.getString("error_message")),
emptyToNull(rs.getString("error_stacktrace")),
emptyToNull(rs.getString("diagram_content_hash")),
emptyToNull(rs.getString("engine_level")),
emptyToNull(rs.getString("input_body")),
emptyToNull(rs.getString("output_body")),
emptyToNull(rs.getString("input_headers")),
emptyToNull(rs.getString("output_headers")),
emptyToNull(rs.getString("attributes")),
emptyToNull(rs.getString("error_type")),
emptyToNull(rs.getString("error_category")),
emptyToNull(rs.getString("root_cause_type")),
emptyToNull(rs.getString("root_cause_message")),
emptyToNull(rs.getString("trace_id")),
emptyToNull(rs.getString("span_id")),
null, // processorsJson not stored in ClickHouse
rs.getBoolean("has_trace_data"),
rs.getBoolean("is_replay")
);
}
private static ProcessorRecord mapProcessorRecord(ResultSet rs) throws SQLException {
return new ProcessorRecord(
emptyToNull(rs.getString("execution_id")),
emptyToNull(rs.getString("processor_id")),
emptyToNull(rs.getString("processor_type")),
emptyToNull(rs.getString("application_name")),
emptyToNull(rs.getString("route_id")),
0, // depth not stored in ClickHouse
emptyToNull(rs.getString("parent_processor_id")),
emptyToNull(rs.getString("status")),
toInstant(rs, "start_time"),
toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
emptyToNull(rs.getString("error_message")),
emptyToNull(rs.getString("error_stacktrace")),
emptyToNull(rs.getString("input_body")),
emptyToNull(rs.getString("output_body")),
emptyToNull(rs.getString("input_headers")),
emptyToNull(rs.getString("output_headers")),
emptyToNull(rs.getString("attributes")),
null, // loopIndex
null, // loopSize
null, // splitIndex
null, // splitSize
null, // multicastIndex
emptyToNull(rs.getString("resolved_endpoint_uri")),
emptyToNull(rs.getString("error_type")),
emptyToNull(rs.getString("error_category")),
emptyToNull(rs.getString("root_cause_type")),
emptyToNull(rs.getString("root_cause_message")),
null, // errorHandlerType
emptyToNull(rs.getString("circuit_breaker_state")),
rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null,
rs.getObject("seq") != null ? rs.getInt("seq") : null,
rs.getObject("parent_seq") != null ? rs.getInt("parent_seq") : null,
rs.getObject("iteration") != null ? rs.getInt("iteration") : null,
rs.getObject("iteration_size") != null ? rs.getInt("iteration_size") : null,
rs.getObject("filter_matched") != null ? rs.getBoolean("filter_matched") : null,
rs.getObject("duplicate_message") != null ? rs.getBoolean("duplicate_message") : null
);
}
// --- Helpers ---
private static String emptyToNull(String value) {
return (value == null || value.isEmpty()) ? null : value;
}
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
Timestamp ts = rs.getTimestamp(column);
return ts != null ? ts.toInstant() : null;
}
private static String nullToEmpty(String value) {
return value != null ? value : "";
}

View File

@@ -3,7 +3,6 @@ package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.storage.ExecutionStore;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -12,7 +11,6 @@ import java.time.Instant;
import java.util.List;
import java.util.Optional;
@Repository
public class PostgresExecutionStore implements ExecutionStore {
private final JdbcTemplate jdbc;
@@ -206,7 +204,8 @@ public class PostgresExecutionStore implements ExecutionStore {
rs.getString("error_type"), rs.getString("error_category"),
rs.getString("root_cause_type"), rs.getString("root_cause_message"),
rs.getString("error_handler_type"), rs.getString("circuit_breaker_state"),
rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null);
rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null,
null, null, null, null, null, null);
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
Timestamp ts = rs.getTimestamp(column);

View File

@@ -55,6 +55,7 @@ cameleer:
diagrams: ${CAMELEER_STORAGE_DIAGRAMS:clickhouse}
events: ${CAMELEER_STORAGE_EVENTS:clickhouse}
logs: ${CAMELEER_STORAGE_LOGS:clickhouse}
executions: ${CAMELEER_STORAGE_EXECUTIONS:clickhouse}
security:
access-token-expiry-ms: 3600000

View File

@@ -0,0 +1,262 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.common.model.ExecutionStatus;
import com.cameleer3.common.model.FlatProcessorRecord;
import com.cameleer3.server.core.detail.DetailService;
import com.cameleer3.server.core.detail.ExecutionDetail;
import com.cameleer3.server.core.detail.ProcessorNode;
import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class ClickHouseExecutionReadIT {
@Container
static final ClickHouseContainer clickhouse =
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
private JdbcTemplate jdbc;
private ClickHouseExecutionStore store;
private DetailService detailService;
@BeforeEach
void setUp() throws Exception {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(clickhouse.getJdbcUrl());
ds.setUsername(clickhouse.getUsername());
ds.setPassword(clickhouse.getPassword());
jdbc = new JdbcTemplate(ds);
// Load DDL for both tables
String execDdl = new ClassPathResource("clickhouse/V2__executions.sql")
.getContentAsString(StandardCharsets.UTF_8);
String procDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql")
.getContentAsString(StandardCharsets.UTF_8);
// Also load V5 for replay fields
String replayDdl = new ClassPathResource("clickhouse/V5__replay_fields.sql")
.getContentAsString(StandardCharsets.UTF_8);
jdbc.execute(execDdl);
jdbc.execute(procDdl);
// V5 has ALTER TABLE statements — execute each separately
for (String stmt : replayDdl.split(";")) {
String trimmed = stmt.trim();
if (!trimmed.isEmpty()) jdbc.execute(trimmed);
}
jdbc.execute("TRUNCATE TABLE executions");
jdbc.execute("TRUNCATE TABLE processor_executions");
store = new ClickHouseExecutionStore(jdbc);
detailService = new DetailService(store);
}
// --- Helper factory methods ---
private MergedExecution minimalExecution(String executionId) {
return new MergedExecution(
"default", 1L, executionId, "route-a", "agent-1", "my-app",
"COMPLETED", "corr-1", "exchange-1",
Instant.parse("2026-04-01T10:00:00Z"),
Instant.parse("2026-04-01T10:00:01Z"),
1000L,
"", "", "", "", "", "",
"", "REGULAR",
"", "", "", "", "{}",
"", "",
false, false,
null, null
);
}
private FlatProcessorRecord processor(int seq, String processorId, String processorType) {
FlatProcessorRecord p = new FlatProcessorRecord(seq, processorId, processorType);
p.setStatus(ExecutionStatus.COMPLETED);
p.setStartTime(Instant.parse("2026-04-01T10:00:00Z"));
p.setDurationMs(10L);
return p;
}
// --- Tests ---
@Test
void findById_returnsInsertedExecution() {
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
Optional<com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord> result =
store.findById("exec-1");
assertThat(result).isPresent();
assertThat(result.get().executionId()).isEqualTo("exec-1");
assertThat(result.get().routeId()).isEqualTo("route-a");
assertThat(result.get().status()).isEqualTo("COMPLETED");
assertThat(result.get().agentId()).isEqualTo("agent-1");
assertThat(result.get().applicationName()).isEqualTo("my-app");
assertThat(result.get().processorsJson()).isNull();
}
@Test
void findById_notFound_returnsEmpty() {
Optional<com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord> result =
store.findById("nonexistent");
assertThat(result).isEmpty();
}
@Test
void findProcessors_returnsOrderedBySeq() {
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
FlatProcessorRecord p1 = processor(1, "log-1", "log");
FlatProcessorRecord p2 = processor(2, "transform-1", "setBody");
FlatProcessorRecord p3 = processor(3, "to-1", "to");
p2.setParentSeq(1);
p2.setParentProcessorId("log-1");
p3.setParentSeq(1);
p3.setParentProcessorId("log-1");
store.insertProcessorBatch(
"default", "exec-1", "route-a", "my-app",
Instant.parse("2026-04-01T10:00:00Z"),
List.of(p1, p2, p3));
List<ProcessorRecord> records = store.findProcessors("exec-1");
assertThat(records).hasSize(3);
assertThat(records.get(0).seq()).isEqualTo(1);
assertThat(records.get(0).processorId()).isEqualTo("log-1");
assertThat(records.get(1).seq()).isEqualTo(2);
assertThat(records.get(1).processorId()).isEqualTo("transform-1");
assertThat(records.get(1).parentSeq()).isEqualTo(1);
assertThat(records.get(2).seq()).isEqualTo(3);
assertThat(records.get(2).processorId()).isEqualTo("to-1");
assertThat(records.get(2).parentSeq()).isEqualTo(1);
}
@Test
void findProcessorBySeq_returnsCorrectRecord() {
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
FlatProcessorRecord p1 = processor(1, "log-1", "log");
FlatProcessorRecord p2 = processor(2, "to-1", "to");
FlatProcessorRecord p3 = processor(3, "log-2", "log");
store.insertProcessorBatch(
"default", "exec-1", "route-a", "my-app",
Instant.parse("2026-04-01T10:00:00Z"),
List.of(p1, p2, p3));
Optional<ProcessorRecord> result = store.findProcessorBySeq("exec-1", 2);
assertThat(result).isPresent();
assertThat(result.get().seq()).isEqualTo(2);
assertThat(result.get().processorId()).isEqualTo("to-1");
assertThat(result.get().processorType()).isEqualTo("to");
}
@Test
void findProcessorById_returnsFirstOccurrence() {
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
// Three processors with the same processorId (iteration scenario)
FlatProcessorRecord iter0 = processor(1, "to-1", "to");
iter0.setIteration(0);
FlatProcessorRecord iter1 = processor(2, "to-1", "to");
iter1.setIteration(1);
FlatProcessorRecord iter2 = processor(3, "to-1", "to");
iter2.setIteration(2);
store.insertProcessorBatch(
"default", "exec-1", "route-a", "my-app",
Instant.parse("2026-04-01T10:00:00Z"),
List.of(iter0, iter1, iter2));
Optional<ProcessorRecord> result = store.findProcessorById("exec-1", "to-1");
assertThat(result).isPresent();
// ClickHouse LIMIT 1 returns one record — verify it has the correct processorId
assertThat(result.get().processorId()).isEqualTo("to-1");
// The returned record should have the lowest seq (first occurrence)
assertThat(result.get().seq()).isEqualTo(1);
}
@Test
void detailService_buildTree_withIterations() {
// Insert an execution
store.insertExecutionBatch(List.of(minimalExecution("exec-1")));
// Build a split scenario:
// seq=1: log-1 (root)
// seq=2: split-1 (root)
// seq=3: to-1 (child of split-1, iteration=0)
// seq=4: to-1 (child of split-1, iteration=1)
// seq=5: to-1 (child of split-1, iteration=2)
// seq=6: log-2 (root)
FlatProcessorRecord log1 = processor(1, "log-1", "log");
FlatProcessorRecord split1 = processor(2, "split-1", "split");
split1.setIterationSize(3);
FlatProcessorRecord to1iter0 = processor(3, "to-1", "to");
to1iter0.setParentSeq(2);
to1iter0.setParentProcessorId("split-1");
to1iter0.setIteration(0);
FlatProcessorRecord to1iter1 = processor(4, "to-1", "to");
to1iter1.setParentSeq(2);
to1iter1.setParentProcessorId("split-1");
to1iter1.setIteration(1);
FlatProcessorRecord to1iter2 = processor(5, "to-1", "to");
to1iter2.setParentSeq(2);
to1iter2.setParentProcessorId("split-1");
to1iter2.setIteration(2);
FlatProcessorRecord log2 = processor(6, "log-2", "log");
store.insertProcessorBatch(
"default", "exec-1", "route-a", "my-app",
Instant.parse("2026-04-01T10:00:00Z"),
List.of(log1, split1, to1iter0, to1iter1, to1iter2, log2));
// Invoke DetailService
Optional<ExecutionDetail> detail = detailService.getDetail("exec-1");
assertThat(detail).isPresent();
List<ProcessorNode> roots = detail.get().processors();
assertThat(roots).hasSize(3);
assertThat(roots.get(0).getProcessorId()).isEqualTo("log-1");
assertThat(roots.get(1).getProcessorId()).isEqualTo("split-1");
assertThat(roots.get(2).getProcessorId()).isEqualTo("log-2");
// Verify split-1 has 3 children (all with processorId "to-1")
ProcessorNode splitNode = roots.get(1);
List<ProcessorNode> children = splitNode.getChildren();
assertThat(children).hasSize(3);
assertThat(children).allMatch(c -> "to-1".equals(c.getProcessorId()));
// Verify iteration values via getLoopIndex() (iteration maps to loopIndex in the seq-based path)
assertThat(children.get(0).getLoopIndex()).isEqualTo(0);
assertThat(children.get(1).getLoopIndex()).isEqualTo(1);
assertThat(children.get(2).getLoopIndex()).isEqualTo(2);
}
}

View File

@@ -77,13 +77,15 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
now, now.plusMillis(10), 10L, null, null,
"input body", "output body", null, null, null,
null, null, null, null, null,
null, null, null, null, null, null, null, null),
null, null, null, null, null, null, null, null,
null, null, null, null, null, null),
new ProcessorRecord("exec-proc", "proc-2", "to",
"app-1", "route-a", 1, "proc-1", "COMPLETED",
now.plusMillis(10), now.plusMillis(30), 20L, null, null,
null, null, null, null, null,
null, null, null, null, null,
null, null, null, null, null, null, null, null)
null, null, null, null, null, null, null, null,
null, null, null, null, null, null)
);
executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors);

View File

@@ -64,6 +64,18 @@ public class DetailService {
});
}
public Optional<Map<String, String>> getProcessorSnapshotBySeq(String executionId, int seq) {
return executionStore.findProcessorBySeq(executionId, seq)
.map(p -> {
Map<String, String> snapshot = new LinkedHashMap<>();
if (p.inputBody() != null) snapshot.put("inputBody", p.inputBody());
if (p.outputBody() != null) snapshot.put("outputBody", p.outputBody());
if (p.inputHeaders() != null) snapshot.put("inputHeaders", p.inputHeaders());
if (p.outputHeaders() != null) snapshot.put("outputHeaders", p.outputHeaders());
return snapshot;
});
}
/** Parse the raw processor tree JSON stored alongside the execution. */
private List<ProcessorNode> parseProcessorsJson(String json) {
if (json == null || json.isBlank()) return null;
@@ -104,12 +116,68 @@ public class DetailService {
}
/**
* Fallback: reconstruct processor tree from flat records.
* Note: this loses iteration context for processors with the same ID across iterations.
* Reconstruct processor tree from flat records.
* Detects whether records use the seq-based model (ClickHouse) or
* processorId-based model (PostgreSQL) and delegates accordingly.
*/
List<ProcessorNode> buildTree(List<ProcessorRecord> processors) {
if (processors.isEmpty()) return List.of();
boolean hasSeq = processors.stream().anyMatch(p -> p.seq() != null);
return hasSeq ? buildTreeBySeq(processors) : buildTreeByProcessorId(processors);
}
/**
* Seq-based tree reconstruction for ClickHouse flat processor model.
* Uses seq/parentSeq linkage, correctly handling duplicate processorIds
* across iterations (e.g., the same processor inside a split running N times).
*/
private List<ProcessorNode> buildTreeBySeq(List<ProcessorRecord> processors) {
Map<Integer, ProcessorNode> nodeBySeq = new LinkedHashMap<>();
for (ProcessorRecord p : processors) {
boolean hasTrace = p.inputBody() != null || p.outputBody() != null
|| p.inputHeaders() != null || p.outputHeaders() != null;
ProcessorNode node = new ProcessorNode(
p.processorId(), p.processorType(), p.status(),
p.startTime(), p.endTime(),
p.durationMs() != null ? p.durationMs() : 0L,
p.errorMessage(), p.errorStacktrace(),
parseAttributes(p.attributes()),
p.iteration(), p.iterationSize(),
null, null, null,
p.resolvedEndpointUri(),
p.errorType(), p.errorCategory(),
p.rootCauseType(), p.rootCauseMessage(),
null, p.circuitBreakerState(),
p.fallbackTriggered(),
p.filterMatched(), p.duplicateMessage(),
hasTrace
);
nodeBySeq.put(p.seq(), node);
}
List<ProcessorNode> roots = new ArrayList<>();
for (ProcessorRecord p : processors) {
ProcessorNode node = nodeBySeq.get(p.seq());
if (p.parentSeq() == null) {
roots.add(node);
} else {
ProcessorNode parent = nodeBySeq.get(p.parentSeq());
if (parent != null) {
parent.addChild(node);
} else {
roots.add(node); // orphan safety
}
}
}
return roots;
}
/**
* ProcessorId-based tree reconstruction for PostgreSQL flat records.
* Note: this loses iteration context for processors with the same ID across iterations.
*/
private List<ProcessorNode> buildTreeByProcessorId(List<ProcessorRecord> processors) {
Map<String, ProcessorNode> nodeMap = new LinkedHashMap<>();
for (ProcessorRecord p : processors) {
boolean hasTrace = p.inputBody() != null || p.outputBody() != null

View File

@@ -159,7 +159,8 @@ public class IngestionService {
p.getErrorType(), p.getErrorCategory(),
p.getRootCauseType(), p.getRootCauseMessage(),
p.getErrorHandlerType(), p.getCircuitBreakerState(),
p.getFallbackTriggered()
p.getFallbackTriggered(),
null, null, null, null, null, null
));
}
return flat;

View File

@@ -49,6 +49,17 @@ public interface ExecutionStore {
String errorType, String errorCategory,
String rootCauseType, String rootCauseMessage,
String errorHandlerType, String circuitBreakerState,
Boolean fallbackTriggered
Boolean fallbackTriggered,
// New fields for ClickHouse seq-based model
Integer seq,
Integer parentSeq,
Integer iteration,
Integer iterationSize,
Boolean filterMatched,
Boolean duplicateMessage
) {}
default Optional<ProcessorRecord> findProcessorBySeq(String executionId, int seq) {
return Optional.empty();
}
}

View File

@@ -29,7 +29,8 @@ class TreeReconstructionTest {
status, NOW, NOW, 10L,
null, null, null, null, null, null, null,
null, null, null, null, null,
null, null, null, null, null, null, null, null
null, null, null, null, null, null, null, null,
null, null, null, null, null, null
);
}
@@ -108,4 +109,100 @@ class TreeReconstructionTest {
List<ProcessorNode> roots = detailService.buildTree(List.of());
assertThat(roots).isEmpty();
}
// --- seq-based model tests (ClickHouse) ---
private ProcessorRecord procWithSeq(String id, String type, String status,
int seq, Integer parentSeq,
Integer iteration, Integer iterationSize) {
return new ProcessorRecord(
"exec-1", id, type, "app", "route1",
0, null, status, NOW, NOW, 10L,
null, null, null, null, null, null, null,
null, null, null, null, null,
null, null, null, null, null, null, null, null,
seq, parentSeq, iteration, iterationSize, null, null
);
}
@Test
void buildTree_seqBasedModel_linearChain() {
List<ProcessorRecord> processors = List.of(
procWithSeq("from", "from", "COMPLETED", 1, null, null, null),
procWithSeq("log1", "log", "COMPLETED", 2, 1, null, null),
procWithSeq("to1", "to", "COMPLETED", 3, 2, null, null)
);
List<ProcessorNode> roots = detailService.buildTree(processors);
assertThat(roots).hasSize(1);
ProcessorNode root = roots.get(0);
assertThat(root.getProcessorId()).isEqualTo("from");
assertThat(root.getChildren()).hasSize(1);
ProcessorNode child = root.getChildren().get(0);
assertThat(child.getProcessorId()).isEqualTo("log1");
assertThat(child.getChildren()).hasSize(1);
ProcessorNode grandchild = child.getChildren().get(0);
assertThat(grandchild.getProcessorId()).isEqualTo("to1");
assertThat(grandchild.getChildren()).isEmpty();
}
@Test
void buildTree_seqBasedModel_sameProcessorIdMultipleIterations() {
// A split processor (seq 1) with 3 child processors all having the SAME
// processorId but different seq values — this is the key scenario that
// breaks the old processorId-based approach.
List<ProcessorRecord> processors = List.of(
procWithSeq("split1", "split", "COMPLETED", 1, null, null, null),
procWithSeq("log-inside", "log", "COMPLETED", 2, 1, 0, 3),
procWithSeq("log-inside", "log", "COMPLETED", 3, 1, 1, 3),
procWithSeq("log-inside", "log", "COMPLETED", 4, 1, 2, 3)
);
List<ProcessorNode> roots = detailService.buildTree(processors);
assertThat(roots).hasSize(1);
ProcessorNode split = roots.get(0);
assertThat(split.getProcessorId()).isEqualTo("split1");
assertThat(split.getChildren()).hasSize(3);
// All three children should have the same processorId
for (ProcessorNode child : split.getChildren()) {
assertThat(child.getProcessorId()).isEqualTo("log-inside");
}
}
@Test
void buildTree_seqBasedModel_orphanSafety() {
// A processor whose parentSeq points to a non-existent seq
List<ProcessorRecord> processors = List.of(
procWithSeq("root", "from", "COMPLETED", 1, null, null, null),
procWithSeq("orphan", "log", "COMPLETED", 2, 999, null, null)
);
List<ProcessorNode> roots = detailService.buildTree(processors);
// Both should be roots — the orphan falls through to root list
assertThat(roots).hasSize(2);
assertThat(roots.get(0).getProcessorId()).isEqualTo("root");
assertThat(roots.get(1).getProcessorId()).isEqualTo("orphan");
}
@Test
void buildTree_seqBasedModel_iterationFields() {
// Verify iteration/iterationSize are populated as loopIndex/loopSize
List<ProcessorRecord> processors = List.of(
procWithSeq("loop1", "loop", "COMPLETED", 1, null, null, null),
procWithSeq("body", "log", "COMPLETED", 2, 1, 5, 10)
);
List<ProcessorNode> roots = detailService.buildTree(processors);
assertThat(roots).hasSize(1);
ProcessorNode child = roots.get(0).getChildren().get(0);
assertThat(child.getLoopIndex()).isEqualTo(5);
assertThat(child.getLoopSize()).isEqualTo(10);
}
}

View File

@@ -101,6 +101,8 @@ spec:
value: "clickhouse"
- name: CAMELEER_STORAGE_LOGS
value: "clickhouse"
- name: CAMELEER_STORAGE_EXECUTIONS
value: "clickhouse"
resources:
requests: