refactor(ingestion): drop dead legacy execution-ingestion path
ExecutionController was @ConditionalOnMissingBean(ChunkAccumulator.class), and ChunkAccumulator is registered unconditionally — the legacy controller never bound in any profile. Even if it had, IngestionService.ingestExecution called executionStore.upsert(), and the only ExecutionStore impl (ClickHouseExecutionStore) threw UnsupportedOperationException from upsert and upsertProcessors. The entire RouteExecution → upsert path was dead code carrying four transitive dependencies (RouteExecution import, eventPublisher wiring, body-size-limit config, searchIndexer::onExecutionUpdated hook). Removed: - cameleer-server-app/.../controller/ExecutionController.java (whole file) - ExecutionStore.upsert + upsertProcessors (interface methods) - ClickHouseExecutionStore.upsert + upsertProcessors (thrower overrides) - IngestionService.ingestExecution + toExecutionRecord + flattenProcessors + hasAnyTraceData + truncateBody + toJson/toJsonObject helpers - IngestionService constructor now takes (DiagramStore, WriteBuffer<Metrics>); dropped ExecutionStore + Consumer<ExecutionUpdatedEvent> + bodySizeLimit - StorageBeanConfig.ingestionService(...) simplified accordingly Untouched because still in use: - ExecutionRecord / ProcessorRecord records (findById / findProcessors / SearchIndexer / DetailController) - SearchIndexer (its onExecutionUpdated never fires now since no-one publishes ExecutionUpdatedEvent, but SearchIndexerStats is still referenced by ClickHouseAdminController — separate cleanup) - TaggedExecution record has no remaining callers after this change — flagged in core-classes.md as a leftover; separate cleanup. Rule docs updated: - .claude/rules/app-classes.md: retired ExecutionController bullet, fixed stale URL for ChunkIngestionController (it owns /api/v1/data/executions, not /api/v1/ingestion/chunk/executions). - .claude/rules/core-classes.md: IngestionService surface + note the dead TaggedExecution. Full IT suite post-removal: 560 tests run, 11 F + 1 E — same 12 failures in the same 3 previously-parked classes (AgentSseControllerIT / SseSigningIT SSE-timing + ClickHouseStatsStoreIT timezone bug). No regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -85,8 +85,7 @@ Env-scoped read-path controllers (`AlertController`, `AlertRuleController`, `Ale
|
||||
|
||||
- `LogIngestionController` — POST `/api/v1/data/logs` (accepts `List<LogEntry>`; WARNs on missing identity, unregistered agents, empty payloads, buffer-full drops).
|
||||
- `EventIngestionController` — POST `/api/v1/data/events`.
|
||||
- `ChunkIngestionController` — POST `/api/v1/ingestion/chunk/{executions|metrics|diagrams}`.
|
||||
- `ExecutionController` — POST `/api/v1/data/executions` (legacy ingestion path when ClickHouse disabled).
|
||||
- `ChunkIngestionController` — POST `/api/v1/data/executions`. Accepts a single `ExecutionChunk` or an array (fields include `exchangeId`, `applicationId`, `instanceId`, `routeId`, `status`, `startTime`, `endTime`, `durationMs`, `chunkSeq`, `final`, `processors: FlatProcessorRecord[]`). The accumulator merges non-final chunks by exchangeId and emits the merged envelope on the final chunk or on stale timeout. Legacy `ExecutionController` / `RouteExecution` shape is retired.
|
||||
- `MetricsController` — POST `/api/v1/data/metrics`.
|
||||
- `DiagramController` — POST `/api/v1/data/diagrams` (resolves applicationId + environment from the agent registry keyed on JWT subject; stamps both on the stored `TaggedDiagram`).
|
||||
|
||||
|
||||
@@ -107,8 +107,8 @@ paths:
|
||||
|
||||
## ingestion/ — Buffered data pipeline
|
||||
|
||||
- `IngestionService` — ingestExecution, ingestMetric, ingestLog, ingestDiagram
|
||||
- `ChunkAccumulator` — batches data for efficient flush
|
||||
- `IngestionService` — diagram + metrics facade (`ingestDiagram`, `acceptMetrics`, `getMetricsBuffer`). Execution ingestion went through here via the legacy `RouteExecution` shape until `ChunkAccumulator` took over writes from the chunked pipeline — the `ingestExecution` path plus its `ExecutionStore.upsert` / `upsertProcessors` dependencies were removed.
|
||||
- `ChunkAccumulator` — batches data for efficient flush; owns the execution write path (chunks → buffers → flush scheduler → `ClickHouseExecutionStore.insertExecutionBatch`).
|
||||
- `WriteBuffer` — bounded ring buffer for async flush
|
||||
- `BufferedLogEntry` — log entry wrapper with metadata
|
||||
- `MergedExecution`, `TaggedExecution`, `TaggedDiagram` — tagged ingestion records. `TaggedDiagram` carries `(instanceId, applicationId, environment, graph)` — env is resolved from the agent registry in the controller and stamped on the ClickHouse `route_diagrams` row.
|
||||
- `MergedExecution`, `TaggedDiagram` — tagged ingestion records. `TaggedDiagram` carries `(instanceId, applicationId, environment, graph)` — env is resolved from the agent registry in the controller and stamped on the ClickHouse `route_diagrams` row. (`TaggedExecution` still lives in the package as a leftover but has no callers since the legacy PG ingest path was retired.)
|
||||
|
||||
@@ -56,13 +56,9 @@ public class StorageBeanConfig {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IngestionService ingestionService(ExecutionStore executionStore,
|
||||
DiagramStore diagramStore,
|
||||
WriteBuffer<MetricsSnapshot> metricsBuffer,
|
||||
SearchIndexer searchIndexer,
|
||||
@Value("${cameleer.server.ingestion.bodysizelimit:16384}") int bodySizeLimit) {
|
||||
return new IngestionService(executionStore, diagramStore, metricsBuffer,
|
||||
searchIndexer::onExecutionUpdated, bodySizeLimit);
|
||||
public IngestionService ingestionService(DiagramStore diagramStore,
|
||||
WriteBuffer<MetricsSnapshot> metricsBuffer) {
|
||||
return new IngestionService(diagramStore, metricsBuffer);
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -1,87 +0,0 @@
|
||||
package com.cameleer.server.app.controller;
|
||||
|
||||
import com.cameleer.common.model.RouteExecution;
|
||||
import com.cameleer.server.core.agent.AgentInfo;
|
||||
import com.cameleer.server.core.agent.AgentRegistryService;
|
||||
import com.cameleer.server.core.ingestion.ChunkAccumulator;
|
||||
import com.cameleer.server.core.ingestion.IngestionService;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.security.core.Authentication;
|
||||
import org.springframework.security.core.context.SecurityContextHolder;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Legacy ingestion endpoint for route execution data (PostgreSQL path).
|
||||
* <p>
|
||||
* Accepts both single {@link RouteExecution} and arrays. Data is written
|
||||
* synchronously to PostgreSQL via {@link IngestionService}.
|
||||
* <p>
|
||||
* Only active when ClickHouse is disabled — when ClickHouse is enabled,
|
||||
* {@link ChunkIngestionController} takes over the {@code /executions} mapping.
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/api/v1/data")
|
||||
@ConditionalOnMissingBean(ChunkAccumulator.class)
|
||||
@Tag(name = "Ingestion", description = "Data ingestion endpoints")
|
||||
public class ExecutionController {
|
||||
|
||||
private final IngestionService ingestionService;
|
||||
private final AgentRegistryService registryService;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public ExecutionController(IngestionService ingestionService,
|
||||
AgentRegistryService registryService,
|
||||
ObjectMapper objectMapper) {
|
||||
this.ingestionService = ingestionService;
|
||||
this.registryService = registryService;
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
@PostMapping("/executions")
|
||||
@Operation(summary = "Ingest route execution data",
|
||||
description = "Accepts a single RouteExecution or an array of RouteExecutions")
|
||||
@ApiResponse(responseCode = "202", description = "Data accepted for processing")
|
||||
public ResponseEntity<Void> ingestExecutions(@RequestBody String body) throws JsonProcessingException {
|
||||
String instanceId = extractAgentId();
|
||||
String applicationId = resolveApplicationId(instanceId);
|
||||
List<RouteExecution> executions = parsePayload(body);
|
||||
|
||||
for (RouteExecution execution : executions) {
|
||||
ingestionService.ingestExecution(instanceId, applicationId, execution);
|
||||
}
|
||||
|
||||
return ResponseEntity.accepted().build();
|
||||
}
|
||||
|
||||
private String extractAgentId() {
|
||||
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
|
||||
return auth != null ? auth.getName() : "";
|
||||
}
|
||||
|
||||
private String resolveApplicationId(String instanceId) {
|
||||
AgentInfo agent = registryService.findById(instanceId);
|
||||
return agent != null ? agent.applicationId() : "";
|
||||
}
|
||||
|
||||
private List<RouteExecution> parsePayload(String body) throws JsonProcessingException {
|
||||
String trimmed = body.strip();
|
||||
if (trimmed.startsWith("[")) {
|
||||
return objectMapper.readValue(trimmed, new TypeReference<>() {});
|
||||
} else {
|
||||
RouteExecution single = objectMapper.readValue(trimmed, RouteExecution.class);
|
||||
return List.of(single);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -282,20 +282,6 @@ public class ClickHouseExecutionStore implements ExecutionStore {
|
||||
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
||||
}
|
||||
|
||||
// --- ExecutionStore interface: write methods (unsupported, use chunked pipeline) ---
|
||||
|
||||
@Override
|
||||
public void upsert(ExecutionRecord execution) {
|
||||
throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void upsertProcessors(String executionId, Instant startTime,
|
||||
String applicationId, String routeId,
|
||||
List<ProcessorRecord> processors) {
|
||||
throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline");
|
||||
}
|
||||
|
||||
// --- Row mappers ---
|
||||
|
||||
private static ExecutionRecord mapExecutionRecord(ResultSet rs) throws SQLException {
|
||||
|
||||
@@ -1,63 +1,28 @@
|
||||
package com.cameleer.server.core.ingestion;
|
||||
|
||||
import com.cameleer.common.model.ExchangeSnapshot;
|
||||
import com.cameleer.common.model.ProcessorExecution;
|
||||
import com.cameleer.common.model.RouteExecution;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
import com.cameleer.server.core.indexing.ExecutionUpdatedEvent;
|
||||
import com.cameleer.server.core.storage.DiagramStore;
|
||||
import com.cameleer.server.core.storage.ExecutionStore;
|
||||
import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord;
|
||||
import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord;
|
||||
import com.cameleer.server.core.storage.model.MetricsSnapshot;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* Diagram + metrics ingestion facade.
|
||||
*
|
||||
* <p>Execution ingestion went through this class via the {@code RouteExecution}
|
||||
* shape until the ClickHouse chunked pipeline took over — {@code ChunkAccumulator}
|
||||
* now writes executions directly from the {@code /api/v1/data/executions}
|
||||
* controller, so this class no longer needs an ExecutionStore or event-publisher
|
||||
* dependency.
|
||||
*/
|
||||
public class IngestionService {
|
||||
|
||||
private static final ObjectMapper JSON = new ObjectMapper()
|
||||
.findAndRegisterModules()
|
||||
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
|
||||
|
||||
private final ExecutionStore executionStore;
|
||||
private final DiagramStore diagramStore;
|
||||
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
|
||||
private final Consumer<ExecutionUpdatedEvent> eventPublisher;
|
||||
private final int bodySizeLimit;
|
||||
|
||||
public IngestionService(ExecutionStore executionStore,
|
||||
DiagramStore diagramStore,
|
||||
WriteBuffer<MetricsSnapshot> metricsBuffer,
|
||||
Consumer<ExecutionUpdatedEvent> eventPublisher,
|
||||
int bodySizeLimit) {
|
||||
this.executionStore = executionStore;
|
||||
public IngestionService(DiagramStore diagramStore,
|
||||
WriteBuffer<MetricsSnapshot> metricsBuffer) {
|
||||
this.diagramStore = diagramStore;
|
||||
this.metricsBuffer = metricsBuffer;
|
||||
this.eventPublisher = eventPublisher;
|
||||
this.bodySizeLimit = bodySizeLimit;
|
||||
}
|
||||
|
||||
public void ingestExecution(String instanceId, String applicationId, RouteExecution execution) {
|
||||
ExecutionRecord record = toExecutionRecord(instanceId, applicationId, execution);
|
||||
executionStore.upsert(record);
|
||||
|
||||
if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) {
|
||||
List<ProcessorRecord> processors = flattenProcessors(
|
||||
execution.getProcessors(), record.executionId(),
|
||||
record.startTime(), applicationId, execution.getRouteId(),
|
||||
null, 0);
|
||||
executionStore.upsertProcessors(
|
||||
record.executionId(), record.startTime(),
|
||||
applicationId, execution.getRouteId(), processors);
|
||||
}
|
||||
|
||||
eventPublisher.accept(new ExecutionUpdatedEvent(
|
||||
record.executionId(), record.startTime()));
|
||||
}
|
||||
|
||||
public void ingestDiagram(TaggedDiagram diagram) {
|
||||
@@ -75,127 +40,4 @@ public class IngestionService {
|
||||
public WriteBuffer<MetricsSnapshot> getMetricsBuffer() {
|
||||
return metricsBuffer;
|
||||
}
|
||||
|
||||
private ExecutionRecord toExecutionRecord(String instanceId, String applicationId,
|
||||
RouteExecution exec) {
|
||||
String diagramHash = diagramStore
|
||||
.findContentHashForRoute(exec.getRouteId(), instanceId)
|
||||
.orElse("");
|
||||
|
||||
// Extract route-level snapshots (critical for REGULAR mode where no processors are recorded)
|
||||
String inputBody = null;
|
||||
String outputBody = null;
|
||||
String inputHeaders = null;
|
||||
String outputHeaders = null;
|
||||
String inputProperties = null;
|
||||
String outputProperties = null;
|
||||
|
||||
ExchangeSnapshot inputSnapshot = exec.getInputSnapshot();
|
||||
if (inputSnapshot != null) {
|
||||
inputBody = truncateBody(inputSnapshot.getBody());
|
||||
inputHeaders = toJson(inputSnapshot.getHeaders());
|
||||
inputProperties = toJson(inputSnapshot.getProperties());
|
||||
}
|
||||
|
||||
ExchangeSnapshot outputSnapshot = exec.getOutputSnapshot();
|
||||
if (outputSnapshot != null) {
|
||||
outputBody = truncateBody(outputSnapshot.getBody());
|
||||
outputHeaders = toJson(outputSnapshot.getHeaders());
|
||||
outputProperties = toJson(outputSnapshot.getProperties());
|
||||
}
|
||||
|
||||
boolean hasTraceData = hasAnyTraceData(exec.getProcessors());
|
||||
|
||||
boolean isReplay = exec.getReplayExchangeId() != null;
|
||||
if (!isReplay && inputSnapshot != null && inputSnapshot.getHeaders() != null) {
|
||||
isReplay = "true".equalsIgnoreCase(
|
||||
String.valueOf(inputSnapshot.getHeaders().get("X-Cameleer-Replay")));
|
||||
}
|
||||
|
||||
return new ExecutionRecord(
|
||||
exec.getExchangeId(), exec.getRouteId(), instanceId, applicationId,
|
||||
null, // environment: legacy PG path; ClickHouse path uses MergedExecution with env resolved from registry
|
||||
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
|
||||
exec.getCorrelationId(), exec.getExchangeId(),
|
||||
exec.getStartTime(), exec.getEndTime(),
|
||||
exec.getDurationMs(),
|
||||
exec.getErrorMessage(), exec.getErrorStackTrace(),
|
||||
diagramHash,
|
||||
exec.getEngineLevel(),
|
||||
inputBody, outputBody, inputHeaders, outputHeaders,
|
||||
inputProperties, outputProperties,
|
||||
toJson(exec.getAttributes()),
|
||||
exec.getErrorType(), exec.getErrorCategory(),
|
||||
exec.getRootCauseType(), exec.getRootCauseMessage(),
|
||||
exec.getTraceId(), exec.getSpanId(),
|
||||
toJsonObject(exec.getProcessors()),
|
||||
hasTraceData,
|
||||
isReplay
|
||||
);
|
||||
}
|
||||
|
||||
private static boolean hasAnyTraceData(List<ProcessorExecution> processors) {
|
||||
if (processors == null) return false;
|
||||
for (ProcessorExecution p : processors) {
|
||||
if (p.getInputBody() != null || p.getOutputBody() != null
|
||||
|| p.getInputHeaders() != null || p.getOutputHeaders() != null
|
||||
|| p.getInputProperties() != null || p.getOutputProperties() != null) return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private List<ProcessorRecord> flattenProcessors(
|
||||
List<ProcessorExecution> processors, String executionId,
|
||||
java.time.Instant execStartTime, String applicationId, String routeId,
|
||||
String parentProcessorId, int depth) {
|
||||
List<ProcessorRecord> flat = new ArrayList<>();
|
||||
for (ProcessorExecution p : processors) {
|
||||
flat.add(new ProcessorRecord(
|
||||
executionId, p.getProcessorId(), p.getProcessorType(),
|
||||
applicationId, routeId,
|
||||
depth, parentProcessorId,
|
||||
p.getStatus() != null ? p.getStatus().name() : "RUNNING",
|
||||
p.getStartTime() != null ? p.getStartTime() : execStartTime,
|
||||
p.getEndTime(),
|
||||
p.getDurationMs(),
|
||||
p.getErrorMessage(), p.getErrorStackTrace(),
|
||||
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
|
||||
toJson(p.getInputHeaders()), toJson(p.getOutputHeaders()),
|
||||
null, null, // inputProperties, outputProperties (not on ProcessorExecution)
|
||||
toJson(p.getAttributes()),
|
||||
null, null, null, null, null,
|
||||
p.getResolvedEndpointUri(),
|
||||
p.getErrorType(), p.getErrorCategory(),
|
||||
p.getRootCauseType(), p.getRootCauseMessage(),
|
||||
p.getErrorHandlerType(), p.getCircuitBreakerState(),
|
||||
p.getFallbackTriggered(),
|
||||
null, null, null, null, null, null
|
||||
));
|
||||
}
|
||||
return flat;
|
||||
}
|
||||
|
||||
private String truncateBody(String body) {
|
||||
if (body == null) return null;
|
||||
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
|
||||
return body;
|
||||
}
|
||||
|
||||
private static String toJson(Map<String, String> headers) {
|
||||
if (headers == null) return null;
|
||||
try {
|
||||
return JSON.writeValueAsString(headers);
|
||||
} catch (JsonProcessingException e) {
|
||||
return "{}";
|
||||
}
|
||||
}
|
||||
|
||||
private static String toJsonObject(Object obj) {
|
||||
if (obj == null) return null;
|
||||
try {
|
||||
return JSON.writeValueAsString(obj);
|
||||
} catch (JsonProcessingException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,12 +6,6 @@ import java.util.Optional;
|
||||
|
||||
public interface ExecutionStore {
|
||||
|
||||
void upsert(ExecutionRecord execution);
|
||||
|
||||
void upsertProcessors(String executionId, Instant startTime,
|
||||
String applicationId, String routeId,
|
||||
List<ProcessorRecord> processors);
|
||||
|
||||
Optional<ExecutionRecord> findById(String executionId);
|
||||
|
||||
List<ProcessorRecord> findProcessors(String executionId);
|
||||
|
||||
Reference in New Issue
Block a user