refactor: replace server-side DTOs with cameleer3-common ExecutionChunk and FlatProcessorRecord

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-31 19:33:49 +02:00
parent 38551eac9d
commit 07f215b0fd
10 changed files with 270 additions and 448 deletions

View File

@@ -1,7 +1,7 @@
package com.cameleer3.server.app.controller; package com.cameleer3.server.app.controller;
import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.server.core.storage.model.ExecutionChunk; import com.cameleer3.common.model.ExecutionChunk;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

View File

@@ -1,7 +1,7 @@
package com.cameleer3.server.app.storage; package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.storage.model.FlatProcessorRecord; import com.cameleer3.common.model.FlatProcessorRecord;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
@@ -93,35 +93,35 @@ public class ClickHouseExecutionStore {
processors.stream().map(p -> new Object[]{ processors.stream().map(p -> new Object[]{
nullToEmpty(tenantId), nullToEmpty(tenantId),
nullToEmpty(executionId), nullToEmpty(executionId),
p.seq(), p.getSeq(),
p.parentSeq(), p.getParentSeq(),
nullToEmpty(p.parentProcessorId()), nullToEmpty(p.getParentProcessorId()),
nullToEmpty(p.processorId()), nullToEmpty(p.getProcessorId()),
nullToEmpty(p.processorType()), nullToEmpty(p.getProcessorType()),
Timestamp.from(p.startTime() != null ? p.startTime() : execStartTime), Timestamp.from(p.getStartTime() != null ? p.getStartTime() : execStartTime),
nullToEmpty(routeId), nullToEmpty(routeId),
nullToEmpty(applicationName), nullToEmpty(applicationName),
p.iteration(), p.getIteration(),
p.iterationSize(), p.getIterationSize(),
nullToEmpty(p.status()), p.getStatus() != null ? p.getStatus().name() : "",
computeEndTime(p.startTime(), p.durationMs()), computeEndTime(p.getStartTime(), p.getDurationMs()),
p.durationMs(), p.getDurationMs(),
nullToEmpty(p.errorMessage()), nullToEmpty(p.getErrorMessage()),
nullToEmpty(p.errorStackTrace()), nullToEmpty(p.getErrorStackTrace()),
nullToEmpty(p.errorType()), nullToEmpty(p.getErrorType()),
nullToEmpty(p.errorCategory()), nullToEmpty(p.getErrorCategory()),
nullToEmpty(p.rootCauseType()), nullToEmpty(p.getRootCauseType()),
nullToEmpty(p.rootCauseMessage()), nullToEmpty(p.getRootCauseMessage()),
nullToEmpty(p.inputBody()), nullToEmpty(p.getInputBody()),
nullToEmpty(p.outputBody()), nullToEmpty(p.getOutputBody()),
mapToJson(p.inputHeaders()), mapToJson(p.getInputHeaders()),
mapToJson(p.outputHeaders()), mapToJson(p.getOutputHeaders()),
mapToJson(p.attributes()), mapToJson(p.getAttributes()),
nullToEmpty(p.resolvedEndpointUri()), nullToEmpty(p.getResolvedEndpointUri()),
nullToEmpty(p.circuitBreakerState()), nullToEmpty(p.getCircuitBreakerState()),
boolOrFalse(p.fallbackTriggered()), boolOrFalse(p.getFallbackTriggered()),
boolOrFalse(p.filterMatched()), boolOrFalse(p.getFilterMatched()),
boolOrFalse(p.duplicateMessage()) boolOrFalse(p.getDuplicateMessage())
}).toList()); }).toList());
} }

View File

@@ -5,7 +5,8 @@ import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.search.ExecutionSummary; import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest; import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult; import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.model.FlatProcessorRecord; import com.cameleer3.common.model.ExecutionStatus;
import com.cameleer3.common.model.FlatProcessorRecord;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -106,17 +107,13 @@ class ClickHouseSearchIndexIT {
store.insertExecutionBatch(List.of(exec1, exec2, exec3)); store.insertExecutionBatch(List.of(exec1, exec2, exec3));
// Processor for exec-1: seq=1, to, inputBody with "Hello World", inputHeaders with secret-token // Processor for exec-1: seq=1, to, inputBody with "Hello World", inputHeaders with secret-token
FlatProcessorRecord proc1 = new FlatProcessorRecord( FlatProcessorRecord proc1 = new FlatProcessorRecord(1, "proc-1", "to");
1, null, null, proc1.setStatus(ExecutionStatus.COMPLETED);
"proc-1", "to", null, null, proc1.setStartTime(baseTime);
"COMPLETED", proc1.setDurationMs(50L);
baseTime, 50L, proc1.setInputBody("Hello World request body");
null, proc1.setOutputBody("");
"Hello World request body", "", proc1.setInputHeaders(Map.of("Authorization", "Bearer secret-token"));
Map.of("Authorization", "Bearer secret-token"), null,
null, null, null, null, null, null,
null, null, null, null, null
);
store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app", baseTime, List.of(proc1)); store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app", baseTime, List.of(proc1));
} }

View File

@@ -6,8 +6,9 @@ import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.search.ExecutionSummary; import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest; import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult; import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.model.ExecutionChunk; import com.cameleer3.common.model.ExecutionChunk;
import com.cameleer3.server.core.storage.model.FlatProcessorRecord; import com.cameleer3.common.model.ExecutionStatus;
import com.cameleer3.common.model.FlatProcessorRecord;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -70,53 +71,76 @@ class ClickHouseChunkPipelineIT {
Instant start = Instant.parse("2026-03-31T12:00:00Z"); Instant start = Instant.parse("2026-03-31T12:00:00Z");
// Chunk 0: RUNNING with initial processors // Chunk 0: RUNNING with initial processors
accumulator.onChunk(new ExecutionChunk( ExecutionChunk chunk0 = new ExecutionChunk();
"pipeline-1", "order-service", "pod-1", "order-route", chunk0.setExchangeId("pipeline-1");
"corr-1", "RUNNING", chunk0.setApplicationName("order-service");
start, null, null, "DEEP", chunk0.setAgentId("pod-1");
null, null, null, null, null, null, chunk0.setRouteId("order-route");
Map.of("orderId", "ORD-123"), chunk0.setCorrelationId("corr-1");
null, null, null, null, chunk0.setStatus(ExecutionStatus.RUNNING);
0, false, chunk0.setStartTime(start);
List.of( chunk0.setEngineLevel("DEEP");
new FlatProcessorRecord(1, null, null, "log1", "log", chunk0.setAttributes(Map.of("orderId", "ORD-123"));
null, null, "COMPLETED", start, 2L, chunk0.setChunkSeq(0);
null, null, null, null, null, chunk0.setFinal(false);
null, null, null, null, null, null,
null, null, null, null, null), FlatProcessorRecord p1 = new FlatProcessorRecord(1, "log1", "log");
new FlatProcessorRecord(2, null, null, "split1", "split", p1.setStatus(ExecutionStatus.COMPLETED);
null, 3, "COMPLETED", start.plusMillis(2), 100L, p1.setStartTime(start);
null, null, null, null, null, p1.setDurationMs(2L);
null, null, null, null, null, null,
null, null, null, null, null), FlatProcessorRecord p2 = new FlatProcessorRecord(2, "split1", "split");
new FlatProcessorRecord(3, 2, "split1", "to1", "to", p2.setIterationSize(3);
0, null, "COMPLETED", start.plusMillis(5), 30L, p2.setStatus(ExecutionStatus.COMPLETED);
"http://inventory/api", p2.setStartTime(start.plusMillis(2));
"order ABC-123 check stock", "stock available", p2.setDurationMs(100L);
null, null,
null, null, null, null, null, null, FlatProcessorRecord p3 = new FlatProcessorRecord(3, "to1", "to");
null, null, null, null, null)))); p3.setParentSeq(2);
p3.setParentProcessorId("split1");
p3.setIteration(0);
p3.setStatus(ExecutionStatus.COMPLETED);
p3.setStartTime(start.plusMillis(5));
p3.setDurationMs(30L);
p3.setResolvedEndpointUri("http://inventory/api");
p3.setInputBody("order ABC-123 check stock");
p3.setOutputBody("stock available");
chunk0.setProcessors(List.of(p1, p2, p3));
accumulator.onChunk(chunk0);
// Processors should be buffered immediately // Processors should be buffered immediately
assertThat(processorBuffer).hasSize(1); assertThat(processorBuffer).hasSize(1);
assertThat(executionBuffer).isEmpty(); assertThat(executionBuffer).isEmpty();
// Chunk 1: COMPLETED (final) // Chunk 1: COMPLETED (final)
accumulator.onChunk(new ExecutionChunk( ExecutionChunk chunk1 = new ExecutionChunk();
"pipeline-1", "order-service", "pod-1", "order-route", chunk1.setExchangeId("pipeline-1");
"corr-1", "COMPLETED", chunk1.setApplicationName("order-service");
start, start.plusMillis(750), 750L, "DEEP", chunk1.setAgentId("pod-1");
null, null, null, null, null, null, chunk1.setRouteId("order-route");
null, null, null, null, null, chunk1.setCorrelationId("corr-1");
1, true, chunk1.setStatus(ExecutionStatus.COMPLETED);
List.of( chunk1.setStartTime(start);
new FlatProcessorRecord(4, 2, "split1", "to1", "to", chunk1.setEndTime(start.plusMillis(750));
1, null, "COMPLETED", start.plusMillis(40), 25L, chunk1.setDurationMs(750L);
"http://inventory/api", chunk1.setEngineLevel("DEEP");
"order DEF-456 check stock", "stock available", chunk1.setChunkSeq(1);
null, null, chunk1.setFinal(true);
null, null, null, null, null, null,
null, null, null, null, null)))); FlatProcessorRecord p4 = new FlatProcessorRecord(4, "to1", "to");
p4.setParentSeq(2);
p4.setParentProcessorId("split1");
p4.setIteration(1);
p4.setStatus(ExecutionStatus.COMPLETED);
p4.setStartTime(start.plusMillis(40));
p4.setDurationMs(25L);
p4.setResolvedEndpointUri("http://inventory/api");
p4.setInputBody("order DEF-456 check stock");
p4.setOutputBody("stock available");
chunk1.setProcessors(List.of(p4));
accumulator.onChunk(chunk1);
assertThat(executionBuffer).hasSize(1); assertThat(executionBuffer).hasSize(1);
assertThat(processorBuffer).hasSize(2); assertThat(processorBuffer).hasSize(2);

View File

@@ -1,7 +1,8 @@
package com.cameleer3.server.app.storage; package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.ingestion.MergedExecution;
import com.cameleer3.server.core.storage.model.FlatProcessorRecord; import com.cameleer3.common.model.ExecutionStatus;
import com.cameleer3.common.model.FlatProcessorRecord;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -80,18 +81,16 @@ class ClickHouseExecutionStoreIT {
@Test @Test
void insertProcessorBatch_writesToClickHouse() { void insertProcessorBatch_writesToClickHouse() {
FlatProcessorRecord proc = new FlatProcessorRecord( FlatProcessorRecord proc = new FlatProcessorRecord(1, "proc-1", "to");
1, null, null, proc.setStatus(ExecutionStatus.COMPLETED);
"proc-1", "to", null, null, proc.setStartTime(Instant.parse("2026-03-31T10:00:00Z"));
"COMPLETED", proc.setDurationMs(50L);
Instant.parse("2026-03-31T10:00:00Z"), 50L, proc.setResolvedEndpointUri("http://example.com");
"http://example.com", proc.setInputBody("input body");
"input body", "output body", proc.setOutputBody("output body");
Map.of("h1", "v1"), Map.of("h2", "v2"), proc.setInputHeaders(Map.of("h1", "v1"));
null, null, null, null, null, null, proc.setOutputHeaders(Map.of("h2", "v2"));
Map.of("a1", "v1"), proc.setAttributes(Map.of("a1", "v1"));
null, null, null, null
);
store.insertProcessorBatch( store.insertProcessorBatch(
"default", "exec-1", "route-a", "my-app", "default", "exec-1", "route-a", "my-app",
@@ -112,48 +111,44 @@ class ClickHouseExecutionStoreIT {
@Test @Test
void insertProcessorBatch_withIterations() { void insertProcessorBatch_withIterations() {
FlatProcessorRecord splitContainer = new FlatProcessorRecord( FlatProcessorRecord splitContainer = new FlatProcessorRecord(1, "split-1", "split");
1, null, null, splitContainer.setIterationSize(3);
"split-1", "split", null, 3, splitContainer.setStatus(ExecutionStatus.COMPLETED);
"COMPLETED", splitContainer.setStartTime(Instant.parse("2026-03-31T10:00:00Z"));
Instant.parse("2026-03-31T10:00:00Z"), 300L, splitContainer.setDurationMs(300L);
null, null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null
);
FlatProcessorRecord child0 = new FlatProcessorRecord( FlatProcessorRecord child0 = new FlatProcessorRecord(2, "child-proc", "to");
2, 1, "split-1", child0.setParentSeq(1);
"child-proc", "to", 0, null, child0.setParentProcessorId("split-1");
"COMPLETED", child0.setIteration(0);
Instant.parse("2026-03-31T10:00:00.100Z"), 80L, child0.setStatus(ExecutionStatus.COMPLETED);
"http://svc-a", "body0", "out0", child0.setStartTime(Instant.parse("2026-03-31T10:00:00.100Z"));
null, null, child0.setDurationMs(80L);
null, null, null, null, null, null, child0.setResolvedEndpointUri("http://svc-a");
null, null, null, null, null child0.setInputBody("body0");
); child0.setOutputBody("out0");
FlatProcessorRecord child1 = new FlatProcessorRecord( FlatProcessorRecord child1 = new FlatProcessorRecord(3, "child-proc", "to");
3, 1, "split-1", child1.setParentSeq(1);
"child-proc", "to", 1, null, child1.setParentProcessorId("split-1");
"COMPLETED", child1.setIteration(1);
Instant.parse("2026-03-31T10:00:00.200Z"), 90L, child1.setStatus(ExecutionStatus.COMPLETED);
"http://svc-a", "body1", "out1", child1.setStartTime(Instant.parse("2026-03-31T10:00:00.200Z"));
null, null, child1.setDurationMs(90L);
null, null, null, null, null, null, child1.setResolvedEndpointUri("http://svc-a");
null, null, null, null, null child1.setInputBody("body1");
); child1.setOutputBody("out1");
FlatProcessorRecord child2 = new FlatProcessorRecord( FlatProcessorRecord child2 = new FlatProcessorRecord(4, "child-proc", "to");
4, 1, "split-1", child2.setParentSeq(1);
"child-proc", "to", 2, null, child2.setParentProcessorId("split-1");
"COMPLETED", child2.setIteration(2);
Instant.parse("2026-03-31T10:00:00.300Z"), 100L, child2.setStatus(ExecutionStatus.COMPLETED);
"http://svc-a", "body2", "out2", child2.setStartTime(Instant.parse("2026-03-31T10:00:00.300Z"));
null, null, child2.setDurationMs(100L);
null, null, null, null, null, null, child2.setResolvedEndpointUri("http://svc-a");
null, null, null, null, null child2.setInputBody("body2");
); child2.setOutputBody("out2");
store.insertProcessorBatch( store.insertProcessorBatch(
"default", "exec-2", "route-b", "my-app", "default", "exec-2", "route-b", "my-app",

View File

@@ -1,7 +1,7 @@
package com.cameleer3.server.core.ingestion; package com.cameleer3.server.core.ingestion;
import com.cameleer3.server.core.storage.model.ExecutionChunk; import com.cameleer3.common.model.ExecutionChunk;
import com.cameleer3.server.core.storage.model.FlatProcessorRecord; import com.cameleer3.common.model.FlatProcessorRecord;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -46,27 +46,27 @@ public class ChunkAccumulator {
*/ */
public void onChunk(ExecutionChunk chunk) { public void onChunk(ExecutionChunk chunk) {
// 1. Push processor records immediately (append-only) // 1. Push processor records immediately (append-only)
if (chunk.processors() != null && !chunk.processors().isEmpty()) { if (chunk.getProcessors() != null && !chunk.getProcessors().isEmpty()) {
processorSink.accept(new ProcessorBatch( processorSink.accept(new ProcessorBatch(
DEFAULT_TENANT, DEFAULT_TENANT,
chunk.exchangeId(), chunk.getExchangeId(),
chunk.routeId(), chunk.getRouteId(),
chunk.applicationName(), chunk.getApplicationName(),
chunk.startTime(), chunk.getStartTime(),
chunk.processors())); chunk.getProcessors()));
} }
// 2. Buffer/merge the exchange envelope // 2. Buffer/merge the exchange envelope
if (chunk.isFinal()) { if (chunk.isFinal()) {
// Merge with any pending envelope, then emit // Merge with any pending envelope, then emit
PendingExchange existing = pending.remove(chunk.exchangeId()); PendingExchange existing = pending.remove(chunk.getExchangeId());
ExecutionChunk merged = existing != null ExecutionChunk merged = existing != null
? mergeEnvelopes(existing.envelope(), chunk) ? mergeEnvelopes(existing.envelope(), chunk)
: chunk; : chunk;
executionSink.accept(toMergedExecution(merged)); executionSink.accept(toMergedExecution(merged));
} else { } else {
// Buffer the envelope for later merging // Buffer the envelope for later merging
pending.merge(chunk.exchangeId(), pending.merge(chunk.getExchangeId(),
new PendingExchange(chunk, Instant.now()), new PendingExchange(chunk, Instant.now()),
(old, incoming) -> new PendingExchange( (old, incoming) -> new PendingExchange(
mergeEnvelopes(old.envelope(), incoming.envelope()), mergeEnvelopes(old.envelope(), incoming.envelope()),
@@ -104,32 +104,32 @@ public class ChunkAccumulator {
* The newer chunk (higher chunkSeq) takes precedence for status, endTime, durationMs. * The newer chunk (higher chunkSeq) takes precedence for status, endTime, durationMs.
*/ */
private static ExecutionChunk mergeEnvelopes(ExecutionChunk older, ExecutionChunk newer) { private static ExecutionChunk mergeEnvelopes(ExecutionChunk older, ExecutionChunk newer) {
return new ExecutionChunk( ExecutionChunk merged = new ExecutionChunk();
coalesce(newer.exchangeId(), older.exchangeId()), merged.setExchangeId(coalesce(newer.getExchangeId(), older.getExchangeId()));
coalesce(newer.applicationName(), older.applicationName()), merged.setApplicationName(coalesce(newer.getApplicationName(), older.getApplicationName()));
coalesce(newer.agentId(), older.agentId()), merged.setAgentId(coalesce(newer.getAgentId(), older.getAgentId()));
coalesce(newer.routeId(), older.routeId()), merged.setRouteId(coalesce(newer.getRouteId(), older.getRouteId()));
coalesce(newer.correlationId(), older.correlationId()), merged.setCorrelationId(coalesce(newer.getCorrelationId(), older.getCorrelationId()));
coalesce(newer.status(), older.status()), merged.setStatus(coalesce(newer.getStatus(), older.getStatus()));
coalesce(older.startTime(), newer.startTime()), // prefer earliest startTime merged.setStartTime(coalesce(older.getStartTime(), newer.getStartTime())); // prefer earliest startTime
coalesce(newer.endTime(), older.endTime()), merged.setEndTime(coalesce(newer.getEndTime(), older.getEndTime()));
coalesce(newer.durationMs(), older.durationMs()), merged.setDurationMs(coalesce(newer.getDurationMs(), older.getDurationMs()));
coalesce(newer.engineLevel(), older.engineLevel()), merged.setEngineLevel(coalesce(newer.getEngineLevel(), older.getEngineLevel()));
coalesce(newer.errorMessage(), older.errorMessage()), merged.setErrorMessage(coalesce(newer.getErrorMessage(), older.getErrorMessage()));
coalesce(newer.errorStackTrace(), older.errorStackTrace()), merged.setErrorStackTrace(coalesce(newer.getErrorStackTrace(), older.getErrorStackTrace()));
coalesce(newer.errorType(), older.errorType()), merged.setErrorType(coalesce(newer.getErrorType(), older.getErrorType()));
coalesce(newer.errorCategory(), older.errorCategory()), merged.setErrorCategory(coalesce(newer.getErrorCategory(), older.getErrorCategory()));
coalesce(newer.rootCauseType(), older.rootCauseType()), merged.setRootCauseType(coalesce(newer.getRootCauseType(), older.getRootCauseType()));
coalesce(newer.rootCauseMessage(), older.rootCauseMessage()), merged.setRootCauseMessage(coalesce(newer.getRootCauseMessage(), older.getRootCauseMessage()));
coalesce(newer.attributes(), older.attributes()), merged.setAttributes(coalesce(newer.getAttributes(), older.getAttributes()));
coalesce(newer.traceId(), older.traceId()), merged.setTraceId(coalesce(newer.getTraceId(), older.getTraceId()));
coalesce(newer.spanId(), older.spanId()), merged.setSpanId(coalesce(newer.getSpanId(), older.getSpanId()));
coalesce(newer.originalExchangeId(), older.originalExchangeId()), merged.setOriginalExchangeId(coalesce(newer.getOriginalExchangeId(), older.getOriginalExchangeId()));
coalesce(newer.replayExchangeId(), older.replayExchangeId()), merged.setReplayExchangeId(coalesce(newer.getReplayExchangeId(), older.getReplayExchangeId()));
Math.max(newer.chunkSeq(), older.chunkSeq()), merged.setChunkSeq(Math.max(newer.getChunkSeq(), older.getChunkSeq()));
newer.isFinal() || older.isFinal(), merged.setFinal(newer.isFinal() || older.isFinal());
List.of() // processors are handled separately merged.setProcessors(List.of()); // processors are handled separately
); return merged;
} }
private static <T> T coalesce(T a, T b) { private static <T> T coalesce(T a, T b) {
@@ -142,33 +142,33 @@ public class ChunkAccumulator {
return new MergedExecution( return new MergedExecution(
DEFAULT_TENANT, DEFAULT_TENANT,
1L, 1L,
envelope.exchangeId(), envelope.getExchangeId(),
envelope.routeId(), envelope.getRouteId(),
envelope.agentId(), envelope.getAgentId(),
envelope.applicationName(), envelope.getApplicationName(),
envelope.status(), envelope.getStatus() != null ? envelope.getStatus().name() : "RUNNING",
envelope.correlationId(), envelope.getCorrelationId(),
envelope.exchangeId(), envelope.getExchangeId(),
envelope.startTime(), envelope.getStartTime(),
envelope.endTime(), envelope.getEndTime(),
envelope.durationMs(), envelope.getDurationMs(),
envelope.errorMessage(), envelope.getErrorMessage(),
envelope.errorStackTrace(), envelope.getErrorStackTrace(),
envelope.errorType(), envelope.getErrorType(),
envelope.errorCategory(), envelope.getErrorCategory(),
envelope.rootCauseType(), envelope.getRootCauseType(),
envelope.rootCauseMessage(), envelope.getRootCauseMessage(),
"", // diagramContentHash — server-side lookup, not in chunk "", // diagramContentHash — server-side lookup, not in chunk
envelope.engineLevel(), envelope.getEngineLevel(),
"", // inputBody — on processor records now "", // inputBody — on processor records now
"", // outputBody "", // outputBody
"", // inputHeaders "", // inputHeaders
"", // outputHeaders "", // outputHeaders
serializeAttributes(envelope.attributes()), serializeAttributes(envelope.getAttributes()),
envelope.traceId(), envelope.getTraceId(),
envelope.spanId(), envelope.getSpanId(),
false, // hasTraceData — not tracked at envelope level false, // hasTraceData — not tracked at envelope level
envelope.replayExchangeId() != null // isReplay envelope.getReplayExchangeId() != null // isReplay
); );
} }

View File

@@ -1,42 +0,0 @@
package com.cameleer3.server.core.storage.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
import java.util.List;
import java.util.Map;
/**
* Chunk document: exchange envelope + list of FlatProcessorRecords.
* Mirrors cameleer3-common ExecutionChunk — replace with common lib import when available.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public record ExecutionChunk(
String exchangeId,
String applicationName,
String agentId,
String routeId,
String correlationId,
String status,
Instant startTime,
Instant endTime,
Long durationMs,
String engineLevel,
String errorMessage,
String errorStackTrace,
String errorType,
String errorCategory,
String rootCauseType,
String rootCauseMessage,
Map<String, String> attributes,
String traceId,
String spanId,
String originalExchangeId,
String replayExchangeId,
int chunkSeq,
@JsonProperty("final") boolean isFinal,
List<FlatProcessorRecord> processors
) {}

View File

@@ -1,42 +0,0 @@
package com.cameleer3.server.core.storage.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.time.Instant;
import java.util.Map;
/**
* Flat processor execution record with seq/parentSeq for tree reconstruction.
* Mirrors cameleer3-common FlatProcessorRecord — replace with common lib import when available.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public record FlatProcessorRecord(
int seq,
Integer parentSeq,
String parentProcessorId,
String processorId,
String processorType,
Integer iteration,
Integer iterationSize,
String status,
Instant startTime,
long durationMs,
String resolvedEndpointUri,
String inputBody,
String outputBody,
Map<String, String> inputHeaders,
Map<String, String> outputHeaders,
String errorMessage,
String errorStackTrace,
String errorType,
String errorCategory,
String rootCauseType,
String rootCauseMessage,
Map<String, String> attributes,
String circuitBreakerState,
Boolean fallbackTriggered,
Boolean filterMatched,
Boolean duplicateMessage
) {}

View File

@@ -1,7 +1,8 @@
package com.cameleer3.server.core.ingestion; package com.cameleer3.server.core.ingestion;
import com.cameleer3.server.core.storage.model.ExecutionChunk; import com.cameleer3.common.model.ExecutionChunk;
import com.cameleer3.server.core.storage.model.FlatProcessorRecord; import com.cameleer3.common.model.ExecutionStatus;
import com.cameleer3.common.model.FlatProcessorRecord;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -29,17 +30,15 @@ class ChunkAccumulatorTest {
@Test @Test
void singleFinalChunk_producesExecutionAndProcessors() { void singleFinalChunk_producesExecutionAndProcessors() {
ExecutionChunk chunk = new ExecutionChunk( ExecutionChunk chunk = chunk("ex-1", "COMPLETED",
"ex-1", "order-service", "agent-1", "route-1",
"corr-1", "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:01Z"), 1000L, Instant.parse("2026-03-31T10:00:01Z"), 1000L,
"REGULAR",
null, null, null, null, null, null,
Map.of("orderId", "ORD-1"),
"trace-1", "span-1", null, null,
0, true, 0, true,
List.of(proc(1, null, "log1", "log", "COMPLETED", 5L))); List.of(proc(1, null, "log1", "log", "COMPLETED", 5L)));
chunk.setCorrelationId("corr-1");
chunk.setAttributes(Map.of("orderId", "ORD-1"));
chunk.setTraceId("trace-1");
chunk.setSpanId("span-1");
accumulator.onChunk(chunk); accumulator.onChunk(chunk);
@@ -69,17 +68,14 @@ class ChunkAccumulatorTest {
@Test @Test
void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() { void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() {
ExecutionChunk chunk0 = new ExecutionChunk( ExecutionChunk chunk0 = chunk("ex-2", "RUNNING",
"ex-2", "app", "agent-1", "route-1",
"ex-2", "RUNNING",
Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00Z"),
null, null, "REGULAR", null, null,
null, null, null, null, null, null,
null, null, null, null, null,
0, false, 0, false,
List.of( List.of(
proc(1, null, "log1", "log", "COMPLETED", 5L), proc(1, null, "log1", "log", "COMPLETED", 5L),
proc(2, null, "log2", "log", "COMPLETED", 3L))); proc(2, null, "log2", "log", "COMPLETED", 3L)));
chunk0.setCorrelationId("ex-2");
accumulator.onChunk(chunk0); accumulator.onChunk(chunk0);
@@ -90,16 +86,12 @@ class ChunkAccumulatorTest {
// No execution yet (not final) // No execution yet (not final)
assertThat(executionSink).isEmpty(); assertThat(executionSink).isEmpty();
ExecutionChunk chunk1 = new ExecutionChunk( ExecutionChunk chunk1 = chunk("ex-2", "COMPLETED",
"ex-2", "app", "agent-1", "route-1",
"ex-2", "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:02Z"), 2000L, Instant.parse("2026-03-31T10:00:02Z"), 2000L,
"REGULAR",
null, null, null, null, null, null,
null, null, null, null, null,
1, true, 1, true,
List.of(proc(3, null, "log3", "log", "COMPLETED", 7L))); List.of(proc(3, null, "log3", "log", "COMPLETED", 7L)));
chunk1.setCorrelationId("ex-2");
accumulator.onChunk(chunk1); accumulator.onChunk(chunk1);
@@ -119,17 +111,14 @@ class ChunkAccumulatorTest {
ChunkAccumulator staleAccumulator = new ChunkAccumulator( ChunkAccumulator staleAccumulator = new ChunkAccumulator(
executionSink::add, processorSink::add, Duration.ofMillis(1)); executionSink::add, processorSink::add, Duration.ofMillis(1));
ExecutionChunk chunk = new ExecutionChunk( ExecutionChunk c = chunk("ex-3", "RUNNING",
"ex-3", "app", "agent-1", "route-1",
"ex-3", "RUNNING",
Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00Z"),
null, null, "REGULAR", null, null,
null, null, null, null, null, null,
null, null, null, null, null,
0, false, 0, false,
List.of()); List.of());
c.setCorrelationId("ex-3");
staleAccumulator.onChunk(chunk); staleAccumulator.onChunk(c);
assertThat(executionSink).isEmpty(); assertThat(executionSink).isEmpty();
Thread.sleep(5); Thread.sleep(5);
@@ -143,20 +132,20 @@ class ChunkAccumulatorTest {
@Test @Test
void finalChunkWithErrors_populatesErrorFields() { void finalChunkWithErrors_populatesErrorFields() {
ExecutionChunk chunk = new ExecutionChunk( ExecutionChunk c = chunk("ex-4", "FAILED",
"ex-4", "app", "agent-1", "route-1",
"ex-4", "FAILED",
Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:01Z"), 1000L, Instant.parse("2026-03-31T10:00:01Z"), 1000L,
"REGULAR",
"NullPointerException", "at com.foo.Bar.baz(Bar.java:42)",
"NullPointerException", "RUNTIME",
"NullPointerException", "null value at index 0",
null, null, null, null, null,
0, true, 0, true,
List.of()); List.of());
c.setCorrelationId("ex-4");
c.setErrorMessage("NullPointerException");
c.setErrorStackTrace("at com.foo.Bar.baz(Bar.java:42)");
c.setErrorType("NullPointerException");
c.setErrorCategory("RUNTIME");
c.setRootCauseType("NullPointerException");
c.setRootCauseMessage("null value at index 0");
accumulator.onChunk(chunk); accumulator.onChunk(c);
assertThat(executionSink).hasSize(1); assertThat(executionSink).hasSize(1);
MergedExecution exec = executionSink.get(0); MergedExecution exec = executionSink.get(0);
@@ -171,56 +160,66 @@ class ChunkAccumulatorTest {
@Test @Test
void getPendingCount_tracksBufferedExchanges() { void getPendingCount_tracksBufferedExchanges() {
ExecutionChunk running1 = new ExecutionChunk( ExecutionChunk running1 = chunk("ex-5", "RUNNING",
"ex-5", "app", "agent-1", "route-1",
"ex-5", "RUNNING",
Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00Z"),
null, null, "REGULAR", null, null,
null, null, null, null, null, null,
null, null, null, null, null,
0, false, 0, false,
List.of()); List.of());
running1.setCorrelationId("ex-5");
ExecutionChunk running2 = new ExecutionChunk( ExecutionChunk running2 = chunk("ex-6", "RUNNING",
"ex-6", "app", "agent-1", "route-2",
"ex-6", "RUNNING",
Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00Z"),
null, null, "REGULAR", null, null,
null, null, null, null, null, null,
null, null, null, null, null,
0, false, 0, false,
List.of()); List.of());
running2.setCorrelationId("ex-6");
running2.setRouteId("route-2");
accumulator.onChunk(running1); accumulator.onChunk(running1);
accumulator.onChunk(running2); accumulator.onChunk(running2);
assertThat(accumulator.getPendingCount()).isEqualTo(2); assertThat(accumulator.getPendingCount()).isEqualTo(2);
// Send final for ex-5 // Send final for ex-5
ExecutionChunk final5 = new ExecutionChunk( ExecutionChunk final5 = chunk("ex-5", "COMPLETED",
"ex-5", "app", "agent-1", "route-1",
"ex-5", "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:01Z"), 1000L, Instant.parse("2026-03-31T10:00:01Z"), 1000L,
"REGULAR",
null, null, null, null, null, null,
null, null, null, null, null,
1, true, 1, true,
List.of()); List.of());
final5.setCorrelationId("ex-5");
accumulator.onChunk(final5); accumulator.onChunk(final5);
assertThat(accumulator.getPendingCount()).isEqualTo(1); assertThat(accumulator.getPendingCount()).isEqualTo(1);
} }
/** Helper to create an ExecutionChunk with common fields. */
private static ExecutionChunk chunk(String exchangeId, String status, Instant start, Instant end, Long duration,
int chunkSeq, boolean isFinal, List<FlatProcessorRecord> processors) {
ExecutionChunk c = new ExecutionChunk();
c.setExchangeId(exchangeId);
c.setApplicationName(exchangeId.equals("ex-1") ? "order-service" : "app");
c.setAgentId("agent-1");
c.setRouteId("route-1");
c.setCorrelationId(null);
c.setStatus(ExecutionStatus.valueOf(status));
c.setStartTime(start);
c.setEndTime(end);
c.setDurationMs(duration);
c.setEngineLevel("REGULAR");
c.setChunkSeq(chunkSeq);
c.setFinal(isFinal);
c.setProcessors(processors);
return c;
}
/** Helper to create a FlatProcessorRecord with minimal fields. */ /** Helper to create a FlatProcessorRecord with minimal fields. */
private static FlatProcessorRecord proc(int seq, Integer parentSeq, private static FlatProcessorRecord proc(int seq, Integer parentSeq,
String processorId, String processorType, String processorId, String processorType,
String status, long durationMs) { String status, long durationMs) {
return new FlatProcessorRecord( FlatProcessorRecord p = new FlatProcessorRecord(seq, processorId, processorType);
seq, parentSeq, null, processorId, processorType, p.setParentSeq(parentSeq);
null, null, status, p.setStatus(ExecutionStatus.valueOf(status));
Instant.parse("2026-03-31T10:00:00.100Z"), durationMs, p.setStartTime(Instant.parse("2026-03-31T10:00:00.100Z"));
null, null, null, null, null, p.setDurationMs(durationMs);
null, null, null, null, null, null, return p;
null, null, null, null, null);
} }
} }

View File

@@ -1,109 +0,0 @@
package com.cameleer3.server.core.storage.model;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class ExecutionChunkDeserializationTest {
private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule());
@Test
void roundTrip_fullChunk() throws Exception {
ExecutionChunk chunk = new ExecutionChunk(
"ex-1", "order-service", "pod-1", "order-route",
"corr-1", "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:01Z"), 1000L,
"REGULAR",
null, null, null, null, null, null,
Map.of("orderId", "ORD-1"),
"trace-1", "span-1", null, null,
2, true,
List.of(new FlatProcessorRecord(
1, null, null, "log1", "log",
null, null, "COMPLETED",
Instant.parse("2026-03-31T10:00:00.100Z"), 5L,
null, "body", null, null, null,
null, null, null, null, null, null,
null, null, null, null, null)));
String json = MAPPER.writeValueAsString(chunk);
ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class);
assertThat(deserialized.exchangeId()).isEqualTo("ex-1");
assertThat(deserialized.isFinal()).isTrue();
assertThat(deserialized.chunkSeq()).isEqualTo(2);
assertThat(deserialized.processors()).hasSize(1);
assertThat(deserialized.processors().get(0).seq()).isEqualTo(1);
assertThat(deserialized.processors().get(0).processorId()).isEqualTo("log1");
assertThat(deserialized.attributes()).containsEntry("orderId", "ORD-1");
}
@Test
void roundTrip_runningChunkWithIterations() throws Exception {
ExecutionChunk chunk = new ExecutionChunk(
"ex-2", "app", "agent-1", "route-1",
"ex-2", "RUNNING",
Instant.parse("2026-03-31T10:00:00Z"),
null, null, "REGULAR",
null, null, null, null, null, null,
null, null, null, null, null,
0, false,
List.of(
new FlatProcessorRecord(
1, null, null, "split1", "split",
null, 3, "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 100L,
null, null, null, null, null,
null, null, null, null, null, null,
null, null, null, null, null),
new FlatProcessorRecord(
2, 1, "split1", "log1", "log",
0, null, "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 5L,
null, "item-0", null, null, null,
null, null, null, null, null, null,
null, null, null, null, null),
new FlatProcessorRecord(
3, 1, "split1", "log1", "log",
1, null, "COMPLETED",
Instant.parse("2026-03-31T10:00:00Z"), 5L,
null, "item-1", null, null, null,
null, null, null, null, null, null,
null, null, null, null, null)));
String json = MAPPER.writeValueAsString(chunk);
ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class);
assertThat(deserialized.isFinal()).isFalse();
assertThat(deserialized.processors()).hasSize(3);
FlatProcessorRecord split = deserialized.processors().get(0);
assertThat(split.iterationSize()).isEqualTo(3);
assertThat(split.parentSeq()).isNull();
FlatProcessorRecord child0 = deserialized.processors().get(1);
assertThat(child0.parentSeq()).isEqualTo(1);
assertThat(child0.parentProcessorId()).isEqualTo("split1");
assertThat(child0.iteration()).isEqualTo(0);
}
@Test
void deserialize_unknownFieldsIgnored() throws Exception {
String json = """
{"exchangeId":"ex-1","routeId":"r1","status":"COMPLETED",
"startTime":"2026-03-31T10:00:00Z","chunkSeq":0,"final":true,
"futureField":"ignored","processors":[]}
""";
ExecutionChunk chunk = MAPPER.readValue(json, ExecutionChunk.class);
assertThat(chunk.exchangeId()).isEqualTo("ex-1");
assertThat(chunk.isFinal()).isTrue();
}
}