diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java index 24cace8a..bf3085ec 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java @@ -42,7 +42,7 @@ public class ChunkIngestionController { this.objectMapper.registerModule(new JavaTimeModule()); } - @PostMapping("/chunks") + @PostMapping("/executions") @Operation(summary = "Ingest execution chunk") public ResponseEntity ingestChunks(@RequestBody String body) { try { diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java index a37ea643..c10d478c 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java @@ -3,6 +3,7 @@ package com.cameleer3.server.app.controller; import com.cameleer3.common.model.RouteExecution; import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; import com.cameleer3.server.core.ingestion.IngestionService; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -12,6 +13,7 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.http.ResponseEntity; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; @@ -23,13 +25,17 @@ import org.springframework.web.bind.annotation.RestController; import java.util.List; /** - * Ingestion endpoint for route execution data. + * Legacy ingestion endpoint for route execution data (PostgreSQL path). *

* Accepts both single {@link RouteExecution} and arrays. Data is written * synchronously to PostgreSQL via {@link IngestionService}. + *

+ * Only active when ClickHouse is disabled — when ClickHouse is enabled, + * {@link ChunkIngestionController} takes over the {@code /executions} mapping. */ @RestController @RequestMapping("/api/v1/data") +@ConditionalOnMissingBean(ChunkAccumulator.class) @Tag(name = "Ingestion", description = "Data ingestion endpoints") public class ExecutionController { diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java index dc64b148..76ea8d80 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java @@ -35,9 +35,10 @@ public class ClickHouseExecutionStore { error_message, error_stacktrace, error_type, error_category, root_cause_type, root_cause_message, diagram_content_hash, engine_level, input_body, output_body, input_headers, output_headers, attributes, - trace_id, span_id, has_trace_data, is_replay + trace_id, span_id, has_trace_data, is_replay, + original_exchange_id, replay_exchange_id ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, executions.stream().map(e -> new Object[]{ nullToEmpty(e.tenantId()), @@ -68,7 +69,9 @@ public class ClickHouseExecutionStore { nullToEmpty(e.traceId()), nullToEmpty(e.spanId()), e.hasTraceData(), - e.isReplay() + e.isReplay(), + nullToEmpty(e.originalExchangeId()), + nullToEmpty(e.replayExchangeId()) }).toList()); } diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V5__replay_fields.sql b/cameleer3-server-app/src/main/resources/clickhouse/V5__replay_fields.sql new file mode 100644 index 00000000..0c1d8b8c --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/V5__replay_fields.sql @@ -0,0 +1,2 @@ +ALTER TABLE executions ADD COLUMN IF NOT EXISTS original_exchange_id String DEFAULT ''; +ALTER TABLE executions ADD COLUMN IF NOT EXISTS replay_exchange_id String DEFAULT ''; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java index 4cb2de53..a0f9382c 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java @@ -71,7 +71,8 @@ class ClickHouseSearchIndexIT { "hash-abc", "FULL", "{\"order\":\"12345\"}", "", "", "", "{\"env\":\"prod\"}", "", "", - false, false + false, false, + null, null ); // exec-2: FAILED, route-timer, agent-a, my-app, corr-2, 200ms, with error @@ -87,7 +88,8 @@ class ClickHouseSearchIndexIT { "", "FULL", "", "", "", "", "", "", "", - false, false + false, false, + null, null ); // exec-3: COMPLETED, route-rest, agent-b, other-app, 100ms, no error @@ -101,7 +103,8 @@ class ClickHouseSearchIndexIT { "", "FULL", "", "", "", "", "", "", "", - false, false + false, false, + null, null ); store.insertExecutionBatch(List.of(exec1, exec2, exec3)); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java index 8b8ede77..6f2a5a24 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java @@ -68,7 +68,8 @@ class ClickHouseExecutionStoreIT { "{\"h1\":\"v1\"}", "{\"h2\":\"v2\"}", "{\"attr\":\"val\"}", "trace-123", "span-456", - true, false + true, false, + null, null ); store.insertExecutionBatch(List.of(exec)); @@ -195,7 +196,8 @@ class ClickHouseExecutionStoreIT { "", "FULL", "", "", "", "", "", "", "", - false, false + false, false, + null, null ); MergedExecution v2 = new MergedExecution( @@ -208,7 +210,8 @@ class ClickHouseExecutionStoreIT { "", "FULL", "", "", "", "", "", "", "", - false, false + false, false, + null, null ); store.insertExecutionBatch(List.of(v1)); diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java index dcc7d486..9d6ed225 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java @@ -168,7 +168,9 @@ public class ChunkAccumulator { envelope.getTraceId(), envelope.getSpanId(), false, // hasTraceData — not tracked at envelope level - envelope.getReplayExchangeId() != null // isReplay + envelope.getReplayExchangeId() != null, // isReplay + envelope.getOriginalExchangeId(), + envelope.getReplayExchangeId() ); } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java index d5227ab8..1e002f8e 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java @@ -35,5 +35,7 @@ public record MergedExecution( String traceId, String spanId, boolean hasTraceData, - boolean isReplay + boolean isReplay, + String originalExchangeId, + String replayExchangeId ) {}