feat(clickhouse): add ClickHouseExecutionStore with batch insert for chunked format

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-31 19:07:33 +02:00
parent b30dfa39f4
commit 81f7f8afe1
3 changed files with 421 additions and 0 deletions

View File

@@ -0,0 +1,151 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.storage.model.FlatProcessorRecord;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
public class ClickHouseExecutionStore {
private final JdbcTemplate jdbc;
private final ObjectMapper objectMapper;
public ClickHouseExecutionStore(JdbcTemplate jdbc) {
this(jdbc, new ObjectMapper());
}
public ClickHouseExecutionStore(JdbcTemplate jdbc, ObjectMapper objectMapper) {
this.jdbc = jdbc;
this.objectMapper = objectMapper;
}
public void insertExecutionBatch(List<MergedExecution> executions) {
if (executions.isEmpty()) return;
jdbc.batchUpdate("""
INSERT INTO executions (
tenant_id, _version, execution_id, route_id, agent_id, application_name,
status, correlation_id, exchange_id, start_time, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message, diagram_content_hash, engine_level,
input_body, output_body, input_headers, output_headers, attributes,
trace_id, span_id, has_trace_data, is_replay
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
executions.stream().map(e -> new Object[]{
nullToEmpty(e.tenantId()),
e.version(),
nullToEmpty(e.executionId()),
nullToEmpty(e.routeId()),
nullToEmpty(e.agentId()),
nullToEmpty(e.applicationName()),
nullToEmpty(e.status()),
nullToEmpty(e.correlationId()),
nullToEmpty(e.exchangeId()),
Timestamp.from(e.startTime()),
e.endTime() != null ? Timestamp.from(e.endTime()) : null,
e.durationMs(),
nullToEmpty(e.errorMessage()),
nullToEmpty(e.errorStacktrace()),
nullToEmpty(e.errorType()),
nullToEmpty(e.errorCategory()),
nullToEmpty(e.rootCauseType()),
nullToEmpty(e.rootCauseMessage()),
nullToEmpty(e.diagramContentHash()),
nullToEmpty(e.engineLevel()),
nullToEmpty(e.inputBody()),
nullToEmpty(e.outputBody()),
nullToEmpty(e.inputHeaders()),
nullToEmpty(e.outputHeaders()),
nullToEmpty(e.attributes()),
nullToEmpty(e.traceId()),
nullToEmpty(e.spanId()),
e.hasTraceData(),
e.isReplay()
}).toList());
}
public void insertProcessorBatch(String tenantId, String executionId, String routeId,
String applicationName, Instant execStartTime,
List<FlatProcessorRecord> processors) {
if (processors.isEmpty()) return;
jdbc.batchUpdate("""
INSERT INTO processor_executions (
tenant_id, execution_id, seq, parent_seq, parent_processor_id,
processor_id, processor_type, start_time, route_id, application_name,
iteration, iteration_size, status, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message,
input_body, output_body, input_headers, output_headers, attributes,
resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
processors.stream().map(p -> new Object[]{
nullToEmpty(tenantId),
nullToEmpty(executionId),
p.seq(),
p.parentSeq(),
nullToEmpty(p.parentProcessorId()),
nullToEmpty(p.processorId()),
nullToEmpty(p.processorType()),
Timestamp.from(p.startTime() != null ? p.startTime() : execStartTime),
nullToEmpty(routeId),
nullToEmpty(applicationName),
p.iteration(),
p.iterationSize(),
nullToEmpty(p.status()),
computeEndTime(p.startTime(), p.durationMs()),
p.durationMs(),
nullToEmpty(p.errorMessage()),
nullToEmpty(p.errorStackTrace()),
nullToEmpty(p.errorType()),
nullToEmpty(p.errorCategory()),
nullToEmpty(p.rootCauseType()),
nullToEmpty(p.rootCauseMessage()),
nullToEmpty(p.inputBody()),
nullToEmpty(p.outputBody()),
mapToJson(p.inputHeaders()),
mapToJson(p.outputHeaders()),
mapToJson(p.attributes()),
nullToEmpty(p.resolvedEndpointUri()),
nullToEmpty(p.circuitBreakerState()),
boolOrFalse(p.fallbackTriggered()),
boolOrFalse(p.filterMatched()),
boolOrFalse(p.duplicateMessage())
}).toList());
}
private static String nullToEmpty(String value) {
return value != null ? value : "";
}
private static boolean boolOrFalse(Boolean value) {
return value != null && value;
}
private static Timestamp computeEndTime(Instant startTime, long durationMs) {
if (startTime != null && durationMs > 0) {
return Timestamp.from(startTime.plusMillis(durationMs));
}
return null;
}
private String mapToJson(Map<String, String> map) {
if (map == null || map.isEmpty()) return "";
try {
return objectMapper.writeValueAsString(map);
} catch (JsonProcessingException e) {
return "";
}
}
}

View File

@@ -0,0 +1,231 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.storage.model.FlatProcessorRecord;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class ClickHouseExecutionStoreIT {
@Container
static final ClickHouseContainer clickhouse =
new ClickHouseContainer("clickhouse/clickhouse-server:24.12");
private JdbcTemplate jdbc;
private ClickHouseExecutionStore store;
@BeforeEach
void setUp() throws Exception {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(clickhouse.getJdbcUrl());
ds.setUsername(clickhouse.getUsername());
ds.setPassword(clickhouse.getPassword());
jdbc = new JdbcTemplate(ds);
// Load DDL from classpath resources
String executionsDdl = new ClassPathResource("clickhouse/V2__executions.sql")
.getContentAsString(StandardCharsets.UTF_8);
String processorsDdl = new ClassPathResource("clickhouse/V3__processor_executions.sql")
.getContentAsString(StandardCharsets.UTF_8);
jdbc.execute(executionsDdl);
jdbc.execute(processorsDdl);
jdbc.execute("TRUNCATE TABLE executions");
jdbc.execute("TRUNCATE TABLE processor_executions");
store = new ClickHouseExecutionStore(jdbc);
}
@Test
void insertExecutionBatch_writesToClickHouse() {
MergedExecution exec = new MergedExecution(
"default", 1L, "exec-1", "route-a", "agent-1", "my-app",
"COMPLETED", "corr-1", "exchange-1",
Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:01Z"),
1000L,
"some error", "stack trace", "IOException", "IO",
"FileNotFoundException", "file not found",
"hash-abc", "FULL",
"{\"key\":\"val\"}", "{\"out\":\"val\"}",
"{\"h1\":\"v1\"}", "{\"h2\":\"v2\"}",
"{\"attr\":\"val\"}",
"trace-123", "span-456",
true, false
);
store.insertExecutionBatch(List.of(exec));
Integer count = jdbc.queryForObject(
"SELECT count() FROM executions WHERE execution_id = 'exec-1'",
Integer.class);
assertThat(count).isEqualTo(1);
}
@Test
void insertProcessorBatch_writesToClickHouse() {
FlatProcessorRecord proc = new FlatProcessorRecord(
1, null, null,
"proc-1", "to", null, null,
"COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 50L,
"http://example.com",
"input body", "output body",
Map.of("h1", "v1"), Map.of("h2", "v2"),
null, null, null, null, null, null,
Map.of("a1", "v1"),
null, null, null, null
);
store.insertProcessorBatch(
"default", "exec-1", "route-a", "my-app",
Instant.parse("2026-03-31T10:00:00Z"),
List.of(proc));
Integer count = jdbc.queryForObject(
"SELECT count() FROM processor_executions WHERE execution_id = 'exec-1'",
Integer.class);
assertThat(count).isEqualTo(1);
// Verify seq is stored
Integer seq = jdbc.queryForObject(
"SELECT seq FROM processor_executions WHERE execution_id = 'exec-1'",
Integer.class);
assertThat(seq).isEqualTo(1);
}
@Test
void insertProcessorBatch_withIterations() {
FlatProcessorRecord splitContainer = new FlatProcessorRecord(
1, null, null,
"split-1", "split", null, 3,
"COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 300L,
null, null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null
);
FlatProcessorRecord child0 = new FlatProcessorRecord(
2, 1, "split-1",
"child-proc", "to", 0, null,
"COMPLETED",
Instant.parse("2026-03-31T10:00:00.100Z"), 80L,
"http://svc-a", "body0", "out0",
null, null,
null, null, null, null, null, null,
null, null, null, null, null
);
FlatProcessorRecord child1 = new FlatProcessorRecord(
3, 1, "split-1",
"child-proc", "to", 1, null,
"COMPLETED",
Instant.parse("2026-03-31T10:00:00.200Z"), 90L,
"http://svc-a", "body1", "out1",
null, null,
null, null, null, null, null, null,
null, null, null, null, null
);
FlatProcessorRecord child2 = new FlatProcessorRecord(
4, 1, "split-1",
"child-proc", "to", 2, null,
"COMPLETED",
Instant.parse("2026-03-31T10:00:00.300Z"), 100L,
"http://svc-a", "body2", "out2",
null, null,
null, null, null, null, null, null,
null, null, null, null, null
);
store.insertProcessorBatch(
"default", "exec-2", "route-b", "my-app",
Instant.parse("2026-03-31T10:00:00Z"),
List.of(splitContainer, child0, child1, child2));
Integer count = jdbc.queryForObject(
"SELECT count() FROM processor_executions WHERE execution_id = 'exec-2'",
Integer.class);
assertThat(count).isEqualTo(4);
// Verify iteration data on the split container
Integer iterationSize = jdbc.queryForObject(
"SELECT iteration_size FROM processor_executions " +
"WHERE execution_id = 'exec-2' AND seq = 1",
Integer.class);
assertThat(iterationSize).isEqualTo(3);
// Verify iteration index on a child
Integer iteration = jdbc.queryForObject(
"SELECT iteration FROM processor_executions " +
"WHERE execution_id = 'exec-2' AND seq = 3",
Integer.class);
assertThat(iteration).isEqualTo(1);
}
@Test
void insertExecutionBatch_emptyList_doesNothing() {
store.insertExecutionBatch(List.of());
Integer count = jdbc.queryForObject(
"SELECT count() FROM executions", Integer.class);
assertThat(count).isEqualTo(0);
}
@Test
void insertExecutionBatch_replacingMergeTree_keepsLatestVersion() {
MergedExecution v1 = new MergedExecution(
"default", 1L, "exec-r", "route-a", "agent-1", "my-app",
"RUNNING", "corr-1", "exchange-1",
Instant.parse("2026-03-31T10:00:00Z"),
null, null,
"", "", "", "", "", "",
"", "FULL",
"", "", "", "", "",
"", "",
false, false
);
MergedExecution v2 = new MergedExecution(
"default", 2L, "exec-r", "route-a", "agent-1", "my-app",
"COMPLETED", "corr-1", "exchange-1",
Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:05Z"),
5000L,
"", "", "", "", "", "",
"", "FULL",
"", "", "", "", "",
"", "",
false, false
);
store.insertExecutionBatch(List.of(v1));
store.insertExecutionBatch(List.of(v2));
// Force merge to apply ReplacingMergeTree deduplication
jdbc.execute("OPTIMIZE TABLE executions FINAL");
String status = jdbc.queryForObject(
"SELECT status FROM executions " +
"WHERE execution_id = 'exec-r'",
String.class);
assertThat(status).isEqualTo("COMPLETED");
}
}

View File

@@ -0,0 +1,39 @@
package com.cameleer3.server.core.ingestion;
import java.time.Instant;
/**
* A merged execution envelope ready for ClickHouse insertion.
* Produced by ChunkAccumulator after receiving the final chunk.
*/
public record MergedExecution(
String tenantId,
long version,
String executionId,
String routeId,
String agentId,
String applicationName,
String status,
String correlationId,
String exchangeId,
Instant startTime,
Instant endTime,
Long durationMs,
String errorMessage,
String errorStacktrace,
String errorType,
String errorCategory,
String rootCauseType,
String rootCauseMessage,
String diagramContentHash,
String engineLevel,
String inputBody,
String outputBody,
String inputHeaders,
String outputHeaders,
String attributes,
String traceId,
String spanId,
boolean hasTraceData,
boolean isReplay
) {}