feat(clickhouse): add ChunkAccumulator for chunked execution ingestion

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-31 19:10:21 +02:00
parent 81f7f8afe1
commit 62420cf0c2
2 changed files with 431 additions and 0 deletions

View File

@@ -0,0 +1,205 @@
package com.cameleer3.server.core.ingestion;
import com.cameleer3.server.core.storage.model.ExecutionChunk;
import com.cameleer3.server.core.storage.model.FlatProcessorRecord;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* Accumulates {@link ExecutionChunk} documents and produces:
* <ul>
* <li>{@link ProcessorBatch} — pushed immediately for each chunk (append-only)</li>
* <li>{@link MergedExecution} — pushed when the final chunk arrives or on stale sweep</li>
* </ul>
*/
public class ChunkAccumulator {
private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class);
private static final String DEFAULT_TENANT = "default";
private static final ObjectMapper MAPPER = new ObjectMapper();
private final Consumer<MergedExecution> executionSink;
private final Consumer<ProcessorBatch> processorSink;
private final Duration staleThreshold;
private final ConcurrentHashMap<String, PendingExchange> pending = new ConcurrentHashMap<>();
public ChunkAccumulator(Consumer<MergedExecution> executionSink,
Consumer<ProcessorBatch> processorSink,
Duration staleThreshold) {
this.executionSink = executionSink;
this.processorSink = processorSink;
this.staleThreshold = staleThreshold;
}
/**
* Process an incoming chunk: push processors immediately,
* buffer/merge the envelope, and emit when final.
*/
public void onChunk(ExecutionChunk chunk) {
// 1. Push processor records immediately (append-only)
if (chunk.processors() != null && !chunk.processors().isEmpty()) {
processorSink.accept(new ProcessorBatch(
DEFAULT_TENANT,
chunk.exchangeId(),
chunk.routeId(),
chunk.applicationName(),
chunk.startTime(),
chunk.processors()));
}
// 2. Buffer/merge the exchange envelope
if (chunk.isFinal()) {
// Merge with any pending envelope, then emit
PendingExchange existing = pending.remove(chunk.exchangeId());
ExecutionChunk merged = existing != null
? mergeEnvelopes(existing.envelope(), chunk)
: chunk;
executionSink.accept(toMergedExecution(merged));
} else {
// Buffer the envelope for later merging
pending.merge(chunk.exchangeId(),
new PendingExchange(chunk, Instant.now()),
(old, incoming) -> new PendingExchange(
mergeEnvelopes(old.envelope(), incoming.envelope()),
old.receivedAt()));
}
}
/**
* Flush exchanges that have been pending longer than the stale threshold.
* Called periodically by a scheduled task.
*/
public void sweepStale() {
Instant cutoff = Instant.now().minus(staleThreshold);
pending.forEach((exchangeId, pe) -> {
if (pe.receivedAt().isBefore(cutoff)) {
PendingExchange removed = pending.remove(exchangeId);
if (removed != null) {
log.info("Flushing stale exchange {} (pending since {})",
exchangeId, removed.receivedAt());
executionSink.accept(toMergedExecution(removed.envelope()));
}
}
});
}
/** Number of exchanges awaiting a final chunk. */
public int getPendingCount() {
return pending.size();
}
// ---- Merge logic ----
/**
* COALESCE merge: for each field, prefer the newer value if non-null, else keep older.
* The newer chunk (higher chunkSeq) takes precedence for status, endTime, durationMs.
*/
private static ExecutionChunk mergeEnvelopes(ExecutionChunk older, ExecutionChunk newer) {
return new ExecutionChunk(
coalesce(newer.exchangeId(), older.exchangeId()),
coalesce(newer.applicationName(), older.applicationName()),
coalesce(newer.agentId(), older.agentId()),
coalesce(newer.routeId(), older.routeId()),
coalesce(newer.correlationId(), older.correlationId()),
coalesce(newer.status(), older.status()),
coalesce(older.startTime(), newer.startTime()), // prefer earliest startTime
coalesce(newer.endTime(), older.endTime()),
coalesce(newer.durationMs(), older.durationMs()),
coalesce(newer.engineLevel(), older.engineLevel()),
coalesce(newer.errorMessage(), older.errorMessage()),
coalesce(newer.errorStackTrace(), older.errorStackTrace()),
coalesce(newer.errorType(), older.errorType()),
coalesce(newer.errorCategory(), older.errorCategory()),
coalesce(newer.rootCauseType(), older.rootCauseType()),
coalesce(newer.rootCauseMessage(), older.rootCauseMessage()),
coalesce(newer.attributes(), older.attributes()),
coalesce(newer.traceId(), older.traceId()),
coalesce(newer.spanId(), older.spanId()),
coalesce(newer.originalExchangeId(), older.originalExchangeId()),
coalesce(newer.replayExchangeId(), older.replayExchangeId()),
Math.max(newer.chunkSeq(), older.chunkSeq()),
newer.isFinal() || older.isFinal(),
List.of() // processors are handled separately
);
}
private static <T> T coalesce(T a, T b) {
return a != null ? a : b;
}
// ---- Conversion to MergedExecution ----
private static MergedExecution toMergedExecution(ExecutionChunk envelope) {
return new MergedExecution(
DEFAULT_TENANT,
1L,
envelope.exchangeId(),
envelope.routeId(),
envelope.agentId(),
envelope.applicationName(),
envelope.status(),
envelope.correlationId(),
envelope.exchangeId(),
envelope.startTime(),
envelope.endTime(),
envelope.durationMs(),
envelope.errorMessage(),
envelope.errorStackTrace(),
envelope.errorType(),
envelope.errorCategory(),
envelope.rootCauseType(),
envelope.rootCauseMessage(),
"", // diagramContentHash — server-side lookup, not in chunk
envelope.engineLevel(),
"", // inputBody — on processor records now
"", // outputBody
"", // inputHeaders
"", // outputHeaders
serializeAttributes(envelope.attributes()),
envelope.traceId(),
envelope.spanId(),
false, // hasTraceData — not tracked at envelope level
envelope.replayExchangeId() != null // isReplay
);
}
private static String serializeAttributes(Map<String, String> attributes) {
if (attributes == null || attributes.isEmpty()) {
return "{}";
}
try {
return MAPPER.writeValueAsString(attributes);
} catch (JsonProcessingException e) {
log.warn("Failed to serialize attributes, falling back to empty object", e);
return "{}";
}
}
// ---- Inner types ----
/**
* A batch of processor records from a single chunk, ready for ClickHouse insertion.
*/
public record ProcessorBatch(
String tenantId,
String executionId,
String routeId,
String applicationName,
Instant execStartTime,
List<FlatProcessorRecord> processors
) {}
/**
* Envelope buffered while waiting for the final chunk.
*/
private record PendingExchange(ExecutionChunk envelope, Instant receivedAt) {}
}

View File

@@ -0,0 +1,226 @@
package com.cameleer3.server.core.ingestion;
import com.cameleer3.server.core.storage.model.ExecutionChunk;
import com.cameleer3.server.core.storage.model.FlatProcessorRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.assertj.core.api.Assertions.assertThat;
class ChunkAccumulatorTest {
private CopyOnWriteArrayList<MergedExecution> executionSink;
private CopyOnWriteArrayList<ChunkAccumulator.ProcessorBatch> processorSink;
private ChunkAccumulator accumulator;
@BeforeEach
void setUp() {
executionSink = new CopyOnWriteArrayList<>();
processorSink = new CopyOnWriteArrayList<>();
accumulator = new ChunkAccumulator(
executionSink::add, processorSink::add, Duration.ofMinutes(5));
}
@Test
void singleFinalChunk_producesExecutionAndProcessors() {
ExecutionChunk chunk = new ExecutionChunk(
"ex-1", "order-service", "agent-1", "route-1",
"corr-1", "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:01Z"), 1000L,
"REGULAR",
null, null, null, null, null, null,
Map.of("orderId", "ORD-1"),
"trace-1", "span-1", null, null,
0, true,
List.of(proc(1, null, "log1", "log", "COMPLETED", 5L)));
accumulator.onChunk(chunk);
// Processor sink should receive 1 batch with 1 record
assertThat(processorSink).hasSize(1);
ChunkAccumulator.ProcessorBatch batch = processorSink.get(0);
assertThat(batch.tenantId()).isEqualTo("default");
assertThat(batch.executionId()).isEqualTo("ex-1");
assertThat(batch.routeId()).isEqualTo("route-1");
assertThat(batch.applicationName()).isEqualTo("order-service");
assertThat(batch.execStartTime()).isEqualTo(Instant.parse("2026-03-31T10:00:00Z"));
assertThat(batch.processors()).hasSize(1);
// Execution sink should receive 1 merged execution
assertThat(executionSink).hasSize(1);
MergedExecution exec = executionSink.get(0);
assertThat(exec.tenantId()).isEqualTo("default");
assertThat(exec.version()).isEqualTo(1L);
assertThat(exec.executionId()).isEqualTo("ex-1");
assertThat(exec.routeId()).isEqualTo("route-1");
assertThat(exec.status()).isEqualTo("COMPLETED");
assertThat(exec.durationMs()).isEqualTo(1000L);
assertThat(exec.traceId()).isEqualTo("trace-1");
assertThat(exec.spanId()).isEqualTo("span-1");
assertThat(exec.attributes()).contains("orderId");
}
@Test
void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() {
ExecutionChunk chunk0 = new ExecutionChunk(
"ex-2", "app", "agent-1", "route-1",
"ex-2", "RUNNING",
Instant.parse("2026-03-31T10:00:00Z"),
null, null, "REGULAR",
null, null, null, null, null, null,
null, null, null, null, null,
0, false,
List.of(
proc(1, null, "log1", "log", "COMPLETED", 5L),
proc(2, null, "log2", "log", "COMPLETED", 3L)));
accumulator.onChunk(chunk0);
// Processors pushed immediately on chunk 0
assertThat(processorSink).hasSize(1);
assertThat(processorSink.get(0).processors()).hasSize(2);
// No execution yet (not final)
assertThat(executionSink).isEmpty();
ExecutionChunk chunk1 = new ExecutionChunk(
"ex-2", "app", "agent-1", "route-1",
"ex-2", "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:02Z"), 2000L,
"REGULAR",
null, null, null, null, null, null,
null, null, null, null, null,
1, true,
List.of(proc(3, null, "log3", "log", "COMPLETED", 7L)));
accumulator.onChunk(chunk1);
// Processors from chunk 1 also pushed
assertThat(processorSink).hasSize(2);
assertThat(processorSink.get(1).processors()).hasSize(1);
// Now execution is emitted
assertThat(executionSink).hasSize(1);
MergedExecution exec = executionSink.get(0);
assertThat(exec.status()).isEqualTo("COMPLETED");
assertThat(exec.durationMs()).isEqualTo(2000L);
}
@Test
void staleExchange_flushedBySweep() throws Exception {
ChunkAccumulator staleAccumulator = new ChunkAccumulator(
executionSink::add, processorSink::add, Duration.ofMillis(1));
ExecutionChunk chunk = new ExecutionChunk(
"ex-3", "app", "agent-1", "route-1",
"ex-3", "RUNNING",
Instant.parse("2026-03-31T10:00:00Z"),
null, null, "REGULAR",
null, null, null, null, null, null,
null, null, null, null, null,
0, false,
List.of());
staleAccumulator.onChunk(chunk);
assertThat(executionSink).isEmpty();
Thread.sleep(5);
staleAccumulator.sweepStale();
assertThat(executionSink).hasSize(1);
MergedExecution exec = executionSink.get(0);
assertThat(exec.status()).isEqualTo("RUNNING");
assertThat(exec.executionId()).isEqualTo("ex-3");
}
@Test
void finalChunkWithErrors_populatesErrorFields() {
ExecutionChunk chunk = new ExecutionChunk(
"ex-4", "app", "agent-1", "route-1",
"ex-4", "FAILED",
Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:01Z"), 1000L,
"REGULAR",
"NullPointerException", "at com.foo.Bar.baz(Bar.java:42)",
"NullPointerException", "RUNTIME",
"NullPointerException", "null value at index 0",
null, null, null, null, null,
0, true,
List.of());
accumulator.onChunk(chunk);
assertThat(executionSink).hasSize(1);
MergedExecution exec = executionSink.get(0);
assertThat(exec.status()).isEqualTo("FAILED");
assertThat(exec.errorMessage()).isEqualTo("NullPointerException");
assertThat(exec.errorStacktrace()).isEqualTo("at com.foo.Bar.baz(Bar.java:42)");
assertThat(exec.errorType()).isEqualTo("NullPointerException");
assertThat(exec.errorCategory()).isEqualTo("RUNTIME");
assertThat(exec.rootCauseType()).isEqualTo("NullPointerException");
assertThat(exec.rootCauseMessage()).isEqualTo("null value at index 0");
}
@Test
void getPendingCount_tracksBufferedExchanges() {
ExecutionChunk running1 = new ExecutionChunk(
"ex-5", "app", "agent-1", "route-1",
"ex-5", "RUNNING",
Instant.parse("2026-03-31T10:00:00Z"),
null, null, "REGULAR",
null, null, null, null, null, null,
null, null, null, null, null,
0, false,
List.of());
ExecutionChunk running2 = new ExecutionChunk(
"ex-6", "app", "agent-1", "route-2",
"ex-6", "RUNNING",
Instant.parse("2026-03-31T10:00:00Z"),
null, null, "REGULAR",
null, null, null, null, null, null,
null, null, null, null, null,
0, false,
List.of());
accumulator.onChunk(running1);
accumulator.onChunk(running2);
assertThat(accumulator.getPendingCount()).isEqualTo(2);
// Send final for ex-5
ExecutionChunk final5 = new ExecutionChunk(
"ex-5", "app", "agent-1", "route-1",
"ex-5", "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:01Z"), 1000L,
"REGULAR",
null, null, null, null, null, null,
null, null, null, null, null,
1, true,
List.of());
accumulator.onChunk(final5);
assertThat(accumulator.getPendingCount()).isEqualTo(1);
}
/** Helper to create a FlatProcessorRecord with minimal fields. */
private static FlatProcessorRecord proc(int seq, Integer parentSeq,
String processorId, String processorType,
String status, long durationMs) {
return new FlatProcessorRecord(
seq, parentSeq, null, processorId, processorType,
null, null, status,
Instant.parse("2026-03-31T10:00:00.100Z"), durationMs,
null, null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null);
}
}