refactor(ingestion): drop dead legacy execution-ingestion path

ExecutionController was @ConditionalOnMissingBean(ChunkAccumulator.class),
and ChunkAccumulator is registered unconditionally — the legacy controller
never bound in any profile. Even if it had, IngestionService.ingestExecution
called executionStore.upsert(), and the only ExecutionStore impl
(ClickHouseExecutionStore) threw UnsupportedOperationException from upsert
and upsertProcessors. The entire RouteExecution → upsert path was dead code
carrying four transitive dependencies (RouteExecution import, eventPublisher
wiring, body-size-limit config, searchIndexer::onExecutionUpdated hook).

Removed:
- cameleer-server-app/.../controller/ExecutionController.java (whole file)
- ExecutionStore.upsert + upsertProcessors (interface methods)
- ClickHouseExecutionStore.upsert + upsertProcessors (thrower overrides)
- IngestionService.ingestExecution + toExecutionRecord + flattenProcessors
  + hasAnyTraceData + truncateBody + toJson/toJsonObject helpers
- IngestionService constructor now takes (DiagramStore, WriteBuffer<Metrics>);
  dropped ExecutionStore + Consumer<ExecutionUpdatedEvent> + bodySizeLimit
- StorageBeanConfig.ingestionService(...) simplified accordingly

Untouched because still in use:
- ExecutionRecord / ProcessorRecord records (findById / findProcessors /
  SearchIndexer / DetailController)
- SearchIndexer (its onExecutionUpdated never fires now since no-one
  publishes ExecutionUpdatedEvent, but SearchIndexerStats is still
  referenced by ClickHouseAdminController — separate cleanup)
- TaggedExecution record has no remaining callers after this change —
  flagged in core-classes.md as a leftover; separate cleanup.

Rule docs updated:
- .claude/rules/app-classes.md: retired ExecutionController bullet, fixed
  stale URL for ChunkIngestionController (it owns /api/v1/data/executions,
  not /api/v1/ingestion/chunk/executions).
- .claude/rules/core-classes.md: IngestionService surface + note the dead
  TaggedExecution.

Full IT suite post-removal: 560 tests run, 11 F + 1 E — same 12 failures
in the same 3 previously-parked classes (AgentSseControllerIT / SseSigningIT
SSE-timing + ClickHouseStatsStoreIT timezone bug). No regression.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-21 22:50:51 +02:00
parent 56faabcdf1
commit 0f635576a3
7 changed files with 18 additions and 288 deletions

View File

@@ -85,8 +85,7 @@ Env-scoped read-path controllers (`AlertController`, `AlertRuleController`, `Ale
- `LogIngestionController` — POST `/api/v1/data/logs` (accepts `List<LogEntry>`; WARNs on missing identity, unregistered agents, empty payloads, buffer-full drops).
- `EventIngestionController` — POST `/api/v1/data/events`.
- `ChunkIngestionController` — POST `/api/v1/ingestion/chunk/{executions|metrics|diagrams}`.
- `ExecutionController` — POST `/api/v1/data/executions` (legacy ingestion path when ClickHouse disabled).
- `ChunkIngestionController` — POST `/api/v1/data/executions`. Accepts a single `ExecutionChunk` or an array (fields include `exchangeId`, `applicationId`, `instanceId`, `routeId`, `status`, `startTime`, `endTime`, `durationMs`, `chunkSeq`, `final`, `processors: FlatProcessorRecord[]`). The accumulator merges non-final chunks by exchangeId and emits the merged envelope on the final chunk or on stale timeout. Legacy `ExecutionController` / `RouteExecution` shape is retired.
- `MetricsController` — POST `/api/v1/data/metrics`.
- `DiagramController` — POST `/api/v1/data/diagrams` (resolves applicationId + environment from the agent registry keyed on JWT subject; stamps both on the stored `TaggedDiagram`).

View File

@@ -107,8 +107,8 @@ paths:
## ingestion/ — Buffered data pipeline
- `IngestionService`ingestExecution, ingestMetric, ingestLog, ingestDiagram
- `ChunkAccumulator` — batches data for efficient flush
- `IngestionService`diagram + metrics facade (`ingestDiagram`, `acceptMetrics`, `getMetricsBuffer`). Execution ingestion went through here via the legacy `RouteExecution` shape until `ChunkAccumulator` took over writes from the chunked pipeline — the `ingestExecution` path plus its `ExecutionStore.upsert` / `upsertProcessors` dependencies were removed.
- `ChunkAccumulator` — batches data for efficient flush; owns the execution write path (chunks → buffers → flush scheduler → `ClickHouseExecutionStore.insertExecutionBatch`).
- `WriteBuffer` — bounded ring buffer for async flush
- `BufferedLogEntry` — log entry wrapper with metadata
- `MergedExecution`, `TaggedExecution`, `TaggedDiagram` — tagged ingestion records. `TaggedDiagram` carries `(instanceId, applicationId, environment, graph)` — env is resolved from the agent registry in the controller and stamped on the ClickHouse `route_diagrams` row.
- `MergedExecution`, `TaggedDiagram` — tagged ingestion records. `TaggedDiagram` carries `(instanceId, applicationId, environment, graph)` — env is resolved from the agent registry in the controller and stamped on the ClickHouse `route_diagrams` row. (`TaggedExecution` still lives in the package as a leftover but has no callers since the legacy PG ingest path was retired.)

View File

@@ -56,13 +56,9 @@ public class StorageBeanConfig {
}
@Bean
public IngestionService ingestionService(ExecutionStore executionStore,
DiagramStore diagramStore,
WriteBuffer<MetricsSnapshot> metricsBuffer,
SearchIndexer searchIndexer,
@Value("${cameleer.server.ingestion.bodysizelimit:16384}") int bodySizeLimit) {
return new IngestionService(executionStore, diagramStore, metricsBuffer,
searchIndexer::onExecutionUpdated, bodySizeLimit);
public IngestionService ingestionService(DiagramStore diagramStore,
WriteBuffer<MetricsSnapshot> metricsBuffer) {
return new IngestionService(diagramStore, metricsBuffer);
}
@Bean

View File

@@ -1,87 +0,0 @@
package com.cameleer.server.app.controller;
import com.cameleer.common.model.RouteExecution;
import com.cameleer.server.core.agent.AgentInfo;
import com.cameleer.server.core.agent.AgentRegistryService;
import com.cameleer.server.core.ingestion.ChunkAccumulator;
import com.cameleer.server.core.ingestion.IngestionService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* Legacy ingestion endpoint for route execution data (PostgreSQL path).
* <p>
* Accepts both single {@link RouteExecution} and arrays. Data is written
* synchronously to PostgreSQL via {@link IngestionService}.
* <p>
* Only active when ClickHouse is disabled — when ClickHouse is enabled,
* {@link ChunkIngestionController} takes over the {@code /executions} mapping.
*/
@RestController
@RequestMapping("/api/v1/data")
@ConditionalOnMissingBean(ChunkAccumulator.class)
@Tag(name = "Ingestion", description = "Data ingestion endpoints")
public class ExecutionController {
private final IngestionService ingestionService;
private final AgentRegistryService registryService;
private final ObjectMapper objectMapper;
public ExecutionController(IngestionService ingestionService,
AgentRegistryService registryService,
ObjectMapper objectMapper) {
this.ingestionService = ingestionService;
this.registryService = registryService;
this.objectMapper = objectMapper;
}
@PostMapping("/executions")
@Operation(summary = "Ingest route execution data",
description = "Accepts a single RouteExecution or an array of RouteExecutions")
@ApiResponse(responseCode = "202", description = "Data accepted for processing")
public ResponseEntity<Void> ingestExecutions(@RequestBody String body) throws JsonProcessingException {
String instanceId = extractAgentId();
String applicationId = resolveApplicationId(instanceId);
List<RouteExecution> executions = parsePayload(body);
for (RouteExecution execution : executions) {
ingestionService.ingestExecution(instanceId, applicationId, execution);
}
return ResponseEntity.accepted().build();
}
private String extractAgentId() {
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
return auth != null ? auth.getName() : "";
}
private String resolveApplicationId(String instanceId) {
AgentInfo agent = registryService.findById(instanceId);
return agent != null ? agent.applicationId() : "";
}
private List<RouteExecution> parsePayload(String body) throws JsonProcessingException {
String trimmed = body.strip();
if (trimmed.startsWith("[")) {
return objectMapper.readValue(trimmed, new TypeReference<>() {});
} else {
RouteExecution single = objectMapper.readValue(trimmed, RouteExecution.class);
return List.of(single);
}
}
}

View File

@@ -282,20 +282,6 @@ public class ClickHouseExecutionStore implements ExecutionStore {
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
// --- ExecutionStore interface: write methods (unsupported, use chunked pipeline) ---
@Override
public void upsert(ExecutionRecord execution) {
throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline");
}
@Override
public void upsertProcessors(String executionId, Instant startTime,
String applicationId, String routeId,
List<ProcessorRecord> processors) {
throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline");
}
// --- Row mappers ---
private static ExecutionRecord mapExecutionRecord(ResultSet rs) throws SQLException {

View File

@@ -1,63 +1,28 @@
package com.cameleer.server.core.ingestion;
import com.cameleer.common.model.ExchangeSnapshot;
import com.cameleer.common.model.ProcessorExecution;
import com.cameleer.common.model.RouteExecution;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.cameleer.server.core.indexing.ExecutionUpdatedEvent;
import com.cameleer.server.core.storage.DiagramStore;
import com.cameleer.server.core.storage.ExecutionStore;
import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord;
import com.cameleer.server.core.storage.model.MetricsSnapshot;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
/**
* Diagram + metrics ingestion facade.
*
* <p>Execution ingestion went through this class via the {@code RouteExecution}
* shape until the ClickHouse chunked pipeline took over — {@code ChunkAccumulator}
* now writes executions directly from the {@code /api/v1/data/executions}
* controller, so this class no longer needs an ExecutionStore or event-publisher
* dependency.
*/
public class IngestionService {
private static final ObjectMapper JSON = new ObjectMapper()
.findAndRegisterModules()
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
private final ExecutionStore executionStore;
private final DiagramStore diagramStore;
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
private final Consumer<ExecutionUpdatedEvent> eventPublisher;
private final int bodySizeLimit;
public IngestionService(ExecutionStore executionStore,
DiagramStore diagramStore,
WriteBuffer<MetricsSnapshot> metricsBuffer,
Consumer<ExecutionUpdatedEvent> eventPublisher,
int bodySizeLimit) {
this.executionStore = executionStore;
public IngestionService(DiagramStore diagramStore,
WriteBuffer<MetricsSnapshot> metricsBuffer) {
this.diagramStore = diagramStore;
this.metricsBuffer = metricsBuffer;
this.eventPublisher = eventPublisher;
this.bodySizeLimit = bodySizeLimit;
}
public void ingestExecution(String instanceId, String applicationId, RouteExecution execution) {
ExecutionRecord record = toExecutionRecord(instanceId, applicationId, execution);
executionStore.upsert(record);
if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) {
List<ProcessorRecord> processors = flattenProcessors(
execution.getProcessors(), record.executionId(),
record.startTime(), applicationId, execution.getRouteId(),
null, 0);
executionStore.upsertProcessors(
record.executionId(), record.startTime(),
applicationId, execution.getRouteId(), processors);
}
eventPublisher.accept(new ExecutionUpdatedEvent(
record.executionId(), record.startTime()));
}
public void ingestDiagram(TaggedDiagram diagram) {
@@ -75,127 +40,4 @@ public class IngestionService {
public WriteBuffer<MetricsSnapshot> getMetricsBuffer() {
return metricsBuffer;
}
private ExecutionRecord toExecutionRecord(String instanceId, String applicationId,
RouteExecution exec) {
String diagramHash = diagramStore
.findContentHashForRoute(exec.getRouteId(), instanceId)
.orElse("");
// Extract route-level snapshots (critical for REGULAR mode where no processors are recorded)
String inputBody = null;
String outputBody = null;
String inputHeaders = null;
String outputHeaders = null;
String inputProperties = null;
String outputProperties = null;
ExchangeSnapshot inputSnapshot = exec.getInputSnapshot();
if (inputSnapshot != null) {
inputBody = truncateBody(inputSnapshot.getBody());
inputHeaders = toJson(inputSnapshot.getHeaders());
inputProperties = toJson(inputSnapshot.getProperties());
}
ExchangeSnapshot outputSnapshot = exec.getOutputSnapshot();
if (outputSnapshot != null) {
outputBody = truncateBody(outputSnapshot.getBody());
outputHeaders = toJson(outputSnapshot.getHeaders());
outputProperties = toJson(outputSnapshot.getProperties());
}
boolean hasTraceData = hasAnyTraceData(exec.getProcessors());
boolean isReplay = exec.getReplayExchangeId() != null;
if (!isReplay && inputSnapshot != null && inputSnapshot.getHeaders() != null) {
isReplay = "true".equalsIgnoreCase(
String.valueOf(inputSnapshot.getHeaders().get("X-Cameleer-Replay")));
}
return new ExecutionRecord(
exec.getExchangeId(), exec.getRouteId(), instanceId, applicationId,
null, // environment: legacy PG path; ClickHouse path uses MergedExecution with env resolved from registry
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
exec.getCorrelationId(), exec.getExchangeId(),
exec.getStartTime(), exec.getEndTime(),
exec.getDurationMs(),
exec.getErrorMessage(), exec.getErrorStackTrace(),
diagramHash,
exec.getEngineLevel(),
inputBody, outputBody, inputHeaders, outputHeaders,
inputProperties, outputProperties,
toJson(exec.getAttributes()),
exec.getErrorType(), exec.getErrorCategory(),
exec.getRootCauseType(), exec.getRootCauseMessage(),
exec.getTraceId(), exec.getSpanId(),
toJsonObject(exec.getProcessors()),
hasTraceData,
isReplay
);
}
private static boolean hasAnyTraceData(List<ProcessorExecution> processors) {
if (processors == null) return false;
for (ProcessorExecution p : processors) {
if (p.getInputBody() != null || p.getOutputBody() != null
|| p.getInputHeaders() != null || p.getOutputHeaders() != null
|| p.getInputProperties() != null || p.getOutputProperties() != null) return true;
}
return false;
}
private List<ProcessorRecord> flattenProcessors(
List<ProcessorExecution> processors, String executionId,
java.time.Instant execStartTime, String applicationId, String routeId,
String parentProcessorId, int depth) {
List<ProcessorRecord> flat = new ArrayList<>();
for (ProcessorExecution p : processors) {
flat.add(new ProcessorRecord(
executionId, p.getProcessorId(), p.getProcessorType(),
applicationId, routeId,
depth, parentProcessorId,
p.getStatus() != null ? p.getStatus().name() : "RUNNING",
p.getStartTime() != null ? p.getStartTime() : execStartTime,
p.getEndTime(),
p.getDurationMs(),
p.getErrorMessage(), p.getErrorStackTrace(),
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
toJson(p.getInputHeaders()), toJson(p.getOutputHeaders()),
null, null, // inputProperties, outputProperties (not on ProcessorExecution)
toJson(p.getAttributes()),
null, null, null, null, null,
p.getResolvedEndpointUri(),
p.getErrorType(), p.getErrorCategory(),
p.getRootCauseType(), p.getRootCauseMessage(),
p.getErrorHandlerType(), p.getCircuitBreakerState(),
p.getFallbackTriggered(),
null, null, null, null, null, null
));
}
return flat;
}
private String truncateBody(String body) {
if (body == null) return null;
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
return body;
}
private static String toJson(Map<String, String> headers) {
if (headers == null) return null;
try {
return JSON.writeValueAsString(headers);
} catch (JsonProcessingException e) {
return "{}";
}
}
private static String toJsonObject(Object obj) {
if (obj == null) return null;
try {
return JSON.writeValueAsString(obj);
} catch (JsonProcessingException e) {
return null;
}
}
}

View File

@@ -6,12 +6,6 @@ import java.util.Optional;
public interface ExecutionStore {
void upsert(ExecutionRecord execution);
void upsertProcessors(String executionId, Instant startTime,
String applicationId, String routeId,
List<ProcessorRecord> processors);
Optional<ExecutionRecord> findById(String executionId);
List<ProcessorRecord> findProcessors(String executionId);