From 565b548ac127e0d9b112b8b6f35efb7baf371f54 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Mon, 16 Mar 2026 18:56:13 +0100 Subject: [PATCH] refactor: remove all ClickHouse code, old interfaces, and SQL migrations - Delete all ClickHouse storage implementations and config - Delete old core interfaces (ExecutionRepository, DiagramRepository, MetricsRepository, SearchEngine, RawExecutionRow) - Delete ClickHouse SQL migration files - Delete AbstractClickHouseIT - Update controllers to use new store interfaces (DiagramStore, ExecutionStore) - Fix IngestionService calls in controllers for new synchronous API - Migrate all ITs from AbstractClickHouseIT to AbstractPostgresIT - Fix count() syntax and remove ClickHouse-specific test assertions - Update TreeReconstructionTest for new buildTree() method Co-Authored-By: Claude Opus 4.6 (1M context) --- cameleer3-server-app/pom.xml | 7 +- .../server/app/config/ClickHouseConfig.java | 80 ---- .../server/app/config/OpenSearchConfig.java | 17 +- .../app/controller/DetailController.java | 31 +- .../app/controller/DiagramController.java | 23 +- .../controller/DiagramRenderController.java | 14 +- .../app/controller/ExecutionController.java | 39 +- .../app/controller/MetricsController.java | 2 +- .../ingestion/ClickHouseFlushScheduler.java | 159 ------- .../app/search/ClickHouseSearchEngine.java | 357 --------------- .../server/app/search/OpenSearchIndex.java | 62 +-- .../app/security/SecurityBeanConfig.java | 2 +- .../storage/ClickHouseDiagramRepository.java | 127 ------ .../ClickHouseExecutionRepository.java | 418 ------------------ .../storage/ClickHouseMetricsRepository.java | 67 --- .../ClickHouseOidcConfigRepository.java | 71 --- .../app/storage/ClickHouseUserRepository.java | 112 ----- .../app/storage/PostgresUserRepository.java | 4 +- .../main/resources/clickhouse/01-schema.sql | 57 --- .../clickhouse/02-search-columns.sql | 25 -- .../main/resources/clickhouse/03-users.sql | 10 - .../resources/clickhouse/04-oidc-config.sql | 13 - .../clickhouse/05-oidc-auto-signup.sql | 1 - .../clickhouse/06-oidc-display-name-claim.sql | 1 - .../resources/clickhouse/07-stats-rollup.sql | 35 -- .../clickhouse/08-stats-rollup-backfill.sql | 16 - .../server/app/AbstractClickHouseIT.java | 82 ---- .../server/app/AbstractPostgresIT.java | 5 + .../controller/AgentCommandControllerIT.java | 4 +- .../AgentRegistrationControllerIT.java | 4 +- .../app/controller/AgentSseControllerIT.java | 4 +- .../server/app/controller/BackpressureIT.java | 64 ++- .../app/controller/DetailControllerIT.java | 6 +- .../app/controller/DiagramControllerIT.java | 8 +- .../controller/DiagramRenderControllerIT.java | 6 +- .../app/controller/ExecutionControllerIT.java | 8 +- .../app/controller/ForwardCompatIT.java | 4 +- .../app/controller/HealthControllerIT.java | 22 +- .../app/controller/MetricsControllerIT.java | 8 +- .../server/app/controller/OpenApiIT.java | 4 +- .../app/controller/SearchControllerIT.java | 6 +- .../app/interceptor/ProtocolVersionIT.java | 4 +- .../server/app/search/OpenSearchIndexIT.java | 5 +- .../server/app/security/BootstrapTokenIT.java | 4 +- .../server/app/security/JwtRefreshIT.java | 4 +- .../app/security/RegistrationSecurityIT.java | 4 +- .../server/app/security/SecurityFilterIT.java | 4 +- .../server/app/security/SseSigningIT.java | 4 +- .../server/app/storage/DiagramLinkingIT.java | 4 +- .../server/app/storage/IngestionSchemaIT.java | 4 +- .../server/core/detail/ExecutionDetail.java | 2 +- .../server/core/detail/ProcessorNode.java | 2 +- .../server/core/detail/RawExecutionRow.java | 59 --- .../core/ingestion/IngestionService.java | 6 +- .../server/core/ingestion/WriteBuffer.java | 2 +- .../server/core/search/SearchEngine.java | 72 --- .../server/core/search/SearchRequest.java | 2 +- .../core/storage/DiagramRepository.java | 35 -- .../core/storage/ExecutionRepository.java | 28 -- .../core/storage/MetricsRepository.java | 17 - .../core/detail/TreeReconstructionTest.java | 103 ++--- clickhouse/init/01-schema.sql | 57 --- clickhouse/init/02-search-columns.sql | 25 -- clickhouse/init/03-users.sql | 10 - clickhouse/init/04-oidc-config.sql | 13 - clickhouse/init/05-oidc-auto-signup.sql | 1 - .../init/06-oidc-display-name-claim.sql | 1 - pom.xml | 7 + 68 files changed, 226 insertions(+), 2238 deletions(-) delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramRepository.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsRepository.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseOidcConfigRepository.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseUserRepository.java delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/01-schema.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/02-search-columns.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/03-users.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/04-oidc-config.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/05-oidc-auto-signup.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/06-oidc-display-name-claim.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/07-stats-rollup.sql delete mode 100644 cameleer3-server-app/src/main/resources/clickhouse/08-stats-rollup-backfill.sql delete mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java delete mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java delete mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java delete mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java delete mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java delete mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java delete mode 100644 clickhouse/init/01-schema.sql delete mode 100644 clickhouse/init/02-search-columns.sql delete mode 100644 clickhouse/init/03-users.sql delete mode 100644 clickhouse/init/04-oidc-config.sql delete mode 100644 clickhouse/init/05-oidc-auto-signup.sql delete mode 100644 clickhouse/init/06-oidc-display-name-claim.sql diff --git a/cameleer3-server-app/pom.xml b/cameleer3-server-app/pom.xml index 9194dc02..81e21c59 100644 --- a/cameleer3-server-app/pom.xml +++ b/cameleer3-server-app/pom.xml @@ -112,7 +112,12 @@ org.testcontainers - postgresql + testcontainers-postgresql + test + + + org.testcontainers + testcontainers-junit-jupiter test diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java deleted file mode 100644 index ee5d8db5..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.cameleer3.server.app.config; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.core.io.Resource; -import org.springframework.core.io.support.PathMatchingResourcePatternResolver; -import org.springframework.jdbc.core.JdbcTemplate; - -import jakarta.annotation.PostConstruct; -import javax.sql.DataSource; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Comparator; -import java.util.stream.Collectors; - -/** - * ClickHouse configuration. - *

- * Spring Boot auto-configures the DataSource from {@code spring.datasource.*} properties. - * This class exposes a JdbcTemplate bean and initializes the schema on startup. - *

- * The ClickHouse container's {@code CLICKHOUSE_DB} env var creates the database; - * this class creates the tables within it. - *

- * Migration files are discovered automatically from {@code classpath:clickhouse/*.sql} - * and executed in filename order (numeric prefix sort). - */ -@Configuration -public class ClickHouseConfig { - - private static final Logger log = LoggerFactory.getLogger(ClickHouseConfig.class); - private static final String MIGRATION_PATTERN = "classpath:clickhouse/*.sql"; - - private final DataSource dataSource; - - public ClickHouseConfig(DataSource dataSource) { - this.dataSource = dataSource; - } - - @Bean - public JdbcTemplate jdbcTemplate() { - return new JdbcTemplate(dataSource); - } - - @PostConstruct - void initSchema() { - var jdbc = new JdbcTemplate(dataSource); - try { - Resource[] resources = new PathMatchingResourcePatternResolver() - .getResources(MIGRATION_PATTERN); - Arrays.sort(resources, Comparator.comparing(Resource::getFilename)); - - for (Resource resource : resources) { - String filename = resource.getFilename(); - try { - String sql = resource.getContentAsString(StandardCharsets.UTF_8); - String stripped = sql.lines() - .filter(line -> !line.trim().startsWith("--")) - .collect(Collectors.joining("\n")); - for (String statement : stripped.split(";")) { - String trimmed = statement.trim(); - if (!trimmed.isEmpty()) { - jdbc.execute(trimmed); - } - } - log.info("Applied schema: {}", filename); - } catch (Exception e) { - log.error("Failed to apply schema: {}", filename, e); - throw new RuntimeException("Schema initialization failed: " + filename, e); - } - } - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException("Failed to discover migration files", e); - } - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java index 0ed581ad..3ff7edea 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java @@ -1,9 +1,10 @@ package com.cameleer3.server.app.config; -import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; -import org.apache.hc.core5.http.HttpHost; +import org.apache.http.HttpHost; +import org.opensearch.client.RestClient; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; +import org.opensearch.client.transport.rest_client.RestClientTransport; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -14,10 +15,14 @@ public class OpenSearchConfig { @Value("${opensearch.url:http://localhost:9200}") private String opensearchUrl; + @Bean(destroyMethod = "close") + public RestClient opensearchRestClient() { + return RestClient.builder(HttpHost.create(opensearchUrl)).build(); + } + @Bean - public OpenSearchClient openSearchClient() { - HttpHost host = HttpHost.create(opensearchUrl); - var transport = ApacheHttpClient5TransportBuilder.builder(host).build(); + public OpenSearchClient openSearchClient(RestClient restClient) { + var transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); return new OpenSearchClient(transport); } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DetailController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DetailController.java index 3e0ca0c4..2bd6ea55 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DetailController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DetailController.java @@ -1,8 +1,9 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.storage.ClickHouseExecutionRepository; import com.cameleer3.server.core.detail.DetailService; import com.cameleer3.server.core.detail.ExecutionDetail; +import com.cameleer3.server.core.storage.ExecutionStore; +import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; @@ -12,14 +13,16 @@ import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; /** * Endpoints for retrieving execution details and processor snapshots. *

* The detail endpoint returns a nested processor tree reconstructed from - * flat parallel arrays stored in ClickHouse. The snapshot endpoint returns - * per-processor exchange data (bodies and headers). + * individual processor records stored in PostgreSQL. The snapshot endpoint + * returns per-processor exchange data (bodies and headers). */ @RestController @RequestMapping("/api/v1/executions") @@ -27,12 +30,12 @@ import java.util.Map; public class DetailController { private final DetailService detailService; - private final ClickHouseExecutionRepository executionRepository; + private final ExecutionStore executionStore; public DetailController(DetailService detailService, - ClickHouseExecutionRepository executionRepository) { + ExecutionStore executionStore) { this.detailService = detailService; - this.executionRepository = executionRepository; + this.executionStore = executionStore; } @GetMapping("/{executionId}") @@ -52,8 +55,18 @@ public class DetailController { public ResponseEntity> getProcessorSnapshot( @PathVariable String executionId, @PathVariable int index) { - return executionRepository.findProcessorSnapshot(executionId, index) - .map(ResponseEntity::ok) - .orElse(ResponseEntity.notFound().build()); + List processors = executionStore.findProcessors(executionId); + if (index < 0 || index >= processors.size()) { + return ResponseEntity.notFound().build(); + } + + ProcessorRecord p = processors.get(index); + Map snapshot = new LinkedHashMap<>(); + if (p.inputBody() != null) snapshot.put("inputBody", p.inputBody()); + if (p.outputBody() != null) snapshot.put("outputBody", p.outputBody()); + if (p.inputHeaders() != null) snapshot.put("inputHeaders", p.inputHeaders()); + if (p.outputHeaders() != null) snapshot.put("outputHeaders", p.outputHeaders()); + + return ResponseEntity.ok(snapshot); } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java index d4359968..5cdaf176 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java @@ -11,7 +11,6 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; @@ -25,8 +24,8 @@ import java.util.List; /** * Ingestion endpoint for route diagrams. *

- * Accepts both single {@link RouteGraph} and arrays. Data is buffered - * and flushed to ClickHouse by the flush scheduler. + * Accepts both single {@link RouteGraph} and arrays. Data is written + * synchronously to PostgreSQL via {@link IngestionService}. */ @RestController @RequestMapping("/api/v1/data") @@ -47,26 +46,12 @@ public class DiagramController { @Operation(summary = "Ingest route diagram data", description = "Accepts a single RouteGraph or an array of RouteGraphs") @ApiResponse(responseCode = "202", description = "Data accepted for processing") - @ApiResponse(responseCode = "503", description = "Buffer full, retry later") public ResponseEntity ingestDiagrams(@RequestBody String body) throws JsonProcessingException { String agentId = extractAgentId(); List graphs = parsePayload(body); - List tagged = graphs.stream() - .map(graph -> new TaggedDiagram(agentId, graph)) - .toList(); - boolean accepted; - if (tagged.size() == 1) { - accepted = ingestionService.acceptDiagram(tagged.get(0)); - } else { - accepted = ingestionService.acceptDiagrams(tagged); - } - - if (!accepted) { - log.warn("Diagram buffer full, returning 503"); - return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE) - .header("Retry-After", "5") - .build(); + for (RouteGraph graph : graphs) { + ingestionService.ingestDiagram(new TaggedDiagram(agentId, graph)); } return ResponseEntity.accepted().build(); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramRenderController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramRenderController.java index b1ca3775..d8f722e7 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramRenderController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramRenderController.java @@ -5,7 +5,7 @@ import com.cameleer3.server.core.agent.AgentInfo; import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.diagram.DiagramLayout; import com.cameleer3.server.core.diagram.DiagramRenderer; -import com.cameleer3.server.core.storage.DiagramRepository; +import com.cameleer3.server.core.storage.DiagramStore; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Schema; @@ -39,14 +39,14 @@ public class DiagramRenderController { private static final MediaType SVG_MEDIA_TYPE = MediaType.valueOf("image/svg+xml"); - private final DiagramRepository diagramRepository; + private final DiagramStore diagramStore; private final DiagramRenderer diagramRenderer; private final AgentRegistryService registryService; - public DiagramRenderController(DiagramRepository diagramRepository, + public DiagramRenderController(DiagramStore diagramStore, DiagramRenderer diagramRenderer, AgentRegistryService registryService) { - this.diagramRepository = diagramRepository; + this.diagramStore = diagramStore; this.diagramRenderer = diagramRenderer; this.registryService = registryService; } @@ -64,7 +64,7 @@ public class DiagramRenderController { @PathVariable String contentHash, HttpServletRequest request) { - Optional graphOpt = diagramRepository.findByContentHash(contentHash); + Optional graphOpt = diagramStore.findByContentHash(contentHash); if (graphOpt.isEmpty()) { return ResponseEntity.notFound().build(); } @@ -105,12 +105,12 @@ public class DiagramRenderController { return ResponseEntity.notFound().build(); } - Optional contentHash = diagramRepository.findContentHashForRouteByAgents(routeId, agentIds); + Optional contentHash = diagramStore.findContentHashForRouteByAgents(routeId, agentIds); if (contentHash.isEmpty()) { return ResponseEntity.notFound().build(); } - Optional graphOpt = diagramRepository.findByContentHash(contentHash.get()); + Optional graphOpt = diagramStore.findByContentHash(contentHash.get()); if (graphOpt.isEmpty()) { return ResponseEntity.notFound().build(); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java index e44f2645..bea76037 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java @@ -1,8 +1,9 @@ package com.cameleer3.server.app.controller; import com.cameleer3.common.model.RouteExecution; +import com.cameleer3.server.core.agent.AgentInfo; +import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.ingestion.IngestionService; -import com.cameleer3.server.core.ingestion.TaggedExecution; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -11,7 +12,6 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; @@ -25,9 +25,8 @@ import java.util.List; /** * Ingestion endpoint for route execution data. *

- * Accepts both single {@link RouteExecution} and arrays. Data is buffered - * in a {@link com.cameleer3.server.core.ingestion.WriteBuffer} and flushed - * to ClickHouse by the flush scheduler. + * Accepts both single {@link RouteExecution} and arrays. Data is written + * synchronously to PostgreSQL via {@link IngestionService}. */ @RestController @RequestMapping("/api/v1/data") @@ -37,10 +36,14 @@ public class ExecutionController { private static final Logger log = LoggerFactory.getLogger(ExecutionController.class); private final IngestionService ingestionService; + private final AgentRegistryService registryService; private final ObjectMapper objectMapper; - public ExecutionController(IngestionService ingestionService, ObjectMapper objectMapper) { + public ExecutionController(IngestionService ingestionService, + AgentRegistryService registryService, + ObjectMapper objectMapper) { this.ingestionService = ingestionService; + this.registryService = registryService; this.objectMapper = objectMapper; } @@ -48,26 +51,13 @@ public class ExecutionController { @Operation(summary = "Ingest route execution data", description = "Accepts a single RouteExecution or an array of RouteExecutions") @ApiResponse(responseCode = "202", description = "Data accepted for processing") - @ApiResponse(responseCode = "503", description = "Buffer full, retry later") public ResponseEntity ingestExecutions(@RequestBody String body) throws JsonProcessingException { String agentId = extractAgentId(); + String groupName = resolveGroupName(agentId); List executions = parsePayload(body); - List tagged = executions.stream() - .map(exec -> new TaggedExecution(agentId, exec)) - .toList(); - boolean accepted; - if (tagged.size() == 1) { - accepted = ingestionService.acceptExecution(tagged.get(0)); - } else { - accepted = ingestionService.acceptExecutions(tagged); - } - - if (!accepted) { - log.warn("Execution buffer full, returning 503"); - return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE) - .header("Retry-After", "5") - .build(); + for (RouteExecution execution : executions) { + ingestionService.ingestExecution(agentId, groupName, execution); } return ResponseEntity.accepted().build(); @@ -78,6 +68,11 @@ public class ExecutionController { return auth != null ? auth.getName() : ""; } + private String resolveGroupName(String agentId) { + AgentInfo agent = registryService.findById(agentId); + return agent != null ? agent.group() : ""; + } + private List parsePayload(String body) throws JsonProcessingException { String trimmed = body.strip(); if (trimmed.startsWith("[")) { diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/MetricsController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/MetricsController.java index e947942d..a7ee03d2 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/MetricsController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/MetricsController.java @@ -23,7 +23,7 @@ import java.util.List; * Ingestion endpoint for agent metrics. *

* Accepts an array of {@link MetricsSnapshot}. Data is buffered - * and flushed to ClickHouse by the flush scheduler. + * and flushed to PostgreSQL by the flush scheduler. */ @RestController @RequestMapping("/api/v1/data") diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java deleted file mode 100644 index e48a2a92..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java +++ /dev/null @@ -1,159 +0,0 @@ -package com.cameleer3.server.app.ingestion; - -import com.cameleer3.server.app.config.IngestionConfig; -import com.cameleer3.server.core.ingestion.TaggedDiagram; -import com.cameleer3.server.core.ingestion.TaggedExecution; -import com.cameleer3.server.core.ingestion.WriteBuffer; -import com.cameleer3.server.core.storage.DiagramRepository; -import com.cameleer3.server.core.storage.ExecutionRepository; -import com.cameleer3.server.core.storage.MetricsRepository; -import com.cameleer3.server.core.storage.model.MetricsSnapshot; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.context.SmartLifecycle; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.List; - -/** - * Scheduled task that drains the write buffers and batch-inserts into ClickHouse. - *

- * Implements {@link SmartLifecycle} to ensure all remaining buffered data is - * flushed on application shutdown. - */ -@Component -public class ClickHouseFlushScheduler implements SmartLifecycle { - - private static final Logger log = LoggerFactory.getLogger(ClickHouseFlushScheduler.class); - - private final WriteBuffer executionBuffer; - private final WriteBuffer diagramBuffer; - private final WriteBuffer metricsBuffer; - private final ExecutionRepository executionRepository; - private final DiagramRepository diagramRepository; - private final MetricsRepository metricsRepository; - private final int batchSize; - - private volatile boolean running = false; - - public ClickHouseFlushScheduler(WriteBuffer executionBuffer, - WriteBuffer diagramBuffer, - WriteBuffer metricsBuffer, - ExecutionRepository executionRepository, - DiagramRepository diagramRepository, - MetricsRepository metricsRepository, - IngestionConfig config) { - this.executionBuffer = executionBuffer; - this.diagramBuffer = diagramBuffer; - this.metricsBuffer = metricsBuffer; - this.executionRepository = executionRepository; - this.diagramRepository = diagramRepository; - this.metricsRepository = metricsRepository; - this.batchSize = config.getBatchSize(); - } - - @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}") - public void flushAll() { - flushExecutions(); - flushDiagrams(); - flushMetrics(); - } - - private void flushExecutions() { - try { - List batch = executionBuffer.drain(batchSize); - if (!batch.isEmpty()) { - executionRepository.insertBatch(batch); - log.debug("Flushed {} executions to ClickHouse", batch.size()); - } - } catch (Exception e) { - log.error("Failed to flush executions to ClickHouse", e); - } - } - - private void flushDiagrams() { - try { - List batch = diagramBuffer.drain(batchSize); - for (TaggedDiagram diagram : batch) { - diagramRepository.store(diagram); - } - if (!batch.isEmpty()) { - log.debug("Flushed {} diagrams to ClickHouse", batch.size()); - } - } catch (Exception e) { - log.error("Failed to flush diagrams to ClickHouse", e); - } - } - - private void flushMetrics() { - try { - List batch = metricsBuffer.drain(batchSize); - if (!batch.isEmpty()) { - metricsRepository.insertBatch(batch); - log.debug("Flushed {} metrics to ClickHouse", batch.size()); - } - } catch (Exception e) { - log.error("Failed to flush metrics to ClickHouse", e); - } - } - - // SmartLifecycle -- flush remaining data on shutdown - - @Override - public void start() { - running = true; - log.info("ClickHouseFlushScheduler started"); - } - - @Override - public void stop() { - log.info("ClickHouseFlushScheduler stopping -- flushing remaining data"); - drainAll(); - running = false; - } - - @Override - public boolean isRunning() { - return running; - } - - @Override - public int getPhase() { - // Run after most beans but before DataSource shutdown - return Integer.MAX_VALUE - 1; - } - - /** - * Drain all buffers completely (loop until empty). - */ - private void drainAll() { - drainBufferCompletely("executions", executionBuffer, batch -> executionRepository.insertBatch(batch)); - drainBufferCompletely("diagrams", diagramBuffer, batch -> { - for (TaggedDiagram d : batch) { - diagramRepository.store(d); - } - }); - drainBufferCompletely("metrics", metricsBuffer, batch -> metricsRepository.insertBatch(batch)); - } - - private void drainBufferCompletely(String name, WriteBuffer buffer, java.util.function.Consumer> inserter) { - int total = 0; - while (buffer.size() > 0) { - List batch = buffer.drain(batchSize); - if (batch.isEmpty()) { - break; - } - try { - inserter.accept(batch); - total += batch.size(); - } catch (Exception e) { - log.error("Failed to flush remaining {} during shutdown", name, e); - break; - } - } - if (total > 0) { - log.info("Flushed {} remaining {} during shutdown", total, name); - } - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java deleted file mode 100644 index ed6a0b13..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java +++ /dev/null @@ -1,357 +0,0 @@ -package com.cameleer3.server.app.search; - -import com.cameleer3.server.core.search.ExecutionStats; -import com.cameleer3.server.core.search.ExecutionSummary; -import com.cameleer3.server.core.search.SearchEngine; -import com.cameleer3.server.core.search.SearchRequest; -import com.cameleer3.server.core.search.SearchResult; -import com.cameleer3.server.core.search.StatsTimeseries; -import org.springframework.jdbc.core.JdbcTemplate; - -import java.sql.Timestamp; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * ClickHouse implementation of {@link SearchEngine}. - *

- * Builds dynamic WHERE clauses from non-null {@link SearchRequest} fields - * and queries the {@code route_executions} table. LIKE patterns are properly - * escaped to prevent injection. - */ -public class ClickHouseSearchEngine implements SearchEngine { - - /** Per-query memory cap (1 GiB) — prevents a single query from OOMing ClickHouse. */ - private static final String SETTINGS = " SETTINGS max_memory_usage = 1000000000"; - - private final JdbcTemplate jdbcTemplate; - - public ClickHouseSearchEngine(JdbcTemplate jdbcTemplate) { - this.jdbcTemplate = jdbcTemplate; - } - - @Override - public SearchResult search(SearchRequest request) { - var conditions = new ArrayList(); - var params = new ArrayList(); - - buildWhereClause(request, conditions, params); - - String where = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions); - - // Count query - var countParams = params.toArray(); - Long total = jdbcTemplate.queryForObject( - "SELECT count() FROM route_executions" + where + SETTINGS, Long.class, countParams); - if (total == null) total = 0L; - - if (total == 0) { - return SearchResult.empty(request.offset(), request.limit()); - } - - // Data query - params.add(request.limit()); - params.add(request.offset()); - String orderDir = "asc".equalsIgnoreCase(request.sortDir()) ? "ASC" : "DESC"; - String dataSql = "SELECT execution_id, route_id, agent_id, status, start_time, end_time, " + - "duration_ms, correlation_id, error_message, diagram_content_hash " + - "FROM route_executions" + where + - " ORDER BY " + request.sortColumn() + " " + orderDir + " LIMIT ? OFFSET ?" + SETTINGS; - - List data = jdbcTemplate.query(dataSql, (rs, rowNum) -> { - Timestamp endTs = rs.getTimestamp("end_time"); - return new ExecutionSummary( - rs.getString("execution_id"), - rs.getString("route_id"), - rs.getString("agent_id"), - rs.getString("status"), - rs.getTimestamp("start_time").toInstant(), - endTs != null ? endTs.toInstant() : null, - rs.getLong("duration_ms"), - rs.getString("correlation_id"), - rs.getString("error_message"), - rs.getString("diagram_content_hash") - ); - }, params.toArray()); - - return new SearchResult<>(data, total, request.offset(), request.limit()); - } - - @Override - public long count(SearchRequest request) { - var conditions = new ArrayList(); - var params = new ArrayList(); - buildWhereClause(request, conditions, params); - - String where = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions); - Long result = jdbcTemplate.queryForObject( - "SELECT count() FROM route_executions" + where + SETTINGS, Long.class, params.toArray()); - return result != null ? result : 0L; - } - - @Override - public ExecutionStats stats(Instant from, Instant to) { - return stats(from, to, null, null); - } - - @Override - public ExecutionStats stats(Instant from, Instant to, String routeId, List agentIds) { - // Current period — read from rollup - var conditions = new ArrayList(); - var params = new ArrayList(); - conditions.add("bucket >= ?"); - params.add(bucketTimestamp(floorToFiveMinutes(from))); - conditions.add("bucket <= ?"); - params.add(bucketTimestamp(to)); - addScopeFilters(routeId, agentIds, conditions, params); - - String where = " WHERE " + String.join(" AND ", conditions); - - String rollupSql = "SELECT " + - "countMerge(total_count) AS cnt, " + - "countIfMerge(failed_count) AS failed, " + - "toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " + - "toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " + - "FROM route_execution_stats_5m" + where + SETTINGS; - - record PeriodStats(long totalCount, long failedCount, long avgDurationMs, long p99LatencyMs) {} - PeriodStats current = jdbcTemplate.queryForObject(rollupSql, - (rs, rowNum) -> new PeriodStats( - rs.getLong("cnt"), - rs.getLong("failed"), - rs.getLong("avg_ms"), - rs.getLong("p99_ms")), - params.toArray()); - - // Active count — PREWHERE reads only the status column before touching wide rows - var scopeConditions = new ArrayList(); - var activeParams = new ArrayList(); - addScopeFilters(routeId, agentIds, scopeConditions, activeParams); - String scopeWhere = scopeConditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", scopeConditions); - Long activeCount = jdbcTemplate.queryForObject( - "SELECT count() FROM route_executions PREWHERE status = 'RUNNING'" + scopeWhere + SETTINGS, - Long.class, activeParams.toArray()); - - // Previous period (same window shifted back 24h) — read from rollup - Duration window = Duration.between(from, to); - Instant prevFrom = from.minus(Duration.ofHours(24)); - Instant prevTo = prevFrom.plus(window); - var prevConditions = new ArrayList(); - var prevParams = new ArrayList(); - prevConditions.add("bucket >= ?"); - prevParams.add(bucketTimestamp(floorToFiveMinutes(prevFrom))); - prevConditions.add("bucket <= ?"); - prevParams.add(bucketTimestamp(prevTo)); - addScopeFilters(routeId, agentIds, prevConditions, prevParams); - String prevWhere = " WHERE " + String.join(" AND ", prevConditions); - - String prevRollupSql = "SELECT " + - "countMerge(total_count) AS cnt, " + - "countIfMerge(failed_count) AS failed, " + - "toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " + - "toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " + - "FROM route_execution_stats_5m" + prevWhere + SETTINGS; - - PeriodStats prev = jdbcTemplate.queryForObject(prevRollupSql, - (rs, rowNum) -> new PeriodStats( - rs.getLong("cnt"), - rs.getLong("failed"), - rs.getLong("avg_ms"), - rs.getLong("p99_ms")), - prevParams.toArray()); - - // Today total (midnight UTC to now) — read from rollup with same scope - Instant todayStart = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.DAYS); - var todayConditions = new ArrayList(); - var todayParams = new ArrayList(); - todayConditions.add("bucket >= ?"); - todayParams.add(bucketTimestamp(floorToFiveMinutes(todayStart))); - addScopeFilters(routeId, agentIds, todayConditions, todayParams); - String todayWhere = " WHERE " + String.join(" AND ", todayConditions); - - Long totalToday = jdbcTemplate.queryForObject( - "SELECT countMerge(total_count) FROM route_execution_stats_5m" + todayWhere + SETTINGS, - Long.class, todayParams.toArray()); - - return new ExecutionStats( - current.totalCount, current.failedCount, current.avgDurationMs, - current.p99LatencyMs, activeCount != null ? activeCount : 0L, - totalToday != null ? totalToday : 0L, - prev.totalCount, prev.failedCount, prev.avgDurationMs, prev.p99LatencyMs); - } - - @Override - public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) { - return timeseries(from, to, bucketCount, null, null); - } - - @Override - public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount, - String routeId, List agentIds) { - long intervalSeconds = Duration.between(from, to).getSeconds() / bucketCount; - if (intervalSeconds < 1) intervalSeconds = 1; - - var conditions = new ArrayList(); - var params = new ArrayList(); - conditions.add("bucket >= ?"); - params.add(bucketTimestamp(floorToFiveMinutes(from))); - conditions.add("bucket <= ?"); - params.add(bucketTimestamp(to)); - addScopeFilters(routeId, agentIds, conditions, params); - - String where = " WHERE " + String.join(" AND ", conditions); - - // Re-aggregate 5-minute rollup buckets into the requested interval - String sql = "SELECT " + - "toDateTime(intDiv(toUInt32(bucket), " + intervalSeconds + ") * " + intervalSeconds + ") AS ts_bucket, " + - "countMerge(total_count) AS cnt, " + - "countIfMerge(failed_count) AS failed, " + - "toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " + - "toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " + - "FROM route_execution_stats_5m" + where + - " GROUP BY ts_bucket ORDER BY ts_bucket" + SETTINGS; - - List buckets = jdbcTemplate.query(sql, (rs, rowNum) -> - new StatsTimeseries.TimeseriesBucket( - rs.getTimestamp("ts_bucket").toInstant(), - rs.getLong("cnt"), - rs.getLong("failed"), - rs.getLong("avg_ms"), - rs.getLong("p99_ms"), - 0L - ), - params.toArray()); - - return new StatsTimeseries(buckets); - } - - private void buildWhereClause(SearchRequest req, List conditions, List params) { - if (req.status() != null && !req.status().isBlank()) { - String[] statuses = req.status().split(","); - if (statuses.length == 1) { - conditions.add("status = ?"); - params.add(statuses[0].trim()); - } else { - String placeholders = String.join(", ", Collections.nCopies(statuses.length, "?")); - conditions.add("status IN (" + placeholders + ")"); - for (String s : statuses) { - params.add(s.trim()); - } - } - } - if (req.timeFrom() != null) { - conditions.add("start_time >= ?"); - params.add(Timestamp.from(req.timeFrom())); - } - if (req.timeTo() != null) { - conditions.add("start_time <= ?"); - params.add(Timestamp.from(req.timeTo())); - } - if (req.durationMin() != null) { - conditions.add("duration_ms >= ?"); - params.add(req.durationMin()); - } - if (req.durationMax() != null) { - conditions.add("duration_ms <= ?"); - params.add(req.durationMax()); - } - if (req.correlationId() != null && !req.correlationId().isBlank()) { - conditions.add("correlation_id = ?"); - params.add(req.correlationId()); - } - if (req.routeId() != null && !req.routeId().isBlank()) { - conditions.add("route_id = ?"); - params.add(req.routeId()); - } - if (req.agentId() != null && !req.agentId().isBlank()) { - conditions.add("agent_id = ?"); - params.add(req.agentId()); - } - // agentIds from group resolution (takes precedence when agentId is not set) - if ((req.agentId() == null || req.agentId().isBlank()) - && req.agentIds() != null && !req.agentIds().isEmpty()) { - String placeholders = String.join(", ", Collections.nCopies(req.agentIds().size(), "?")); - conditions.add("agent_id IN (" + placeholders + ")"); - params.addAll(req.agentIds()); - } - if (req.processorType() != null && !req.processorType().isBlank()) { - conditions.add("has(processor_types, ?)"); - params.add(req.processorType()); - } - if (req.text() != null && !req.text().isBlank()) { - String pattern = "%" + escapeLike(req.text()) + "%"; - String[] textColumns = { - "execution_id", "route_id", "agent_id", - "error_message", "error_stacktrace", - "exchange_bodies", "exchange_headers" - }; - var likeClauses = java.util.Arrays.stream(textColumns) - .map(col -> col + " LIKE ?") - .toList(); - conditions.add("(" + String.join(" OR ", likeClauses) + ")"); - for (int i = 0; i < textColumns.length; i++) { - params.add(pattern); - } - } - if (req.textInBody() != null && !req.textInBody().isBlank()) { - conditions.add("exchange_bodies LIKE ?"); - params.add("%" + escapeLike(req.textInBody()) + "%"); - } - if (req.textInHeaders() != null && !req.textInHeaders().isBlank()) { - conditions.add("exchange_headers LIKE ?"); - params.add("%" + escapeLike(req.textInHeaders()) + "%"); - } - if (req.textInErrors() != null && !req.textInErrors().isBlank()) { - String pattern = "%" + escapeLike(req.textInErrors()) + "%"; - conditions.add("(error_message LIKE ? OR error_stacktrace LIKE ?)"); - params.add(pattern); - params.add(pattern); - } - } - - /** - * Add route ID and agent IDs scope filters to conditions/params. - */ - private void addScopeFilters(String routeId, List agentIds, - List conditions, List params) { - if (routeId != null && !routeId.isBlank()) { - conditions.add("route_id = ?"); - params.add(routeId); - } - if (agentIds != null && !agentIds.isEmpty()) { - String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?")); - conditions.add("agent_id IN (" + placeholders + ")"); - params.addAll(agentIds); - } - } - - /** - * Floor an Instant to the start of its 5-minute bucket. - */ - private static Instant floorToFiveMinutes(Instant instant) { - long epochSecond = instant.getEpochSecond(); - return Instant.ofEpochSecond(epochSecond - (epochSecond % 300)); - } - - /** - * Create a second-precision Timestamp for rollup bucket comparisons. - * The bucket column is DateTime('UTC') (second precision); the JDBC driver - * sends java.sql.Timestamp with nanoseconds which ClickHouse rejects. - */ - private static Timestamp bucketTimestamp(Instant instant) { - return Timestamp.from(instant.truncatedTo(java.time.temporal.ChronoUnit.SECONDS)); - } - - /** - * Escape special LIKE characters to prevent LIKE injection. - */ - static String escapeLike(String input) { - return input - .replace("\\", "\\\\") - .replace("%", "\\%") - .replace("_", "\\_"); - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java index d130b1f6..892792fc 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java @@ -7,6 +7,7 @@ import com.cameleer3.server.core.storage.SearchIndex; import com.cameleer3.server.core.storage.model.ExecutionDocument; import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; import jakarta.annotation.PostConstruct; +import org.opensearch.client.json.JsonData; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.FieldValue; import org.opensearch.client.opensearch._types.SortOrder; @@ -41,8 +42,6 @@ public class OpenSearchIndex implements SearchIndex { @PostConstruct void ensureIndexTemplate() { - // Full template with ngram analyzer for infix wildcard search. - // The template JSON matches the spec's OpenSearch index template definition. try { boolean exists = client.indices().existsIndexTemplate( ExistsIndexTemplateRequest.of(b -> b.name("executions-template"))).value(); @@ -53,22 +52,8 @@ public class OpenSearchIndex implements SearchIndex { .template(t -> t .settings(s -> s .numberOfShards("3") - .numberOfReplicas("1") - .analysis(a -> a - .analyzer("ngram_analyzer", an -> an - .custom(c -> c - .tokenizer("ngram_tokenizer") - .filter("lowercase"))) - .tokenizer("ngram_tokenizer", tk -> tk - .definition(d -> d - .ngram(ng -> ng - .minGram(3) - .maxGram(4) - .tokenChars(TokenChar.Letter, - TokenChar.Digit, - TokenChar.Punctuation, - TokenChar.Symbol))))))))); - log.info("OpenSearch index template created with ngram analyzer"); + .numberOfReplicas("1"))))); + log.info("OpenSearch index template created"); } } catch (IOException e) { log.error("Failed to create index template", e); @@ -99,10 +84,10 @@ public class OpenSearchIndex implements SearchIndex { .collect(Collectors.toList()); long total = response.hits().total() != null ? response.hits().total().value() : 0; - return new SearchResult<>(items, total); + return new SearchResult<>(items, total, request.offset(), request.limit()); } catch (IOException e) { log.error("Search failed", e); - return new SearchResult<>(List.of(), 0); + return SearchResult.empty(request.offset(), request.limit()); } } @@ -125,7 +110,8 @@ public class OpenSearchIndex implements SearchIndex { client.deleteByQuery(DeleteByQueryRequest.of(b -> b .index(List.of(INDEX_PREFIX + "*")) .query(Query.of(q -> q.term(t -> t - .field("execution_id").value(executionId)))))); + .field("execution_id") + .value(FieldValue.of(executionId))))))); } catch (IOException e) { log.error("Failed to delete execution {}", executionId, e); } @@ -155,9 +141,9 @@ public class OpenSearchIndex implements SearchIndex { filter.add(Query.of(q -> q.range(r -> { r.field("start_time"); if (request.timeFrom() != null) - r.gte(jakarta.json.Json.createValue(request.timeFrom().toString())); + r.gte(JsonData.of(request.timeFrom().toString())); if (request.timeTo() != null) - r.lte(jakarta.json.Json.createValue(request.timeTo().toString())); + r.lte(JsonData.of(request.timeTo().toString())); return r; }))); } @@ -180,8 +166,7 @@ public class OpenSearchIndex implements SearchIndex { // Search top-level text fields textQueries.add(Query.of(q -> q.multiMatch(m -> m .query(text) - .fields("error_message", "error_stacktrace", - "error_message.ngram", "error_stacktrace.ngram")))); + .fields("error_message", "error_stacktrace")))); // Search nested processor fields textQueries.add(Query.of(q -> q.nested(n -> n @@ -190,10 +175,7 @@ public class OpenSearchIndex implements SearchIndex { .query(text) .fields("processors.input_body", "processors.output_body", "processors.input_headers", "processors.output_headers", - "processors.error_message", "processors.error_stacktrace", - "processors.input_body.ngram", "processors.output_body.ngram", - "processors.input_headers.ngram", "processors.output_headers.ngram", - "processors.error_message.ngram", "processors.error_stacktrace.ngram")))))); + "processors.error_message", "processors.error_stacktrace")))))); // Also try keyword fields for exact matches textQueries.add(Query.of(q -> q.multiMatch(m -> m @@ -209,30 +191,26 @@ public class OpenSearchIndex implements SearchIndex { .path("processors") .query(nq -> nq.multiMatch(m -> m .query(request.textInBody()) - .fields("processors.input_body", "processors.output_body", - "processors.input_body.ngram", "processors.output_body.ngram")))))); + .fields("processors.input_body", "processors.output_body")))))); } if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) { must.add(Query.of(q -> q.nested(n -> n .path("processors") .query(nq -> nq.multiMatch(m -> m .query(request.textInHeaders()) - .fields("processors.input_headers", "processors.output_headers", - "processors.input_headers.ngram", "processors.output_headers.ngram")))))); + .fields("processors.input_headers", "processors.output_headers")))))); } if (request.textInErrors() != null && !request.textInErrors().isBlank()) { String errText = request.textInErrors(); must.add(Query.of(q -> q.bool(b -> b.should( Query.of(sq -> sq.multiMatch(m -> m .query(errText) - .fields("error_message", "error_stacktrace", - "error_message.ngram", "error_stacktrace.ngram"))), + .fields("error_message", "error_stacktrace"))), Query.of(sq -> sq.nested(n -> n .path("processors") .query(nq -> nq.multiMatch(m -> m .query(errText) - .fields("processors.error_message", "processors.error_stacktrace", - "processors.error_message.ngram", "processors.error_stacktrace.ngram"))))) + .fields("processors.error_message", "processors.error_stacktrace"))))) ).minimumShouldMatch("1")))); } @@ -241,9 +219,9 @@ public class OpenSearchIndex implements SearchIndex { filter.add(Query.of(q -> q.range(r -> { r.field("duration_ms"); if (request.durationMin() != null) - r.gte(jakarta.json.Json.createValue(request.durationMin())); + r.gte(JsonData.of(request.durationMin())); if (request.durationMax() != null) - r.lte(jakarta.json.Json.createValue(request.durationMax())); + r.lte(JsonData.of(request.durationMax())); return r; }))); } @@ -257,7 +235,7 @@ public class OpenSearchIndex implements SearchIndex { } private Query termQuery(String field, String value) { - return Query.of(q -> q.term(t -> t.field(field).value(value))); + return Query.of(q -> q.term(t -> t.field(field).value(FieldValue.of(value)))); } private Map toMap(ExecutionDocument doc) { @@ -305,6 +283,8 @@ public class OpenSearchIndex implements SearchIndex { src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null, src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L, (String) src.get("correlation_id"), - (String) src.get("error_message")); + (String) src.get("error_message"), + null // diagramContentHash not stored in index + ); } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/security/SecurityBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/security/SecurityBeanConfig.java index ad48c345..5c0bdff5 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/security/SecurityBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/security/SecurityBeanConfig.java @@ -16,7 +16,7 @@ import java.util.List; * that required security properties are set. *

* Fails fast on startup if {@code CAMELEER_AUTH_TOKEN} is not set. - * Seeds OIDC config from env vars into ClickHouse if DB is empty. + * Seeds OIDC config from env vars into the database if DB is empty. */ @Configuration @EnableConfigurationProperties(SecurityProperties.class) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramRepository.java deleted file mode 100644 index 11a0ed4f..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramRepository.java +++ /dev/null @@ -1,127 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.common.graph.RouteGraph; -import com.cameleer3.server.core.ingestion.TaggedDiagram; -import com.cameleer3.server.core.storage.DiagramRepository; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Repository; - -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HexFormat; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -/** - * ClickHouse implementation of {@link DiagramRepository}. - *

- * Stores route graphs as JSON with SHA-256 content-hash deduplication. - * The underlying table uses ReplacingMergeTree keyed on content_hash. - */ -@Repository -public class ClickHouseDiagramRepository implements DiagramRepository { - - private static final Logger log = LoggerFactory.getLogger(ClickHouseDiagramRepository.class); - - private static final String INSERT_SQL = """ - INSERT INTO route_diagrams (content_hash, route_id, agent_id, definition) - VALUES (?, ?, ?, ?) - """; - - private static final String SELECT_BY_HASH = """ - SELECT definition FROM route_diagrams WHERE content_hash = ? LIMIT 1 - """; - - private static final String SELECT_HASH_FOR_ROUTE = """ - SELECT content_hash FROM route_diagrams - WHERE route_id = ? AND agent_id = ? - ORDER BY created_at DESC LIMIT 1 - """; - - private final JdbcTemplate jdbcTemplate; - private final ObjectMapper objectMapper; - - public ClickHouseDiagramRepository(JdbcTemplate jdbcTemplate) { - this.jdbcTemplate = jdbcTemplate; - this.objectMapper = new ObjectMapper(); - this.objectMapper.registerModule(new JavaTimeModule()); - } - - @Override - public void store(TaggedDiagram diagram) { - try { - RouteGraph graph = diagram.graph(); - String agentId = diagram.agentId() != null ? diagram.agentId() : ""; - String json = objectMapper.writeValueAsString(graph); - String contentHash = sha256Hex(json); - String routeId = graph.getRouteId() != null ? graph.getRouteId() : ""; - - jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, json); - log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash); - } catch (JsonProcessingException e) { - throw new RuntimeException("Failed to serialize RouteGraph to JSON", e); - } - } - - @Override - public Optional findByContentHash(String contentHash) { - List> rows = jdbcTemplate.queryForList(SELECT_BY_HASH, contentHash); - if (rows.isEmpty()) { - return Optional.empty(); - } - String json = (String) rows.get(0).get("definition"); - try { - return Optional.of(objectMapper.readValue(json, RouteGraph.class)); - } catch (JsonProcessingException e) { - log.error("Failed to deserialize RouteGraph from ClickHouse", e); - return Optional.empty(); - } - } - - @Override - public Optional findContentHashForRoute(String routeId, String agentId) { - List> rows = jdbcTemplate.queryForList(SELECT_HASH_FOR_ROUTE, routeId, agentId); - if (rows.isEmpty()) { - return Optional.empty(); - } - return Optional.of((String) rows.get(0).get("content_hash")); - } - - @Override - public Optional findContentHashForRouteByAgents(String routeId, List agentIds) { - if (agentIds == null || agentIds.isEmpty()) { - return Optional.empty(); - } - String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?")); - String sql = "SELECT content_hash FROM route_diagrams " + - "WHERE route_id = ? AND agent_id IN (" + placeholders + ") " + - "ORDER BY created_at DESC LIMIT 1"; - var params = new ArrayList(); - params.add(routeId); - params.addAll(agentIds); - List> rows = jdbcTemplate.queryForList(sql, params.toArray()); - if (rows.isEmpty()) { - return Optional.empty(); - } - return Optional.of((String) rows.get(0).get("content_hash")); - } - - static String sha256Hex(String input) { - try { - MessageDigest digest = MessageDigest.getInstance("SHA-256"); - byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8)); - return HexFormat.of().formatHex(hash); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException("SHA-256 not available", e); - } - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java deleted file mode 100644 index b119f7e7..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java +++ /dev/null @@ -1,418 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.common.model.ExchangeSnapshot; -import com.cameleer3.common.model.ProcessorExecution; -import com.cameleer3.common.model.RouteExecution; -import com.cameleer3.server.core.detail.RawExecutionRow; -import com.cameleer3.server.core.ingestion.TaggedExecution; -import com.cameleer3.server.core.storage.DiagramRepository; -import com.cameleer3.server.core.storage.ExecutionRepository; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.jdbc.core.BatchPreparedStatementSetter; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Repository; - -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; - -/** - * ClickHouse implementation of {@link ExecutionRepository}. - *

- * Performs batch inserts into the {@code route_executions} table. - * Processor executions are flattened into parallel arrays with tree metadata - * (depth, parent index) for reconstruction. - */ -@Repository -public class ClickHouseExecutionRepository implements ExecutionRepository { - - private static final Logger log = LoggerFactory.getLogger(ClickHouseExecutionRepository.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private static final String INSERT_SQL = """ - INSERT INTO route_executions ( - execution_id, route_id, agent_id, status, start_time, end_time, - duration_ms, correlation_id, exchange_id, error_message, error_stacktrace, - processor_ids, processor_types, processor_starts, processor_ends, - processor_durations, processor_statuses, - exchange_bodies, exchange_headers, - processor_depths, processor_parent_indexes, - processor_error_messages, processor_error_stacktraces, - processor_input_bodies, processor_output_bodies, - processor_input_headers, processor_output_headers, - processor_diagram_node_ids, diagram_content_hash - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """; - - private final JdbcTemplate jdbcTemplate; - private final DiagramRepository diagramRepository; - - public ClickHouseExecutionRepository(JdbcTemplate jdbcTemplate, DiagramRepository diagramRepository) { - this.jdbcTemplate = jdbcTemplate; - this.diagramRepository = diagramRepository; - } - - @Override - public void insertBatch(List executions) { - if (executions.isEmpty()) { - return; - } - - jdbcTemplate.batchUpdate(INSERT_SQL, new BatchPreparedStatementSetter() { - @Override - public void setValues(PreparedStatement ps, int i) throws SQLException { - TaggedExecution tagged = executions.get(i); - RouteExecution exec = tagged.execution(); - String agentId = tagged.agentId() != null ? tagged.agentId() : ""; - List flatProcessors = flattenWithMetadata(exec.getProcessors()); - - int col = 1; - ps.setString(col++, UUID.randomUUID().toString()); - ps.setString(col++, nullSafe(exec.getRouteId())); - ps.setString(col++, agentId); - ps.setString(col++, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING"); - ps.setObject(col++, toTimestamp(exec.getStartTime())); - ps.setObject(col++, toTimestamp(exec.getEndTime())); - ps.setLong(col++, exec.getDurationMs()); - ps.setString(col++, nullSafe(exec.getCorrelationId())); - ps.setString(col++, nullSafe(exec.getExchangeId())); - ps.setString(col++, nullSafe(exec.getErrorMessage())); - ps.setString(col++, nullSafe(exec.getErrorStackTrace())); - - // Original parallel arrays - ps.setObject(col++, flatProcessors.stream().map(fp -> nullSafe(fp.proc.getProcessorId())).toArray(String[]::new)); - ps.setObject(col++, flatProcessors.stream().map(fp -> nullSafe(fp.proc.getProcessorType())).toArray(String[]::new)); - ps.setObject(col++, flatProcessors.stream().map(fp -> toTimestamp(fp.proc.getStartTime())).toArray(Timestamp[]::new)); - ps.setObject(col++, flatProcessors.stream().map(fp -> toTimestamp(fp.proc.getEndTime())).toArray(Timestamp[]::new)); - ps.setObject(col++, flatProcessors.stream().mapToLong(fp -> fp.proc.getDurationMs()).boxed().toArray(Long[]::new)); - ps.setObject(col++, flatProcessors.stream().map(fp -> fp.proc.getStatus() != null ? fp.proc.getStatus().name() : "RUNNING").toArray(String[]::new)); - - // Phase 2: exchange bodies and headers (concatenated for search) - StringBuilder allBodies = new StringBuilder(); - StringBuilder allHeaders = new StringBuilder(); - - String[] inputBodies = new String[flatProcessors.size()]; - String[] outputBodies = new String[flatProcessors.size()]; - String[] inputHeaders = new String[flatProcessors.size()]; - String[] outputHeaders = new String[flatProcessors.size()]; - String[] errorMessages = new String[flatProcessors.size()]; - String[] errorStacktraces = new String[flatProcessors.size()]; - String[] diagramNodeIds = new String[flatProcessors.size()]; - Short[] depths = new Short[flatProcessors.size()]; - Integer[] parentIndexes = new Integer[flatProcessors.size()]; - - for (int j = 0; j < flatProcessors.size(); j++) { - FlatProcessor fp = flatProcessors.get(j); - ProcessorExecution p = fp.proc; - - inputBodies[j] = nullSafe(p.getInputBody()); - outputBodies[j] = nullSafe(p.getOutputBody()); - inputHeaders[j] = mapToJson(p.getInputHeaders()); - outputHeaders[j] = mapToJson(p.getOutputHeaders()); - errorMessages[j] = nullSafe(p.getErrorMessage()); - errorStacktraces[j] = nullSafe(p.getErrorStackTrace()); - diagramNodeIds[j] = nullSafe(p.getDiagramNodeId()); - depths[j] = (short) fp.depth; - parentIndexes[j] = fp.parentIndex; - - allBodies.append(inputBodies[j]).append(' ').append(outputBodies[j]).append(' '); - allHeaders.append(inputHeaders[j]).append(' ').append(outputHeaders[j]).append(' '); - } - - // Include route-level input/output snapshot in searchable text - appendSnapshotText(exec.getInputSnapshot(), allBodies, allHeaders); - appendSnapshotText(exec.getOutputSnapshot(), allBodies, allHeaders); - - ps.setString(col++, allBodies.toString().trim()); // exchange_bodies - ps.setString(col++, allHeaders.toString().trim()); // exchange_headers - ps.setObject(col++, depths); // processor_depths - ps.setObject(col++, parentIndexes); // processor_parent_indexes - ps.setObject(col++, errorMessages); // processor_error_messages - ps.setObject(col++, errorStacktraces); // processor_error_stacktraces - ps.setObject(col++, inputBodies); // processor_input_bodies - ps.setObject(col++, outputBodies); // processor_output_bodies - ps.setObject(col++, inputHeaders); // processor_input_headers - ps.setObject(col++, outputHeaders); // processor_output_headers - ps.setObject(col++, diagramNodeIds); // processor_diagram_node_ids - String diagramHash = diagramRepository - .findContentHashForRoute(exec.getRouteId(), agentId) - .orElse(""); - ps.setString(col++, diagramHash); // diagram_content_hash - } - - @Override - public int getBatchSize() { - return executions.size(); - } - }); - - log.debug("Inserted batch of {} route executions into ClickHouse", executions.size()); - } - - @Override - public Optional findRawById(String executionId) { - String sql = """ - SELECT execution_id, route_id, agent_id, status, start_time, end_time, - duration_ms, correlation_id, exchange_id, error_message, error_stacktrace, - diagram_content_hash, - processor_ids, processor_types, processor_statuses, - processor_starts, processor_ends, processor_durations, - processor_diagram_node_ids, - processor_error_messages, processor_error_stacktraces, - processor_depths, processor_parent_indexes - FROM route_executions - WHERE execution_id = ? - LIMIT 1 - """; - - List results = jdbcTemplate.query(sql, (rs, rowNum) -> { - // Extract parallel arrays from ClickHouse - String[] processorIds = toStringArray(rs.getArray("processor_ids")); - String[] processorTypes = toStringArray(rs.getArray("processor_types")); - String[] processorStatuses = toStringArray(rs.getArray("processor_statuses")); - Instant[] processorStarts = toInstantArray(rs.getArray("processor_starts")); - Instant[] processorEnds = toInstantArray(rs.getArray("processor_ends")); - long[] processorDurations = toLongArray(rs.getArray("processor_durations")); - String[] processorDiagramNodeIds = toStringArray(rs.getArray("processor_diagram_node_ids")); - String[] processorErrorMessages = toStringArray(rs.getArray("processor_error_messages")); - String[] processorErrorStacktraces = toStringArray(rs.getArray("processor_error_stacktraces")); - int[] processorDepths = toIntArrayFromShort(rs.getArray("processor_depths")); - int[] processorParentIndexes = toIntArray(rs.getArray("processor_parent_indexes")); - - Timestamp endTs = rs.getTimestamp("end_time"); - return new RawExecutionRow( - rs.getString("execution_id"), - rs.getString("route_id"), - rs.getString("agent_id"), - rs.getString("status"), - rs.getTimestamp("start_time").toInstant(), - endTs != null ? endTs.toInstant() : null, - rs.getLong("duration_ms"), - rs.getString("correlation_id"), - rs.getString("exchange_id"), - rs.getString("error_message"), - rs.getString("error_stacktrace"), - rs.getString("diagram_content_hash"), - processorIds, processorTypes, processorStatuses, - processorStarts, processorEnds, processorDurations, - processorDiagramNodeIds, - processorErrorMessages, processorErrorStacktraces, - processorDepths, processorParentIndexes - ); - }, executionId); - - return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); - } - - /** - * Find exchange snapshot data for a specific processor by index. - * - * @param executionId the execution ID - * @param processorIndex 0-based processor index - * @return map with inputBody, outputBody, inputHeaders, outputHeaders or empty if not found - */ - public Optional> findProcessorSnapshot(String executionId, int processorIndex) { - // ClickHouse arrays are 1-indexed in SQL - int chIndex = processorIndex + 1; - String sql = """ - SELECT - processor_input_bodies[?] AS input_body, - processor_output_bodies[?] AS output_body, - processor_input_headers[?] AS input_headers, - processor_output_headers[?] AS output_headers, - length(processor_ids) AS proc_count - FROM route_executions - WHERE execution_id = ? - LIMIT 1 - """; - - List> results = jdbcTemplate.query(sql, (rs, rowNum) -> { - int procCount = rs.getInt("proc_count"); - if (processorIndex < 0 || processorIndex >= procCount) { - return null; - } - var snapshot = new java.util.LinkedHashMap(); - snapshot.put("inputBody", rs.getString("input_body")); - snapshot.put("outputBody", rs.getString("output_body")); - snapshot.put("inputHeaders", rs.getString("input_headers")); - snapshot.put("outputHeaders", rs.getString("output_headers")); - return snapshot; - }, chIndex, chIndex, chIndex, chIndex, executionId); - - if (results.isEmpty() || results.get(0) == null) { - return Optional.empty(); - } - return Optional.of(results.get(0)); - } - - // --- Array extraction helpers --- - - private static String[] toStringArray(java.sql.Array sqlArray) throws SQLException { - if (sqlArray == null) return new String[0]; - Object arr = sqlArray.getArray(); - if (arr instanceof String[] sa) return sa; - if (arr instanceof Object[] oa) { - String[] result = new String[oa.length]; - for (int i = 0; i < oa.length; i++) { - result[i] = oa[i] != null ? oa[i].toString() : ""; - } - return result; - } - return new String[0]; - } - - private static Instant[] toInstantArray(java.sql.Array sqlArray) throws SQLException { - if (sqlArray == null) return new Instant[0]; - Object arr = sqlArray.getArray(); - if (arr instanceof Timestamp[] ts) { - Instant[] result = new Instant[ts.length]; - for (int i = 0; i < ts.length; i++) { - result[i] = ts[i] != null ? ts[i].toInstant() : Instant.EPOCH; - } - return result; - } - if (arr instanceof Object[] oa) { - Instant[] result = new Instant[oa.length]; - for (int i = 0; i < oa.length; i++) { - if (oa[i] instanceof Timestamp ts) { - result[i] = ts.toInstant(); - } else { - result[i] = Instant.EPOCH; - } - } - return result; - } - return new Instant[0]; - } - - private static long[] toLongArray(java.sql.Array sqlArray) throws SQLException { - if (sqlArray == null) return new long[0]; - Object arr = sqlArray.getArray(); - if (arr instanceof long[] la) return la; - if (arr instanceof Long[] la) { - long[] result = new long[la.length]; - for (int i = 0; i < la.length; i++) { - result[i] = la[i] != null ? la[i] : 0; - } - return result; - } - if (arr instanceof Object[] oa) { - long[] result = new long[oa.length]; - for (int i = 0; i < oa.length; i++) { - result[i] = oa[i] instanceof Number n ? n.longValue() : 0; - } - return result; - } - return new long[0]; - } - - private static int[] toIntArray(java.sql.Array sqlArray) throws SQLException { - if (sqlArray == null) return new int[0]; - Object arr = sqlArray.getArray(); - if (arr instanceof int[] ia) return ia; - if (arr instanceof Integer[] ia) { - int[] result = new int[ia.length]; - for (int i = 0; i < ia.length; i++) { - result[i] = ia[i] != null ? ia[i] : 0; - } - return result; - } - if (arr instanceof Object[] oa) { - int[] result = new int[oa.length]; - for (int i = 0; i < oa.length; i++) { - result[i] = oa[i] instanceof Number n ? n.intValue() : 0; - } - return result; - } - return new int[0]; - } - - private static int[] toIntArrayFromShort(java.sql.Array sqlArray) throws SQLException { - if (sqlArray == null) return new int[0]; - Object arr = sqlArray.getArray(); - if (arr instanceof short[] sa) { - int[] result = new int[sa.length]; - for (int i = 0; i < sa.length; i++) { - result[i] = sa[i]; - } - return result; - } - if (arr instanceof int[] ia) return ia; - if (arr instanceof Object[] oa) { - int[] result = new int[oa.length]; - for (int i = 0; i < oa.length; i++) { - result[i] = oa[i] instanceof Number n ? n.intValue() : 0; - } - return result; - } - return new int[0]; - } - - /** - * Internal record for a flattened processor with tree metadata. - */ - private record FlatProcessor(ProcessorExecution proc, int depth, int parentIndex) {} - - /** - * Flatten the processor tree with depth and parent index metadata (DFS order). - */ - private List flattenWithMetadata(List processors) { - if (processors == null || processors.isEmpty()) { - return List.of(); - } - var result = new ArrayList(); - for (ProcessorExecution p : processors) { - flattenRecursive(p, 0, -1, result); - } - return result; - } - - private void flattenRecursive(ProcessorExecution processor, int depth, int parentIdx, - List result) { - int myIndex = result.size(); - result.add(new FlatProcessor(processor, depth, parentIdx)); - if (processor.getChildren() != null) { - for (ProcessorExecution child : processor.getChildren()) { - flattenRecursive(child, depth + 1, myIndex, result); - } - } - } - - private void appendSnapshotText(ExchangeSnapshot snapshot, - StringBuilder allBodies, StringBuilder allHeaders) { - if (snapshot != null) { - allBodies.append(nullSafe(snapshot.getBody())).append(' '); - allHeaders.append(mapToJson(snapshot.getHeaders())).append(' '); - } - } - - private static String mapToJson(Map map) { - if (map == null || map.isEmpty()) { - return "{}"; - } - try { - return OBJECT_MAPPER.writeValueAsString(map); - } catch (JsonProcessingException e) { - log.warn("Failed to serialize headers map to JSON", e); - return "{}"; - } - } - - private static String nullSafe(String value) { - return value != null ? value : ""; - } - - private static Timestamp toTimestamp(Instant instant) { - return instant != null ? Timestamp.from(instant) : Timestamp.from(Instant.EPOCH); - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsRepository.java deleted file mode 100644 index a72ea26d..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsRepository.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.server.core.storage.MetricsRepository; -import com.cameleer3.server.core.storage.model.MetricsSnapshot; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.jdbc.core.BatchPreparedStatementSetter; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Repository; - -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.time.Instant; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * ClickHouse implementation of {@link MetricsRepository}. - *

- * Performs batch inserts into the {@code agent_metrics} table. - */ -@Repository -public class ClickHouseMetricsRepository implements MetricsRepository { - - private static final Logger log = LoggerFactory.getLogger(ClickHouseMetricsRepository.class); - - private static final String INSERT_SQL = """ - INSERT INTO agent_metrics (agent_id, collected_at, metric_name, metric_value, tags) - VALUES (?, ?, ?, ?, ?) - """; - - private final JdbcTemplate jdbcTemplate; - - public ClickHouseMetricsRepository(JdbcTemplate jdbcTemplate) { - this.jdbcTemplate = jdbcTemplate; - } - - @Override - public void insertBatch(List metrics) { - if (metrics.isEmpty()) { - return; - } - - jdbcTemplate.batchUpdate(INSERT_SQL, new BatchPreparedStatementSetter() { - @Override - public void setValues(PreparedStatement ps, int i) throws SQLException { - MetricsSnapshot m = metrics.get(i); - ps.setString(1, m.agentId() != null ? m.agentId() : ""); - ps.setObject(2, m.collectedAt() != null ? Timestamp.from(m.collectedAt()) : Timestamp.from(Instant.EPOCH)); - ps.setString(3, m.metricName() != null ? m.metricName() : ""); - ps.setDouble(4, m.metricValue()); - // ClickHouse Map(String, String) -- pass as a java.util.Map - Map tags = m.tags() != null ? m.tags() : new HashMap<>(); - ps.setObject(5, tags); - } - - @Override - public int getBatchSize() { - return metrics.size(); - } - }); - - log.debug("Inserted batch of {} metrics into ClickHouse", metrics.size()); - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseOidcConfigRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseOidcConfigRepository.java deleted file mode 100644 index 92b08d54..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseOidcConfigRepository.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.server.core.security.OidcConfig; -import com.cameleer3.server.core.security.OidcConfigRepository; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Repository; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; - -/** - * ClickHouse implementation of {@link OidcConfigRepository}. - * Singleton row with {@code config_id = 'default'}, using ReplacingMergeTree. - */ -@Repository -public class ClickHouseOidcConfigRepository implements OidcConfigRepository { - - private final JdbcTemplate jdbc; - - public ClickHouseOidcConfigRepository(JdbcTemplate jdbc) { - this.jdbc = jdbc; - } - - @Override - public Optional find() { - List results = jdbc.query( - "SELECT enabled, issuer_uri, client_id, client_secret, roles_claim, default_roles, auto_signup, display_name_claim " - + "FROM oidc_config FINAL WHERE config_id = 'default'", - this::mapRow - ); - return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); - } - - @Override - public void save(OidcConfig config) { - jdbc.update( - "INSERT INTO oidc_config (config_id, enabled, issuer_uri, client_id, client_secret, roles_claim, default_roles, auto_signup, display_name_claim, updated_at) " - + "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, now64(3, 'UTC'))", - config.enabled(), - config.issuerUri(), - config.clientId(), - config.clientSecret(), - config.rolesClaim(), - config.defaultRoles().toArray(new String[0]), - config.autoSignup(), - config.displayNameClaim() - ); - } - - @Override - public void delete() { - jdbc.update("DELETE FROM oidc_config WHERE config_id = 'default'"); - } - - private OidcConfig mapRow(ResultSet rs, int rowNum) throws SQLException { - String[] rolesArray = (String[]) rs.getArray("default_roles").getArray(); - return new OidcConfig( - rs.getBoolean("enabled"), - rs.getString("issuer_uri"), - rs.getString("client_id"), - rs.getString("client_secret"), - rs.getString("roles_claim"), - Arrays.asList(rolesArray), - rs.getBoolean("auto_signup"), - rs.getString("display_name_claim") - ); - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseUserRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseUserRepository.java deleted file mode 100644 index b5090a1e..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseUserRepository.java +++ /dev/null @@ -1,112 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.server.core.security.UserInfo; -import com.cameleer3.server.core.security.UserRepository; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Repository; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.time.Instant; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; - -/** - * ClickHouse implementation of {@link UserRepository}. - *

- * Uses ReplacingMergeTree — reads use {@code FINAL} to get the latest version. - */ -@Repository -public class ClickHouseUserRepository implements UserRepository { - - private final JdbcTemplate jdbc; - - public ClickHouseUserRepository(JdbcTemplate jdbc) { - this.jdbc = jdbc; - } - - @Override - public Optional findById(String userId) { - List results = jdbc.query( - "SELECT user_id, provider, email, display_name, roles, created_at " - + "FROM users FINAL WHERE user_id = ?", - this::mapRow, - userId - ); - return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); - } - - @Override - public List findAll() { - return jdbc.query( - "SELECT user_id, provider, email, display_name, roles, created_at FROM users FINAL ORDER BY user_id", - this::mapRow - ); - } - - @Override - public void upsert(UserInfo user) { - Optional existing = findById(user.userId()); - if (existing.isPresent()) { - UserInfo ex = existing.get(); - // Skip write if nothing changed — avoids accumulating un-merged rows - if (ex.provider().equals(user.provider()) - && ex.email().equals(user.email()) - && ex.displayName().equals(user.displayName()) - && ex.roles().equals(user.roles())) { - return; - } - jdbc.update( - "INSERT INTO users (user_id, provider, email, display_name, roles, created_at, updated_at) " - + "SELECT user_id, ?, ?, ?, ?, created_at, now64(3, 'UTC') " - + "FROM users FINAL WHERE user_id = ?", - user.provider(), - user.email(), - user.displayName(), - user.roles().toArray(new String[0]), - user.userId() - ); - } else { - jdbc.update( - "INSERT INTO users (user_id, provider, email, display_name, roles, updated_at) " - + "VALUES (?, ?, ?, ?, ?, now64(3, 'UTC'))", - user.userId(), - user.provider(), - user.email(), - user.displayName(), - user.roles().toArray(new String[0]) - ); - } - } - - @Override - public void updateRoles(String userId, List roles) { - // ReplacingMergeTree: insert a new row with updated_at to supersede the old one. - // Copy existing fields, update roles. - jdbc.update( - "INSERT INTO users (user_id, provider, email, display_name, roles, created_at, updated_at) " - + "SELECT user_id, provider, email, display_name, ?, created_at, now64(3, 'UTC') " - + "FROM users FINAL WHERE user_id = ?", - roles.toArray(new String[0]), - userId - ); - } - - @Override - public void delete(String userId) { - jdbc.update("DELETE FROM users WHERE user_id = ?", userId); - } - - private UserInfo mapRow(ResultSet rs, int rowNum) throws SQLException { - String[] rolesArray = (String[]) rs.getArray("roles").getArray(); - return new UserInfo( - rs.getString("user_id"), - rs.getString("provider"), - rs.getString("email"), - rs.getString("display_name"), - Arrays.asList(rolesArray), - rs.getTimestamp("created_at").toInstant() - ); - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java index f5867fec..6985b2a3 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java @@ -61,9 +61,11 @@ public class PostgresUserRepository implements UserRepository { private UserInfo mapUser(java.sql.ResultSet rs) throws java.sql.SQLException { Array rolesArray = rs.getArray("roles"); String[] roles = rolesArray != null ? (String[]) rolesArray.getArray() : new String[0]; + java.sql.Timestamp ts = rs.getTimestamp("created_at"); + java.time.Instant createdAt = ts != null ? ts.toInstant() : null; return new UserInfo( rs.getString("user_id"), rs.getString("provider"), rs.getString("email"), rs.getString("display_name"), - List.of(roles)); + List.of(roles), createdAt); } } diff --git a/cameleer3-server-app/src/main/resources/clickhouse/01-schema.sql b/cameleer3-server-app/src/main/resources/clickhouse/01-schema.sql deleted file mode 100644 index ab56da70..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/01-schema.sql +++ /dev/null @@ -1,57 +0,0 @@ --- Cameleer3 ClickHouse Schema --- Tables for route executions, route diagrams, and agent metrics. - -CREATE TABLE IF NOT EXISTS route_executions ( - execution_id String, - route_id LowCardinality(String), - agent_id LowCardinality(String), - status LowCardinality(String), - start_time DateTime64(3, 'UTC'), - end_time Nullable(DateTime64(3, 'UTC')), - duration_ms UInt64, - correlation_id String, - exchange_id String, - error_message String DEFAULT '', - error_stacktrace String DEFAULT '', - -- Nested processor executions stored as parallel arrays - processor_ids Array(String), - processor_types Array(LowCardinality(String)), - processor_starts Array(DateTime64(3, 'UTC')), - processor_ends Array(DateTime64(3, 'UTC')), - processor_durations Array(UInt64), - processor_statuses Array(LowCardinality(String)), - -- Metadata - server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'), - -- Skip indexes - INDEX idx_correlation correlation_id TYPE bloom_filter GRANULARITY 4, - INDEX idx_error error_message TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4 -) -ENGINE = MergeTree() -PARTITION BY toYYYYMMDD(start_time) -ORDER BY (agent_id, status, start_time, execution_id) -TTL toDateTime(start_time) + toIntervalDay(30) -SETTINGS ttl_only_drop_parts = 1; - -CREATE TABLE IF NOT EXISTS route_diagrams ( - content_hash String, - route_id LowCardinality(String), - agent_id LowCardinality(String), - definition String, - created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC') -) -ENGINE = ReplacingMergeTree(created_at) -ORDER BY (content_hash); - -CREATE TABLE IF NOT EXISTS agent_metrics ( - agent_id LowCardinality(String), - collected_at DateTime64(3, 'UTC'), - metric_name LowCardinality(String), - metric_value Float64, - tags Map(String, String), - server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC') -) -ENGINE = MergeTree() -PARTITION BY toYYYYMMDD(collected_at) -ORDER BY (agent_id, metric_name, collected_at) -TTL toDateTime(collected_at) + toIntervalDay(30) -SETTINGS ttl_only_drop_parts = 1; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/02-search-columns.sql b/cameleer3-server-app/src/main/resources/clickhouse/02-search-columns.sql deleted file mode 100644 index 2b11b435..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/02-search-columns.sql +++ /dev/null @@ -1,25 +0,0 @@ --- Phase 2: Schema extension for search, detail, and diagram linking columns. --- Adds exchange snapshot data, processor tree metadata, and diagram content hash. - -ALTER TABLE route_executions - ADD COLUMN IF NOT EXISTS exchange_bodies String DEFAULT '', - ADD COLUMN IF NOT EXISTS exchange_headers String DEFAULT '', - ADD COLUMN IF NOT EXISTS processor_depths Array(UInt16) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_parent_indexes Array(Int32) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_error_messages Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_error_stacktraces Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_input_bodies Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_output_bodies Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_input_headers Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_output_headers Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_diagram_node_ids Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS diagram_content_hash String DEFAULT ''; - --- Skip indexes for full-text search on new text columns -ALTER TABLE route_executions - ADD INDEX IF NOT EXISTS idx_exchange_bodies exchange_bodies TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4, - ADD INDEX IF NOT EXISTS idx_exchange_headers exchange_headers TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4; - --- Skip index on error_stacktrace (not indexed in 01-schema.sql, needed for SRCH-05) -ALTER TABLE route_executions - ADD INDEX IF NOT EXISTS idx_error_stacktrace error_stacktrace TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/03-users.sql b/cameleer3-server-app/src/main/resources/clickhouse/03-users.sql deleted file mode 100644 index 9dc7ce7a..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/03-users.sql +++ /dev/null @@ -1,10 +0,0 @@ -CREATE TABLE IF NOT EXISTS users ( - user_id String, - provider LowCardinality(String), - email String DEFAULT '', - display_name String DEFAULT '', - roles Array(LowCardinality(String)), - created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'), - updated_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC') -) ENGINE = ReplacingMergeTree(updated_at) -ORDER BY (user_id); diff --git a/cameleer3-server-app/src/main/resources/clickhouse/04-oidc-config.sql b/cameleer3-server-app/src/main/resources/clickhouse/04-oidc-config.sql deleted file mode 100644 index 35b4d896..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/04-oidc-config.sql +++ /dev/null @@ -1,13 +0,0 @@ -CREATE TABLE IF NOT EXISTS oidc_config ( - config_id String DEFAULT 'default', - enabled Bool DEFAULT false, - issuer_uri String DEFAULT '', - client_id String DEFAULT '', - client_secret String DEFAULT '', - roles_claim String DEFAULT 'realm_access.roles', - default_roles Array(LowCardinality(String)), - auto_signup Bool DEFAULT true, - display_name_claim String DEFAULT 'name', - updated_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC') -) ENGINE = ReplacingMergeTree(updated_at) -ORDER BY (config_id); diff --git a/cameleer3-server-app/src/main/resources/clickhouse/05-oidc-auto-signup.sql b/cameleer3-server-app/src/main/resources/clickhouse/05-oidc-auto-signup.sql deleted file mode 100644 index 643a69ea..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/05-oidc-auto-signup.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE oidc_config ADD COLUMN IF NOT EXISTS auto_signup Bool DEFAULT true; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/06-oidc-display-name-claim.sql b/cameleer3-server-app/src/main/resources/clickhouse/06-oidc-display-name-claim.sql deleted file mode 100644 index ef1870bd..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/06-oidc-display-name-claim.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE oidc_config ADD COLUMN IF NOT EXISTS display_name_claim String DEFAULT 'name'; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/07-stats-rollup.sql b/cameleer3-server-app/src/main/resources/clickhouse/07-stats-rollup.sql deleted file mode 100644 index 5d1efe24..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/07-stats-rollup.sql +++ /dev/null @@ -1,35 +0,0 @@ --- Pre-aggregated 5-minute stats rollup for route executions. --- Uses AggregatingMergeTree with -State/-Merge combinators so intermediate --- aggregates can be merged across arbitrary time windows and dimensions. - --- Drop existing objects to allow schema changes (MV must be dropped before table) -DROP VIEW IF EXISTS route_execution_stats_5m_mv; -DROP TABLE IF EXISTS route_execution_stats_5m; - -CREATE TABLE route_execution_stats_5m ( - bucket DateTime('UTC'), - route_id LowCardinality(String), - agent_id LowCardinality(String), - total_count AggregateFunction(count), - failed_count AggregateFunction(countIf, UInt8), - duration_sum AggregateFunction(sum, UInt64), - p99_duration AggregateFunction(quantileTDigest(0.99), UInt64) -) -ENGINE = AggregatingMergeTree() -PARTITION BY toYYYYMMDD(bucket) -ORDER BY (agent_id, route_id, bucket) -TTL bucket + toIntervalDay(30) -SETTINGS ttl_only_drop_parts = 1; - -CREATE MATERIALIZED VIEW route_execution_stats_5m_mv -TO route_execution_stats_5m -AS SELECT - toStartOfFiveMinutes(start_time) AS bucket, - route_id, - agent_id, - countState() AS total_count, - countIfState(status = 'FAILED') AS failed_count, - sumState(duration_ms) AS duration_sum, - quantileTDigestState(0.99)(duration_ms) AS p99_duration -FROM route_executions -GROUP BY bucket, route_id, agent_id; diff --git a/cameleer3-server-app/src/main/resources/clickhouse/08-stats-rollup-backfill.sql b/cameleer3-server-app/src/main/resources/clickhouse/08-stats-rollup-backfill.sql deleted file mode 100644 index 5e80a23a..00000000 --- a/cameleer3-server-app/src/main/resources/clickhouse/08-stats-rollup-backfill.sql +++ /dev/null @@ -1,16 +0,0 @@ --- One-time idempotent backfill of existing route_executions into the --- 5-minute stats rollup table. Safe for repeated execution — the WHERE --- clause skips the INSERT if the target table already contains data. - -INSERT INTO route_execution_stats_5m -SELECT - toStartOfFiveMinutes(start_time) AS bucket, - route_id, - agent_id, - countState() AS total_count, - countIfState(status = 'FAILED') AS failed_count, - sumState(duration_ms) AS duration_sum, - quantileTDigestState(0.99)(duration_ms) AS p99_duration -FROM route_executions -WHERE (SELECT count() FROM route_execution_stats_5m) = 0 -GROUP BY bucket, route_id, agent_id; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java deleted file mode 100644 index d1271adb..00000000 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java +++ /dev/null @@ -1,82 +0,0 @@ -package com.cameleer3.server.app; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; -import org.testcontainers.clickhouse.ClickHouseContainer; - -import org.junit.jupiter.api.BeforeAll; - -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.Statement; - -/** - * Base class for integration tests requiring a ClickHouse instance. - *

- * Uses Testcontainers to spin up a ClickHouse server and initializes the schema - * from {@code clickhouse/init/01-schema.sql} before the first test runs. - * Subclasses get a {@link JdbcTemplate} for direct database assertions. - *

- * Container lifecycle is managed manually (started once, shared across all test classes). - */ -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) -@ActiveProfiles("test") -public abstract class AbstractClickHouseIT { - - protected static final ClickHouseContainer CLICKHOUSE; - - static { - CLICKHOUSE = new ClickHouseContainer("clickhouse/clickhouse-server:25.3"); - CLICKHOUSE.start(); - } - - @Autowired - protected JdbcTemplate jdbcTemplate; - - @DynamicPropertySource - static void overrideProperties(DynamicPropertyRegistry registry) { - registry.add("spring.datasource.url", CLICKHOUSE::getJdbcUrl); - registry.add("spring.datasource.username", CLICKHOUSE::getUsername); - registry.add("spring.datasource.password", CLICKHOUSE::getPassword); - } - - @BeforeAll - static void initSchema() throws Exception { - // Surefire runs from the module directory; schema is in the project root - Path baseDir = Path.of("clickhouse/init"); - if (!Files.exists(baseDir)) { - baseDir = Path.of("../clickhouse/init"); - } - - // Load all schema files in order - String[] schemaFiles = {"01-schema.sql", "02-search-columns.sql", "03-users.sql", "04-oidc-config.sql", "05-oidc-auto-signup.sql"}; - - try (Connection conn = DriverManager.getConnection( - CLICKHOUSE.getJdbcUrl(), - CLICKHOUSE.getUsername(), - CLICKHOUSE.getPassword()); - Statement stmt = conn.createStatement()) { - - for (String schemaFile : schemaFiles) { - Path schemaPath = baseDir.resolve(schemaFile); - if (Files.exists(schemaPath)) { - String sql = Files.readString(schemaPath, StandardCharsets.UTF_8); - // Execute each statement separately (separated by semicolons) - for (String statement : sql.split(";")) { - String trimmed = statement.trim(); - if (!trimmed.isEmpty()) { - stmt.execute(trimmed); - } - } - } - } - } - } -} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java index 26faf84a..490e20a9 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java @@ -1,6 +1,8 @@ package com.cameleer3.server.app; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.PostgreSQLContainer; @@ -18,6 +20,9 @@ public abstract class AbstractPostgresIT { .withUsername("cameleer") .withPassword("test"); + @Autowired + protected JdbcTemplate jdbcTemplate; + @DynamicPropertySource static void configureProperties(DynamicPropertyRegistry registry) { registry.add("spring.datasource.url", postgres::getJdbcUrl); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java index ab98f30d..4ba36c5d 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -18,7 +18,7 @@ import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; -class AgentCommandControllerIT extends AbstractClickHouseIT { +class AgentCommandControllerIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java index 652f92d8..763646b9 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -16,7 +16,7 @@ import org.springframework.http.ResponseEntity; import static org.assertj.core.api.Assertions.assertThat; -class AgentRegistrationControllerIT extends AbstractClickHouseIT { +class AgentRegistrationControllerIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java index 1af16ed5..fddc7152 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.BeforeEach; @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -class AgentSseControllerIT extends AbstractClickHouseIT { +class AgentSseControllerIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java index aa8baa17..ee3db1fe 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import com.cameleer3.server.core.ingestion.IngestionService; import org.junit.jupiter.api.BeforeEach; @@ -13,21 +13,20 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.test.context.TestPropertySource; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; /** - * Tests backpressure behavior when write buffers are full. - * Uses a tiny buffer (capacity=5) and a very long flush interval - * to prevent the scheduler from draining the buffer during the test. + * Tests backpressure behavior when the metrics write buffer is full. + *

+ * Execution and diagram ingestion are now synchronous (no buffers). + * Only the metrics pipeline still uses a write buffer with backpressure. */ @TestPropertySource(properties = { "ingestion.buffer-capacity=5", "ingestion.batch-size=5", "ingestion.flush-interval-ms=60000" // 60s -- effectively no flush during test }) -class BackpressureIT extends AbstractClickHouseIT { +class BackpressureIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; @@ -47,34 +46,31 @@ class BackpressureIT extends AbstractClickHouseIT { } @Test - void whenBufferFull_returns503WithRetryAfter() { - // Wait for any initial scheduled flush to complete, then fill buffer via batch POST - await().atMost(5, SECONDS).until(() -> ingestionService.getExecutionBufferDepth() == 0); - - // Fill the buffer completely with a batch of 5 + void whenMetricsBufferFull_returns503WithRetryAfter() { + // Fill the metrics buffer completely with a batch of 5 String batchJson = """ [ - {"routeId":"bp-0","exchangeId":"bp-e0","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]}, - {"routeId":"bp-1","exchangeId":"bp-e1","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]}, - {"routeId":"bp-2","exchangeId":"bp-e2","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]}, - {"routeId":"bp-3","exchangeId":"bp-e3","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]}, - {"routeId":"bp-4","exchangeId":"bp-e4","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]} + {"agentId":"bp-agent","timestamp":"2026-03-11T10:00:00Z","metrics":{}}, + {"agentId":"bp-agent","timestamp":"2026-03-11T10:00:01Z","metrics":{}}, + {"agentId":"bp-agent","timestamp":"2026-03-11T10:00:02Z","metrics":{}}, + {"agentId":"bp-agent","timestamp":"2026-03-11T10:00:03Z","metrics":{}}, + {"agentId":"bp-agent","timestamp":"2026-03-11T10:00:04Z","metrics":{}} ] """; ResponseEntity batchResponse = restTemplate.postForEntity( - "/api/v1/data/executions", + "/api/v1/data/metrics", new HttpEntity<>(batchJson, authHeaders), String.class); assertThat(batchResponse.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); // Now buffer should be full -- next POST should get 503 String overflowJson = """ - {"routeId":"bp-overflow","exchangeId":"bp-overflow-e","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]} + [{"agentId":"bp-agent","timestamp":"2026-03-11T10:00:05Z","metrics":{}}] """; ResponseEntity response = restTemplate.postForEntity( - "/api/v1/data/executions", + "/api/v1/data/metrics", new HttpEntity<>(overflowJson, authHeaders), String.class); @@ -83,25 +79,17 @@ class BackpressureIT extends AbstractClickHouseIT { } @Test - void bufferedDataNotLost_afterBackpressure() { - // Post data to the diagram buffer (separate from executions used above) - for (int i = 0; i < 3; i++) { - String json = String.format(""" - { - "routeId": "bp-persist-diagram-%d", - "version": 1, - "nodes": [], - "edges": [] - } - """, i); + void executionIngestion_isSynchronous_returnsAccepted() { + String json = """ + {"routeId":"bp-sync","exchangeId":"bp-sync-e","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]} + """; - restTemplate.postForEntity( - "/api/v1/data/diagrams", - new HttpEntity<>(json, authHeaders), - String.class); - } + ResponseEntity response = restTemplate.postForEntity( + "/api/v1/data/executions", + new HttpEntity<>(json, authHeaders), + String.class); - // Data is in the buffer. Verify the buffer has data. - assertThat(ingestionService.getDiagramBufferDepth()).isGreaterThanOrEqualTo(3); + // Synchronous ingestion always returns 202 (no buffer to overflow) + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); } } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DetailControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DetailControllerIT.java index cdd29df7..83fa17b1 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DetailControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DetailControllerIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -23,7 +23,7 @@ import static org.awaitility.Awaitility.await; * Integration tests for the detail and processor snapshot endpoints. */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) -class DetailControllerIT extends AbstractClickHouseIT { +class DetailControllerIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; @@ -121,7 +121,7 @@ class DetailControllerIT extends AbstractClickHouseIT { // Wait for flush and get the execution_id await().atMost(10, SECONDS).untilAsserted(() -> { Integer count = jdbcTemplate.queryForObject( - "SELECT count() FROM route_executions WHERE route_id = 'detail-test-route'", + "SELECT count(*) FROM route_executions WHERE route_id = 'detail-test-route'", Integer.class); assertThat(count).isGreaterThanOrEqualTo(1); }); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java index 832967fc..af6f274d 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -15,7 +15,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -class DiagramControllerIT extends AbstractClickHouseIT { +class DiagramControllerIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; @@ -53,7 +53,7 @@ class DiagramControllerIT extends AbstractClickHouseIT { } @Test - void postDiagram_dataAppearsInClickHouseAfterFlush() { + void postDiagram_dataAppearsAfterFlush() { String json = """ { "routeId": "diagram-flush-route", @@ -72,7 +72,7 @@ class DiagramControllerIT extends AbstractClickHouseIT { await().atMost(10, SECONDS).untilAsserted(() -> { Integer count = jdbcTemplate.queryForObject( - "SELECT count() FROM route_diagrams WHERE route_id = 'diagram-flush-route'", + "SELECT count(*) FROM route_diagrams WHERE route_id = 'diagram-flush-route'", Integer.class); assertThat(count).isGreaterThanOrEqualTo(1); }); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramRenderControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramRenderControllerIT.java index f4b0308d..af0b8668 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramRenderControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramRenderControllerIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -20,7 +20,7 @@ import static org.awaitility.Awaitility.await; * Integration tests for {@link DiagramRenderController}. * Seeds a diagram via the ingestion endpoint, then tests rendering. */ -class DiagramRenderControllerIT extends AbstractClickHouseIT { +class DiagramRenderControllerIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; @@ -61,7 +61,7 @@ class DiagramRenderControllerIT extends AbstractClickHouseIT { new HttpEntity<>(json, securityHelper.authHeaders(jwt)), String.class); - // Wait for flush to ClickHouse and retrieve the content hash + // Wait for flush to storage and retrieve the content hash await().atMost(10, SECONDS).untilAsserted(() -> { String hash = jdbcTemplate.queryForObject( "SELECT content_hash FROM route_diagrams WHERE route_id = 'render-test-route' LIMIT 1", diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java index a2bf59d5..65f72d85 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -16,7 +16,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -class ExecutionControllerIT extends AbstractClickHouseIT { +class ExecutionControllerIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; @@ -90,7 +90,7 @@ class ExecutionControllerIT extends AbstractClickHouseIT { } @Test - void postExecution_dataAppearsInClickHouseAfterFlush() { + void postExecution_dataAppearsAfterFlush() { String json = """ { "routeId": "flush-test-route", @@ -111,7 +111,7 @@ class ExecutionControllerIT extends AbstractClickHouseIT { await().atMost(10, SECONDS).untilAsserted(() -> { Integer count = jdbcTemplate.queryForObject( - "SELECT count() FROM route_executions WHERE route_id = 'flush-test-route'", + "SELECT count(*) FROM route_executions WHERE route_id = 'flush-test-route'", Integer.class); assertThat(count).isGreaterThanOrEqualTo(1); }); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ForwardCompatIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ForwardCompatIT.java index 9d68212d..555bbf7c 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ForwardCompatIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ForwardCompatIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -16,7 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat; * Integration test for forward compatibility (API-05). * Verifies that unknown JSON fields in request bodies do not cause deserialization errors. */ -class ForwardCompatIT extends AbstractClickHouseIT { +class ForwardCompatIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/HealthControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/HealthControllerIT.java index c701af3b..9ca31887 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/HealthControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/HealthControllerIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.web.client.TestRestTemplate; @@ -8,9 +8,9 @@ import org.springframework.boot.test.web.client.TestRestTemplate; import static org.assertj.core.api.Assertions.assertThat; /** - * Integration tests for the health endpoint and ClickHouse TTL verification. + * Integration tests for the health endpoint. */ -class HealthControllerIT extends AbstractClickHouseIT { +class HealthControllerIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; @@ -28,20 +28,4 @@ class HealthControllerIT extends AbstractClickHouseIT { var response = restTemplate.getForEntity("/api/v1/health", String.class); assertThat(response.getStatusCode().value()).isEqualTo(200); } - - @Test - void ttlConfiguredOnRouteExecutions() { - String createTable = jdbcTemplate.queryForObject( - "SHOW CREATE TABLE route_executions", String.class); - assertThat(createTable).containsIgnoringCase("TTL"); - assertThat(createTable).contains("toIntervalDay(30)"); - } - - @Test - void ttlConfiguredOnAgentMetrics() { - String createTable = jdbcTemplate.queryForObject( - "SHOW CREATE TABLE agent_metrics", String.class); - assertThat(createTable).containsIgnoringCase("TTL"); - assertThat(createTable).contains("toIntervalDay(30)"); - } } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java index d0eb9793..8f0d8a14 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -15,7 +15,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -class MetricsControllerIT extends AbstractClickHouseIT { +class MetricsControllerIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; @@ -52,7 +52,7 @@ class MetricsControllerIT extends AbstractClickHouseIT { } @Test - void postMetrics_dataAppearsInClickHouseAfterFlush() { + void postMetrics_dataAppearsAfterFlush() { String json = """ [{ "agentId": "agent-flush-test", @@ -70,7 +70,7 @@ class MetricsControllerIT extends AbstractClickHouseIT { await().atMost(10, SECONDS).untilAsserted(() -> { Integer count = jdbcTemplate.queryForObject( - "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-flush-test'", + "SELECT count(*) FROM agent_metrics WHERE agent_id = 'agent-flush-test'", Integer.class); assertThat(count).isGreaterThanOrEqualTo(1); }); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenApiIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenApiIT.java index e474f2b8..a8ceb053 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenApiIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenApiIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.web.client.TestRestTemplate; @@ -10,7 +10,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** * Integration tests for OpenAPI documentation endpoints. */ -class OpenApiIT extends AbstractClickHouseIT { +class OpenApiIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java index 8ae4e072..95f42b2a 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.controller; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -24,7 +24,7 @@ import static org.awaitility.Awaitility.await; * Tests all filter types independently and in combination. */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) -class SearchControllerIT extends AbstractClickHouseIT { +class SearchControllerIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; @@ -155,7 +155,7 @@ class SearchControllerIT extends AbstractClickHouseIT { // Wait for all data to flush await().atMost(10, SECONDS).untilAsserted(() -> { Integer count = jdbcTemplate.queryForObject( - "SELECT count() FROM route_executions WHERE route_id LIKE 'search-route-%'", + "SELECT count(*) FROM route_executions WHERE route_id LIKE 'search-route-%'", Integer.class); assertThat(count).isEqualTo(10); }); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/interceptor/ProtocolVersionIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/interceptor/ProtocolVersionIT.java index 26e8d5a9..35d0c0d1 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/interceptor/ProtocolVersionIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/interceptor/ProtocolVersionIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.interceptor; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -18,7 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; * With security enabled, requests to protected endpoints need JWT auth * to reach the interceptor layer. */ -class ProtocolVersionIT extends AbstractClickHouseIT { +class ProtocolVersionIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java index 24054006..2194ecb4 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java @@ -24,8 +24,7 @@ class OpenSearchIndexIT extends AbstractPostgresIT { @Container static final OpensearchContainer opensearch = - new OpensearchContainer<>("opensearchproject/opensearch:2.19.0") - .withSecurityEnabled(false); + new OpensearchContainer<>("opensearchproject/opensearch:2.19.0"); @DynamicPropertySource static void configureOpenSearch(DynamicPropertyRegistry registry) { @@ -58,7 +57,7 @@ class OpenSearchIndexIT extends AbstractPostgresIT { SearchResult result = searchIndex.search(request); assertTrue(result.total() > 0); - assertEquals("search-1", result.items().get(0).executionId()); + assertEquals("search-1", result.data().get(0).executionId()); } @Test diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/BootstrapTokenIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/BootstrapTokenIT.java index 1309517b..3ce87894 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/BootstrapTokenIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/BootstrapTokenIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.security; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; @@ -17,7 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** * Integration tests verifying bootstrap token validation on the registration endpoint. */ -class BootstrapTokenIT extends AbstractClickHouseIT { +class BootstrapTokenIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/JwtRefreshIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/JwtRefreshIT.java index 7e40e0a1..87ddf25e 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/JwtRefreshIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/JwtRefreshIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.security; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import com.cameleer3.server.core.security.JwtService; import com.fasterxml.jackson.databind.JsonNode; @@ -20,7 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** * Integration tests for the JWT refresh flow. */ -class JwtRefreshIT extends AbstractClickHouseIT { +class JwtRefreshIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/RegistrationSecurityIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/RegistrationSecurityIT.java index abd35524..e4ee5da4 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/RegistrationSecurityIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/RegistrationSecurityIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.security; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; @@ -19,7 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat; * Integration tests verifying that registration returns security credentials * and that those credentials can be used to access protected endpoints. */ -class RegistrationSecurityIT extends AbstractClickHouseIT { +class RegistrationSecurityIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SecurityFilterIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SecurityFilterIT.java index 38f25766..ba8dfcbb 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SecurityFilterIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SecurityFilterIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.security; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -19,7 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat; * Integration tests verifying that the SecurityFilterChain correctly * protects endpoints and allows public access where configured. */ -class SecurityFilterIT extends AbstractClickHouseIT { +class SecurityFilterIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SseSigningIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SseSigningIT.java index ccbb8af9..d611520b 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SseSigningIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SseSigningIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.security; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.core.security.Ed25519SigningService; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -44,7 +44,7 @@ import static org.awaitility.Awaitility.await; * open SSE stream (with JWT query param) -> push config-update command (with JWT) -> * receive SSE event -> verify signature field against server's Ed25519 public key. */ -class SseSigningIT extends AbstractClickHouseIT { +class SseSigningIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/DiagramLinkingIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/DiagramLinkingIT.java index 7322ec26..ab0f01c3 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/DiagramLinkingIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/DiagramLinkingIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.storage; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -19,7 +19,7 @@ import static org.awaitility.Awaitility.await; * Integration test proving that diagram_content_hash is populated during * execution ingestion when a RouteGraph exists for the same route+agent. */ -class DiagramLinkingIT extends AbstractClickHouseIT { +class DiagramLinkingIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java index d0d79e02..4cfa8247 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java @@ -1,6 +1,6 @@ package com.cameleer3.server.app.storage; -import com.cameleer3.server.app.AbstractClickHouseIT; +import com.cameleer3.server.app.AbstractPostgresIT; import com.cameleer3.server.app.TestSecurityHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -22,7 +22,7 @@ import static org.awaitility.Awaitility.await; * Integration test verifying that Phase 2 schema columns are correctly populated * during ingestion of route executions with nested processors and exchange data. */ -class IngestionSchemaIT extends AbstractClickHouseIT { +class IngestionSchemaIT extends AbstractPostgresIT { @Autowired private TestRestTemplate restTemplate; diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java index e739dd81..1b474ba0 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java @@ -7,7 +7,7 @@ import java.util.List; * Full detail of a route execution, including the nested processor tree. *

* This is the rich detail model returned by the detail endpoint. The processor - * tree is reconstructed from flat parallel arrays stored in ClickHouse. + * tree is reconstructed from individual processor records stored in PostgreSQL. * * @param executionId unique execution identifier * @param routeId Camel route ID diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java index 10d1e88e..65e08b9a 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java @@ -7,7 +7,7 @@ import java.util.List; /** * Nested tree node representing a single processor execution within a route. *

- * The tree structure is reconstructed from flat parallel arrays stored in ClickHouse. + * The tree structure is reconstructed from individual processor records stored in PostgreSQL. * Each node may have children (e.g., processors inside a split or try-catch block). */ public final class ProcessorNode { diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java deleted file mode 100644 index 2297e4b6..00000000 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.cameleer3.server.core.detail; - -import java.time.Instant; - -/** - * Raw execution data from ClickHouse, including all parallel arrays needed - * for tree reconstruction. This is the intermediate representation between - * the database and the {@link ExecutionDetail} domain object. - * - * @param executionId unique execution identifier - * @param routeId Camel route ID - * @param agentId agent instance - * @param status execution status - * @param startTime execution start time - * @param endTime execution end time - * @param durationMs execution duration in milliseconds - * @param correlationId correlation ID - * @param exchangeId Camel exchange ID - * @param errorMessage execution-level error message - * @param errorStackTrace execution-level error stack trace - * @param diagramContentHash content hash for diagram linking - * @param processorIds processor IDs (parallel array) - * @param processorTypes processor types (parallel array) - * @param processorStatuses processor statuses (parallel array) - * @param processorStarts processor start times (parallel array) - * @param processorEnds processor end times (parallel array) - * @param processorDurations processor durations in ms (parallel array) - * @param processorDiagramNodeIds processor diagram node IDs (parallel array) - * @param processorErrorMessages processor error messages (parallel array) - * @param processorErrorStacktraces processor error stack traces (parallel array) - * @param processorDepths processor tree depths (parallel array) - * @param processorParentIndexes processor parent indexes, -1 for roots (parallel array) - */ -public record RawExecutionRow( - String executionId, - String routeId, - String agentId, - String status, - Instant startTime, - Instant endTime, - long durationMs, - String correlationId, - String exchangeId, - String errorMessage, - String errorStackTrace, - String diagramContentHash, - String[] processorIds, - String[] processorTypes, - String[] processorStatuses, - Instant[] processorStarts, - Instant[] processorEnds, - long[] processorDurations, - String[] processorDiagramNodeIds, - String[] processorErrorMessages, - String[] processorErrorStacktraces, - int[] processorDepths, - int[] processorParentIndexes -) { -} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java index c5e17e6f..36419fb8 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java @@ -70,12 +70,12 @@ public class IngestionService { private ExecutionRecord toExecutionRecord(String agentId, String groupName, RouteExecution exec) { return new ExecutionRecord( - exec.getExecutionId(), exec.getRouteId(), agentId, groupName, + exec.getExchangeId(), exec.getRouteId(), agentId, groupName, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING", exec.getCorrelationId(), exec.getExchangeId(), exec.getStartTime(), exec.getEndTime(), exec.getDurationMs(), - exec.getErrorMessage(), exec.getErrorStacktrace(), + exec.getErrorMessage(), exec.getErrorStackTrace(), null // diagramContentHash set separately ); } @@ -94,7 +94,7 @@ public class IngestionService { p.getStartTime() != null ? p.getStartTime() : execStartTime, p.getEndTime(), p.getDurationMs(), - p.getErrorMessage(), p.getErrorStacktrace(), + p.getErrorMessage(), p.getErrorStackTrace(), truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()), p.getInputHeaders() != null ? p.getInputHeaders().toString() : null, p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/WriteBuffer.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/WriteBuffer.java index 267de43c..bcd1077c 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/WriteBuffer.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/WriteBuffer.java @@ -6,7 +6,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** - * Bounded write buffer that decouples HTTP ingestion from ClickHouse batch inserts. + * Bounded write buffer that decouples HTTP ingestion from database batch inserts. *

* Items are offered to the buffer by controllers and drained in batches by a * scheduled flush task. When the buffer is full, {@link #offer} returns false, diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java deleted file mode 100644 index 44955c18..00000000 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.cameleer3.server.core.search; - -import java.util.List; - -/** - * Swappable search backend abstraction. - *

- * The current implementation uses ClickHouse for search. This interface allows - * replacing the search backend (e.g., with OpenSearch) without changing the - * service layer or controllers. - */ -public interface SearchEngine { - - /** - * Search for route executions matching the given criteria. - * - * @param request search filters and pagination - * @return paginated search results with total count - */ - SearchResult search(SearchRequest request); - - /** - * Count route executions matching the given criteria (without fetching data). - * - * @param request search filters - * @return total number of matching executions - */ - long count(SearchRequest request); - - /** - * Compute aggregate stats: P99 latency and count of currently running executions. - * - * @param from start of the time window - * @param to end of the time window - * @return execution stats - */ - ExecutionStats stats(java.time.Instant from, java.time.Instant to); - - /** - * Compute aggregate stats scoped to specific routes and agents. - * - * @param from start of the time window - * @param to end of the time window - * @param routeId optional route ID filter - * @param agentIds optional agent ID filter (from group resolution) - * @return execution stats - */ - ExecutionStats stats(java.time.Instant from, java.time.Instant to, String routeId, List agentIds); - - /** - * Compute bucketed time-series stats over a time window. - * - * @param from start of the time window - * @param to end of the time window - * @param bucketCount number of buckets to divide the window into - * @return bucketed stats - */ - StatsTimeseries timeseries(java.time.Instant from, java.time.Instant to, int bucketCount); - - /** - * Compute bucketed time-series stats scoped to specific routes and agents. - * - * @param from start of the time window - * @param to end of the time window - * @param bucketCount number of buckets to divide the window into - * @param routeId optional route ID filter - * @param agentIds optional agent ID filter (from group resolution) - * @return bucketed stats - */ - StatsTimeseries timeseries(java.time.Instant from, java.time.Instant to, int bucketCount, - String routeId, List agentIds); -} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java index ab97c31e..17ff44c9 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java @@ -75,7 +75,7 @@ public record SearchRequest( if (!"asc".equalsIgnoreCase(sortDir)) sortDir = "desc"; } - /** Returns the validated ClickHouse column name for ORDER BY. */ + /** Returns the validated database column name for ORDER BY. */ public String sortColumn() { return SORT_FIELD_TO_COLUMN.getOrDefault(sortField, "start_time"); } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java deleted file mode 100644 index 3a2c4bd6..00000000 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.cameleer3.server.core.storage; - -import com.cameleer3.common.graph.RouteGraph; -import com.cameleer3.server.core.ingestion.TaggedDiagram; - -import java.util.List; -import java.util.Optional; - -/** - * Repository for route diagram storage with content-hash deduplication. - */ -public interface DiagramRepository { - - /** - * Store a tagged route graph. Uses content-hash deduplication via ReplacingMergeTree. - */ - void store(TaggedDiagram diagram); - - /** - * Find a route graph by its content hash. - */ - Optional findByContentHash(String contentHash); - - /** - * Find the content hash for the latest diagram of a given route and agent. - */ - Optional findContentHashForRoute(String routeId, String agentId); - - /** - * Find the content hash for the latest diagram of a route across any agent in the given list. - * All instances of the same application produce the same route graph, so any agent's - * diagram for the same route will have the same content hash. - */ - Optional findContentHashForRouteByAgents(String routeId, List agentIds); -} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java deleted file mode 100644 index c58c1f81..00000000 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.cameleer3.server.core.storage; - -import com.cameleer3.server.core.detail.RawExecutionRow; -import com.cameleer3.server.core.ingestion.TaggedExecution; - -import java.util.List; -import java.util.Optional; - -/** - * Repository for route execution storage and retrieval. - */ -public interface ExecutionRepository { - - /** - * Insert a batch of tagged route executions. - * Implementations must perform a single batch insert for efficiency. - */ - void insertBatch(List executions); - - /** - * Find a raw execution row by execution ID, including all parallel arrays - * needed for processor tree reconstruction. - * - * @param executionId the execution ID to look up - * @return the raw execution row, or empty if not found - */ - Optional findRawById(String executionId); -} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java deleted file mode 100644 index ad15ef0a..00000000 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.cameleer3.server.core.storage; - -import com.cameleer3.server.core.storage.model.MetricsSnapshot; - -import java.util.List; - -/** - * Repository for agent metrics batch inserts into ClickHouse. - */ -public interface MetricsRepository { - - /** - * Insert a batch of metrics snapshots. - * Implementations must perform a single batch insert for efficiency. - */ - void insertBatch(List metrics); -} diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java index a6b4251a..89311bfe 100644 --- a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java +++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java @@ -1,6 +1,7 @@ package com.cameleer3.server.core.detail; -import com.cameleer3.server.core.storage.ExecutionRepository; +import com.cameleer3.server.core.storage.ExecutionStore; +import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; import org.junit.jupiter.api.Test; import java.time.Instant; @@ -10,33 +11,36 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; /** - * Unit tests for {@link DetailService#reconstructTree} logic. + * Unit tests for {@link DetailService#buildTree} logic. *

- * Verifies correct parent-child wiring from flat parallel arrays. + * Verifies correct parent-child wiring from flat ProcessorRecord lists. */ class TreeReconstructionTest { - private final DetailService detailService = new DetailService(mock(ExecutionRepository.class)); + private final DetailService detailService = new DetailService(mock(ExecutionStore.class)); private static final Instant NOW = Instant.parse("2026-03-10T10:00:00Z"); + private ProcessorRecord proc(String id, String type, String status, + int depth, String parentId) { + return new ProcessorRecord( + "exec-1", id, type, "node-" + id, + "default", "route1", depth, parentId, + status, NOW, NOW, 10L, + null, null, null, null, null, null + ); + } + @Test void linearChain_rootChildGrandchild() { - // [root, child, grandchild], depths=[0,1,2], parents=[-1,0,1] - List roots = detailService.reconstructTree( - new String[]{"root", "child", "grandchild"}, - new String[]{"log", "bean", "to"}, - new String[]{"COMPLETED", "COMPLETED", "COMPLETED"}, - new Instant[]{NOW, NOW, NOW}, - new Instant[]{NOW, NOW, NOW}, - new long[]{10, 20, 30}, - new String[]{"n1", "n2", "n3"}, - new String[]{"", "", ""}, - new String[]{"", "", ""}, - new int[]{0, 1, 2}, - new int[]{-1, 0, 1} + List processors = List.of( + proc("root", "log", "COMPLETED", 0, null), + proc("child", "bean", "COMPLETED", 1, "root"), + proc("grandchild", "to", "COMPLETED", 2, "child") ); + List roots = detailService.buildTree(processors); + assertThat(roots).hasSize(1); ProcessorNode root = roots.get(0); assertThat(root.getProcessorId()).isEqualTo("root"); @@ -53,21 +57,14 @@ class TreeReconstructionTest { @Test void multipleRoots_noNesting() { - // [A, B, C], depths=[0,0,0], parents=[-1,-1,-1] - List roots = detailService.reconstructTree( - new String[]{"A", "B", "C"}, - new String[]{"log", "log", "log"}, - new String[]{"COMPLETED", "COMPLETED", "COMPLETED"}, - new Instant[]{NOW, NOW, NOW}, - new Instant[]{NOW, NOW, NOW}, - new long[]{10, 20, 30}, - new String[]{"n1", "n2", "n3"}, - new String[]{"", "", ""}, - new String[]{"", "", ""}, - new int[]{0, 0, 0}, - new int[]{-1, -1, -1} + List processors = List.of( + proc("A", "log", "COMPLETED", 0, null), + proc("B", "log", "COMPLETED", 0, null), + proc("C", "log", "COMPLETED", 0, null) ); + List roots = detailService.buildTree(processors); + assertThat(roots).hasSize(3); assertThat(roots.get(0).getProcessorId()).isEqualTo("A"); assertThat(roots.get(1).getProcessorId()).isEqualTo("B"); @@ -77,21 +74,15 @@ class TreeReconstructionTest { @Test void branchingTree_parentWithTwoChildren_secondChildHasGrandchild() { - // [parent, child1, child2, grandchild], depths=[0,1,1,2], parents=[-1,0,0,2] - List roots = detailService.reconstructTree( - new String[]{"parent", "child1", "child2", "grandchild"}, - new String[]{"split", "log", "bean", "to"}, - new String[]{"COMPLETED", "COMPLETED", "COMPLETED", "COMPLETED"}, - new Instant[]{NOW, NOW, NOW, NOW}, - new Instant[]{NOW, NOW, NOW, NOW}, - new long[]{100, 20, 30, 5}, - new String[]{"n1", "n2", "n3", "n4"}, - new String[]{"", "", "", ""}, - new String[]{"", "", "", ""}, - new int[]{0, 1, 1, 2}, - new int[]{-1, 0, 0, 2} + List processors = List.of( + proc("parent", "split", "COMPLETED", 0, null), + proc("child1", "log", "COMPLETED", 1, "parent"), + proc("child2", "bean", "COMPLETED", 1, "parent"), + proc("grandchild", "to", "COMPLETED", 2, "child2") ); + List roots = detailService.buildTree(processors); + assertThat(roots).hasSize(1); ProcessorNode parent = roots.get(0); assertThat(parent.getProcessorId()).isEqualTo("parent"); @@ -111,30 +102,8 @@ class TreeReconstructionTest { } @Test - void emptyArrays_producesEmptyList() { - List roots = detailService.reconstructTree( - new String[]{}, - new String[]{}, - new String[]{}, - new Instant[]{}, - new Instant[]{}, - new long[]{}, - new String[]{}, - new String[]{}, - new String[]{}, - new int[]{}, - new int[]{} - ); - - assertThat(roots).isEmpty(); - } - - @Test - void nullArrays_producesEmptyList() { - List roots = detailService.reconstructTree( - null, null, null, null, null, null, null, null, null, null, null - ); - + void emptyList_producesEmptyRoots() { + List roots = detailService.buildTree(List.of()); assertThat(roots).isEmpty(); } } diff --git a/clickhouse/init/01-schema.sql b/clickhouse/init/01-schema.sql deleted file mode 100644 index ab56da70..00000000 --- a/clickhouse/init/01-schema.sql +++ /dev/null @@ -1,57 +0,0 @@ --- Cameleer3 ClickHouse Schema --- Tables for route executions, route diagrams, and agent metrics. - -CREATE TABLE IF NOT EXISTS route_executions ( - execution_id String, - route_id LowCardinality(String), - agent_id LowCardinality(String), - status LowCardinality(String), - start_time DateTime64(3, 'UTC'), - end_time Nullable(DateTime64(3, 'UTC')), - duration_ms UInt64, - correlation_id String, - exchange_id String, - error_message String DEFAULT '', - error_stacktrace String DEFAULT '', - -- Nested processor executions stored as parallel arrays - processor_ids Array(String), - processor_types Array(LowCardinality(String)), - processor_starts Array(DateTime64(3, 'UTC')), - processor_ends Array(DateTime64(3, 'UTC')), - processor_durations Array(UInt64), - processor_statuses Array(LowCardinality(String)), - -- Metadata - server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'), - -- Skip indexes - INDEX idx_correlation correlation_id TYPE bloom_filter GRANULARITY 4, - INDEX idx_error error_message TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4 -) -ENGINE = MergeTree() -PARTITION BY toYYYYMMDD(start_time) -ORDER BY (agent_id, status, start_time, execution_id) -TTL toDateTime(start_time) + toIntervalDay(30) -SETTINGS ttl_only_drop_parts = 1; - -CREATE TABLE IF NOT EXISTS route_diagrams ( - content_hash String, - route_id LowCardinality(String), - agent_id LowCardinality(String), - definition String, - created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC') -) -ENGINE = ReplacingMergeTree(created_at) -ORDER BY (content_hash); - -CREATE TABLE IF NOT EXISTS agent_metrics ( - agent_id LowCardinality(String), - collected_at DateTime64(3, 'UTC'), - metric_name LowCardinality(String), - metric_value Float64, - tags Map(String, String), - server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC') -) -ENGINE = MergeTree() -PARTITION BY toYYYYMMDD(collected_at) -ORDER BY (agent_id, metric_name, collected_at) -TTL toDateTime(collected_at) + toIntervalDay(30) -SETTINGS ttl_only_drop_parts = 1; diff --git a/clickhouse/init/02-search-columns.sql b/clickhouse/init/02-search-columns.sql deleted file mode 100644 index 2b11b435..00000000 --- a/clickhouse/init/02-search-columns.sql +++ /dev/null @@ -1,25 +0,0 @@ --- Phase 2: Schema extension for search, detail, and diagram linking columns. --- Adds exchange snapshot data, processor tree metadata, and diagram content hash. - -ALTER TABLE route_executions - ADD COLUMN IF NOT EXISTS exchange_bodies String DEFAULT '', - ADD COLUMN IF NOT EXISTS exchange_headers String DEFAULT '', - ADD COLUMN IF NOT EXISTS processor_depths Array(UInt16) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_parent_indexes Array(Int32) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_error_messages Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_error_stacktraces Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_input_bodies Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_output_bodies Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_input_headers Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_output_headers Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS processor_diagram_node_ids Array(String) DEFAULT [], - ADD COLUMN IF NOT EXISTS diagram_content_hash String DEFAULT ''; - --- Skip indexes for full-text search on new text columns -ALTER TABLE route_executions - ADD INDEX IF NOT EXISTS idx_exchange_bodies exchange_bodies TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4, - ADD INDEX IF NOT EXISTS idx_exchange_headers exchange_headers TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4; - --- Skip index on error_stacktrace (not indexed in 01-schema.sql, needed for SRCH-05) -ALTER TABLE route_executions - ADD INDEX IF NOT EXISTS idx_error_stacktrace error_stacktrace TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4; diff --git a/clickhouse/init/03-users.sql b/clickhouse/init/03-users.sql deleted file mode 100644 index 9dc7ce7a..00000000 --- a/clickhouse/init/03-users.sql +++ /dev/null @@ -1,10 +0,0 @@ -CREATE TABLE IF NOT EXISTS users ( - user_id String, - provider LowCardinality(String), - email String DEFAULT '', - display_name String DEFAULT '', - roles Array(LowCardinality(String)), - created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'), - updated_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC') -) ENGINE = ReplacingMergeTree(updated_at) -ORDER BY (user_id); diff --git a/clickhouse/init/04-oidc-config.sql b/clickhouse/init/04-oidc-config.sql deleted file mode 100644 index 35b4d896..00000000 --- a/clickhouse/init/04-oidc-config.sql +++ /dev/null @@ -1,13 +0,0 @@ -CREATE TABLE IF NOT EXISTS oidc_config ( - config_id String DEFAULT 'default', - enabled Bool DEFAULT false, - issuer_uri String DEFAULT '', - client_id String DEFAULT '', - client_secret String DEFAULT '', - roles_claim String DEFAULT 'realm_access.roles', - default_roles Array(LowCardinality(String)), - auto_signup Bool DEFAULT true, - display_name_claim String DEFAULT 'name', - updated_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC') -) ENGINE = ReplacingMergeTree(updated_at) -ORDER BY (config_id); diff --git a/clickhouse/init/05-oidc-auto-signup.sql b/clickhouse/init/05-oidc-auto-signup.sql deleted file mode 100644 index 643a69ea..00000000 --- a/clickhouse/init/05-oidc-auto-signup.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE oidc_config ADD COLUMN IF NOT EXISTS auto_signup Bool DEFAULT true; diff --git a/clickhouse/init/06-oidc-display-name-claim.sql b/clickhouse/init/06-oidc-display-name-claim.sql deleted file mode 100644 index ef1870bd..00000000 --- a/clickhouse/init/06-oidc-display-name-claim.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE oidc_config ADD COLUMN IF NOT EXISTS display_name_claim String DEFAULT 'name'; diff --git a/pom.xml b/pom.xml index bca775b0..2f27d0fd 100644 --- a/pom.xml +++ b/pom.xml @@ -44,6 +44,13 @@ cameleer3-server-core ${project.version} + + org.testcontainers + testcontainers-bom + ${testcontainers.version} + pom + import +