diff --git a/.claude/rules/app-classes.md b/.claude/rules/app-classes.md index 95cbd870..49b613a4 100644 --- a/.claude/rules/app-classes.md +++ b/.claude/rules/app-classes.md @@ -85,8 +85,7 @@ Env-scoped read-path controllers (`AlertController`, `AlertRuleController`, `Ale - `LogIngestionController` — POST `/api/v1/data/logs` (accepts `List`; WARNs on missing identity, unregistered agents, empty payloads, buffer-full drops). - `EventIngestionController` — POST `/api/v1/data/events`. -- `ChunkIngestionController` — POST `/api/v1/ingestion/chunk/{executions|metrics|diagrams}`. -- `ExecutionController` — POST `/api/v1/data/executions` (legacy ingestion path when ClickHouse disabled). +- `ChunkIngestionController` — POST `/api/v1/data/executions`. Accepts a single `ExecutionChunk` or an array (fields include `exchangeId`, `applicationId`, `instanceId`, `routeId`, `status`, `startTime`, `endTime`, `durationMs`, `chunkSeq`, `final`, `processors: FlatProcessorRecord[]`). The accumulator merges non-final chunks by exchangeId and emits the merged envelope on the final chunk or on stale timeout. Legacy `ExecutionController` / `RouteExecution` shape is retired. - `MetricsController` — POST `/api/v1/data/metrics`. - `DiagramController` — POST `/api/v1/data/diagrams` (resolves applicationId + environment from the agent registry keyed on JWT subject; stamps both on the stored `TaggedDiagram`). diff --git a/.claude/rules/core-classes.md b/.claude/rules/core-classes.md index 193e6832..b43b99f0 100644 --- a/.claude/rules/core-classes.md +++ b/.claude/rules/core-classes.md @@ -107,8 +107,8 @@ paths: ## ingestion/ — Buffered data pipeline -- `IngestionService` — ingestExecution, ingestMetric, ingestLog, ingestDiagram -- `ChunkAccumulator` — batches data for efficient flush +- `IngestionService` — diagram + metrics facade (`ingestDiagram`, `acceptMetrics`, `getMetricsBuffer`). Execution ingestion went through here via the legacy `RouteExecution` shape until `ChunkAccumulator` took over writes from the chunked pipeline — the `ingestExecution` path plus its `ExecutionStore.upsert` / `upsertProcessors` dependencies were removed. +- `ChunkAccumulator` — batches data for efficient flush; owns the execution write path (chunks → buffers → flush scheduler → `ClickHouseExecutionStore.insertExecutionBatch`). - `WriteBuffer` — bounded ring buffer for async flush - `BufferedLogEntry` — log entry wrapper with metadata -- `MergedExecution`, `TaggedExecution`, `TaggedDiagram` — tagged ingestion records. `TaggedDiagram` carries `(instanceId, applicationId, environment, graph)` — env is resolved from the agent registry in the controller and stamped on the ClickHouse `route_diagrams` row. +- `MergedExecution`, `TaggedDiagram` — tagged ingestion records. `TaggedDiagram` carries `(instanceId, applicationId, environment, graph)` — env is resolved from the agent registry in the controller and stamped on the ClickHouse `route_diagrams` row. (`TaggedExecution` still lives in the package as a leftover but has no callers since the legacy PG ingest path was retired.) diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java index 57fbc9be..b1da5677 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java @@ -56,13 +56,9 @@ public class StorageBeanConfig { } @Bean - public IngestionService ingestionService(ExecutionStore executionStore, - DiagramStore diagramStore, - WriteBuffer metricsBuffer, - SearchIndexer searchIndexer, - @Value("${cameleer.server.ingestion.bodysizelimit:16384}") int bodySizeLimit) { - return new IngestionService(executionStore, diagramStore, metricsBuffer, - searchIndexer::onExecutionUpdated, bodySizeLimit); + public IngestionService ingestionService(DiagramStore diagramStore, + WriteBuffer metricsBuffer) { + return new IngestionService(diagramStore, metricsBuffer); } @Bean diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ExecutionController.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ExecutionController.java deleted file mode 100644 index 64294981..00000000 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ExecutionController.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.cameleer.server.app.controller; - -import com.cameleer.common.model.RouteExecution; -import com.cameleer.server.core.agent.AgentInfo; -import com.cameleer.server.core.agent.AgentRegistryService; -import com.cameleer.server.core.ingestion.ChunkAccumulator; -import com.cameleer.server.core.ingestion.IngestionService; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.swagger.v3.oas.annotations.Operation; -import io.swagger.v3.oas.annotations.responses.ApiResponse; -import io.swagger.v3.oas.annotations.tags.Tag; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.http.ResponseEntity; -import org.springframework.security.core.Authentication; -import org.springframework.security.core.context.SecurityContextHolder; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.util.List; - -/** - * Legacy ingestion endpoint for route execution data (PostgreSQL path). - *

- * Accepts both single {@link RouteExecution} and arrays. Data is written - * synchronously to PostgreSQL via {@link IngestionService}. - *

- * Only active when ClickHouse is disabled — when ClickHouse is enabled, - * {@link ChunkIngestionController} takes over the {@code /executions} mapping. - */ -@RestController -@RequestMapping("/api/v1/data") -@ConditionalOnMissingBean(ChunkAccumulator.class) -@Tag(name = "Ingestion", description = "Data ingestion endpoints") -public class ExecutionController { - - private final IngestionService ingestionService; - private final AgentRegistryService registryService; - private final ObjectMapper objectMapper; - - public ExecutionController(IngestionService ingestionService, - AgentRegistryService registryService, - ObjectMapper objectMapper) { - this.ingestionService = ingestionService; - this.registryService = registryService; - this.objectMapper = objectMapper; - } - - @PostMapping("/executions") - @Operation(summary = "Ingest route execution data", - description = "Accepts a single RouteExecution or an array of RouteExecutions") - @ApiResponse(responseCode = "202", description = "Data accepted for processing") - public ResponseEntity ingestExecutions(@RequestBody String body) throws JsonProcessingException { - String instanceId = extractAgentId(); - String applicationId = resolveApplicationId(instanceId); - List executions = parsePayload(body); - - for (RouteExecution execution : executions) { - ingestionService.ingestExecution(instanceId, applicationId, execution); - } - - return ResponseEntity.accepted().build(); - } - - private String extractAgentId() { - Authentication auth = SecurityContextHolder.getContext().getAuthentication(); - return auth != null ? auth.getName() : ""; - } - - private String resolveApplicationId(String instanceId) { - AgentInfo agent = registryService.findById(instanceId); - return agent != null ? agent.applicationId() : ""; - } - - private List parsePayload(String body) throws JsonProcessingException { - String trimmed = body.strip(); - if (trimmed.startsWith("[")) { - return objectMapper.readValue(trimmed, new TypeReference<>() {}); - } else { - RouteExecution single = objectMapper.readValue(trimmed, RouteExecution.class); - return List.of(single); - } - } -} diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java index 2e21cacb..496abe88 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionStore.java @@ -282,20 +282,6 @@ public class ClickHouseExecutionStore implements ExecutionStore { return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } - // --- ExecutionStore interface: write methods (unsupported, use chunked pipeline) --- - - @Override - public void upsert(ExecutionRecord execution) { - throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline"); - } - - @Override - public void upsertProcessors(String executionId, Instant startTime, - String applicationId, String routeId, - List processors) { - throw new UnsupportedOperationException("ClickHouse writes use the chunked pipeline"); - } - // --- Row mappers --- private static ExecutionRecord mapExecutionRecord(ResultSet rs) throws SQLException { diff --git a/cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java b/cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java index bc925a09..dfd31697 100644 --- a/cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java +++ b/cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java @@ -1,63 +1,28 @@ package com.cameleer.server.core.ingestion; -import com.cameleer.common.model.ExchangeSnapshot; -import com.cameleer.common.model.ProcessorExecution; -import com.cameleer.common.model.RouteExecution; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.cameleer.server.core.indexing.ExecutionUpdatedEvent; import com.cameleer.server.core.storage.DiagramStore; -import com.cameleer.server.core.storage.ExecutionStore; -import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord; -import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord; import com.cameleer.server.core.storage.model.MetricsSnapshot; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.function.Consumer; +/** + * Diagram + metrics ingestion facade. + * + *

Execution ingestion went through this class via the {@code RouteExecution} + * shape until the ClickHouse chunked pipeline took over — {@code ChunkAccumulator} + * now writes executions directly from the {@code /api/v1/data/executions} + * controller, so this class no longer needs an ExecutionStore or event-publisher + * dependency. + */ public class IngestionService { - private static final ObjectMapper JSON = new ObjectMapper() - .findAndRegisterModules() - .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - - private final ExecutionStore executionStore; private final DiagramStore diagramStore; private final WriteBuffer metricsBuffer; - private final Consumer eventPublisher; - private final int bodySizeLimit; - public IngestionService(ExecutionStore executionStore, - DiagramStore diagramStore, - WriteBuffer metricsBuffer, - Consumer eventPublisher, - int bodySizeLimit) { - this.executionStore = executionStore; + public IngestionService(DiagramStore diagramStore, + WriteBuffer metricsBuffer) { this.diagramStore = diagramStore; this.metricsBuffer = metricsBuffer; - this.eventPublisher = eventPublisher; - this.bodySizeLimit = bodySizeLimit; - } - - public void ingestExecution(String instanceId, String applicationId, RouteExecution execution) { - ExecutionRecord record = toExecutionRecord(instanceId, applicationId, execution); - executionStore.upsert(record); - - if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) { - List processors = flattenProcessors( - execution.getProcessors(), record.executionId(), - record.startTime(), applicationId, execution.getRouteId(), - null, 0); - executionStore.upsertProcessors( - record.executionId(), record.startTime(), - applicationId, execution.getRouteId(), processors); - } - - eventPublisher.accept(new ExecutionUpdatedEvent( - record.executionId(), record.startTime())); } public void ingestDiagram(TaggedDiagram diagram) { @@ -75,127 +40,4 @@ public class IngestionService { public WriteBuffer getMetricsBuffer() { return metricsBuffer; } - - private ExecutionRecord toExecutionRecord(String instanceId, String applicationId, - RouteExecution exec) { - String diagramHash = diagramStore - .findContentHashForRoute(exec.getRouteId(), instanceId) - .orElse(""); - - // Extract route-level snapshots (critical for REGULAR mode where no processors are recorded) - String inputBody = null; - String outputBody = null; - String inputHeaders = null; - String outputHeaders = null; - String inputProperties = null; - String outputProperties = null; - - ExchangeSnapshot inputSnapshot = exec.getInputSnapshot(); - if (inputSnapshot != null) { - inputBody = truncateBody(inputSnapshot.getBody()); - inputHeaders = toJson(inputSnapshot.getHeaders()); - inputProperties = toJson(inputSnapshot.getProperties()); - } - - ExchangeSnapshot outputSnapshot = exec.getOutputSnapshot(); - if (outputSnapshot != null) { - outputBody = truncateBody(outputSnapshot.getBody()); - outputHeaders = toJson(outputSnapshot.getHeaders()); - outputProperties = toJson(outputSnapshot.getProperties()); - } - - boolean hasTraceData = hasAnyTraceData(exec.getProcessors()); - - boolean isReplay = exec.getReplayExchangeId() != null; - if (!isReplay && inputSnapshot != null && inputSnapshot.getHeaders() != null) { - isReplay = "true".equalsIgnoreCase( - String.valueOf(inputSnapshot.getHeaders().get("X-Cameleer-Replay"))); - } - - return new ExecutionRecord( - exec.getExchangeId(), exec.getRouteId(), instanceId, applicationId, - null, // environment: legacy PG path; ClickHouse path uses MergedExecution with env resolved from registry - exec.getStatus() != null ? exec.getStatus().name() : "RUNNING", - exec.getCorrelationId(), exec.getExchangeId(), - exec.getStartTime(), exec.getEndTime(), - exec.getDurationMs(), - exec.getErrorMessage(), exec.getErrorStackTrace(), - diagramHash, - exec.getEngineLevel(), - inputBody, outputBody, inputHeaders, outputHeaders, - inputProperties, outputProperties, - toJson(exec.getAttributes()), - exec.getErrorType(), exec.getErrorCategory(), - exec.getRootCauseType(), exec.getRootCauseMessage(), - exec.getTraceId(), exec.getSpanId(), - toJsonObject(exec.getProcessors()), - hasTraceData, - isReplay - ); - } - - private static boolean hasAnyTraceData(List processors) { - if (processors == null) return false; - for (ProcessorExecution p : processors) { - if (p.getInputBody() != null || p.getOutputBody() != null - || p.getInputHeaders() != null || p.getOutputHeaders() != null - || p.getInputProperties() != null || p.getOutputProperties() != null) return true; - } - return false; - } - - private List flattenProcessors( - List processors, String executionId, - java.time.Instant execStartTime, String applicationId, String routeId, - String parentProcessorId, int depth) { - List flat = new ArrayList<>(); - for (ProcessorExecution p : processors) { - flat.add(new ProcessorRecord( - executionId, p.getProcessorId(), p.getProcessorType(), - applicationId, routeId, - depth, parentProcessorId, - p.getStatus() != null ? p.getStatus().name() : "RUNNING", - p.getStartTime() != null ? p.getStartTime() : execStartTime, - p.getEndTime(), - p.getDurationMs(), - p.getErrorMessage(), p.getErrorStackTrace(), - truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()), - toJson(p.getInputHeaders()), toJson(p.getOutputHeaders()), - null, null, // inputProperties, outputProperties (not on ProcessorExecution) - toJson(p.getAttributes()), - null, null, null, null, null, - p.getResolvedEndpointUri(), - p.getErrorType(), p.getErrorCategory(), - p.getRootCauseType(), p.getRootCauseMessage(), - p.getErrorHandlerType(), p.getCircuitBreakerState(), - p.getFallbackTriggered(), - null, null, null, null, null, null - )); - } - return flat; - } - - private String truncateBody(String body) { - if (body == null) return null; - if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit); - return body; - } - - private static String toJson(Map headers) { - if (headers == null) return null; - try { - return JSON.writeValueAsString(headers); - } catch (JsonProcessingException e) { - return "{}"; - } - } - - private static String toJsonObject(Object obj) { - if (obj == null) return null; - try { - return JSON.writeValueAsString(obj); - } catch (JsonProcessingException e) { - return null; - } - } } diff --git a/cameleer-server-core/src/main/java/com/cameleer/server/core/storage/ExecutionStore.java b/cameleer-server-core/src/main/java/com/cameleer/server/core/storage/ExecutionStore.java index 9c227128..96e9739b 100644 --- a/cameleer-server-core/src/main/java/com/cameleer/server/core/storage/ExecutionStore.java +++ b/cameleer-server-core/src/main/java/com/cameleer/server/core/storage/ExecutionStore.java @@ -6,12 +6,6 @@ import java.util.Optional; public interface ExecutionStore { - void upsert(ExecutionRecord execution); - - void upsertProcessors(String executionId, Instant startTime, - String applicationId, String routeId, - List processors); - Optional findById(String executionId); List findProcessors(String executionId);