From 07f215b0fdb093b7afa741d9dd6832c08fd82692 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 19:33:49 +0200 Subject: [PATCH] refactor: replace server-side DTOs with cameleer3-common ExecutionChunk and FlatProcessorRecord Co-Authored-By: Claude Opus 4.6 (1M context) --- .../controller/ChunkIngestionController.java | 2 +- .../app/storage/ClickHouseExecutionStore.java | 56 ++++---- .../app/search/ClickHouseSearchIndexIT.java | 21 ++- .../storage/ClickHouseChunkPipelineIT.java | 110 ++++++++++------ .../storage/ClickHouseExecutionStoreIT.java | 99 +++++++------- .../core/ingestion/ChunkAccumulator.java | 114 ++++++++-------- .../core/storage/model/ExecutionChunk.java | 42 ------ .../storage/model/FlatProcessorRecord.java | 42 ------ .../core/ingestion/ChunkAccumulatorTest.java | 123 +++++++++--------- .../ExecutionChunkDeserializationTest.java | 109 ---------------- 10 files changed, 270 insertions(+), 448 deletions(-) delete mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java delete mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java delete mode 100644 cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java index 21ed7602..24cace8a 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java @@ -1,7 +1,7 @@ package com.cameleer3.server.app.controller; import com.cameleer3.server.core.ingestion.ChunkAccumulator; -import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.cameleer3.common.model.ExecutionChunk; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java index 287bd18c..dc64b148 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java @@ -1,7 +1,7 @@ package com.cameleer3.server.app.storage; import com.cameleer3.server.core.ingestion.MergedExecution; -import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.cameleer3.common.model.FlatProcessorRecord; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.jdbc.core.JdbcTemplate; @@ -93,35 +93,35 @@ public class ClickHouseExecutionStore { processors.stream().map(p -> new Object[]{ nullToEmpty(tenantId), nullToEmpty(executionId), - p.seq(), - p.parentSeq(), - nullToEmpty(p.parentProcessorId()), - nullToEmpty(p.processorId()), - nullToEmpty(p.processorType()), - Timestamp.from(p.startTime() != null ? p.startTime() : execStartTime), + p.getSeq(), + p.getParentSeq(), + nullToEmpty(p.getParentProcessorId()), + nullToEmpty(p.getProcessorId()), + nullToEmpty(p.getProcessorType()), + Timestamp.from(p.getStartTime() != null ? p.getStartTime() : execStartTime), nullToEmpty(routeId), nullToEmpty(applicationName), - p.iteration(), - p.iterationSize(), - nullToEmpty(p.status()), - computeEndTime(p.startTime(), p.durationMs()), - p.durationMs(), - nullToEmpty(p.errorMessage()), - nullToEmpty(p.errorStackTrace()), - nullToEmpty(p.errorType()), - nullToEmpty(p.errorCategory()), - nullToEmpty(p.rootCauseType()), - nullToEmpty(p.rootCauseMessage()), - nullToEmpty(p.inputBody()), - nullToEmpty(p.outputBody()), - mapToJson(p.inputHeaders()), - mapToJson(p.outputHeaders()), - mapToJson(p.attributes()), - nullToEmpty(p.resolvedEndpointUri()), - nullToEmpty(p.circuitBreakerState()), - boolOrFalse(p.fallbackTriggered()), - boolOrFalse(p.filterMatched()), - boolOrFalse(p.duplicateMessage()) + p.getIteration(), + p.getIterationSize(), + p.getStatus() != null ? p.getStatus().name() : "", + computeEndTime(p.getStartTime(), p.getDurationMs()), + p.getDurationMs(), + nullToEmpty(p.getErrorMessage()), + nullToEmpty(p.getErrorStackTrace()), + nullToEmpty(p.getErrorType()), + nullToEmpty(p.getErrorCategory()), + nullToEmpty(p.getRootCauseType()), + nullToEmpty(p.getRootCauseMessage()), + nullToEmpty(p.getInputBody()), + nullToEmpty(p.getOutputBody()), + mapToJson(p.getInputHeaders()), + mapToJson(p.getOutputHeaders()), + mapToJson(p.getAttributes()), + nullToEmpty(p.getResolvedEndpointUri()), + nullToEmpty(p.getCircuitBreakerState()), + boolOrFalse(p.getFallbackTriggered()), + boolOrFalse(p.getFilterMatched()), + boolOrFalse(p.getDuplicateMessage()) }).toList()); } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java index 31a9580c..4cb2de53 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java @@ -5,7 +5,8 @@ import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.search.ExecutionSummary; import com.cameleer3.server.core.search.SearchRequest; import com.cameleer3.server.core.search.SearchResult; -import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -106,17 +107,13 @@ class ClickHouseSearchIndexIT { store.insertExecutionBatch(List.of(exec1, exec2, exec3)); // Processor for exec-1: seq=1, to, inputBody with "Hello World", inputHeaders with secret-token - FlatProcessorRecord proc1 = new FlatProcessorRecord( - 1, null, null, - "proc-1", "to", null, null, - "COMPLETED", - baseTime, 50L, - null, - "Hello World request body", "", - Map.of("Authorization", "Bearer secret-token"), null, - null, null, null, null, null, null, - null, null, null, null, null - ); + FlatProcessorRecord proc1 = new FlatProcessorRecord(1, "proc-1", "to"); + proc1.setStatus(ExecutionStatus.COMPLETED); + proc1.setStartTime(baseTime); + proc1.setDurationMs(50L); + proc1.setInputBody("Hello World request body"); + proc1.setOutputBody(""); + proc1.setInputHeaders(Map.of("Authorization", "Bearer secret-token")); store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app", baseTime, List.of(proc1)); } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java index 219f836b..2227ad7f 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java @@ -6,8 +6,9 @@ import com.cameleer3.server.core.ingestion.MergedExecution; import com.cameleer3.server.core.search.ExecutionSummary; import com.cameleer3.server.core.search.SearchRequest; import com.cameleer3.server.core.search.SearchResult; -import com.cameleer3.server.core.storage.model.ExecutionChunk; -import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.cameleer3.common.model.ExecutionChunk; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -70,53 +71,76 @@ class ClickHouseChunkPipelineIT { Instant start = Instant.parse("2026-03-31T12:00:00Z"); // Chunk 0: RUNNING with initial processors - accumulator.onChunk(new ExecutionChunk( - "pipeline-1", "order-service", "pod-1", "order-route", - "corr-1", "RUNNING", - start, null, null, "DEEP", - null, null, null, null, null, null, - Map.of("orderId", "ORD-123"), - null, null, null, null, - 0, false, - List.of( - new FlatProcessorRecord(1, null, null, "log1", "log", - null, null, "COMPLETED", start, 2L, - null, null, null, null, null, - null, null, null, null, null, null, - null, null, null, null, null), - new FlatProcessorRecord(2, null, null, "split1", "split", - null, 3, "COMPLETED", start.plusMillis(2), 100L, - null, null, null, null, null, - null, null, null, null, null, null, - null, null, null, null, null), - new FlatProcessorRecord(3, 2, "split1", "to1", "to", - 0, null, "COMPLETED", start.plusMillis(5), 30L, - "http://inventory/api", - "order ABC-123 check stock", "stock available", - null, null, - null, null, null, null, null, null, - null, null, null, null, null)))); + ExecutionChunk chunk0 = new ExecutionChunk(); + chunk0.setExchangeId("pipeline-1"); + chunk0.setApplicationName("order-service"); + chunk0.setAgentId("pod-1"); + chunk0.setRouteId("order-route"); + chunk0.setCorrelationId("corr-1"); + chunk0.setStatus(ExecutionStatus.RUNNING); + chunk0.setStartTime(start); + chunk0.setEngineLevel("DEEP"); + chunk0.setAttributes(Map.of("orderId", "ORD-123")); + chunk0.setChunkSeq(0); + chunk0.setFinal(false); + + FlatProcessorRecord p1 = new FlatProcessorRecord(1, "log1", "log"); + p1.setStatus(ExecutionStatus.COMPLETED); + p1.setStartTime(start); + p1.setDurationMs(2L); + + FlatProcessorRecord p2 = new FlatProcessorRecord(2, "split1", "split"); + p2.setIterationSize(3); + p2.setStatus(ExecutionStatus.COMPLETED); + p2.setStartTime(start.plusMillis(2)); + p2.setDurationMs(100L); + + FlatProcessorRecord p3 = new FlatProcessorRecord(3, "to1", "to"); + p3.setParentSeq(2); + p3.setParentProcessorId("split1"); + p3.setIteration(0); + p3.setStatus(ExecutionStatus.COMPLETED); + p3.setStartTime(start.plusMillis(5)); + p3.setDurationMs(30L); + p3.setResolvedEndpointUri("http://inventory/api"); + p3.setInputBody("order ABC-123 check stock"); + p3.setOutputBody("stock available"); + + chunk0.setProcessors(List.of(p1, p2, p3)); + accumulator.onChunk(chunk0); // Processors should be buffered immediately assertThat(processorBuffer).hasSize(1); assertThat(executionBuffer).isEmpty(); // Chunk 1: COMPLETED (final) - accumulator.onChunk(new ExecutionChunk( - "pipeline-1", "order-service", "pod-1", "order-route", - "corr-1", "COMPLETED", - start, start.plusMillis(750), 750L, "DEEP", - null, null, null, null, null, null, - null, null, null, null, null, - 1, true, - List.of( - new FlatProcessorRecord(4, 2, "split1", "to1", "to", - 1, null, "COMPLETED", start.plusMillis(40), 25L, - "http://inventory/api", - "order DEF-456 check stock", "stock available", - null, null, - null, null, null, null, null, null, - null, null, null, null, null)))); + ExecutionChunk chunk1 = new ExecutionChunk(); + chunk1.setExchangeId("pipeline-1"); + chunk1.setApplicationName("order-service"); + chunk1.setAgentId("pod-1"); + chunk1.setRouteId("order-route"); + chunk1.setCorrelationId("corr-1"); + chunk1.setStatus(ExecutionStatus.COMPLETED); + chunk1.setStartTime(start); + chunk1.setEndTime(start.plusMillis(750)); + chunk1.setDurationMs(750L); + chunk1.setEngineLevel("DEEP"); + chunk1.setChunkSeq(1); + chunk1.setFinal(true); + + FlatProcessorRecord p4 = new FlatProcessorRecord(4, "to1", "to"); + p4.setParentSeq(2); + p4.setParentProcessorId("split1"); + p4.setIteration(1); + p4.setStatus(ExecutionStatus.COMPLETED); + p4.setStartTime(start.plusMillis(40)); + p4.setDurationMs(25L); + p4.setResolvedEndpointUri("http://inventory/api"); + p4.setInputBody("order DEF-456 check stock"); + p4.setOutputBody("stock available"); + + chunk1.setProcessors(List.of(p4)); + accumulator.onChunk(chunk1); assertThat(executionBuffer).hasSize(1); assertThat(processorBuffer).hasSize(2); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java index 0904507e..8b8ede77 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java @@ -1,7 +1,8 @@ package com.cameleer3.server.app.storage; import com.cameleer3.server.core.ingestion.MergedExecution; -import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -80,18 +81,16 @@ class ClickHouseExecutionStoreIT { @Test void insertProcessorBatch_writesToClickHouse() { - FlatProcessorRecord proc = new FlatProcessorRecord( - 1, null, null, - "proc-1", "to", null, null, - "COMPLETED", - Instant.parse("2026-03-31T10:00:00Z"), 50L, - "http://example.com", - "input body", "output body", - Map.of("h1", "v1"), Map.of("h2", "v2"), - null, null, null, null, null, null, - Map.of("a1", "v1"), - null, null, null, null - ); + FlatProcessorRecord proc = new FlatProcessorRecord(1, "proc-1", "to"); + proc.setStatus(ExecutionStatus.COMPLETED); + proc.setStartTime(Instant.parse("2026-03-31T10:00:00Z")); + proc.setDurationMs(50L); + proc.setResolvedEndpointUri("http://example.com"); + proc.setInputBody("input body"); + proc.setOutputBody("output body"); + proc.setInputHeaders(Map.of("h1", "v1")); + proc.setOutputHeaders(Map.of("h2", "v2")); + proc.setAttributes(Map.of("a1", "v1")); store.insertProcessorBatch( "default", "exec-1", "route-a", "my-app", @@ -112,48 +111,44 @@ class ClickHouseExecutionStoreIT { @Test void insertProcessorBatch_withIterations() { - FlatProcessorRecord splitContainer = new FlatProcessorRecord( - 1, null, null, - "split-1", "split", null, 3, - "COMPLETED", - Instant.parse("2026-03-31T10:00:00Z"), 300L, - null, null, null, null, null, - null, null, null, null, null, null, - null, null, null, null, null - ); + FlatProcessorRecord splitContainer = new FlatProcessorRecord(1, "split-1", "split"); + splitContainer.setIterationSize(3); + splitContainer.setStatus(ExecutionStatus.COMPLETED); + splitContainer.setStartTime(Instant.parse("2026-03-31T10:00:00Z")); + splitContainer.setDurationMs(300L); - FlatProcessorRecord child0 = new FlatProcessorRecord( - 2, 1, "split-1", - "child-proc", "to", 0, null, - "COMPLETED", - Instant.parse("2026-03-31T10:00:00.100Z"), 80L, - "http://svc-a", "body0", "out0", - null, null, - null, null, null, null, null, null, - null, null, null, null, null - ); + FlatProcessorRecord child0 = new FlatProcessorRecord(2, "child-proc", "to"); + child0.setParentSeq(1); + child0.setParentProcessorId("split-1"); + child0.setIteration(0); + child0.setStatus(ExecutionStatus.COMPLETED); + child0.setStartTime(Instant.parse("2026-03-31T10:00:00.100Z")); + child0.setDurationMs(80L); + child0.setResolvedEndpointUri("http://svc-a"); + child0.setInputBody("body0"); + child0.setOutputBody("out0"); - FlatProcessorRecord child1 = new FlatProcessorRecord( - 3, 1, "split-1", - "child-proc", "to", 1, null, - "COMPLETED", - Instant.parse("2026-03-31T10:00:00.200Z"), 90L, - "http://svc-a", "body1", "out1", - null, null, - null, null, null, null, null, null, - null, null, null, null, null - ); + FlatProcessorRecord child1 = new FlatProcessorRecord(3, "child-proc", "to"); + child1.setParentSeq(1); + child1.setParentProcessorId("split-1"); + child1.setIteration(1); + child1.setStatus(ExecutionStatus.COMPLETED); + child1.setStartTime(Instant.parse("2026-03-31T10:00:00.200Z")); + child1.setDurationMs(90L); + child1.setResolvedEndpointUri("http://svc-a"); + child1.setInputBody("body1"); + child1.setOutputBody("out1"); - FlatProcessorRecord child2 = new FlatProcessorRecord( - 4, 1, "split-1", - "child-proc", "to", 2, null, - "COMPLETED", - Instant.parse("2026-03-31T10:00:00.300Z"), 100L, - "http://svc-a", "body2", "out2", - null, null, - null, null, null, null, null, null, - null, null, null, null, null - ); + FlatProcessorRecord child2 = new FlatProcessorRecord(4, "child-proc", "to"); + child2.setParentSeq(1); + child2.setParentProcessorId("split-1"); + child2.setIteration(2); + child2.setStatus(ExecutionStatus.COMPLETED); + child2.setStartTime(Instant.parse("2026-03-31T10:00:00.300Z")); + child2.setDurationMs(100L); + child2.setResolvedEndpointUri("http://svc-a"); + child2.setInputBody("body2"); + child2.setOutputBody("out2"); store.insertProcessorBatch( "default", "exec-2", "route-b", "my-app", diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java index 35eccbb9..dcc7d486 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java @@ -1,7 +1,7 @@ package com.cameleer3.server.core.ingestion; -import com.cameleer3.server.core.storage.model.ExecutionChunk; -import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.cameleer3.common.model.ExecutionChunk; +import com.cameleer3.common.model.FlatProcessorRecord; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; @@ -46,27 +46,27 @@ public class ChunkAccumulator { */ public void onChunk(ExecutionChunk chunk) { // 1. Push processor records immediately (append-only) - if (chunk.processors() != null && !chunk.processors().isEmpty()) { + if (chunk.getProcessors() != null && !chunk.getProcessors().isEmpty()) { processorSink.accept(new ProcessorBatch( DEFAULT_TENANT, - chunk.exchangeId(), - chunk.routeId(), - chunk.applicationName(), - chunk.startTime(), - chunk.processors())); + chunk.getExchangeId(), + chunk.getRouteId(), + chunk.getApplicationName(), + chunk.getStartTime(), + chunk.getProcessors())); } // 2. Buffer/merge the exchange envelope if (chunk.isFinal()) { // Merge with any pending envelope, then emit - PendingExchange existing = pending.remove(chunk.exchangeId()); + PendingExchange existing = pending.remove(chunk.getExchangeId()); ExecutionChunk merged = existing != null ? mergeEnvelopes(existing.envelope(), chunk) : chunk; executionSink.accept(toMergedExecution(merged)); } else { // Buffer the envelope for later merging - pending.merge(chunk.exchangeId(), + pending.merge(chunk.getExchangeId(), new PendingExchange(chunk, Instant.now()), (old, incoming) -> new PendingExchange( mergeEnvelopes(old.envelope(), incoming.envelope()), @@ -104,32 +104,32 @@ public class ChunkAccumulator { * The newer chunk (higher chunkSeq) takes precedence for status, endTime, durationMs. */ private static ExecutionChunk mergeEnvelopes(ExecutionChunk older, ExecutionChunk newer) { - return new ExecutionChunk( - coalesce(newer.exchangeId(), older.exchangeId()), - coalesce(newer.applicationName(), older.applicationName()), - coalesce(newer.agentId(), older.agentId()), - coalesce(newer.routeId(), older.routeId()), - coalesce(newer.correlationId(), older.correlationId()), - coalesce(newer.status(), older.status()), - coalesce(older.startTime(), newer.startTime()), // prefer earliest startTime - coalesce(newer.endTime(), older.endTime()), - coalesce(newer.durationMs(), older.durationMs()), - coalesce(newer.engineLevel(), older.engineLevel()), - coalesce(newer.errorMessage(), older.errorMessage()), - coalesce(newer.errorStackTrace(), older.errorStackTrace()), - coalesce(newer.errorType(), older.errorType()), - coalesce(newer.errorCategory(), older.errorCategory()), - coalesce(newer.rootCauseType(), older.rootCauseType()), - coalesce(newer.rootCauseMessage(), older.rootCauseMessage()), - coalesce(newer.attributes(), older.attributes()), - coalesce(newer.traceId(), older.traceId()), - coalesce(newer.spanId(), older.spanId()), - coalesce(newer.originalExchangeId(), older.originalExchangeId()), - coalesce(newer.replayExchangeId(), older.replayExchangeId()), - Math.max(newer.chunkSeq(), older.chunkSeq()), - newer.isFinal() || older.isFinal(), - List.of() // processors are handled separately - ); + ExecutionChunk merged = new ExecutionChunk(); + merged.setExchangeId(coalesce(newer.getExchangeId(), older.getExchangeId())); + merged.setApplicationName(coalesce(newer.getApplicationName(), older.getApplicationName())); + merged.setAgentId(coalesce(newer.getAgentId(), older.getAgentId())); + merged.setRouteId(coalesce(newer.getRouteId(), older.getRouteId())); + merged.setCorrelationId(coalesce(newer.getCorrelationId(), older.getCorrelationId())); + merged.setStatus(coalesce(newer.getStatus(), older.getStatus())); + merged.setStartTime(coalesce(older.getStartTime(), newer.getStartTime())); // prefer earliest startTime + merged.setEndTime(coalesce(newer.getEndTime(), older.getEndTime())); + merged.setDurationMs(coalesce(newer.getDurationMs(), older.getDurationMs())); + merged.setEngineLevel(coalesce(newer.getEngineLevel(), older.getEngineLevel())); + merged.setErrorMessage(coalesce(newer.getErrorMessage(), older.getErrorMessage())); + merged.setErrorStackTrace(coalesce(newer.getErrorStackTrace(), older.getErrorStackTrace())); + merged.setErrorType(coalesce(newer.getErrorType(), older.getErrorType())); + merged.setErrorCategory(coalesce(newer.getErrorCategory(), older.getErrorCategory())); + merged.setRootCauseType(coalesce(newer.getRootCauseType(), older.getRootCauseType())); + merged.setRootCauseMessage(coalesce(newer.getRootCauseMessage(), older.getRootCauseMessage())); + merged.setAttributes(coalesce(newer.getAttributes(), older.getAttributes())); + merged.setTraceId(coalesce(newer.getTraceId(), older.getTraceId())); + merged.setSpanId(coalesce(newer.getSpanId(), older.getSpanId())); + merged.setOriginalExchangeId(coalesce(newer.getOriginalExchangeId(), older.getOriginalExchangeId())); + merged.setReplayExchangeId(coalesce(newer.getReplayExchangeId(), older.getReplayExchangeId())); + merged.setChunkSeq(Math.max(newer.getChunkSeq(), older.getChunkSeq())); + merged.setFinal(newer.isFinal() || older.isFinal()); + merged.setProcessors(List.of()); // processors are handled separately + return merged; } private static T coalesce(T a, T b) { @@ -142,33 +142,33 @@ public class ChunkAccumulator { return new MergedExecution( DEFAULT_TENANT, 1L, - envelope.exchangeId(), - envelope.routeId(), - envelope.agentId(), - envelope.applicationName(), - envelope.status(), - envelope.correlationId(), - envelope.exchangeId(), - envelope.startTime(), - envelope.endTime(), - envelope.durationMs(), - envelope.errorMessage(), - envelope.errorStackTrace(), - envelope.errorType(), - envelope.errorCategory(), - envelope.rootCauseType(), - envelope.rootCauseMessage(), + envelope.getExchangeId(), + envelope.getRouteId(), + envelope.getAgentId(), + envelope.getApplicationName(), + envelope.getStatus() != null ? envelope.getStatus().name() : "RUNNING", + envelope.getCorrelationId(), + envelope.getExchangeId(), + envelope.getStartTime(), + envelope.getEndTime(), + envelope.getDurationMs(), + envelope.getErrorMessage(), + envelope.getErrorStackTrace(), + envelope.getErrorType(), + envelope.getErrorCategory(), + envelope.getRootCauseType(), + envelope.getRootCauseMessage(), "", // diagramContentHash — server-side lookup, not in chunk - envelope.engineLevel(), + envelope.getEngineLevel(), "", // inputBody — on processor records now "", // outputBody "", // inputHeaders "", // outputHeaders - serializeAttributes(envelope.attributes()), - envelope.traceId(), - envelope.spanId(), + serializeAttributes(envelope.getAttributes()), + envelope.getTraceId(), + envelope.getSpanId(), false, // hasTraceData — not tracked at envelope level - envelope.replayExchangeId() != null // isReplay + envelope.getReplayExchangeId() != null // isReplay ); } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java deleted file mode 100644 index 20066f09..00000000 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.cameleer3.server.core.storage.model; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.time.Instant; -import java.util.List; -import java.util.Map; - -/** - * Chunk document: exchange envelope + list of FlatProcessorRecords. - * Mirrors cameleer3-common ExecutionChunk — replace with common lib import when available. - */ -@JsonIgnoreProperties(ignoreUnknown = true) -@JsonInclude(JsonInclude.Include.NON_NULL) -public record ExecutionChunk( - String exchangeId, - String applicationName, - String agentId, - String routeId, - String correlationId, - String status, - Instant startTime, - Instant endTime, - Long durationMs, - String engineLevel, - String errorMessage, - String errorStackTrace, - String errorType, - String errorCategory, - String rootCauseType, - String rootCauseMessage, - Map attributes, - String traceId, - String spanId, - String originalExchangeId, - String replayExchangeId, - int chunkSeq, - @JsonProperty("final") boolean isFinal, - List processors -) {} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java deleted file mode 100644 index deb89221..00000000 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.cameleer3.server.core.storage.model; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonInclude; - -import java.time.Instant; -import java.util.Map; - -/** - * Flat processor execution record with seq/parentSeq for tree reconstruction. - * Mirrors cameleer3-common FlatProcessorRecord — replace with common lib import when available. - */ -@JsonIgnoreProperties(ignoreUnknown = true) -@JsonInclude(JsonInclude.Include.NON_NULL) -public record FlatProcessorRecord( - int seq, - Integer parentSeq, - String parentProcessorId, - String processorId, - String processorType, - Integer iteration, - Integer iterationSize, - String status, - Instant startTime, - long durationMs, - String resolvedEndpointUri, - String inputBody, - String outputBody, - Map inputHeaders, - Map outputHeaders, - String errorMessage, - String errorStackTrace, - String errorType, - String errorCategory, - String rootCauseType, - String rootCauseMessage, - Map attributes, - String circuitBreakerState, - Boolean fallbackTriggered, - Boolean filterMatched, - Boolean duplicateMessage -) {} diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java index fd9cdca8..75771697 100644 --- a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java +++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java @@ -1,7 +1,8 @@ package com.cameleer3.server.core.ingestion; -import com.cameleer3.server.core.storage.model.ExecutionChunk; -import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.cameleer3.common.model.ExecutionChunk; +import com.cameleer3.common.model.ExecutionStatus; +import com.cameleer3.common.model.FlatProcessorRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -29,17 +30,15 @@ class ChunkAccumulatorTest { @Test void singleFinalChunk_producesExecutionAndProcessors() { - ExecutionChunk chunk = new ExecutionChunk( - "ex-1", "order-service", "agent-1", "route-1", - "corr-1", "COMPLETED", + ExecutionChunk chunk = chunk("ex-1", "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:01Z"), 1000L, - "REGULAR", - null, null, null, null, null, null, - Map.of("orderId", "ORD-1"), - "trace-1", "span-1", null, null, 0, true, List.of(proc(1, null, "log1", "log", "COMPLETED", 5L))); + chunk.setCorrelationId("corr-1"); + chunk.setAttributes(Map.of("orderId", "ORD-1")); + chunk.setTraceId("trace-1"); + chunk.setSpanId("span-1"); accumulator.onChunk(chunk); @@ -69,17 +68,14 @@ class ChunkAccumulatorTest { @Test void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() { - ExecutionChunk chunk0 = new ExecutionChunk( - "ex-2", "app", "agent-1", "route-1", - "ex-2", "RUNNING", + ExecutionChunk chunk0 = chunk("ex-2", "RUNNING", Instant.parse("2026-03-31T10:00:00Z"), - null, null, "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, + null, null, 0, false, List.of( proc(1, null, "log1", "log", "COMPLETED", 5L), proc(2, null, "log2", "log", "COMPLETED", 3L))); + chunk0.setCorrelationId("ex-2"); accumulator.onChunk(chunk0); @@ -90,16 +86,12 @@ class ChunkAccumulatorTest { // No execution yet (not final) assertThat(executionSink).isEmpty(); - ExecutionChunk chunk1 = new ExecutionChunk( - "ex-2", "app", "agent-1", "route-1", - "ex-2", "COMPLETED", + ExecutionChunk chunk1 = chunk("ex-2", "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:02Z"), 2000L, - "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, 1, true, List.of(proc(3, null, "log3", "log", "COMPLETED", 7L))); + chunk1.setCorrelationId("ex-2"); accumulator.onChunk(chunk1); @@ -119,17 +111,14 @@ class ChunkAccumulatorTest { ChunkAccumulator staleAccumulator = new ChunkAccumulator( executionSink::add, processorSink::add, Duration.ofMillis(1)); - ExecutionChunk chunk = new ExecutionChunk( - "ex-3", "app", "agent-1", "route-1", - "ex-3", "RUNNING", + ExecutionChunk c = chunk("ex-3", "RUNNING", Instant.parse("2026-03-31T10:00:00Z"), - null, null, "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, + null, null, 0, false, List.of()); + c.setCorrelationId("ex-3"); - staleAccumulator.onChunk(chunk); + staleAccumulator.onChunk(c); assertThat(executionSink).isEmpty(); Thread.sleep(5); @@ -143,20 +132,20 @@ class ChunkAccumulatorTest { @Test void finalChunkWithErrors_populatesErrorFields() { - ExecutionChunk chunk = new ExecutionChunk( - "ex-4", "app", "agent-1", "route-1", - "ex-4", "FAILED", + ExecutionChunk c = chunk("ex-4", "FAILED", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:01Z"), 1000L, - "REGULAR", - "NullPointerException", "at com.foo.Bar.baz(Bar.java:42)", - "NullPointerException", "RUNTIME", - "NullPointerException", "null value at index 0", - null, null, null, null, null, 0, true, List.of()); + c.setCorrelationId("ex-4"); + c.setErrorMessage("NullPointerException"); + c.setErrorStackTrace("at com.foo.Bar.baz(Bar.java:42)"); + c.setErrorType("NullPointerException"); + c.setErrorCategory("RUNTIME"); + c.setRootCauseType("NullPointerException"); + c.setRootCauseMessage("null value at index 0"); - accumulator.onChunk(chunk); + accumulator.onChunk(c); assertThat(executionSink).hasSize(1); MergedExecution exec = executionSink.get(0); @@ -171,56 +160,66 @@ class ChunkAccumulatorTest { @Test void getPendingCount_tracksBufferedExchanges() { - ExecutionChunk running1 = new ExecutionChunk( - "ex-5", "app", "agent-1", "route-1", - "ex-5", "RUNNING", + ExecutionChunk running1 = chunk("ex-5", "RUNNING", Instant.parse("2026-03-31T10:00:00Z"), - null, null, "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, + null, null, 0, false, List.of()); + running1.setCorrelationId("ex-5"); - ExecutionChunk running2 = new ExecutionChunk( - "ex-6", "app", "agent-1", "route-2", - "ex-6", "RUNNING", + ExecutionChunk running2 = chunk("ex-6", "RUNNING", Instant.parse("2026-03-31T10:00:00Z"), - null, null, "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, + null, null, 0, false, List.of()); + running2.setCorrelationId("ex-6"); + running2.setRouteId("route-2"); accumulator.onChunk(running1); accumulator.onChunk(running2); assertThat(accumulator.getPendingCount()).isEqualTo(2); // Send final for ex-5 - ExecutionChunk final5 = new ExecutionChunk( - "ex-5", "app", "agent-1", "route-1", - "ex-5", "COMPLETED", + ExecutionChunk final5 = chunk("ex-5", "COMPLETED", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:01Z"), 1000L, - "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, 1, true, List.of()); + final5.setCorrelationId("ex-5"); accumulator.onChunk(final5); assertThat(accumulator.getPendingCount()).isEqualTo(1); } + /** Helper to create an ExecutionChunk with common fields. */ + private static ExecutionChunk chunk(String exchangeId, String status, Instant start, Instant end, Long duration, + int chunkSeq, boolean isFinal, List processors) { + ExecutionChunk c = new ExecutionChunk(); + c.setExchangeId(exchangeId); + c.setApplicationName(exchangeId.equals("ex-1") ? "order-service" : "app"); + c.setAgentId("agent-1"); + c.setRouteId("route-1"); + c.setCorrelationId(null); + c.setStatus(ExecutionStatus.valueOf(status)); + c.setStartTime(start); + c.setEndTime(end); + c.setDurationMs(duration); + c.setEngineLevel("REGULAR"); + c.setChunkSeq(chunkSeq); + c.setFinal(isFinal); + c.setProcessors(processors); + return c; + } + /** Helper to create a FlatProcessorRecord with minimal fields. */ private static FlatProcessorRecord proc(int seq, Integer parentSeq, String processorId, String processorType, String status, long durationMs) { - return new FlatProcessorRecord( - seq, parentSeq, null, processorId, processorType, - null, null, status, - Instant.parse("2026-03-31T10:00:00.100Z"), durationMs, - null, null, null, null, null, - null, null, null, null, null, null, - null, null, null, null, null); + FlatProcessorRecord p = new FlatProcessorRecord(seq, processorId, processorType); + p.setParentSeq(parentSeq); + p.setStatus(ExecutionStatus.valueOf(status)); + p.setStartTime(Instant.parse("2026-03-31T10:00:00.100Z")); + p.setDurationMs(durationMs); + return p; } } diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java deleted file mode 100644 index de0a50cf..00000000 --- a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java +++ /dev/null @@ -1,109 +0,0 @@ -package com.cameleer3.server.core.storage.model; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.junit.jupiter.api.Test; - -import java.time.Instant; -import java.util.List; -import java.util.Map; - -import static org.assertj.core.api.Assertions.assertThat; - -class ExecutionChunkDeserializationTest { - - private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()); - - @Test - void roundTrip_fullChunk() throws Exception { - ExecutionChunk chunk = new ExecutionChunk( - "ex-1", "order-service", "pod-1", "order-route", - "corr-1", "COMPLETED", - Instant.parse("2026-03-31T10:00:00Z"), - Instant.parse("2026-03-31T10:00:01Z"), 1000L, - "REGULAR", - null, null, null, null, null, null, - Map.of("orderId", "ORD-1"), - "trace-1", "span-1", null, null, - 2, true, - List.of(new FlatProcessorRecord( - 1, null, null, "log1", "log", - null, null, "COMPLETED", - Instant.parse("2026-03-31T10:00:00.100Z"), 5L, - null, "body", null, null, null, - null, null, null, null, null, null, - null, null, null, null, null))); - - String json = MAPPER.writeValueAsString(chunk); - ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class); - - assertThat(deserialized.exchangeId()).isEqualTo("ex-1"); - assertThat(deserialized.isFinal()).isTrue(); - assertThat(deserialized.chunkSeq()).isEqualTo(2); - assertThat(deserialized.processors()).hasSize(1); - assertThat(deserialized.processors().get(0).seq()).isEqualTo(1); - assertThat(deserialized.processors().get(0).processorId()).isEqualTo("log1"); - assertThat(deserialized.attributes()).containsEntry("orderId", "ORD-1"); - } - - @Test - void roundTrip_runningChunkWithIterations() throws Exception { - ExecutionChunk chunk = new ExecutionChunk( - "ex-2", "app", "agent-1", "route-1", - "ex-2", "RUNNING", - Instant.parse("2026-03-31T10:00:00Z"), - null, null, "REGULAR", - null, null, null, null, null, null, - null, null, null, null, null, - 0, false, - List.of( - new FlatProcessorRecord( - 1, null, null, "split1", "split", - null, 3, "COMPLETED", - Instant.parse("2026-03-31T10:00:00Z"), 100L, - null, null, null, null, null, - null, null, null, null, null, null, - null, null, null, null, null), - new FlatProcessorRecord( - 2, 1, "split1", "log1", "log", - 0, null, "COMPLETED", - Instant.parse("2026-03-31T10:00:00Z"), 5L, - null, "item-0", null, null, null, - null, null, null, null, null, null, - null, null, null, null, null), - new FlatProcessorRecord( - 3, 1, "split1", "log1", "log", - 1, null, "COMPLETED", - Instant.parse("2026-03-31T10:00:00Z"), 5L, - null, "item-1", null, null, null, - null, null, null, null, null, null, - null, null, null, null, null))); - - String json = MAPPER.writeValueAsString(chunk); - ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class); - - assertThat(deserialized.isFinal()).isFalse(); - assertThat(deserialized.processors()).hasSize(3); - - FlatProcessorRecord split = deserialized.processors().get(0); - assertThat(split.iterationSize()).isEqualTo(3); - assertThat(split.parentSeq()).isNull(); - - FlatProcessorRecord child0 = deserialized.processors().get(1); - assertThat(child0.parentSeq()).isEqualTo(1); - assertThat(child0.parentProcessorId()).isEqualTo("split1"); - assertThat(child0.iteration()).isEqualTo(0); - } - - @Test - void deserialize_unknownFieldsIgnored() throws Exception { - String json = """ - {"exchangeId":"ex-1","routeId":"r1","status":"COMPLETED", - "startTime":"2026-03-31T10:00:00Z","chunkSeq":0,"final":true, - "futureField":"ignored","processors":[]} - """; - ExecutionChunk chunk = MAPPER.readValue(json, ExecutionChunk.class); - assertThat(chunk.exchangeId()).isEqualTo("ex-1"); - assertThat(chunk.isFinal()).isTrue(); - } -}