refactor(ingestion): drop dead legacy execution-ingestion path
ExecutionController was @ConditionalOnMissingBean(ChunkAccumulator.class), and ChunkAccumulator is registered unconditionally — the legacy controller never bound in any profile. Even if it had, IngestionService.ingestExecution called executionStore.upsert(), and the only ExecutionStore impl (ClickHouseExecutionStore) threw UnsupportedOperationException from upsert and upsertProcessors. The entire RouteExecution → upsert path was dead code carrying four transitive dependencies (RouteExecution import, eventPublisher wiring, body-size-limit config, searchIndexer::onExecutionUpdated hook). Removed: - cameleer-server-app/.../controller/ExecutionController.java (whole file) - ExecutionStore.upsert + upsertProcessors (interface methods) - ClickHouseExecutionStore.upsert + upsertProcessors (thrower overrides) - IngestionService.ingestExecution + toExecutionRecord + flattenProcessors + hasAnyTraceData + truncateBody + toJson/toJsonObject helpers - IngestionService constructor now takes (DiagramStore, WriteBuffer<Metrics>); dropped ExecutionStore + Consumer<ExecutionUpdatedEvent> + bodySizeLimit - StorageBeanConfig.ingestionService(...) simplified accordingly Untouched because still in use: - ExecutionRecord / ProcessorRecord records (findById / findProcessors / SearchIndexer / DetailController) - SearchIndexer (its onExecutionUpdated never fires now since no-one publishes ExecutionUpdatedEvent, but SearchIndexerStats is still referenced by ClickHouseAdminController — separate cleanup) - TaggedExecution record has no remaining callers after this change — flagged in core-classes.md as a leftover; separate cleanup. Rule docs updated: - .claude/rules/app-classes.md: retired ExecutionController bullet, fixed stale URL for ChunkIngestionController (it owns /api/v1/data/executions, not /api/v1/ingestion/chunk/executions). - .claude/rules/core-classes.md: IngestionService surface + note the dead TaggedExecution. Full IT suite post-removal: 560 tests run, 11 F + 1 E — same 12 failures in the same 3 previously-parked classes (AgentSseControllerIT / SseSigningIT SSE-timing + ClickHouseStatsStoreIT timezone bug). No regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,63 +1,28 @@
|
||||
package com.cameleer.server.core.ingestion;
|
||||
|
||||
import com.cameleer.common.model.ExchangeSnapshot;
|
||||
import com.cameleer.common.model.ProcessorExecution;
|
||||
import com.cameleer.common.model.RouteExecution;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
import com.cameleer.server.core.indexing.ExecutionUpdatedEvent;
|
||||
import com.cameleer.server.core.storage.DiagramStore;
|
||||
import com.cameleer.server.core.storage.ExecutionStore;
|
||||
import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord;
|
||||
import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord;
|
||||
import com.cameleer.server.core.storage.model.MetricsSnapshot;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* Diagram + metrics ingestion facade.
|
||||
*
|
||||
* <p>Execution ingestion went through this class via the {@code RouteExecution}
|
||||
* shape until the ClickHouse chunked pipeline took over — {@code ChunkAccumulator}
|
||||
* now writes executions directly from the {@code /api/v1/data/executions}
|
||||
* controller, so this class no longer needs an ExecutionStore or event-publisher
|
||||
* dependency.
|
||||
*/
|
||||
public class IngestionService {
|
||||
|
||||
private static final ObjectMapper JSON = new ObjectMapper()
|
||||
.findAndRegisterModules()
|
||||
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
|
||||
|
||||
private final ExecutionStore executionStore;
|
||||
private final DiagramStore diagramStore;
|
||||
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
|
||||
private final Consumer<ExecutionUpdatedEvent> eventPublisher;
|
||||
private final int bodySizeLimit;
|
||||
|
||||
public IngestionService(ExecutionStore executionStore,
|
||||
DiagramStore diagramStore,
|
||||
WriteBuffer<MetricsSnapshot> metricsBuffer,
|
||||
Consumer<ExecutionUpdatedEvent> eventPublisher,
|
||||
int bodySizeLimit) {
|
||||
this.executionStore = executionStore;
|
||||
public IngestionService(DiagramStore diagramStore,
|
||||
WriteBuffer<MetricsSnapshot> metricsBuffer) {
|
||||
this.diagramStore = diagramStore;
|
||||
this.metricsBuffer = metricsBuffer;
|
||||
this.eventPublisher = eventPublisher;
|
||||
this.bodySizeLimit = bodySizeLimit;
|
||||
}
|
||||
|
||||
public void ingestExecution(String instanceId, String applicationId, RouteExecution execution) {
|
||||
ExecutionRecord record = toExecutionRecord(instanceId, applicationId, execution);
|
||||
executionStore.upsert(record);
|
||||
|
||||
if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) {
|
||||
List<ProcessorRecord> processors = flattenProcessors(
|
||||
execution.getProcessors(), record.executionId(),
|
||||
record.startTime(), applicationId, execution.getRouteId(),
|
||||
null, 0);
|
||||
executionStore.upsertProcessors(
|
||||
record.executionId(), record.startTime(),
|
||||
applicationId, execution.getRouteId(), processors);
|
||||
}
|
||||
|
||||
eventPublisher.accept(new ExecutionUpdatedEvent(
|
||||
record.executionId(), record.startTime()));
|
||||
}
|
||||
|
||||
public void ingestDiagram(TaggedDiagram diagram) {
|
||||
@@ -75,127 +40,4 @@ public class IngestionService {
|
||||
public WriteBuffer<MetricsSnapshot> getMetricsBuffer() {
|
||||
return metricsBuffer;
|
||||
}
|
||||
|
||||
private ExecutionRecord toExecutionRecord(String instanceId, String applicationId,
|
||||
RouteExecution exec) {
|
||||
String diagramHash = diagramStore
|
||||
.findContentHashForRoute(exec.getRouteId(), instanceId)
|
||||
.orElse("");
|
||||
|
||||
// Extract route-level snapshots (critical for REGULAR mode where no processors are recorded)
|
||||
String inputBody = null;
|
||||
String outputBody = null;
|
||||
String inputHeaders = null;
|
||||
String outputHeaders = null;
|
||||
String inputProperties = null;
|
||||
String outputProperties = null;
|
||||
|
||||
ExchangeSnapshot inputSnapshot = exec.getInputSnapshot();
|
||||
if (inputSnapshot != null) {
|
||||
inputBody = truncateBody(inputSnapshot.getBody());
|
||||
inputHeaders = toJson(inputSnapshot.getHeaders());
|
||||
inputProperties = toJson(inputSnapshot.getProperties());
|
||||
}
|
||||
|
||||
ExchangeSnapshot outputSnapshot = exec.getOutputSnapshot();
|
||||
if (outputSnapshot != null) {
|
||||
outputBody = truncateBody(outputSnapshot.getBody());
|
||||
outputHeaders = toJson(outputSnapshot.getHeaders());
|
||||
outputProperties = toJson(outputSnapshot.getProperties());
|
||||
}
|
||||
|
||||
boolean hasTraceData = hasAnyTraceData(exec.getProcessors());
|
||||
|
||||
boolean isReplay = exec.getReplayExchangeId() != null;
|
||||
if (!isReplay && inputSnapshot != null && inputSnapshot.getHeaders() != null) {
|
||||
isReplay = "true".equalsIgnoreCase(
|
||||
String.valueOf(inputSnapshot.getHeaders().get("X-Cameleer-Replay")));
|
||||
}
|
||||
|
||||
return new ExecutionRecord(
|
||||
exec.getExchangeId(), exec.getRouteId(), instanceId, applicationId,
|
||||
null, // environment: legacy PG path; ClickHouse path uses MergedExecution with env resolved from registry
|
||||
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
|
||||
exec.getCorrelationId(), exec.getExchangeId(),
|
||||
exec.getStartTime(), exec.getEndTime(),
|
||||
exec.getDurationMs(),
|
||||
exec.getErrorMessage(), exec.getErrorStackTrace(),
|
||||
diagramHash,
|
||||
exec.getEngineLevel(),
|
||||
inputBody, outputBody, inputHeaders, outputHeaders,
|
||||
inputProperties, outputProperties,
|
||||
toJson(exec.getAttributes()),
|
||||
exec.getErrorType(), exec.getErrorCategory(),
|
||||
exec.getRootCauseType(), exec.getRootCauseMessage(),
|
||||
exec.getTraceId(), exec.getSpanId(),
|
||||
toJsonObject(exec.getProcessors()),
|
||||
hasTraceData,
|
||||
isReplay
|
||||
);
|
||||
}
|
||||
|
||||
private static boolean hasAnyTraceData(List<ProcessorExecution> processors) {
|
||||
if (processors == null) return false;
|
||||
for (ProcessorExecution p : processors) {
|
||||
if (p.getInputBody() != null || p.getOutputBody() != null
|
||||
|| p.getInputHeaders() != null || p.getOutputHeaders() != null
|
||||
|| p.getInputProperties() != null || p.getOutputProperties() != null) return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private List<ProcessorRecord> flattenProcessors(
|
||||
List<ProcessorExecution> processors, String executionId,
|
||||
java.time.Instant execStartTime, String applicationId, String routeId,
|
||||
String parentProcessorId, int depth) {
|
||||
List<ProcessorRecord> flat = new ArrayList<>();
|
||||
for (ProcessorExecution p : processors) {
|
||||
flat.add(new ProcessorRecord(
|
||||
executionId, p.getProcessorId(), p.getProcessorType(),
|
||||
applicationId, routeId,
|
||||
depth, parentProcessorId,
|
||||
p.getStatus() != null ? p.getStatus().name() : "RUNNING",
|
||||
p.getStartTime() != null ? p.getStartTime() : execStartTime,
|
||||
p.getEndTime(),
|
||||
p.getDurationMs(),
|
||||
p.getErrorMessage(), p.getErrorStackTrace(),
|
||||
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
|
||||
toJson(p.getInputHeaders()), toJson(p.getOutputHeaders()),
|
||||
null, null, // inputProperties, outputProperties (not on ProcessorExecution)
|
||||
toJson(p.getAttributes()),
|
||||
null, null, null, null, null,
|
||||
p.getResolvedEndpointUri(),
|
||||
p.getErrorType(), p.getErrorCategory(),
|
||||
p.getRootCauseType(), p.getRootCauseMessage(),
|
||||
p.getErrorHandlerType(), p.getCircuitBreakerState(),
|
||||
p.getFallbackTriggered(),
|
||||
null, null, null, null, null, null
|
||||
));
|
||||
}
|
||||
return flat;
|
||||
}
|
||||
|
||||
private String truncateBody(String body) {
|
||||
if (body == null) return null;
|
||||
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
|
||||
return body;
|
||||
}
|
||||
|
||||
private static String toJson(Map<String, String> headers) {
|
||||
if (headers == null) return null;
|
||||
try {
|
||||
return JSON.writeValueAsString(headers);
|
||||
} catch (JsonProcessingException e) {
|
||||
return "{}";
|
||||
}
|
||||
}
|
||||
|
||||
private static String toJsonObject(Object obj) {
|
||||
if (obj == null) return null;
|
||||
try {
|
||||
return JSON.writeValueAsString(obj);
|
||||
} catch (JsonProcessingException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,12 +6,6 @@ import java.util.Optional;
|
||||
|
||||
public interface ExecutionStore {
|
||||
|
||||
void upsert(ExecutionRecord execution);
|
||||
|
||||
void upsertProcessors(String executionId, Instant startTime,
|
||||
String applicationId, String routeId,
|
||||
List<ProcessorRecord> processors);
|
||||
|
||||
Optional<ExecutionRecord> findById(String executionId);
|
||||
|
||||
List<ProcessorRecord> findProcessors(String executionId);
|
||||
|
||||
Reference in New Issue
Block a user