fix: align server with protocol v2 chunked transport spec
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m45s
CI / docker (push) Successful in 59s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 46s

- ChunkIngestionController: /data/chunks → /data/executions (matches
  PROTOCOL.md endpoint the agent actually posts to)
- ExecutionController: conditional on ClickHouse being disabled to
  avoid mapping conflict
- Persist originalExchangeId and replayExchangeId from ExecutionChunk
  envelope through to ClickHouse (was silently dropped)
- V5 migration adds the two new columns to executions table

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-31 23:18:35 +02:00
parent 154bce366a
commit 606f81a970
8 changed files with 34 additions and 13 deletions

View File

@@ -42,7 +42,7 @@ public class ChunkIngestionController {
this.objectMapper.registerModule(new JavaTimeModule()); this.objectMapper.registerModule(new JavaTimeModule());
} }
@PostMapping("/chunks") @PostMapping("/executions")
@Operation(summary = "Ingest execution chunk") @Operation(summary = "Ingest execution chunk")
public ResponseEntity<Void> ingestChunks(@RequestBody String body) { public ResponseEntity<Void> ingestChunks(@RequestBody String body) {
try { try {

View File

@@ -3,6 +3,7 @@ package com.cameleer3.server.app.controller;
import com.cameleer3.common.model.RouteExecution; import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.ingestion.ChunkAccumulator;
import com.cameleer3.server.core.ingestion.IngestionService; import com.cameleer3.server.core.ingestion.IngestionService;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
@@ -12,6 +13,7 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication; import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.core.context.SecurityContextHolder;
@@ -23,13 +25,17 @@ import org.springframework.web.bind.annotation.RestController;
import java.util.List; import java.util.List;
/** /**
* Ingestion endpoint for route execution data. * Legacy ingestion endpoint for route execution data (PostgreSQL path).
* <p> * <p>
* Accepts both single {@link RouteExecution} and arrays. Data is written * Accepts both single {@link RouteExecution} and arrays. Data is written
* synchronously to PostgreSQL via {@link IngestionService}. * synchronously to PostgreSQL via {@link IngestionService}.
* <p>
* Only active when ClickHouse is disabled — when ClickHouse is enabled,
* {@link ChunkIngestionController} takes over the {@code /executions} mapping.
*/ */
@RestController @RestController
@RequestMapping("/api/v1/data") @RequestMapping("/api/v1/data")
@ConditionalOnMissingBean(ChunkAccumulator.class)
@Tag(name = "Ingestion", description = "Data ingestion endpoints") @Tag(name = "Ingestion", description = "Data ingestion endpoints")
public class ExecutionController { public class ExecutionController {

View File

@@ -35,9 +35,10 @@ public class ClickHouseExecutionStore {
error_message, error_stacktrace, error_type, error_category, error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message, diagram_content_hash, engine_level, root_cause_type, root_cause_message, diagram_content_hash, engine_level,
input_body, output_body, input_headers, output_headers, attributes, input_body, output_body, input_headers, output_headers, attributes,
trace_id, span_id, has_trace_data, is_replay trace_id, span_id, has_trace_data, is_replay,
original_exchange_id, replay_exchange_id
) )
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", """,
executions.stream().map(e -> new Object[]{ executions.stream().map(e -> new Object[]{
nullToEmpty(e.tenantId()), nullToEmpty(e.tenantId()),
@@ -68,7 +69,9 @@ public class ClickHouseExecutionStore {
nullToEmpty(e.traceId()), nullToEmpty(e.traceId()),
nullToEmpty(e.spanId()), nullToEmpty(e.spanId()),
e.hasTraceData(), e.hasTraceData(),
e.isReplay() e.isReplay(),
nullToEmpty(e.originalExchangeId()),
nullToEmpty(e.replayExchangeId())
}).toList()); }).toList());
} }

View File

@@ -0,0 +1,2 @@
ALTER TABLE executions ADD COLUMN IF NOT EXISTS original_exchange_id String DEFAULT '';
ALTER TABLE executions ADD COLUMN IF NOT EXISTS replay_exchange_id String DEFAULT '';

View File

@@ -71,7 +71,8 @@ class ClickHouseSearchIndexIT {
"hash-abc", "FULL", "hash-abc", "FULL",
"{\"order\":\"12345\"}", "", "", "", "{\"env\":\"prod\"}", "{\"order\":\"12345\"}", "", "", "", "{\"env\":\"prod\"}",
"", "", "", "",
false, false false, false,
null, null
); );
// exec-2: FAILED, route-timer, agent-a, my-app, corr-2, 200ms, with error // exec-2: FAILED, route-timer, agent-a, my-app, corr-2, 200ms, with error
@@ -87,7 +88,8 @@ class ClickHouseSearchIndexIT {
"", "FULL", "", "FULL",
"", "", "", "", "", "", "", "", "", "",
"", "", "", "",
false, false false, false,
null, null
); );
// exec-3: COMPLETED, route-rest, agent-b, other-app, 100ms, no error // exec-3: COMPLETED, route-rest, agent-b, other-app, 100ms, no error
@@ -101,7 +103,8 @@ class ClickHouseSearchIndexIT {
"", "FULL", "", "FULL",
"", "", "", "", "", "", "", "", "", "",
"", "", "", "",
false, false false, false,
null, null
); );
store.insertExecutionBatch(List.of(exec1, exec2, exec3)); store.insertExecutionBatch(List.of(exec1, exec2, exec3));

View File

@@ -68,7 +68,8 @@ class ClickHouseExecutionStoreIT {
"{\"h1\":\"v1\"}", "{\"h2\":\"v2\"}", "{\"h1\":\"v1\"}", "{\"h2\":\"v2\"}",
"{\"attr\":\"val\"}", "{\"attr\":\"val\"}",
"trace-123", "span-456", "trace-123", "span-456",
true, false true, false,
null, null
); );
store.insertExecutionBatch(List.of(exec)); store.insertExecutionBatch(List.of(exec));
@@ -195,7 +196,8 @@ class ClickHouseExecutionStoreIT {
"", "FULL", "", "FULL",
"", "", "", "", "", "", "", "", "", "",
"", "", "", "",
false, false false, false,
null, null
); );
MergedExecution v2 = new MergedExecution( MergedExecution v2 = new MergedExecution(
@@ -208,7 +210,8 @@ class ClickHouseExecutionStoreIT {
"", "FULL", "", "FULL",
"", "", "", "", "", "", "", "", "", "",
"", "", "", "",
false, false false, false,
null, null
); );
store.insertExecutionBatch(List.of(v1)); store.insertExecutionBatch(List.of(v1));

View File

@@ -168,7 +168,9 @@ public class ChunkAccumulator {
envelope.getTraceId(), envelope.getTraceId(),
envelope.getSpanId(), envelope.getSpanId(),
false, // hasTraceData — not tracked at envelope level false, // hasTraceData — not tracked at envelope level
envelope.getReplayExchangeId() != null // isReplay envelope.getReplayExchangeId() != null, // isReplay
envelope.getOriginalExchangeId(),
envelope.getReplayExchangeId()
); );
} }

View File

@@ -35,5 +35,7 @@ public record MergedExecution(
String traceId, String traceId,
String spanId, String spanId,
boolean hasTraceData, boolean hasTraceData,
boolean isReplay boolean isReplay,
String originalExchangeId,
String replayExchangeId
) {} ) {}