diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java
deleted file mode 100644
index ee5d8db5..00000000
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.cameleer3.server.app.config;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.core.io.Resource;
-import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import jakarta.annotation.PostConstruct;
-import javax.sql.DataSource;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.stream.Collectors;
-
-/**
- * ClickHouse configuration.
- *
- * Spring Boot auto-configures the DataSource from {@code spring.datasource.*} properties.
- * This class exposes a JdbcTemplate bean and initializes the schema on startup.
- *
- * The ClickHouse container's {@code CLICKHOUSE_DB} env var creates the database;
- * this class creates the tables within it.
- *
- * Migration files are discovered automatically from {@code classpath:clickhouse/*.sql}
- * and executed in filename order (numeric prefix sort).
- */
-@Configuration
-public class ClickHouseConfig {
-
- private static final Logger log = LoggerFactory.getLogger(ClickHouseConfig.class);
- private static final String MIGRATION_PATTERN = "classpath:clickhouse/*.sql";
-
- private final DataSource dataSource;
-
- public ClickHouseConfig(DataSource dataSource) {
- this.dataSource = dataSource;
- }
-
- @Bean
- public JdbcTemplate jdbcTemplate() {
- return new JdbcTemplate(dataSource);
- }
-
- @PostConstruct
- void initSchema() {
- var jdbc = new JdbcTemplate(dataSource);
- try {
- Resource[] resources = new PathMatchingResourcePatternResolver()
- .getResources(MIGRATION_PATTERN);
- Arrays.sort(resources, Comparator.comparing(Resource::getFilename));
-
- for (Resource resource : resources) {
- String filename = resource.getFilename();
- try {
- String sql = resource.getContentAsString(StandardCharsets.UTF_8);
- String stripped = sql.lines()
- .filter(line -> !line.trim().startsWith("--"))
- .collect(Collectors.joining("\n"));
- for (String statement : stripped.split(";")) {
- String trimmed = statement.trim();
- if (!trimmed.isEmpty()) {
- jdbc.execute(trimmed);
- }
- }
- log.info("Applied schema: {}", filename);
- } catch (Exception e) {
- log.error("Failed to apply schema: {}", filename, e);
- throw new RuntimeException("Schema initialization failed: " + filename, e);
- }
- }
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException("Failed to discover migration files", e);
- }
- }
-}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java
index 0ed581ad..3ff7edea 100644
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java
@@ -1,9 +1,10 @@
package com.cameleer3.server.app.config;
-import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
-import org.apache.hc.core5.http.HttpHost;
+import org.apache.http.HttpHost;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
-import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
+import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -14,10 +15,14 @@ public class OpenSearchConfig {
@Value("${opensearch.url:http://localhost:9200}")
private String opensearchUrl;
+ @Bean(destroyMethod = "close")
+ public RestClient opensearchRestClient() {
+ return RestClient.builder(HttpHost.create(opensearchUrl)).build();
+ }
+
@Bean
- public OpenSearchClient openSearchClient() {
- HttpHost host = HttpHost.create(opensearchUrl);
- var transport = ApacheHttpClient5TransportBuilder.builder(host).build();
+ public OpenSearchClient openSearchClient(RestClient restClient) {
+ var transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
return new OpenSearchClient(transport);
}
}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DetailController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DetailController.java
index 3e0ca0c4..2bd6ea55 100644
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DetailController.java
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DetailController.java
@@ -1,8 +1,9 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.storage.ClickHouseExecutionRepository;
import com.cameleer3.server.core.detail.DetailService;
import com.cameleer3.server.core.detail.ExecutionDetail;
+import com.cameleer3.server.core.storage.ExecutionStore;
+import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
@@ -12,14 +13,16 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
/**
* Endpoints for retrieving execution details and processor snapshots.
*
* The detail endpoint returns a nested processor tree reconstructed from
- * flat parallel arrays stored in ClickHouse. The snapshot endpoint returns
- * per-processor exchange data (bodies and headers).
+ * individual processor records stored in PostgreSQL. The snapshot endpoint
+ * returns per-processor exchange data (bodies and headers).
*/
@RestController
@RequestMapping("/api/v1/executions")
@@ -27,12 +30,12 @@ import java.util.Map;
public class DetailController {
private final DetailService detailService;
- private final ClickHouseExecutionRepository executionRepository;
+ private final ExecutionStore executionStore;
public DetailController(DetailService detailService,
- ClickHouseExecutionRepository executionRepository) {
+ ExecutionStore executionStore) {
this.detailService = detailService;
- this.executionRepository = executionRepository;
+ this.executionStore = executionStore;
}
@GetMapping("/{executionId}")
@@ -52,8 +55,18 @@ public class DetailController {
public ResponseEntity> getProcessorSnapshot(
@PathVariable String executionId,
@PathVariable int index) {
- return executionRepository.findProcessorSnapshot(executionId, index)
- .map(ResponseEntity::ok)
- .orElse(ResponseEntity.notFound().build());
+ List processors = executionStore.findProcessors(executionId);
+ if (index < 0 || index >= processors.size()) {
+ return ResponseEntity.notFound().build();
+ }
+
+ ProcessorRecord p = processors.get(index);
+ Map snapshot = new LinkedHashMap<>();
+ if (p.inputBody() != null) snapshot.put("inputBody", p.inputBody());
+ if (p.outputBody() != null) snapshot.put("outputBody", p.outputBody());
+ if (p.inputHeaders() != null) snapshot.put("inputHeaders", p.inputHeaders());
+ if (p.outputHeaders() != null) snapshot.put("outputHeaders", p.outputHeaders());
+
+ return ResponseEntity.ok(snapshot);
}
}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java
index d4359968..5cdaf176 100644
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java
@@ -11,7 +11,6 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
@@ -25,8 +24,8 @@ import java.util.List;
/**
* Ingestion endpoint for route diagrams.
*
- * Accepts both single {@link RouteGraph} and arrays. Data is buffered
- * and flushed to ClickHouse by the flush scheduler.
+ * Accepts both single {@link RouteGraph} and arrays. Data is written
+ * synchronously to PostgreSQL via {@link IngestionService}.
*/
@RestController
@RequestMapping("/api/v1/data")
@@ -47,26 +46,12 @@ public class DiagramController {
@Operation(summary = "Ingest route diagram data",
description = "Accepts a single RouteGraph or an array of RouteGraphs")
@ApiResponse(responseCode = "202", description = "Data accepted for processing")
- @ApiResponse(responseCode = "503", description = "Buffer full, retry later")
public ResponseEntity ingestDiagrams(@RequestBody String body) throws JsonProcessingException {
String agentId = extractAgentId();
List graphs = parsePayload(body);
- List tagged = graphs.stream()
- .map(graph -> new TaggedDiagram(agentId, graph))
- .toList();
- boolean accepted;
- if (tagged.size() == 1) {
- accepted = ingestionService.acceptDiagram(tagged.get(0));
- } else {
- accepted = ingestionService.acceptDiagrams(tagged);
- }
-
- if (!accepted) {
- log.warn("Diagram buffer full, returning 503");
- return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
- .header("Retry-After", "5")
- .build();
+ for (RouteGraph graph : graphs) {
+ ingestionService.ingestDiagram(new TaggedDiagram(agentId, graph));
}
return ResponseEntity.accepted().build();
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramRenderController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramRenderController.java
index b1ca3775..d8f722e7 100644
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramRenderController.java
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramRenderController.java
@@ -5,7 +5,7 @@ import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.diagram.DiagramLayout;
import com.cameleer3.server.core.diagram.DiagramRenderer;
-import com.cameleer3.server.core.storage.DiagramRepository;
+import com.cameleer3.server.core.storage.DiagramStore;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -39,14 +39,14 @@ public class DiagramRenderController {
private static final MediaType SVG_MEDIA_TYPE = MediaType.valueOf("image/svg+xml");
- private final DiagramRepository diagramRepository;
+ private final DiagramStore diagramStore;
private final DiagramRenderer diagramRenderer;
private final AgentRegistryService registryService;
- public DiagramRenderController(DiagramRepository diagramRepository,
+ public DiagramRenderController(DiagramStore diagramStore,
DiagramRenderer diagramRenderer,
AgentRegistryService registryService) {
- this.diagramRepository = diagramRepository;
+ this.diagramStore = diagramStore;
this.diagramRenderer = diagramRenderer;
this.registryService = registryService;
}
@@ -64,7 +64,7 @@ public class DiagramRenderController {
@PathVariable String contentHash,
HttpServletRequest request) {
- Optional graphOpt = diagramRepository.findByContentHash(contentHash);
+ Optional graphOpt = diagramStore.findByContentHash(contentHash);
if (graphOpt.isEmpty()) {
return ResponseEntity.notFound().build();
}
@@ -105,12 +105,12 @@ public class DiagramRenderController {
return ResponseEntity.notFound().build();
}
- Optional contentHash = diagramRepository.findContentHashForRouteByAgents(routeId, agentIds);
+ Optional contentHash = diagramStore.findContentHashForRouteByAgents(routeId, agentIds);
if (contentHash.isEmpty()) {
return ResponseEntity.notFound().build();
}
- Optional graphOpt = diagramRepository.findByContentHash(contentHash.get());
+ Optional graphOpt = diagramStore.findByContentHash(contentHash.get());
if (graphOpt.isEmpty()) {
return ResponseEntity.notFound().build();
}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java
index e44f2645..bea76037 100644
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java
@@ -1,8 +1,9 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.common.model.RouteExecution;
+import com.cameleer3.server.core.agent.AgentInfo;
+import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.ingestion.IngestionService;
-import com.cameleer3.server.core.ingestion.TaggedExecution;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -11,7 +12,6 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
@@ -25,9 +25,8 @@ import java.util.List;
/**
* Ingestion endpoint for route execution data.
*
- * Accepts both single {@link RouteExecution} and arrays. Data is buffered
- * in a {@link com.cameleer3.server.core.ingestion.WriteBuffer} and flushed
- * to ClickHouse by the flush scheduler.
+ * Accepts both single {@link RouteExecution} and arrays. Data is written
+ * synchronously to PostgreSQL via {@link IngestionService}.
*/
@RestController
@RequestMapping("/api/v1/data")
@@ -37,10 +36,14 @@ public class ExecutionController {
private static final Logger log = LoggerFactory.getLogger(ExecutionController.class);
private final IngestionService ingestionService;
+ private final AgentRegistryService registryService;
private final ObjectMapper objectMapper;
- public ExecutionController(IngestionService ingestionService, ObjectMapper objectMapper) {
+ public ExecutionController(IngestionService ingestionService,
+ AgentRegistryService registryService,
+ ObjectMapper objectMapper) {
this.ingestionService = ingestionService;
+ this.registryService = registryService;
this.objectMapper = objectMapper;
}
@@ -48,26 +51,13 @@ public class ExecutionController {
@Operation(summary = "Ingest route execution data",
description = "Accepts a single RouteExecution or an array of RouteExecutions")
@ApiResponse(responseCode = "202", description = "Data accepted for processing")
- @ApiResponse(responseCode = "503", description = "Buffer full, retry later")
public ResponseEntity ingestExecutions(@RequestBody String body) throws JsonProcessingException {
String agentId = extractAgentId();
+ String groupName = resolveGroupName(agentId);
List executions = parsePayload(body);
- List tagged = executions.stream()
- .map(exec -> new TaggedExecution(agentId, exec))
- .toList();
- boolean accepted;
- if (tagged.size() == 1) {
- accepted = ingestionService.acceptExecution(tagged.get(0));
- } else {
- accepted = ingestionService.acceptExecutions(tagged);
- }
-
- if (!accepted) {
- log.warn("Execution buffer full, returning 503");
- return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
- .header("Retry-After", "5")
- .build();
+ for (RouteExecution execution : executions) {
+ ingestionService.ingestExecution(agentId, groupName, execution);
}
return ResponseEntity.accepted().build();
@@ -78,6 +68,11 @@ public class ExecutionController {
return auth != null ? auth.getName() : "";
}
+ private String resolveGroupName(String agentId) {
+ AgentInfo agent = registryService.findById(agentId);
+ return agent != null ? agent.group() : "";
+ }
+
private List parsePayload(String body) throws JsonProcessingException {
String trimmed = body.strip();
if (trimmed.startsWith("[")) {
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/MetricsController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/MetricsController.java
index e947942d..a7ee03d2 100644
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/MetricsController.java
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/MetricsController.java
@@ -23,7 +23,7 @@ import java.util.List;
* Ingestion endpoint for agent metrics.
*
* Accepts an array of {@link MetricsSnapshot}. Data is buffered
- * and flushed to ClickHouse by the flush scheduler.
+ * and flushed to PostgreSQL by the flush scheduler.
*/
@RestController
@RequestMapping("/api/v1/data")
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java
deleted file mode 100644
index e48a2a92..00000000
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java
+++ /dev/null
@@ -1,159 +0,0 @@
-package com.cameleer3.server.app.ingestion;
-
-import com.cameleer3.server.app.config.IngestionConfig;
-import com.cameleer3.server.core.ingestion.TaggedDiagram;
-import com.cameleer3.server.core.ingestion.TaggedExecution;
-import com.cameleer3.server.core.ingestion.WriteBuffer;
-import com.cameleer3.server.core.storage.DiagramRepository;
-import com.cameleer3.server.core.storage.ExecutionRepository;
-import com.cameleer3.server.core.storage.MetricsRepository;
-import com.cameleer3.server.core.storage.model.MetricsSnapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.SmartLifecycle;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-
-/**
- * Scheduled task that drains the write buffers and batch-inserts into ClickHouse.
- *
- * Implements {@link SmartLifecycle} to ensure all remaining buffered data is
- * flushed on application shutdown.
- */
-@Component
-public class ClickHouseFlushScheduler implements SmartLifecycle {
-
- private static final Logger log = LoggerFactory.getLogger(ClickHouseFlushScheduler.class);
-
- private final WriteBuffer executionBuffer;
- private final WriteBuffer diagramBuffer;
- private final WriteBuffer metricsBuffer;
- private final ExecutionRepository executionRepository;
- private final DiagramRepository diagramRepository;
- private final MetricsRepository metricsRepository;
- private final int batchSize;
-
- private volatile boolean running = false;
-
- public ClickHouseFlushScheduler(WriteBuffer executionBuffer,
- WriteBuffer diagramBuffer,
- WriteBuffer metricsBuffer,
- ExecutionRepository executionRepository,
- DiagramRepository diagramRepository,
- MetricsRepository metricsRepository,
- IngestionConfig config) {
- this.executionBuffer = executionBuffer;
- this.diagramBuffer = diagramBuffer;
- this.metricsBuffer = metricsBuffer;
- this.executionRepository = executionRepository;
- this.diagramRepository = diagramRepository;
- this.metricsRepository = metricsRepository;
- this.batchSize = config.getBatchSize();
- }
-
- @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
- public void flushAll() {
- flushExecutions();
- flushDiagrams();
- flushMetrics();
- }
-
- private void flushExecutions() {
- try {
- List batch = executionBuffer.drain(batchSize);
- if (!batch.isEmpty()) {
- executionRepository.insertBatch(batch);
- log.debug("Flushed {} executions to ClickHouse", batch.size());
- }
- } catch (Exception e) {
- log.error("Failed to flush executions to ClickHouse", e);
- }
- }
-
- private void flushDiagrams() {
- try {
- List batch = diagramBuffer.drain(batchSize);
- for (TaggedDiagram diagram : batch) {
- diagramRepository.store(diagram);
- }
- if (!batch.isEmpty()) {
- log.debug("Flushed {} diagrams to ClickHouse", batch.size());
- }
- } catch (Exception e) {
- log.error("Failed to flush diagrams to ClickHouse", e);
- }
- }
-
- private void flushMetrics() {
- try {
- List batch = metricsBuffer.drain(batchSize);
- if (!batch.isEmpty()) {
- metricsRepository.insertBatch(batch);
- log.debug("Flushed {} metrics to ClickHouse", batch.size());
- }
- } catch (Exception e) {
- log.error("Failed to flush metrics to ClickHouse", e);
- }
- }
-
- // SmartLifecycle -- flush remaining data on shutdown
-
- @Override
- public void start() {
- running = true;
- log.info("ClickHouseFlushScheduler started");
- }
-
- @Override
- public void stop() {
- log.info("ClickHouseFlushScheduler stopping -- flushing remaining data");
- drainAll();
- running = false;
- }
-
- @Override
- public boolean isRunning() {
- return running;
- }
-
- @Override
- public int getPhase() {
- // Run after most beans but before DataSource shutdown
- return Integer.MAX_VALUE - 1;
- }
-
- /**
- * Drain all buffers completely (loop until empty).
- */
- private void drainAll() {
- drainBufferCompletely("executions", executionBuffer, batch -> executionRepository.insertBatch(batch));
- drainBufferCompletely("diagrams", diagramBuffer, batch -> {
- for (TaggedDiagram d : batch) {
- diagramRepository.store(d);
- }
- });
- drainBufferCompletely("metrics", metricsBuffer, batch -> metricsRepository.insertBatch(batch));
- }
-
- private void drainBufferCompletely(String name, WriteBuffer buffer, java.util.function.Consumer> inserter) {
- int total = 0;
- while (buffer.size() > 0) {
- List batch = buffer.drain(batchSize);
- if (batch.isEmpty()) {
- break;
- }
- try {
- inserter.accept(batch);
- total += batch.size();
- } catch (Exception e) {
- log.error("Failed to flush remaining {} during shutdown", name, e);
- break;
- }
- }
- if (total > 0) {
- log.info("Flushed {} remaining {} during shutdown", total, name);
- }
- }
-}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java
deleted file mode 100644
index ed6a0b13..00000000
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java
+++ /dev/null
@@ -1,357 +0,0 @@
-package com.cameleer3.server.app.search;
-
-import com.cameleer3.server.core.search.ExecutionStats;
-import com.cameleer3.server.core.search.ExecutionSummary;
-import com.cameleer3.server.core.search.SearchEngine;
-import com.cameleer3.server.core.search.SearchRequest;
-import com.cameleer3.server.core.search.SearchResult;
-import com.cameleer3.server.core.search.StatsTimeseries;
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import java.sql.Timestamp;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * ClickHouse implementation of {@link SearchEngine}.
- *
- * Builds dynamic WHERE clauses from non-null {@link SearchRequest} fields
- * and queries the {@code route_executions} table. LIKE patterns are properly
- * escaped to prevent injection.
- */
-public class ClickHouseSearchEngine implements SearchEngine {
-
- /** Per-query memory cap (1 GiB) — prevents a single query from OOMing ClickHouse. */
- private static final String SETTINGS = " SETTINGS max_memory_usage = 1000000000";
-
- private final JdbcTemplate jdbcTemplate;
-
- public ClickHouseSearchEngine(JdbcTemplate jdbcTemplate) {
- this.jdbcTemplate = jdbcTemplate;
- }
-
- @Override
- public SearchResult search(SearchRequest request) {
- var conditions = new ArrayList();
- var params = new ArrayList();
-
- buildWhereClause(request, conditions, params);
-
- String where = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions);
-
- // Count query
- var countParams = params.toArray();
- Long total = jdbcTemplate.queryForObject(
- "SELECT count() FROM route_executions" + where + SETTINGS, Long.class, countParams);
- if (total == null) total = 0L;
-
- if (total == 0) {
- return SearchResult.empty(request.offset(), request.limit());
- }
-
- // Data query
- params.add(request.limit());
- params.add(request.offset());
- String orderDir = "asc".equalsIgnoreCase(request.sortDir()) ? "ASC" : "DESC";
- String dataSql = "SELECT execution_id, route_id, agent_id, status, start_time, end_time, " +
- "duration_ms, correlation_id, error_message, diagram_content_hash " +
- "FROM route_executions" + where +
- " ORDER BY " + request.sortColumn() + " " + orderDir + " LIMIT ? OFFSET ?" + SETTINGS;
-
- List data = jdbcTemplate.query(dataSql, (rs, rowNum) -> {
- Timestamp endTs = rs.getTimestamp("end_time");
- return new ExecutionSummary(
- rs.getString("execution_id"),
- rs.getString("route_id"),
- rs.getString("agent_id"),
- rs.getString("status"),
- rs.getTimestamp("start_time").toInstant(),
- endTs != null ? endTs.toInstant() : null,
- rs.getLong("duration_ms"),
- rs.getString("correlation_id"),
- rs.getString("error_message"),
- rs.getString("diagram_content_hash")
- );
- }, params.toArray());
-
- return new SearchResult<>(data, total, request.offset(), request.limit());
- }
-
- @Override
- public long count(SearchRequest request) {
- var conditions = new ArrayList();
- var params = new ArrayList();
- buildWhereClause(request, conditions, params);
-
- String where = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions);
- Long result = jdbcTemplate.queryForObject(
- "SELECT count() FROM route_executions" + where + SETTINGS, Long.class, params.toArray());
- return result != null ? result : 0L;
- }
-
- @Override
- public ExecutionStats stats(Instant from, Instant to) {
- return stats(from, to, null, null);
- }
-
- @Override
- public ExecutionStats stats(Instant from, Instant to, String routeId, List agentIds) {
- // Current period — read from rollup
- var conditions = new ArrayList();
- var params = new ArrayList();
- conditions.add("bucket >= ?");
- params.add(bucketTimestamp(floorToFiveMinutes(from)));
- conditions.add("bucket <= ?");
- params.add(bucketTimestamp(to));
- addScopeFilters(routeId, agentIds, conditions, params);
-
- String where = " WHERE " + String.join(" AND ", conditions);
-
- String rollupSql = "SELECT " +
- "countMerge(total_count) AS cnt, " +
- "countIfMerge(failed_count) AS failed, " +
- "toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " +
- "toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " +
- "FROM route_execution_stats_5m" + where + SETTINGS;
-
- record PeriodStats(long totalCount, long failedCount, long avgDurationMs, long p99LatencyMs) {}
- PeriodStats current = jdbcTemplate.queryForObject(rollupSql,
- (rs, rowNum) -> new PeriodStats(
- rs.getLong("cnt"),
- rs.getLong("failed"),
- rs.getLong("avg_ms"),
- rs.getLong("p99_ms")),
- params.toArray());
-
- // Active count — PREWHERE reads only the status column before touching wide rows
- var scopeConditions = new ArrayList();
- var activeParams = new ArrayList();
- addScopeFilters(routeId, agentIds, scopeConditions, activeParams);
- String scopeWhere = scopeConditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", scopeConditions);
- Long activeCount = jdbcTemplate.queryForObject(
- "SELECT count() FROM route_executions PREWHERE status = 'RUNNING'" + scopeWhere + SETTINGS,
- Long.class, activeParams.toArray());
-
- // Previous period (same window shifted back 24h) — read from rollup
- Duration window = Duration.between(from, to);
- Instant prevFrom = from.minus(Duration.ofHours(24));
- Instant prevTo = prevFrom.plus(window);
- var prevConditions = new ArrayList();
- var prevParams = new ArrayList();
- prevConditions.add("bucket >= ?");
- prevParams.add(bucketTimestamp(floorToFiveMinutes(prevFrom)));
- prevConditions.add("bucket <= ?");
- prevParams.add(bucketTimestamp(prevTo));
- addScopeFilters(routeId, agentIds, prevConditions, prevParams);
- String prevWhere = " WHERE " + String.join(" AND ", prevConditions);
-
- String prevRollupSql = "SELECT " +
- "countMerge(total_count) AS cnt, " +
- "countIfMerge(failed_count) AS failed, " +
- "toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " +
- "toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " +
- "FROM route_execution_stats_5m" + prevWhere + SETTINGS;
-
- PeriodStats prev = jdbcTemplate.queryForObject(prevRollupSql,
- (rs, rowNum) -> new PeriodStats(
- rs.getLong("cnt"),
- rs.getLong("failed"),
- rs.getLong("avg_ms"),
- rs.getLong("p99_ms")),
- prevParams.toArray());
-
- // Today total (midnight UTC to now) — read from rollup with same scope
- Instant todayStart = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.DAYS);
- var todayConditions = new ArrayList();
- var todayParams = new ArrayList();
- todayConditions.add("bucket >= ?");
- todayParams.add(bucketTimestamp(floorToFiveMinutes(todayStart)));
- addScopeFilters(routeId, agentIds, todayConditions, todayParams);
- String todayWhere = " WHERE " + String.join(" AND ", todayConditions);
-
- Long totalToday = jdbcTemplate.queryForObject(
- "SELECT countMerge(total_count) FROM route_execution_stats_5m" + todayWhere + SETTINGS,
- Long.class, todayParams.toArray());
-
- return new ExecutionStats(
- current.totalCount, current.failedCount, current.avgDurationMs,
- current.p99LatencyMs, activeCount != null ? activeCount : 0L,
- totalToday != null ? totalToday : 0L,
- prev.totalCount, prev.failedCount, prev.avgDurationMs, prev.p99LatencyMs);
- }
-
- @Override
- public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
- return timeseries(from, to, bucketCount, null, null);
- }
-
- @Override
- public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount,
- String routeId, List agentIds) {
- long intervalSeconds = Duration.between(from, to).getSeconds() / bucketCount;
- if (intervalSeconds < 1) intervalSeconds = 1;
-
- var conditions = new ArrayList();
- var params = new ArrayList();
- conditions.add("bucket >= ?");
- params.add(bucketTimestamp(floorToFiveMinutes(from)));
- conditions.add("bucket <= ?");
- params.add(bucketTimestamp(to));
- addScopeFilters(routeId, agentIds, conditions, params);
-
- String where = " WHERE " + String.join(" AND ", conditions);
-
- // Re-aggregate 5-minute rollup buckets into the requested interval
- String sql = "SELECT " +
- "toDateTime(intDiv(toUInt32(bucket), " + intervalSeconds + ") * " + intervalSeconds + ") AS ts_bucket, " +
- "countMerge(total_count) AS cnt, " +
- "countIfMerge(failed_count) AS failed, " +
- "toInt64(ifNotFinite(sumMerge(duration_sum) / countMerge(total_count), 0)) AS avg_ms, " +
- "toInt64(ifNotFinite(quantileTDigestMerge(0.99)(p99_duration), 0)) AS p99_ms " +
- "FROM route_execution_stats_5m" + where +
- " GROUP BY ts_bucket ORDER BY ts_bucket" + SETTINGS;
-
- List buckets = jdbcTemplate.query(sql, (rs, rowNum) ->
- new StatsTimeseries.TimeseriesBucket(
- rs.getTimestamp("ts_bucket").toInstant(),
- rs.getLong("cnt"),
- rs.getLong("failed"),
- rs.getLong("avg_ms"),
- rs.getLong("p99_ms"),
- 0L
- ),
- params.toArray());
-
- return new StatsTimeseries(buckets);
- }
-
- private void buildWhereClause(SearchRequest req, List conditions, List params) {
- if (req.status() != null && !req.status().isBlank()) {
- String[] statuses = req.status().split(",");
- if (statuses.length == 1) {
- conditions.add("status = ?");
- params.add(statuses[0].trim());
- } else {
- String placeholders = String.join(", ", Collections.nCopies(statuses.length, "?"));
- conditions.add("status IN (" + placeholders + ")");
- for (String s : statuses) {
- params.add(s.trim());
- }
- }
- }
- if (req.timeFrom() != null) {
- conditions.add("start_time >= ?");
- params.add(Timestamp.from(req.timeFrom()));
- }
- if (req.timeTo() != null) {
- conditions.add("start_time <= ?");
- params.add(Timestamp.from(req.timeTo()));
- }
- if (req.durationMin() != null) {
- conditions.add("duration_ms >= ?");
- params.add(req.durationMin());
- }
- if (req.durationMax() != null) {
- conditions.add("duration_ms <= ?");
- params.add(req.durationMax());
- }
- if (req.correlationId() != null && !req.correlationId().isBlank()) {
- conditions.add("correlation_id = ?");
- params.add(req.correlationId());
- }
- if (req.routeId() != null && !req.routeId().isBlank()) {
- conditions.add("route_id = ?");
- params.add(req.routeId());
- }
- if (req.agentId() != null && !req.agentId().isBlank()) {
- conditions.add("agent_id = ?");
- params.add(req.agentId());
- }
- // agentIds from group resolution (takes precedence when agentId is not set)
- if ((req.agentId() == null || req.agentId().isBlank())
- && req.agentIds() != null && !req.agentIds().isEmpty()) {
- String placeholders = String.join(", ", Collections.nCopies(req.agentIds().size(), "?"));
- conditions.add("agent_id IN (" + placeholders + ")");
- params.addAll(req.agentIds());
- }
- if (req.processorType() != null && !req.processorType().isBlank()) {
- conditions.add("has(processor_types, ?)");
- params.add(req.processorType());
- }
- if (req.text() != null && !req.text().isBlank()) {
- String pattern = "%" + escapeLike(req.text()) + "%";
- String[] textColumns = {
- "execution_id", "route_id", "agent_id",
- "error_message", "error_stacktrace",
- "exchange_bodies", "exchange_headers"
- };
- var likeClauses = java.util.Arrays.stream(textColumns)
- .map(col -> col + " LIKE ?")
- .toList();
- conditions.add("(" + String.join(" OR ", likeClauses) + ")");
- for (int i = 0; i < textColumns.length; i++) {
- params.add(pattern);
- }
- }
- if (req.textInBody() != null && !req.textInBody().isBlank()) {
- conditions.add("exchange_bodies LIKE ?");
- params.add("%" + escapeLike(req.textInBody()) + "%");
- }
- if (req.textInHeaders() != null && !req.textInHeaders().isBlank()) {
- conditions.add("exchange_headers LIKE ?");
- params.add("%" + escapeLike(req.textInHeaders()) + "%");
- }
- if (req.textInErrors() != null && !req.textInErrors().isBlank()) {
- String pattern = "%" + escapeLike(req.textInErrors()) + "%";
- conditions.add("(error_message LIKE ? OR error_stacktrace LIKE ?)");
- params.add(pattern);
- params.add(pattern);
- }
- }
-
- /**
- * Add route ID and agent IDs scope filters to conditions/params.
- */
- private void addScopeFilters(String routeId, List agentIds,
- List conditions, List params) {
- if (routeId != null && !routeId.isBlank()) {
- conditions.add("route_id = ?");
- params.add(routeId);
- }
- if (agentIds != null && !agentIds.isEmpty()) {
- String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?"));
- conditions.add("agent_id IN (" + placeholders + ")");
- params.addAll(agentIds);
- }
- }
-
- /**
- * Floor an Instant to the start of its 5-minute bucket.
- */
- private static Instant floorToFiveMinutes(Instant instant) {
- long epochSecond = instant.getEpochSecond();
- return Instant.ofEpochSecond(epochSecond - (epochSecond % 300));
- }
-
- /**
- * Create a second-precision Timestamp for rollup bucket comparisons.
- * The bucket column is DateTime('UTC') (second precision); the JDBC driver
- * sends java.sql.Timestamp with nanoseconds which ClickHouse rejects.
- */
- private static Timestamp bucketTimestamp(Instant instant) {
- return Timestamp.from(instant.truncatedTo(java.time.temporal.ChronoUnit.SECONDS));
- }
-
- /**
- * Escape special LIKE characters to prevent LIKE injection.
- */
- static String escapeLike(String input) {
- return input
- .replace("\\", "\\\\")
- .replace("%", "\\%")
- .replace("_", "\\_");
- }
-}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java
index d130b1f6..892792fc 100644
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java
@@ -7,6 +7,7 @@ import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import jakarta.annotation.PostConstruct;
+import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.SortOrder;
@@ -41,8 +42,6 @@ public class OpenSearchIndex implements SearchIndex {
@PostConstruct
void ensureIndexTemplate() {
- // Full template with ngram analyzer for infix wildcard search.
- // The template JSON matches the spec's OpenSearch index template definition.
try {
boolean exists = client.indices().existsIndexTemplate(
ExistsIndexTemplateRequest.of(b -> b.name("executions-template"))).value();
@@ -53,22 +52,8 @@ public class OpenSearchIndex implements SearchIndex {
.template(t -> t
.settings(s -> s
.numberOfShards("3")
- .numberOfReplicas("1")
- .analysis(a -> a
- .analyzer("ngram_analyzer", an -> an
- .custom(c -> c
- .tokenizer("ngram_tokenizer")
- .filter("lowercase")))
- .tokenizer("ngram_tokenizer", tk -> tk
- .definition(d -> d
- .ngram(ng -> ng
- .minGram(3)
- .maxGram(4)
- .tokenChars(TokenChar.Letter,
- TokenChar.Digit,
- TokenChar.Punctuation,
- TokenChar.Symbol)))))))));
- log.info("OpenSearch index template created with ngram analyzer");
+ .numberOfReplicas("1")))));
+ log.info("OpenSearch index template created");
}
} catch (IOException e) {
log.error("Failed to create index template", e);
@@ -99,10 +84,10 @@ public class OpenSearchIndex implements SearchIndex {
.collect(Collectors.toList());
long total = response.hits().total() != null ? response.hits().total().value() : 0;
- return new SearchResult<>(items, total);
+ return new SearchResult<>(items, total, request.offset(), request.limit());
} catch (IOException e) {
log.error("Search failed", e);
- return new SearchResult<>(List.of(), 0);
+ return SearchResult.empty(request.offset(), request.limit());
}
}
@@ -125,7 +110,8 @@ public class OpenSearchIndex implements SearchIndex {
client.deleteByQuery(DeleteByQueryRequest.of(b -> b
.index(List.of(INDEX_PREFIX + "*"))
.query(Query.of(q -> q.term(t -> t
- .field("execution_id").value(executionId))))));
+ .field("execution_id")
+ .value(FieldValue.of(executionId)))))));
} catch (IOException e) {
log.error("Failed to delete execution {}", executionId, e);
}
@@ -155,9 +141,9 @@ public class OpenSearchIndex implements SearchIndex {
filter.add(Query.of(q -> q.range(r -> {
r.field("start_time");
if (request.timeFrom() != null)
- r.gte(jakarta.json.Json.createValue(request.timeFrom().toString()));
+ r.gte(JsonData.of(request.timeFrom().toString()));
if (request.timeTo() != null)
- r.lte(jakarta.json.Json.createValue(request.timeTo().toString()));
+ r.lte(JsonData.of(request.timeTo().toString()));
return r;
})));
}
@@ -180,8 +166,7 @@ public class OpenSearchIndex implements SearchIndex {
// Search top-level text fields
textQueries.add(Query.of(q -> q.multiMatch(m -> m
.query(text)
- .fields("error_message", "error_stacktrace",
- "error_message.ngram", "error_stacktrace.ngram"))));
+ .fields("error_message", "error_stacktrace"))));
// Search nested processor fields
textQueries.add(Query.of(q -> q.nested(n -> n
@@ -190,10 +175,7 @@ public class OpenSearchIndex implements SearchIndex {
.query(text)
.fields("processors.input_body", "processors.output_body",
"processors.input_headers", "processors.output_headers",
- "processors.error_message", "processors.error_stacktrace",
- "processors.input_body.ngram", "processors.output_body.ngram",
- "processors.input_headers.ngram", "processors.output_headers.ngram",
- "processors.error_message.ngram", "processors.error_stacktrace.ngram"))))));
+ "processors.error_message", "processors.error_stacktrace"))))));
// Also try keyword fields for exact matches
textQueries.add(Query.of(q -> q.multiMatch(m -> m
@@ -209,30 +191,26 @@ public class OpenSearchIndex implements SearchIndex {
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(request.textInBody())
- .fields("processors.input_body", "processors.output_body",
- "processors.input_body.ngram", "processors.output_body.ngram"))))));
+ .fields("processors.input_body", "processors.output_body"))))));
}
if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) {
must.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(request.textInHeaders())
- .fields("processors.input_headers", "processors.output_headers",
- "processors.input_headers.ngram", "processors.output_headers.ngram"))))));
+ .fields("processors.input_headers", "processors.output_headers"))))));
}
if (request.textInErrors() != null && !request.textInErrors().isBlank()) {
String errText = request.textInErrors();
must.add(Query.of(q -> q.bool(b -> b.should(
Query.of(sq -> sq.multiMatch(m -> m
.query(errText)
- .fields("error_message", "error_stacktrace",
- "error_message.ngram", "error_stacktrace.ngram"))),
+ .fields("error_message", "error_stacktrace"))),
Query.of(sq -> sq.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(errText)
- .fields("processors.error_message", "processors.error_stacktrace",
- "processors.error_message.ngram", "processors.error_stacktrace.ngram")))))
+ .fields("processors.error_message", "processors.error_stacktrace")))))
).minimumShouldMatch("1"))));
}
@@ -241,9 +219,9 @@ public class OpenSearchIndex implements SearchIndex {
filter.add(Query.of(q -> q.range(r -> {
r.field("duration_ms");
if (request.durationMin() != null)
- r.gte(jakarta.json.Json.createValue(request.durationMin()));
+ r.gte(JsonData.of(request.durationMin()));
if (request.durationMax() != null)
- r.lte(jakarta.json.Json.createValue(request.durationMax()));
+ r.lte(JsonData.of(request.durationMax()));
return r;
})));
}
@@ -257,7 +235,7 @@ public class OpenSearchIndex implements SearchIndex {
}
private Query termQuery(String field, String value) {
- return Query.of(q -> q.term(t -> t.field(field).value(value)));
+ return Query.of(q -> q.term(t -> t.field(field).value(FieldValue.of(value))));
}
private Map toMap(ExecutionDocument doc) {
@@ -305,6 +283,8 @@ public class OpenSearchIndex implements SearchIndex {
src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null,
src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L,
(String) src.get("correlation_id"),
- (String) src.get("error_message"));
+ (String) src.get("error_message"),
+ null // diagramContentHash not stored in index
+ );
}
}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/security/SecurityBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/security/SecurityBeanConfig.java
index ad48c345..5c0bdff5 100644
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/security/SecurityBeanConfig.java
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/security/SecurityBeanConfig.java
@@ -16,7 +16,7 @@ import java.util.List;
* that required security properties are set.
*
* Fails fast on startup if {@code CAMELEER_AUTH_TOKEN} is not set.
- * Seeds OIDC config from env vars into ClickHouse if DB is empty.
+ * Seeds OIDC config from env vars into the database if DB is empty.
*/
@Configuration
@EnableConfigurationProperties(SecurityProperties.class)
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramRepository.java
deleted file mode 100644
index 11a0ed4f..00000000
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramRepository.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package com.cameleer3.server.app.storage;
-
-import com.cameleer3.common.graph.RouteGraph;
-import com.cameleer3.server.core.ingestion.TaggedDiagram;
-import com.cameleer3.server.core.storage.DiagramRepository;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.stereotype.Repository;
-
-import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HexFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * ClickHouse implementation of {@link DiagramRepository}.
- *
- * Stores route graphs as JSON with SHA-256 content-hash deduplication.
- * The underlying table uses ReplacingMergeTree keyed on content_hash.
- */
-@Repository
-public class ClickHouseDiagramRepository implements DiagramRepository {
-
- private static final Logger log = LoggerFactory.getLogger(ClickHouseDiagramRepository.class);
-
- private static final String INSERT_SQL = """
- INSERT INTO route_diagrams (content_hash, route_id, agent_id, definition)
- VALUES (?, ?, ?, ?)
- """;
-
- private static final String SELECT_BY_HASH = """
- SELECT definition FROM route_diagrams WHERE content_hash = ? LIMIT 1
- """;
-
- private static final String SELECT_HASH_FOR_ROUTE = """
- SELECT content_hash FROM route_diagrams
- WHERE route_id = ? AND agent_id = ?
- ORDER BY created_at DESC LIMIT 1
- """;
-
- private final JdbcTemplate jdbcTemplate;
- private final ObjectMapper objectMapper;
-
- public ClickHouseDiagramRepository(JdbcTemplate jdbcTemplate) {
- this.jdbcTemplate = jdbcTemplate;
- this.objectMapper = new ObjectMapper();
- this.objectMapper.registerModule(new JavaTimeModule());
- }
-
- @Override
- public void store(TaggedDiagram diagram) {
- try {
- RouteGraph graph = diagram.graph();
- String agentId = diagram.agentId() != null ? diagram.agentId() : "";
- String json = objectMapper.writeValueAsString(graph);
- String contentHash = sha256Hex(json);
- String routeId = graph.getRouteId() != null ? graph.getRouteId() : "";
-
- jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, json);
- log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash);
- } catch (JsonProcessingException e) {
- throw new RuntimeException("Failed to serialize RouteGraph to JSON", e);
- }
- }
-
- @Override
- public Optional findByContentHash(String contentHash) {
- List> rows = jdbcTemplate.queryForList(SELECT_BY_HASH, contentHash);
- if (rows.isEmpty()) {
- return Optional.empty();
- }
- String json = (String) rows.get(0).get("definition");
- try {
- return Optional.of(objectMapper.readValue(json, RouteGraph.class));
- } catch (JsonProcessingException e) {
- log.error("Failed to deserialize RouteGraph from ClickHouse", e);
- return Optional.empty();
- }
- }
-
- @Override
- public Optional findContentHashForRoute(String routeId, String agentId) {
- List> rows = jdbcTemplate.queryForList(SELECT_HASH_FOR_ROUTE, routeId, agentId);
- if (rows.isEmpty()) {
- return Optional.empty();
- }
- return Optional.of((String) rows.get(0).get("content_hash"));
- }
-
- @Override
- public Optional findContentHashForRouteByAgents(String routeId, List agentIds) {
- if (agentIds == null || agentIds.isEmpty()) {
- return Optional.empty();
- }
- String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?"));
- String sql = "SELECT content_hash FROM route_diagrams " +
- "WHERE route_id = ? AND agent_id IN (" + placeholders + ") " +
- "ORDER BY created_at DESC LIMIT 1";
- var params = new ArrayList();
- params.add(routeId);
- params.addAll(agentIds);
- List> rows = jdbcTemplate.queryForList(sql, params.toArray());
- if (rows.isEmpty()) {
- return Optional.empty();
- }
- return Optional.of((String) rows.get(0).get("content_hash"));
- }
-
- static String sha256Hex(String input) {
- try {
- MessageDigest digest = MessageDigest.getInstance("SHA-256");
- byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
- return HexFormat.of().formatHex(hash);
- } catch (NoSuchAlgorithmException e) {
- throw new RuntimeException("SHA-256 not available", e);
- }
- }
-}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java
deleted file mode 100644
index b119f7e7..00000000
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java
+++ /dev/null
@@ -1,418 +0,0 @@
-package com.cameleer3.server.app.storage;
-
-import com.cameleer3.common.model.ExchangeSnapshot;
-import com.cameleer3.common.model.ProcessorExecution;
-import com.cameleer3.common.model.RouteExecution;
-import com.cameleer3.server.core.detail.RawExecutionRow;
-import com.cameleer3.server.core.ingestion.TaggedExecution;
-import com.cameleer3.server.core.storage.DiagramRepository;
-import com.cameleer3.server.core.storage.ExecutionRepository;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.jdbc.core.BatchPreparedStatementSetter;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.stereotype.Repository;
-
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
-
-/**
- * ClickHouse implementation of {@link ExecutionRepository}.
- *
- * Performs batch inserts into the {@code route_executions} table.
- * Processor executions are flattened into parallel arrays with tree metadata
- * (depth, parent index) for reconstruction.
- */
-@Repository
-public class ClickHouseExecutionRepository implements ExecutionRepository {
-
- private static final Logger log = LoggerFactory.getLogger(ClickHouseExecutionRepository.class);
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- private static final String INSERT_SQL = """
- INSERT INTO route_executions (
- execution_id, route_id, agent_id, status, start_time, end_time,
- duration_ms, correlation_id, exchange_id, error_message, error_stacktrace,
- processor_ids, processor_types, processor_starts, processor_ends,
- processor_durations, processor_statuses,
- exchange_bodies, exchange_headers,
- processor_depths, processor_parent_indexes,
- processor_error_messages, processor_error_stacktraces,
- processor_input_bodies, processor_output_bodies,
- processor_input_headers, processor_output_headers,
- processor_diagram_node_ids, diagram_content_hash
- ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """;
-
- private final JdbcTemplate jdbcTemplate;
- private final DiagramRepository diagramRepository;
-
- public ClickHouseExecutionRepository(JdbcTemplate jdbcTemplate, DiagramRepository diagramRepository) {
- this.jdbcTemplate = jdbcTemplate;
- this.diagramRepository = diagramRepository;
- }
-
- @Override
- public void insertBatch(List executions) {
- if (executions.isEmpty()) {
- return;
- }
-
- jdbcTemplate.batchUpdate(INSERT_SQL, new BatchPreparedStatementSetter() {
- @Override
- public void setValues(PreparedStatement ps, int i) throws SQLException {
- TaggedExecution tagged = executions.get(i);
- RouteExecution exec = tagged.execution();
- String agentId = tagged.agentId() != null ? tagged.agentId() : "";
- List flatProcessors = flattenWithMetadata(exec.getProcessors());
-
- int col = 1;
- ps.setString(col++, UUID.randomUUID().toString());
- ps.setString(col++, nullSafe(exec.getRouteId()));
- ps.setString(col++, agentId);
- ps.setString(col++, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING");
- ps.setObject(col++, toTimestamp(exec.getStartTime()));
- ps.setObject(col++, toTimestamp(exec.getEndTime()));
- ps.setLong(col++, exec.getDurationMs());
- ps.setString(col++, nullSafe(exec.getCorrelationId()));
- ps.setString(col++, nullSafe(exec.getExchangeId()));
- ps.setString(col++, nullSafe(exec.getErrorMessage()));
- ps.setString(col++, nullSafe(exec.getErrorStackTrace()));
-
- // Original parallel arrays
- ps.setObject(col++, flatProcessors.stream().map(fp -> nullSafe(fp.proc.getProcessorId())).toArray(String[]::new));
- ps.setObject(col++, flatProcessors.stream().map(fp -> nullSafe(fp.proc.getProcessorType())).toArray(String[]::new));
- ps.setObject(col++, flatProcessors.stream().map(fp -> toTimestamp(fp.proc.getStartTime())).toArray(Timestamp[]::new));
- ps.setObject(col++, flatProcessors.stream().map(fp -> toTimestamp(fp.proc.getEndTime())).toArray(Timestamp[]::new));
- ps.setObject(col++, flatProcessors.stream().mapToLong(fp -> fp.proc.getDurationMs()).boxed().toArray(Long[]::new));
- ps.setObject(col++, flatProcessors.stream().map(fp -> fp.proc.getStatus() != null ? fp.proc.getStatus().name() : "RUNNING").toArray(String[]::new));
-
- // Phase 2: exchange bodies and headers (concatenated for search)
- StringBuilder allBodies = new StringBuilder();
- StringBuilder allHeaders = new StringBuilder();
-
- String[] inputBodies = new String[flatProcessors.size()];
- String[] outputBodies = new String[flatProcessors.size()];
- String[] inputHeaders = new String[flatProcessors.size()];
- String[] outputHeaders = new String[flatProcessors.size()];
- String[] errorMessages = new String[flatProcessors.size()];
- String[] errorStacktraces = new String[flatProcessors.size()];
- String[] diagramNodeIds = new String[flatProcessors.size()];
- Short[] depths = new Short[flatProcessors.size()];
- Integer[] parentIndexes = new Integer[flatProcessors.size()];
-
- for (int j = 0; j < flatProcessors.size(); j++) {
- FlatProcessor fp = flatProcessors.get(j);
- ProcessorExecution p = fp.proc;
-
- inputBodies[j] = nullSafe(p.getInputBody());
- outputBodies[j] = nullSafe(p.getOutputBody());
- inputHeaders[j] = mapToJson(p.getInputHeaders());
- outputHeaders[j] = mapToJson(p.getOutputHeaders());
- errorMessages[j] = nullSafe(p.getErrorMessage());
- errorStacktraces[j] = nullSafe(p.getErrorStackTrace());
- diagramNodeIds[j] = nullSafe(p.getDiagramNodeId());
- depths[j] = (short) fp.depth;
- parentIndexes[j] = fp.parentIndex;
-
- allBodies.append(inputBodies[j]).append(' ').append(outputBodies[j]).append(' ');
- allHeaders.append(inputHeaders[j]).append(' ').append(outputHeaders[j]).append(' ');
- }
-
- // Include route-level input/output snapshot in searchable text
- appendSnapshotText(exec.getInputSnapshot(), allBodies, allHeaders);
- appendSnapshotText(exec.getOutputSnapshot(), allBodies, allHeaders);
-
- ps.setString(col++, allBodies.toString().trim()); // exchange_bodies
- ps.setString(col++, allHeaders.toString().trim()); // exchange_headers
- ps.setObject(col++, depths); // processor_depths
- ps.setObject(col++, parentIndexes); // processor_parent_indexes
- ps.setObject(col++, errorMessages); // processor_error_messages
- ps.setObject(col++, errorStacktraces); // processor_error_stacktraces
- ps.setObject(col++, inputBodies); // processor_input_bodies
- ps.setObject(col++, outputBodies); // processor_output_bodies
- ps.setObject(col++, inputHeaders); // processor_input_headers
- ps.setObject(col++, outputHeaders); // processor_output_headers
- ps.setObject(col++, diagramNodeIds); // processor_diagram_node_ids
- String diagramHash = diagramRepository
- .findContentHashForRoute(exec.getRouteId(), agentId)
- .orElse("");
- ps.setString(col++, diagramHash); // diagram_content_hash
- }
-
- @Override
- public int getBatchSize() {
- return executions.size();
- }
- });
-
- log.debug("Inserted batch of {} route executions into ClickHouse", executions.size());
- }
-
- @Override
- public Optional findRawById(String executionId) {
- String sql = """
- SELECT execution_id, route_id, agent_id, status, start_time, end_time,
- duration_ms, correlation_id, exchange_id, error_message, error_stacktrace,
- diagram_content_hash,
- processor_ids, processor_types, processor_statuses,
- processor_starts, processor_ends, processor_durations,
- processor_diagram_node_ids,
- processor_error_messages, processor_error_stacktraces,
- processor_depths, processor_parent_indexes
- FROM route_executions
- WHERE execution_id = ?
- LIMIT 1
- """;
-
- List results = jdbcTemplate.query(sql, (rs, rowNum) -> {
- // Extract parallel arrays from ClickHouse
- String[] processorIds = toStringArray(rs.getArray("processor_ids"));
- String[] processorTypes = toStringArray(rs.getArray("processor_types"));
- String[] processorStatuses = toStringArray(rs.getArray("processor_statuses"));
- Instant[] processorStarts = toInstantArray(rs.getArray("processor_starts"));
- Instant[] processorEnds = toInstantArray(rs.getArray("processor_ends"));
- long[] processorDurations = toLongArray(rs.getArray("processor_durations"));
- String[] processorDiagramNodeIds = toStringArray(rs.getArray("processor_diagram_node_ids"));
- String[] processorErrorMessages = toStringArray(rs.getArray("processor_error_messages"));
- String[] processorErrorStacktraces = toStringArray(rs.getArray("processor_error_stacktraces"));
- int[] processorDepths = toIntArrayFromShort(rs.getArray("processor_depths"));
- int[] processorParentIndexes = toIntArray(rs.getArray("processor_parent_indexes"));
-
- Timestamp endTs = rs.getTimestamp("end_time");
- return new RawExecutionRow(
- rs.getString("execution_id"),
- rs.getString("route_id"),
- rs.getString("agent_id"),
- rs.getString("status"),
- rs.getTimestamp("start_time").toInstant(),
- endTs != null ? endTs.toInstant() : null,
- rs.getLong("duration_ms"),
- rs.getString("correlation_id"),
- rs.getString("exchange_id"),
- rs.getString("error_message"),
- rs.getString("error_stacktrace"),
- rs.getString("diagram_content_hash"),
- processorIds, processorTypes, processorStatuses,
- processorStarts, processorEnds, processorDurations,
- processorDiagramNodeIds,
- processorErrorMessages, processorErrorStacktraces,
- processorDepths, processorParentIndexes
- );
- }, executionId);
-
- return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
- }
-
- /**
- * Find exchange snapshot data for a specific processor by index.
- *
- * @param executionId the execution ID
- * @param processorIndex 0-based processor index
- * @return map with inputBody, outputBody, inputHeaders, outputHeaders or empty if not found
- */
- public Optional> findProcessorSnapshot(String executionId, int processorIndex) {
- // ClickHouse arrays are 1-indexed in SQL
- int chIndex = processorIndex + 1;
- String sql = """
- SELECT
- processor_input_bodies[?] AS input_body,
- processor_output_bodies[?] AS output_body,
- processor_input_headers[?] AS input_headers,
- processor_output_headers[?] AS output_headers,
- length(processor_ids) AS proc_count
- FROM route_executions
- WHERE execution_id = ?
- LIMIT 1
- """;
-
- List> results = jdbcTemplate.query(sql, (rs, rowNum) -> {
- int procCount = rs.getInt("proc_count");
- if (processorIndex < 0 || processorIndex >= procCount) {
- return null;
- }
- var snapshot = new java.util.LinkedHashMap();
- snapshot.put("inputBody", rs.getString("input_body"));
- snapshot.put("outputBody", rs.getString("output_body"));
- snapshot.put("inputHeaders", rs.getString("input_headers"));
- snapshot.put("outputHeaders", rs.getString("output_headers"));
- return snapshot;
- }, chIndex, chIndex, chIndex, chIndex, executionId);
-
- if (results.isEmpty() || results.get(0) == null) {
- return Optional.empty();
- }
- return Optional.of(results.get(0));
- }
-
- // --- Array extraction helpers ---
-
- private static String[] toStringArray(java.sql.Array sqlArray) throws SQLException {
- if (sqlArray == null) return new String[0];
- Object arr = sqlArray.getArray();
- if (arr instanceof String[] sa) return sa;
- if (arr instanceof Object[] oa) {
- String[] result = new String[oa.length];
- for (int i = 0; i < oa.length; i++) {
- result[i] = oa[i] != null ? oa[i].toString() : "";
- }
- return result;
- }
- return new String[0];
- }
-
- private static Instant[] toInstantArray(java.sql.Array sqlArray) throws SQLException {
- if (sqlArray == null) return new Instant[0];
- Object arr = sqlArray.getArray();
- if (arr instanceof Timestamp[] ts) {
- Instant[] result = new Instant[ts.length];
- for (int i = 0; i < ts.length; i++) {
- result[i] = ts[i] != null ? ts[i].toInstant() : Instant.EPOCH;
- }
- return result;
- }
- if (arr instanceof Object[] oa) {
- Instant[] result = new Instant[oa.length];
- for (int i = 0; i < oa.length; i++) {
- if (oa[i] instanceof Timestamp ts) {
- result[i] = ts.toInstant();
- } else {
- result[i] = Instant.EPOCH;
- }
- }
- return result;
- }
- return new Instant[0];
- }
-
- private static long[] toLongArray(java.sql.Array sqlArray) throws SQLException {
- if (sqlArray == null) return new long[0];
- Object arr = sqlArray.getArray();
- if (arr instanceof long[] la) return la;
- if (arr instanceof Long[] la) {
- long[] result = new long[la.length];
- for (int i = 0; i < la.length; i++) {
- result[i] = la[i] != null ? la[i] : 0;
- }
- return result;
- }
- if (arr instanceof Object[] oa) {
- long[] result = new long[oa.length];
- for (int i = 0; i < oa.length; i++) {
- result[i] = oa[i] instanceof Number n ? n.longValue() : 0;
- }
- return result;
- }
- return new long[0];
- }
-
- private static int[] toIntArray(java.sql.Array sqlArray) throws SQLException {
- if (sqlArray == null) return new int[0];
- Object arr = sqlArray.getArray();
- if (arr instanceof int[] ia) return ia;
- if (arr instanceof Integer[] ia) {
- int[] result = new int[ia.length];
- for (int i = 0; i < ia.length; i++) {
- result[i] = ia[i] != null ? ia[i] : 0;
- }
- return result;
- }
- if (arr instanceof Object[] oa) {
- int[] result = new int[oa.length];
- for (int i = 0; i < oa.length; i++) {
- result[i] = oa[i] instanceof Number n ? n.intValue() : 0;
- }
- return result;
- }
- return new int[0];
- }
-
- private static int[] toIntArrayFromShort(java.sql.Array sqlArray) throws SQLException {
- if (sqlArray == null) return new int[0];
- Object arr = sqlArray.getArray();
- if (arr instanceof short[] sa) {
- int[] result = new int[sa.length];
- for (int i = 0; i < sa.length; i++) {
- result[i] = sa[i];
- }
- return result;
- }
- if (arr instanceof int[] ia) return ia;
- if (arr instanceof Object[] oa) {
- int[] result = new int[oa.length];
- for (int i = 0; i < oa.length; i++) {
- result[i] = oa[i] instanceof Number n ? n.intValue() : 0;
- }
- return result;
- }
- return new int[0];
- }
-
- /**
- * Internal record for a flattened processor with tree metadata.
- */
- private record FlatProcessor(ProcessorExecution proc, int depth, int parentIndex) {}
-
- /**
- * Flatten the processor tree with depth and parent index metadata (DFS order).
- */
- private List flattenWithMetadata(List processors) {
- if (processors == null || processors.isEmpty()) {
- return List.of();
- }
- var result = new ArrayList();
- for (ProcessorExecution p : processors) {
- flattenRecursive(p, 0, -1, result);
- }
- return result;
- }
-
- private void flattenRecursive(ProcessorExecution processor, int depth, int parentIdx,
- List result) {
- int myIndex = result.size();
- result.add(new FlatProcessor(processor, depth, parentIdx));
- if (processor.getChildren() != null) {
- for (ProcessorExecution child : processor.getChildren()) {
- flattenRecursive(child, depth + 1, myIndex, result);
- }
- }
- }
-
- private void appendSnapshotText(ExchangeSnapshot snapshot,
- StringBuilder allBodies, StringBuilder allHeaders) {
- if (snapshot != null) {
- allBodies.append(nullSafe(snapshot.getBody())).append(' ');
- allHeaders.append(mapToJson(snapshot.getHeaders())).append(' ');
- }
- }
-
- private static String mapToJson(Map map) {
- if (map == null || map.isEmpty()) {
- return "{}";
- }
- try {
- return OBJECT_MAPPER.writeValueAsString(map);
- } catch (JsonProcessingException e) {
- log.warn("Failed to serialize headers map to JSON", e);
- return "{}";
- }
- }
-
- private static String nullSafe(String value) {
- return value != null ? value : "";
- }
-
- private static Timestamp toTimestamp(Instant instant) {
- return instant != null ? Timestamp.from(instant) : Timestamp.from(Instant.EPOCH);
- }
-}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsRepository.java
deleted file mode 100644
index a72ea26d..00000000
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsRepository.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package com.cameleer3.server.app.storage;
-
-import com.cameleer3.server.core.storage.MetricsRepository;
-import com.cameleer3.server.core.storage.model.MetricsSnapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.jdbc.core.BatchPreparedStatementSetter;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.stereotype.Repository;
-
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * ClickHouse implementation of {@link MetricsRepository}.
- *
- * Performs batch inserts into the {@code agent_metrics} table.
- */
-@Repository
-public class ClickHouseMetricsRepository implements MetricsRepository {
-
- private static final Logger log = LoggerFactory.getLogger(ClickHouseMetricsRepository.class);
-
- private static final String INSERT_SQL = """
- INSERT INTO agent_metrics (agent_id, collected_at, metric_name, metric_value, tags)
- VALUES (?, ?, ?, ?, ?)
- """;
-
- private final JdbcTemplate jdbcTemplate;
-
- public ClickHouseMetricsRepository(JdbcTemplate jdbcTemplate) {
- this.jdbcTemplate = jdbcTemplate;
- }
-
- @Override
- public void insertBatch(List metrics) {
- if (metrics.isEmpty()) {
- return;
- }
-
- jdbcTemplate.batchUpdate(INSERT_SQL, new BatchPreparedStatementSetter() {
- @Override
- public void setValues(PreparedStatement ps, int i) throws SQLException {
- MetricsSnapshot m = metrics.get(i);
- ps.setString(1, m.agentId() != null ? m.agentId() : "");
- ps.setObject(2, m.collectedAt() != null ? Timestamp.from(m.collectedAt()) : Timestamp.from(Instant.EPOCH));
- ps.setString(3, m.metricName() != null ? m.metricName() : "");
- ps.setDouble(4, m.metricValue());
- // ClickHouse Map(String, String) -- pass as a java.util.Map
- Map tags = m.tags() != null ? m.tags() : new HashMap<>();
- ps.setObject(5, tags);
- }
-
- @Override
- public int getBatchSize() {
- return metrics.size();
- }
- });
-
- log.debug("Inserted batch of {} metrics into ClickHouse", metrics.size());
- }
-}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseOidcConfigRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseOidcConfigRepository.java
deleted file mode 100644
index 92b08d54..00000000
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseOidcConfigRepository.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package com.cameleer3.server.app.storage;
-
-import com.cameleer3.server.core.security.OidcConfig;
-import com.cameleer3.server.core.security.OidcConfigRepository;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.stereotype.Repository;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * ClickHouse implementation of {@link OidcConfigRepository}.
- * Singleton row with {@code config_id = 'default'}, using ReplacingMergeTree.
- */
-@Repository
-public class ClickHouseOidcConfigRepository implements OidcConfigRepository {
-
- private final JdbcTemplate jdbc;
-
- public ClickHouseOidcConfigRepository(JdbcTemplate jdbc) {
- this.jdbc = jdbc;
- }
-
- @Override
- public Optional find() {
- List results = jdbc.query(
- "SELECT enabled, issuer_uri, client_id, client_secret, roles_claim, default_roles, auto_signup, display_name_claim "
- + "FROM oidc_config FINAL WHERE config_id = 'default'",
- this::mapRow
- );
- return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
- }
-
- @Override
- public void save(OidcConfig config) {
- jdbc.update(
- "INSERT INTO oidc_config (config_id, enabled, issuer_uri, client_id, client_secret, roles_claim, default_roles, auto_signup, display_name_claim, updated_at) "
- + "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, now64(3, 'UTC'))",
- config.enabled(),
- config.issuerUri(),
- config.clientId(),
- config.clientSecret(),
- config.rolesClaim(),
- config.defaultRoles().toArray(new String[0]),
- config.autoSignup(),
- config.displayNameClaim()
- );
- }
-
- @Override
- public void delete() {
- jdbc.update("DELETE FROM oidc_config WHERE config_id = 'default'");
- }
-
- private OidcConfig mapRow(ResultSet rs, int rowNum) throws SQLException {
- String[] rolesArray = (String[]) rs.getArray("default_roles").getArray();
- return new OidcConfig(
- rs.getBoolean("enabled"),
- rs.getString("issuer_uri"),
- rs.getString("client_id"),
- rs.getString("client_secret"),
- rs.getString("roles_claim"),
- Arrays.asList(rolesArray),
- rs.getBoolean("auto_signup"),
- rs.getString("display_name_claim")
- );
- }
-}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseUserRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseUserRepository.java
deleted file mode 100644
index b5090a1e..00000000
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseUserRepository.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package com.cameleer3.server.app.storage;
-
-import com.cameleer3.server.core.security.UserInfo;
-import com.cameleer3.server.core.security.UserRepository;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.stereotype.Repository;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.time.Instant;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * ClickHouse implementation of {@link UserRepository}.
- *
- * Uses ReplacingMergeTree — reads use {@code FINAL} to get the latest version.
- */
-@Repository
-public class ClickHouseUserRepository implements UserRepository {
-
- private final JdbcTemplate jdbc;
-
- public ClickHouseUserRepository(JdbcTemplate jdbc) {
- this.jdbc = jdbc;
- }
-
- @Override
- public Optional findById(String userId) {
- List results = jdbc.query(
- "SELECT user_id, provider, email, display_name, roles, created_at "
- + "FROM users FINAL WHERE user_id = ?",
- this::mapRow,
- userId
- );
- return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
- }
-
- @Override
- public List findAll() {
- return jdbc.query(
- "SELECT user_id, provider, email, display_name, roles, created_at FROM users FINAL ORDER BY user_id",
- this::mapRow
- );
- }
-
- @Override
- public void upsert(UserInfo user) {
- Optional existing = findById(user.userId());
- if (existing.isPresent()) {
- UserInfo ex = existing.get();
- // Skip write if nothing changed — avoids accumulating un-merged rows
- if (ex.provider().equals(user.provider())
- && ex.email().equals(user.email())
- && ex.displayName().equals(user.displayName())
- && ex.roles().equals(user.roles())) {
- return;
- }
- jdbc.update(
- "INSERT INTO users (user_id, provider, email, display_name, roles, created_at, updated_at) "
- + "SELECT user_id, ?, ?, ?, ?, created_at, now64(3, 'UTC') "
- + "FROM users FINAL WHERE user_id = ?",
- user.provider(),
- user.email(),
- user.displayName(),
- user.roles().toArray(new String[0]),
- user.userId()
- );
- } else {
- jdbc.update(
- "INSERT INTO users (user_id, provider, email, display_name, roles, updated_at) "
- + "VALUES (?, ?, ?, ?, ?, now64(3, 'UTC'))",
- user.userId(),
- user.provider(),
- user.email(),
- user.displayName(),
- user.roles().toArray(new String[0])
- );
- }
- }
-
- @Override
- public void updateRoles(String userId, List roles) {
- // ReplacingMergeTree: insert a new row with updated_at to supersede the old one.
- // Copy existing fields, update roles.
- jdbc.update(
- "INSERT INTO users (user_id, provider, email, display_name, roles, created_at, updated_at) "
- + "SELECT user_id, provider, email, display_name, ?, created_at, now64(3, 'UTC') "
- + "FROM users FINAL WHERE user_id = ?",
- roles.toArray(new String[0]),
- userId
- );
- }
-
- @Override
- public void delete(String userId) {
- jdbc.update("DELETE FROM users WHERE user_id = ?", userId);
- }
-
- private UserInfo mapRow(ResultSet rs, int rowNum) throws SQLException {
- String[] rolesArray = (String[]) rs.getArray("roles").getArray();
- return new UserInfo(
- rs.getString("user_id"),
- rs.getString("provider"),
- rs.getString("email"),
- rs.getString("display_name"),
- Arrays.asList(rolesArray),
- rs.getTimestamp("created_at").toInstant()
- );
- }
-}
diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java
index f5867fec..6985b2a3 100644
--- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java
+++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java
@@ -61,9 +61,11 @@ public class PostgresUserRepository implements UserRepository {
private UserInfo mapUser(java.sql.ResultSet rs) throws java.sql.SQLException {
Array rolesArray = rs.getArray("roles");
String[] roles = rolesArray != null ? (String[]) rolesArray.getArray() : new String[0];
+ java.sql.Timestamp ts = rs.getTimestamp("created_at");
+ java.time.Instant createdAt = ts != null ? ts.toInstant() : null;
return new UserInfo(
rs.getString("user_id"), rs.getString("provider"),
rs.getString("email"), rs.getString("display_name"),
- List.of(roles));
+ List.of(roles), createdAt);
}
}
diff --git a/cameleer3-server-app/src/main/resources/clickhouse/01-schema.sql b/cameleer3-server-app/src/main/resources/clickhouse/01-schema.sql
deleted file mode 100644
index ab56da70..00000000
--- a/cameleer3-server-app/src/main/resources/clickhouse/01-schema.sql
+++ /dev/null
@@ -1,57 +0,0 @@
--- Cameleer3 ClickHouse Schema
--- Tables for route executions, route diagrams, and agent metrics.
-
-CREATE TABLE IF NOT EXISTS route_executions (
- execution_id String,
- route_id LowCardinality(String),
- agent_id LowCardinality(String),
- status LowCardinality(String),
- start_time DateTime64(3, 'UTC'),
- end_time Nullable(DateTime64(3, 'UTC')),
- duration_ms UInt64,
- correlation_id String,
- exchange_id String,
- error_message String DEFAULT '',
- error_stacktrace String DEFAULT '',
- -- Nested processor executions stored as parallel arrays
- processor_ids Array(String),
- processor_types Array(LowCardinality(String)),
- processor_starts Array(DateTime64(3, 'UTC')),
- processor_ends Array(DateTime64(3, 'UTC')),
- processor_durations Array(UInt64),
- processor_statuses Array(LowCardinality(String)),
- -- Metadata
- server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'),
- -- Skip indexes
- INDEX idx_correlation correlation_id TYPE bloom_filter GRANULARITY 4,
- INDEX idx_error error_message TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4
-)
-ENGINE = MergeTree()
-PARTITION BY toYYYYMMDD(start_time)
-ORDER BY (agent_id, status, start_time, execution_id)
-TTL toDateTime(start_time) + toIntervalDay(30)
-SETTINGS ttl_only_drop_parts = 1;
-
-CREATE TABLE IF NOT EXISTS route_diagrams (
- content_hash String,
- route_id LowCardinality(String),
- agent_id LowCardinality(String),
- definition String,
- created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
-)
-ENGINE = ReplacingMergeTree(created_at)
-ORDER BY (content_hash);
-
-CREATE TABLE IF NOT EXISTS agent_metrics (
- agent_id LowCardinality(String),
- collected_at DateTime64(3, 'UTC'),
- metric_name LowCardinality(String),
- metric_value Float64,
- tags Map(String, String),
- server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
-)
-ENGINE = MergeTree()
-PARTITION BY toYYYYMMDD(collected_at)
-ORDER BY (agent_id, metric_name, collected_at)
-TTL toDateTime(collected_at) + toIntervalDay(30)
-SETTINGS ttl_only_drop_parts = 1;
diff --git a/cameleer3-server-app/src/main/resources/clickhouse/02-search-columns.sql b/cameleer3-server-app/src/main/resources/clickhouse/02-search-columns.sql
deleted file mode 100644
index 2b11b435..00000000
--- a/cameleer3-server-app/src/main/resources/clickhouse/02-search-columns.sql
+++ /dev/null
@@ -1,25 +0,0 @@
--- Phase 2: Schema extension for search, detail, and diagram linking columns.
--- Adds exchange snapshot data, processor tree metadata, and diagram content hash.
-
-ALTER TABLE route_executions
- ADD COLUMN IF NOT EXISTS exchange_bodies String DEFAULT '',
- ADD COLUMN IF NOT EXISTS exchange_headers String DEFAULT '',
- ADD COLUMN IF NOT EXISTS processor_depths Array(UInt16) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_parent_indexes Array(Int32) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_error_messages Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_error_stacktraces Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_input_bodies Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_output_bodies Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_input_headers Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_output_headers Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_diagram_node_ids Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS diagram_content_hash String DEFAULT '';
-
--- Skip indexes for full-text search on new text columns
-ALTER TABLE route_executions
- ADD INDEX IF NOT EXISTS idx_exchange_bodies exchange_bodies TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4,
- ADD INDEX IF NOT EXISTS idx_exchange_headers exchange_headers TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;
-
--- Skip index on error_stacktrace (not indexed in 01-schema.sql, needed for SRCH-05)
-ALTER TABLE route_executions
- ADD INDEX IF NOT EXISTS idx_error_stacktrace error_stacktrace TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;
diff --git a/cameleer3-server-app/src/main/resources/clickhouse/03-users.sql b/cameleer3-server-app/src/main/resources/clickhouse/03-users.sql
deleted file mode 100644
index 9dc7ce7a..00000000
--- a/cameleer3-server-app/src/main/resources/clickhouse/03-users.sql
+++ /dev/null
@@ -1,10 +0,0 @@
-CREATE TABLE IF NOT EXISTS users (
- user_id String,
- provider LowCardinality(String),
- email String DEFAULT '',
- display_name String DEFAULT '',
- roles Array(LowCardinality(String)),
- created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'),
- updated_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
-) ENGINE = ReplacingMergeTree(updated_at)
-ORDER BY (user_id);
diff --git a/cameleer3-server-app/src/main/resources/clickhouse/04-oidc-config.sql b/cameleer3-server-app/src/main/resources/clickhouse/04-oidc-config.sql
deleted file mode 100644
index 35b4d896..00000000
--- a/cameleer3-server-app/src/main/resources/clickhouse/04-oidc-config.sql
+++ /dev/null
@@ -1,13 +0,0 @@
-CREATE TABLE IF NOT EXISTS oidc_config (
- config_id String DEFAULT 'default',
- enabled Bool DEFAULT false,
- issuer_uri String DEFAULT '',
- client_id String DEFAULT '',
- client_secret String DEFAULT '',
- roles_claim String DEFAULT 'realm_access.roles',
- default_roles Array(LowCardinality(String)),
- auto_signup Bool DEFAULT true,
- display_name_claim String DEFAULT 'name',
- updated_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
-) ENGINE = ReplacingMergeTree(updated_at)
-ORDER BY (config_id);
diff --git a/cameleer3-server-app/src/main/resources/clickhouse/05-oidc-auto-signup.sql b/cameleer3-server-app/src/main/resources/clickhouse/05-oidc-auto-signup.sql
deleted file mode 100644
index 643a69ea..00000000
--- a/cameleer3-server-app/src/main/resources/clickhouse/05-oidc-auto-signup.sql
+++ /dev/null
@@ -1 +0,0 @@
-ALTER TABLE oidc_config ADD COLUMN IF NOT EXISTS auto_signup Bool DEFAULT true;
diff --git a/cameleer3-server-app/src/main/resources/clickhouse/06-oidc-display-name-claim.sql b/cameleer3-server-app/src/main/resources/clickhouse/06-oidc-display-name-claim.sql
deleted file mode 100644
index ef1870bd..00000000
--- a/cameleer3-server-app/src/main/resources/clickhouse/06-oidc-display-name-claim.sql
+++ /dev/null
@@ -1 +0,0 @@
-ALTER TABLE oidc_config ADD COLUMN IF NOT EXISTS display_name_claim String DEFAULT 'name';
diff --git a/cameleer3-server-app/src/main/resources/clickhouse/07-stats-rollup.sql b/cameleer3-server-app/src/main/resources/clickhouse/07-stats-rollup.sql
deleted file mode 100644
index 5d1efe24..00000000
--- a/cameleer3-server-app/src/main/resources/clickhouse/07-stats-rollup.sql
+++ /dev/null
@@ -1,35 +0,0 @@
--- Pre-aggregated 5-minute stats rollup for route executions.
--- Uses AggregatingMergeTree with -State/-Merge combinators so intermediate
--- aggregates can be merged across arbitrary time windows and dimensions.
-
--- Drop existing objects to allow schema changes (MV must be dropped before table)
-DROP VIEW IF EXISTS route_execution_stats_5m_mv;
-DROP TABLE IF EXISTS route_execution_stats_5m;
-
-CREATE TABLE route_execution_stats_5m (
- bucket DateTime('UTC'),
- route_id LowCardinality(String),
- agent_id LowCardinality(String),
- total_count AggregateFunction(count),
- failed_count AggregateFunction(countIf, UInt8),
- duration_sum AggregateFunction(sum, UInt64),
- p99_duration AggregateFunction(quantileTDigest(0.99), UInt64)
-)
-ENGINE = AggregatingMergeTree()
-PARTITION BY toYYYYMMDD(bucket)
-ORDER BY (agent_id, route_id, bucket)
-TTL bucket + toIntervalDay(30)
-SETTINGS ttl_only_drop_parts = 1;
-
-CREATE MATERIALIZED VIEW route_execution_stats_5m_mv
-TO route_execution_stats_5m
-AS SELECT
- toStartOfFiveMinutes(start_time) AS bucket,
- route_id,
- agent_id,
- countState() AS total_count,
- countIfState(status = 'FAILED') AS failed_count,
- sumState(duration_ms) AS duration_sum,
- quantileTDigestState(0.99)(duration_ms) AS p99_duration
-FROM route_executions
-GROUP BY bucket, route_id, agent_id;
diff --git a/cameleer3-server-app/src/main/resources/clickhouse/08-stats-rollup-backfill.sql b/cameleer3-server-app/src/main/resources/clickhouse/08-stats-rollup-backfill.sql
deleted file mode 100644
index 5e80a23a..00000000
--- a/cameleer3-server-app/src/main/resources/clickhouse/08-stats-rollup-backfill.sql
+++ /dev/null
@@ -1,16 +0,0 @@
--- One-time idempotent backfill of existing route_executions into the
--- 5-minute stats rollup table. Safe for repeated execution — the WHERE
--- clause skips the INSERT if the target table already contains data.
-
-INSERT INTO route_execution_stats_5m
-SELECT
- toStartOfFiveMinutes(start_time) AS bucket,
- route_id,
- agent_id,
- countState() AS total_count,
- countIfState(status = 'FAILED') AS failed_count,
- sumState(duration_ms) AS duration_sum,
- quantileTDigestState(0.99)(duration_ms) AS p99_duration
-FROM route_executions
-WHERE (SELECT count() FROM route_execution_stats_5m) = 0
-GROUP BY bucket, route_id, agent_id;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java
deleted file mode 100644
index d1271adb..00000000
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package com.cameleer3.server.app;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.DynamicPropertyRegistry;
-import org.springframework.test.context.DynamicPropertySource;
-import org.testcontainers.clickhouse.ClickHouseContainer;
-
-import org.junit.jupiter.api.BeforeAll;
-
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-
-/**
- * Base class for integration tests requiring a ClickHouse instance.
- *
- * Uses Testcontainers to spin up a ClickHouse server and initializes the schema
- * from {@code clickhouse/init/01-schema.sql} before the first test runs.
- * Subclasses get a {@link JdbcTemplate} for direct database assertions.
- *
- * Container lifecycle is managed manually (started once, shared across all test classes).
- */
-@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
-@ActiveProfiles("test")
-public abstract class AbstractClickHouseIT {
-
- protected static final ClickHouseContainer CLICKHOUSE;
-
- static {
- CLICKHOUSE = new ClickHouseContainer("clickhouse/clickhouse-server:25.3");
- CLICKHOUSE.start();
- }
-
- @Autowired
- protected JdbcTemplate jdbcTemplate;
-
- @DynamicPropertySource
- static void overrideProperties(DynamicPropertyRegistry registry) {
- registry.add("spring.datasource.url", CLICKHOUSE::getJdbcUrl);
- registry.add("spring.datasource.username", CLICKHOUSE::getUsername);
- registry.add("spring.datasource.password", CLICKHOUSE::getPassword);
- }
-
- @BeforeAll
- static void initSchema() throws Exception {
- // Surefire runs from the module directory; schema is in the project root
- Path baseDir = Path.of("clickhouse/init");
- if (!Files.exists(baseDir)) {
- baseDir = Path.of("../clickhouse/init");
- }
-
- // Load all schema files in order
- String[] schemaFiles = {"01-schema.sql", "02-search-columns.sql", "03-users.sql", "04-oidc-config.sql", "05-oidc-auto-signup.sql"};
-
- try (Connection conn = DriverManager.getConnection(
- CLICKHOUSE.getJdbcUrl(),
- CLICKHOUSE.getUsername(),
- CLICKHOUSE.getPassword());
- Statement stmt = conn.createStatement()) {
-
- for (String schemaFile : schemaFiles) {
- Path schemaPath = baseDir.resolve(schemaFile);
- if (Files.exists(schemaPath)) {
- String sql = Files.readString(schemaPath, StandardCharsets.UTF_8);
- // Execute each statement separately (separated by semicolons)
- for (String statement : sql.split(";")) {
- String trimmed = statement.trim();
- if (!trimmed.isEmpty()) {
- stmt.execute(trimmed);
- }
- }
- }
- }
- }
- }
-}
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java
index 26faf84a..490e20a9 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java
@@ -1,6 +1,8 @@
package com.cameleer3.server.app;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -18,6 +20,9 @@ public abstract class AbstractPostgresIT {
.withUsername("cameleer")
.withPassword("test");
+ @Autowired
+ protected JdbcTemplate jdbcTemplate;
+
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", postgres::getJdbcUrl);
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java
index ab98f30d..4ba36c5d 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentCommandControllerIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -18,7 +18,7 @@ import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
-class AgentCommandControllerIT extends AbstractClickHouseIT {
+class AgentCommandControllerIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java
index 652f92d8..763646b9 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentRegistrationControllerIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -16,7 +16,7 @@ import org.springframework.http.ResponseEntity;
import static org.assertj.core.api.Assertions.assertThat;
-class AgentRegistrationControllerIT extends AbstractClickHouseIT {
+class AgentRegistrationControllerIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java
index 1af16ed5..fddc7152 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/AgentSseControllerIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
-class AgentSseControllerIT extends AbstractClickHouseIT {
+class AgentSseControllerIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java
index aa8baa17..ee3db1fe 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import com.cameleer3.server.core.ingestion.IngestionService;
import org.junit.jupiter.api.BeforeEach;
@@ -13,21 +13,20 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.TestPropertySource;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.awaitility.Awaitility.await;
/**
- * Tests backpressure behavior when write buffers are full.
- * Uses a tiny buffer (capacity=5) and a very long flush interval
- * to prevent the scheduler from draining the buffer during the test.
+ * Tests backpressure behavior when the metrics write buffer is full.
+ *
+ * Execution and diagram ingestion are now synchronous (no buffers).
+ * Only the metrics pipeline still uses a write buffer with backpressure.
*/
@TestPropertySource(properties = {
"ingestion.buffer-capacity=5",
"ingestion.batch-size=5",
"ingestion.flush-interval-ms=60000" // 60s -- effectively no flush during test
})
-class BackpressureIT extends AbstractClickHouseIT {
+class BackpressureIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
@@ -47,34 +46,31 @@ class BackpressureIT extends AbstractClickHouseIT {
}
@Test
- void whenBufferFull_returns503WithRetryAfter() {
- // Wait for any initial scheduled flush to complete, then fill buffer via batch POST
- await().atMost(5, SECONDS).until(() -> ingestionService.getExecutionBufferDepth() == 0);
-
- // Fill the buffer completely with a batch of 5
+ void whenMetricsBufferFull_returns503WithRetryAfter() {
+ // Fill the metrics buffer completely with a batch of 5
String batchJson = """
[
- {"routeId":"bp-0","exchangeId":"bp-e0","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]},
- {"routeId":"bp-1","exchangeId":"bp-e1","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]},
- {"routeId":"bp-2","exchangeId":"bp-e2","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]},
- {"routeId":"bp-3","exchangeId":"bp-e3","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]},
- {"routeId":"bp-4","exchangeId":"bp-e4","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]}
+ {"agentId":"bp-agent","timestamp":"2026-03-11T10:00:00Z","metrics":{}},
+ {"agentId":"bp-agent","timestamp":"2026-03-11T10:00:01Z","metrics":{}},
+ {"agentId":"bp-agent","timestamp":"2026-03-11T10:00:02Z","metrics":{}},
+ {"agentId":"bp-agent","timestamp":"2026-03-11T10:00:03Z","metrics":{}},
+ {"agentId":"bp-agent","timestamp":"2026-03-11T10:00:04Z","metrics":{}}
]
""";
ResponseEntity batchResponse = restTemplate.postForEntity(
- "/api/v1/data/executions",
+ "/api/v1/data/metrics",
new HttpEntity<>(batchJson, authHeaders),
String.class);
assertThat(batchResponse.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
// Now buffer should be full -- next POST should get 503
String overflowJson = """
- {"routeId":"bp-overflow","exchangeId":"bp-overflow-e","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]}
+ [{"agentId":"bp-agent","timestamp":"2026-03-11T10:00:05Z","metrics":{}}]
""";
ResponseEntity response = restTemplate.postForEntity(
- "/api/v1/data/executions",
+ "/api/v1/data/metrics",
new HttpEntity<>(overflowJson, authHeaders),
String.class);
@@ -83,25 +79,17 @@ class BackpressureIT extends AbstractClickHouseIT {
}
@Test
- void bufferedDataNotLost_afterBackpressure() {
- // Post data to the diagram buffer (separate from executions used above)
- for (int i = 0; i < 3; i++) {
- String json = String.format("""
- {
- "routeId": "bp-persist-diagram-%d",
- "version": 1,
- "nodes": [],
- "edges": []
- }
- """, i);
+ void executionIngestion_isSynchronous_returnsAccepted() {
+ String json = """
+ {"routeId":"bp-sync","exchangeId":"bp-sync-e","status":"COMPLETED","startTime":"2026-03-11T10:00:00Z","durationMs":100,"processors":[]}
+ """;
- restTemplate.postForEntity(
- "/api/v1/data/diagrams",
- new HttpEntity<>(json, authHeaders),
- String.class);
- }
+ ResponseEntity response = restTemplate.postForEntity(
+ "/api/v1/data/executions",
+ new HttpEntity<>(json, authHeaders),
+ String.class);
- // Data is in the buffer. Verify the buffer has data.
- assertThat(ingestionService.getDiagramBufferDepth()).isGreaterThanOrEqualTo(3);
+ // Synchronous ingestion always returns 202 (no buffer to overflow)
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
}
}
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DetailControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DetailControllerIT.java
index cdd29df7..83fa17b1 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DetailControllerIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DetailControllerIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -23,7 +23,7 @@ import static org.awaitility.Awaitility.await;
* Integration tests for the detail and processor snapshot endpoints.
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-class DetailControllerIT extends AbstractClickHouseIT {
+class DetailControllerIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
@@ -121,7 +121,7 @@ class DetailControllerIT extends AbstractClickHouseIT {
// Wait for flush and get the execution_id
await().atMost(10, SECONDS).untilAsserted(() -> {
Integer count = jdbcTemplate.queryForObject(
- "SELECT count() FROM route_executions WHERE route_id = 'detail-test-route'",
+ "SELECT count(*) FROM route_executions WHERE route_id = 'detail-test-route'",
Integer.class);
assertThat(count).isGreaterThanOrEqualTo(1);
});
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java
index 832967fc..af6f274d 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -15,7 +15,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
-class DiagramControllerIT extends AbstractClickHouseIT {
+class DiagramControllerIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
@@ -53,7 +53,7 @@ class DiagramControllerIT extends AbstractClickHouseIT {
}
@Test
- void postDiagram_dataAppearsInClickHouseAfterFlush() {
+ void postDiagram_dataAppearsAfterFlush() {
String json = """
{
"routeId": "diagram-flush-route",
@@ -72,7 +72,7 @@ class DiagramControllerIT extends AbstractClickHouseIT {
await().atMost(10, SECONDS).untilAsserted(() -> {
Integer count = jdbcTemplate.queryForObject(
- "SELECT count() FROM route_diagrams WHERE route_id = 'diagram-flush-route'",
+ "SELECT count(*) FROM route_diagrams WHERE route_id = 'diagram-flush-route'",
Integer.class);
assertThat(count).isGreaterThanOrEqualTo(1);
});
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramRenderControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramRenderControllerIT.java
index f4b0308d..af0b8668 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramRenderControllerIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramRenderControllerIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -20,7 +20,7 @@ import static org.awaitility.Awaitility.await;
* Integration tests for {@link DiagramRenderController}.
* Seeds a diagram via the ingestion endpoint, then tests rendering.
*/
-class DiagramRenderControllerIT extends AbstractClickHouseIT {
+class DiagramRenderControllerIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
@@ -61,7 +61,7 @@ class DiagramRenderControllerIT extends AbstractClickHouseIT {
new HttpEntity<>(json, securityHelper.authHeaders(jwt)),
String.class);
- // Wait for flush to ClickHouse and retrieve the content hash
+ // Wait for flush to storage and retrieve the content hash
await().atMost(10, SECONDS).untilAsserted(() -> {
String hash = jdbcTemplate.queryForObject(
"SELECT content_hash FROM route_diagrams WHERE route_id = 'render-test-route' LIMIT 1",
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java
index a2bf59d5..65f72d85 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -16,7 +16,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
-class ExecutionControllerIT extends AbstractClickHouseIT {
+class ExecutionControllerIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
@@ -90,7 +90,7 @@ class ExecutionControllerIT extends AbstractClickHouseIT {
}
@Test
- void postExecution_dataAppearsInClickHouseAfterFlush() {
+ void postExecution_dataAppearsAfterFlush() {
String json = """
{
"routeId": "flush-test-route",
@@ -111,7 +111,7 @@ class ExecutionControllerIT extends AbstractClickHouseIT {
await().atMost(10, SECONDS).untilAsserted(() -> {
Integer count = jdbcTemplate.queryForObject(
- "SELECT count() FROM route_executions WHERE route_id = 'flush-test-route'",
+ "SELECT count(*) FROM route_executions WHERE route_id = 'flush-test-route'",
Integer.class);
assertThat(count).isGreaterThanOrEqualTo(1);
});
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ForwardCompatIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ForwardCompatIT.java
index 9d68212d..555bbf7c 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ForwardCompatIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ForwardCompatIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -16,7 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* Integration test for forward compatibility (API-05).
* Verifies that unknown JSON fields in request bodies do not cause deserialization errors.
*/
-class ForwardCompatIT extends AbstractClickHouseIT {
+class ForwardCompatIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/HealthControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/HealthControllerIT.java
index c701af3b..9ca31887 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/HealthControllerIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/HealthControllerIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
@@ -8,9 +8,9 @@ import org.springframework.boot.test.web.client.TestRestTemplate;
import static org.assertj.core.api.Assertions.assertThat;
/**
- * Integration tests for the health endpoint and ClickHouse TTL verification.
+ * Integration tests for the health endpoint.
*/
-class HealthControllerIT extends AbstractClickHouseIT {
+class HealthControllerIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
@@ -28,20 +28,4 @@ class HealthControllerIT extends AbstractClickHouseIT {
var response = restTemplate.getForEntity("/api/v1/health", String.class);
assertThat(response.getStatusCode().value()).isEqualTo(200);
}
-
- @Test
- void ttlConfiguredOnRouteExecutions() {
- String createTable = jdbcTemplate.queryForObject(
- "SHOW CREATE TABLE route_executions", String.class);
- assertThat(createTable).containsIgnoringCase("TTL");
- assertThat(createTable).contains("toIntervalDay(30)");
- }
-
- @Test
- void ttlConfiguredOnAgentMetrics() {
- String createTable = jdbcTemplate.queryForObject(
- "SHOW CREATE TABLE agent_metrics", String.class);
- assertThat(createTable).containsIgnoringCase("TTL");
- assertThat(createTable).contains("toIntervalDay(30)");
- }
}
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java
index d0eb9793..8f0d8a14 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -15,7 +15,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
-class MetricsControllerIT extends AbstractClickHouseIT {
+class MetricsControllerIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
@@ -52,7 +52,7 @@ class MetricsControllerIT extends AbstractClickHouseIT {
}
@Test
- void postMetrics_dataAppearsInClickHouseAfterFlush() {
+ void postMetrics_dataAppearsAfterFlush() {
String json = """
[{
"agentId": "agent-flush-test",
@@ -70,7 +70,7 @@ class MetricsControllerIT extends AbstractClickHouseIT {
await().atMost(10, SECONDS).untilAsserted(() -> {
Integer count = jdbcTemplate.queryForObject(
- "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-flush-test'",
+ "SELECT count(*) FROM agent_metrics WHERE agent_id = 'agent-flush-test'",
Integer.class);
assertThat(count).isGreaterThanOrEqualTo(1);
});
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenApiIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenApiIT.java
index e474f2b8..a8ceb053 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenApiIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenApiIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
@@ -10,7 +10,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* Integration tests for OpenAPI documentation endpoints.
*/
-class OpenApiIT extends AbstractClickHouseIT {
+class OpenApiIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java
index 8ae4e072..95f42b2a 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.controller;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -24,7 +24,7 @@ import static org.awaitility.Awaitility.await;
* Tests all filter types independently and in combination.
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-class SearchControllerIT extends AbstractClickHouseIT {
+class SearchControllerIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
@@ -155,7 +155,7 @@ class SearchControllerIT extends AbstractClickHouseIT {
// Wait for all data to flush
await().atMost(10, SECONDS).untilAsserted(() -> {
Integer count = jdbcTemplate.queryForObject(
- "SELECT count() FROM route_executions WHERE route_id LIKE 'search-route-%'",
+ "SELECT count(*) FROM route_executions WHERE route_id LIKE 'search-route-%'",
Integer.class);
assertThat(count).isEqualTo(10);
});
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/interceptor/ProtocolVersionIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/interceptor/ProtocolVersionIT.java
index 26e8d5a9..35d0c0d1 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/interceptor/ProtocolVersionIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/interceptor/ProtocolVersionIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.interceptor;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -18,7 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* With security enabled, requests to protected endpoints need JWT auth
* to reach the interceptor layer.
*/
-class ProtocolVersionIT extends AbstractClickHouseIT {
+class ProtocolVersionIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java
index 24054006..2194ecb4 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java
@@ -24,8 +24,7 @@ class OpenSearchIndexIT extends AbstractPostgresIT {
@Container
static final OpensearchContainer> opensearch =
- new OpensearchContainer<>("opensearchproject/opensearch:2.19.0")
- .withSecurityEnabled(false);
+ new OpensearchContainer<>("opensearchproject/opensearch:2.19.0");
@DynamicPropertySource
static void configureOpenSearch(DynamicPropertyRegistry registry) {
@@ -58,7 +57,7 @@ class OpenSearchIndexIT extends AbstractPostgresIT {
SearchResult result = searchIndex.search(request);
assertTrue(result.total() > 0);
- assertEquals("search-1", result.items().get(0).executionId());
+ assertEquals("search-1", result.data().get(0).executionId());
}
@Test
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/BootstrapTokenIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/BootstrapTokenIT.java
index 1309517b..3ce87894 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/BootstrapTokenIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/BootstrapTokenIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.security;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
@@ -17,7 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* Integration tests verifying bootstrap token validation on the registration endpoint.
*/
-class BootstrapTokenIT extends AbstractClickHouseIT {
+class BootstrapTokenIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/JwtRefreshIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/JwtRefreshIT.java
index 7e40e0a1..87ddf25e 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/JwtRefreshIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/JwtRefreshIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.security;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import com.cameleer3.server.core.security.JwtService;
import com.fasterxml.jackson.databind.JsonNode;
@@ -20,7 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* Integration tests for the JWT refresh flow.
*/
-class JwtRefreshIT extends AbstractClickHouseIT {
+class JwtRefreshIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/RegistrationSecurityIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/RegistrationSecurityIT.java
index abd35524..e4ee5da4 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/RegistrationSecurityIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/RegistrationSecurityIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.security;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
@@ -19,7 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* Integration tests verifying that registration returns security credentials
* and that those credentials can be used to access protected endpoints.
*/
-class RegistrationSecurityIT extends AbstractClickHouseIT {
+class RegistrationSecurityIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SecurityFilterIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SecurityFilterIT.java
index 38f25766..ba8dfcbb 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SecurityFilterIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SecurityFilterIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.security;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -19,7 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* Integration tests verifying that the SecurityFilterChain correctly
* protects endpoints and allows public access where configured.
*/
-class SecurityFilterIT extends AbstractClickHouseIT {
+class SecurityFilterIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SseSigningIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SseSigningIT.java
index ccbb8af9..d611520b 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SseSigningIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/security/SseSigningIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.security;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.core.security.Ed25519SigningService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -44,7 +44,7 @@ import static org.awaitility.Awaitility.await;
* open SSE stream (with JWT query param) -> push config-update command (with JWT) ->
* receive SSE event -> verify signature field against server's Ed25519 public key.
*/
-class SseSigningIT extends AbstractClickHouseIT {
+class SseSigningIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/DiagramLinkingIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/DiagramLinkingIT.java
index 7322ec26..ab0f01c3 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/DiagramLinkingIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/DiagramLinkingIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.storage;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -19,7 +19,7 @@ import static org.awaitility.Awaitility.await;
* Integration test proving that diagram_content_hash is populated during
* execution ingestion when a RouteGraph exists for the same route+agent.
*/
-class DiagramLinkingIT extends AbstractClickHouseIT {
+class DiagramLinkingIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java
index d0d79e02..4cfa8247 100644
--- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java
+++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/IngestionSchemaIT.java
@@ -1,6 +1,6 @@
package com.cameleer3.server.app.storage;
-import com.cameleer3.server.app.AbstractClickHouseIT;
+import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.app.TestSecurityHelper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -22,7 +22,7 @@ import static org.awaitility.Awaitility.await;
* Integration test verifying that Phase 2 schema columns are correctly populated
* during ingestion of route executions with nested processors and exchange data.
*/
-class IngestionSchemaIT extends AbstractClickHouseIT {
+class IngestionSchemaIT extends AbstractPostgresIT {
@Autowired
private TestRestTemplate restTemplate;
diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java
index e739dd81..1b474ba0 100644
--- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java
+++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ExecutionDetail.java
@@ -7,7 +7,7 @@ import java.util.List;
* Full detail of a route execution, including the nested processor tree.
*
* This is the rich detail model returned by the detail endpoint. The processor
- * tree is reconstructed from flat parallel arrays stored in ClickHouse.
+ * tree is reconstructed from individual processor records stored in PostgreSQL.
*
* @param executionId unique execution identifier
* @param routeId Camel route ID
diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java
index 10d1e88e..65e08b9a 100644
--- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java
+++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/ProcessorNode.java
@@ -7,7 +7,7 @@ import java.util.List;
/**
* Nested tree node representing a single processor execution within a route.
*
- * The tree structure is reconstructed from flat parallel arrays stored in ClickHouse.
+ * The tree structure is reconstructed from individual processor records stored in PostgreSQL.
* Each node may have children (e.g., processors inside a split or try-catch block).
*/
public final class ProcessorNode {
diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java
deleted file mode 100644
index 2297e4b6..00000000
--- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package com.cameleer3.server.core.detail;
-
-import java.time.Instant;
-
-/**
- * Raw execution data from ClickHouse, including all parallel arrays needed
- * for tree reconstruction. This is the intermediate representation between
- * the database and the {@link ExecutionDetail} domain object.
- *
- * @param executionId unique execution identifier
- * @param routeId Camel route ID
- * @param agentId agent instance
- * @param status execution status
- * @param startTime execution start time
- * @param endTime execution end time
- * @param durationMs execution duration in milliseconds
- * @param correlationId correlation ID
- * @param exchangeId Camel exchange ID
- * @param errorMessage execution-level error message
- * @param errorStackTrace execution-level error stack trace
- * @param diagramContentHash content hash for diagram linking
- * @param processorIds processor IDs (parallel array)
- * @param processorTypes processor types (parallel array)
- * @param processorStatuses processor statuses (parallel array)
- * @param processorStarts processor start times (parallel array)
- * @param processorEnds processor end times (parallel array)
- * @param processorDurations processor durations in ms (parallel array)
- * @param processorDiagramNodeIds processor diagram node IDs (parallel array)
- * @param processorErrorMessages processor error messages (parallel array)
- * @param processorErrorStacktraces processor error stack traces (parallel array)
- * @param processorDepths processor tree depths (parallel array)
- * @param processorParentIndexes processor parent indexes, -1 for roots (parallel array)
- */
-public record RawExecutionRow(
- String executionId,
- String routeId,
- String agentId,
- String status,
- Instant startTime,
- Instant endTime,
- long durationMs,
- String correlationId,
- String exchangeId,
- String errorMessage,
- String errorStackTrace,
- String diagramContentHash,
- String[] processorIds,
- String[] processorTypes,
- String[] processorStatuses,
- Instant[] processorStarts,
- Instant[] processorEnds,
- long[] processorDurations,
- String[] processorDiagramNodeIds,
- String[] processorErrorMessages,
- String[] processorErrorStacktraces,
- int[] processorDepths,
- int[] processorParentIndexes
-) {
-}
diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java
index c5e17e6f..36419fb8 100644
--- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java
+++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java
@@ -70,12 +70,12 @@ public class IngestionService {
private ExecutionRecord toExecutionRecord(String agentId, String groupName,
RouteExecution exec) {
return new ExecutionRecord(
- exec.getExecutionId(), exec.getRouteId(), agentId, groupName,
+ exec.getExchangeId(), exec.getRouteId(), agentId, groupName,
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
exec.getCorrelationId(), exec.getExchangeId(),
exec.getStartTime(), exec.getEndTime(),
exec.getDurationMs(),
- exec.getErrorMessage(), exec.getErrorStacktrace(),
+ exec.getErrorMessage(), exec.getErrorStackTrace(),
null // diagramContentHash set separately
);
}
@@ -94,7 +94,7 @@ public class IngestionService {
p.getStartTime() != null ? p.getStartTime() : execStartTime,
p.getEndTime(),
p.getDurationMs(),
- p.getErrorMessage(), p.getErrorStacktrace(),
+ p.getErrorMessage(), p.getErrorStackTrace(),
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
p.getInputHeaders() != null ? p.getInputHeaders().toString() : null,
p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null
diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/WriteBuffer.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/WriteBuffer.java
index 267de43c..bcd1077c 100644
--- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/WriteBuffer.java
+++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/WriteBuffer.java
@@ -6,7 +6,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
- * Bounded write buffer that decouples HTTP ingestion from ClickHouse batch inserts.
+ * Bounded write buffer that decouples HTTP ingestion from database batch inserts.
*
* Items are offered to the buffer by controllers and drained in batches by a
* scheduled flush task. When the buffer is full, {@link #offer} returns false,
diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java
deleted file mode 100644
index 44955c18..00000000
--- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.cameleer3.server.core.search;
-
-import java.util.List;
-
-/**
- * Swappable search backend abstraction.
- *
- * The current implementation uses ClickHouse for search. This interface allows
- * replacing the search backend (e.g., with OpenSearch) without changing the
- * service layer or controllers.
- */
-public interface SearchEngine {
-
- /**
- * Search for route executions matching the given criteria.
- *
- * @param request search filters and pagination
- * @return paginated search results with total count
- */
- SearchResult search(SearchRequest request);
-
- /**
- * Count route executions matching the given criteria (without fetching data).
- *
- * @param request search filters
- * @return total number of matching executions
- */
- long count(SearchRequest request);
-
- /**
- * Compute aggregate stats: P99 latency and count of currently running executions.
- *
- * @param from start of the time window
- * @param to end of the time window
- * @return execution stats
- */
- ExecutionStats stats(java.time.Instant from, java.time.Instant to);
-
- /**
- * Compute aggregate stats scoped to specific routes and agents.
- *
- * @param from start of the time window
- * @param to end of the time window
- * @param routeId optional route ID filter
- * @param agentIds optional agent ID filter (from group resolution)
- * @return execution stats
- */
- ExecutionStats stats(java.time.Instant from, java.time.Instant to, String routeId, List agentIds);
-
- /**
- * Compute bucketed time-series stats over a time window.
- *
- * @param from start of the time window
- * @param to end of the time window
- * @param bucketCount number of buckets to divide the window into
- * @return bucketed stats
- */
- StatsTimeseries timeseries(java.time.Instant from, java.time.Instant to, int bucketCount);
-
- /**
- * Compute bucketed time-series stats scoped to specific routes and agents.
- *
- * @param from start of the time window
- * @param to end of the time window
- * @param bucketCount number of buckets to divide the window into
- * @param routeId optional route ID filter
- * @param agentIds optional agent ID filter (from group resolution)
- * @return bucketed stats
- */
- StatsTimeseries timeseries(java.time.Instant from, java.time.Instant to, int bucketCount,
- String routeId, List agentIds);
-}
diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java
index ab97c31e..17ff44c9 100644
--- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java
+++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java
@@ -75,7 +75,7 @@ public record SearchRequest(
if (!"asc".equalsIgnoreCase(sortDir)) sortDir = "desc";
}
- /** Returns the validated ClickHouse column name for ORDER BY. */
+ /** Returns the validated database column name for ORDER BY. */
public String sortColumn() {
return SORT_FIELD_TO_COLUMN.getOrDefault(sortField, "start_time");
}
diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java
deleted file mode 100644
index 3a2c4bd6..00000000
--- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.cameleer3.server.core.storage;
-
-import com.cameleer3.common.graph.RouteGraph;
-import com.cameleer3.server.core.ingestion.TaggedDiagram;
-
-import java.util.List;
-import java.util.Optional;
-
-/**
- * Repository for route diagram storage with content-hash deduplication.
- */
-public interface DiagramRepository {
-
- /**
- * Store a tagged route graph. Uses content-hash deduplication via ReplacingMergeTree.
- */
- void store(TaggedDiagram diagram);
-
- /**
- * Find a route graph by its content hash.
- */
- Optional findByContentHash(String contentHash);
-
- /**
- * Find the content hash for the latest diagram of a given route and agent.
- */
- Optional findContentHashForRoute(String routeId, String agentId);
-
- /**
- * Find the content hash for the latest diagram of a route across any agent in the given list.
- * All instances of the same application produce the same route graph, so any agent's
- * diagram for the same route will have the same content hash.
- */
- Optional findContentHashForRouteByAgents(String routeId, List agentIds);
-}
diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java
deleted file mode 100644
index c58c1f81..00000000
--- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package com.cameleer3.server.core.storage;
-
-import com.cameleer3.server.core.detail.RawExecutionRow;
-import com.cameleer3.server.core.ingestion.TaggedExecution;
-
-import java.util.List;
-import java.util.Optional;
-
-/**
- * Repository for route execution storage and retrieval.
- */
-public interface ExecutionRepository {
-
- /**
- * Insert a batch of tagged route executions.
- * Implementations must perform a single batch insert for efficiency.
- */
- void insertBatch(List executions);
-
- /**
- * Find a raw execution row by execution ID, including all parallel arrays
- * needed for processor tree reconstruction.
- *
- * @param executionId the execution ID to look up
- * @return the raw execution row, or empty if not found
- */
- Optional findRawById(String executionId);
-}
diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java
deleted file mode 100644
index ad15ef0a..00000000
--- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.cameleer3.server.core.storage;
-
-import com.cameleer3.server.core.storage.model.MetricsSnapshot;
-
-import java.util.List;
-
-/**
- * Repository for agent metrics batch inserts into ClickHouse.
- */
-public interface MetricsRepository {
-
- /**
- * Insert a batch of metrics snapshots.
- * Implementations must perform a single batch insert for efficiency.
- */
- void insertBatch(List metrics);
-}
diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java
index a6b4251a..89311bfe 100644
--- a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java
+++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/detail/TreeReconstructionTest.java
@@ -1,6 +1,7 @@
package com.cameleer3.server.core.detail;
-import com.cameleer3.server.core.storage.ExecutionRepository;
+import com.cameleer3.server.core.storage.ExecutionStore;
+import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import org.junit.jupiter.api.Test;
import java.time.Instant;
@@ -10,33 +11,36 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
/**
- * Unit tests for {@link DetailService#reconstructTree} logic.
+ * Unit tests for {@link DetailService#buildTree} logic.
*
- * Verifies correct parent-child wiring from flat parallel arrays.
+ * Verifies correct parent-child wiring from flat ProcessorRecord lists.
*/
class TreeReconstructionTest {
- private final DetailService detailService = new DetailService(mock(ExecutionRepository.class));
+ private final DetailService detailService = new DetailService(mock(ExecutionStore.class));
private static final Instant NOW = Instant.parse("2026-03-10T10:00:00Z");
+ private ProcessorRecord proc(String id, String type, String status,
+ int depth, String parentId) {
+ return new ProcessorRecord(
+ "exec-1", id, type, "node-" + id,
+ "default", "route1", depth, parentId,
+ status, NOW, NOW, 10L,
+ null, null, null, null, null, null
+ );
+ }
+
@Test
void linearChain_rootChildGrandchild() {
- // [root, child, grandchild], depths=[0,1,2], parents=[-1,0,1]
- List roots = detailService.reconstructTree(
- new String[]{"root", "child", "grandchild"},
- new String[]{"log", "bean", "to"},
- new String[]{"COMPLETED", "COMPLETED", "COMPLETED"},
- new Instant[]{NOW, NOW, NOW},
- new Instant[]{NOW, NOW, NOW},
- new long[]{10, 20, 30},
- new String[]{"n1", "n2", "n3"},
- new String[]{"", "", ""},
- new String[]{"", "", ""},
- new int[]{0, 1, 2},
- new int[]{-1, 0, 1}
+ List processors = List.of(
+ proc("root", "log", "COMPLETED", 0, null),
+ proc("child", "bean", "COMPLETED", 1, "root"),
+ proc("grandchild", "to", "COMPLETED", 2, "child")
);
+ List roots = detailService.buildTree(processors);
+
assertThat(roots).hasSize(1);
ProcessorNode root = roots.get(0);
assertThat(root.getProcessorId()).isEqualTo("root");
@@ -53,21 +57,14 @@ class TreeReconstructionTest {
@Test
void multipleRoots_noNesting() {
- // [A, B, C], depths=[0,0,0], parents=[-1,-1,-1]
- List roots = detailService.reconstructTree(
- new String[]{"A", "B", "C"},
- new String[]{"log", "log", "log"},
- new String[]{"COMPLETED", "COMPLETED", "COMPLETED"},
- new Instant[]{NOW, NOW, NOW},
- new Instant[]{NOW, NOW, NOW},
- new long[]{10, 20, 30},
- new String[]{"n1", "n2", "n3"},
- new String[]{"", "", ""},
- new String[]{"", "", ""},
- new int[]{0, 0, 0},
- new int[]{-1, -1, -1}
+ List processors = List.of(
+ proc("A", "log", "COMPLETED", 0, null),
+ proc("B", "log", "COMPLETED", 0, null),
+ proc("C", "log", "COMPLETED", 0, null)
);
+ List roots = detailService.buildTree(processors);
+
assertThat(roots).hasSize(3);
assertThat(roots.get(0).getProcessorId()).isEqualTo("A");
assertThat(roots.get(1).getProcessorId()).isEqualTo("B");
@@ -77,21 +74,15 @@ class TreeReconstructionTest {
@Test
void branchingTree_parentWithTwoChildren_secondChildHasGrandchild() {
- // [parent, child1, child2, grandchild], depths=[0,1,1,2], parents=[-1,0,0,2]
- List roots = detailService.reconstructTree(
- new String[]{"parent", "child1", "child2", "grandchild"},
- new String[]{"split", "log", "bean", "to"},
- new String[]{"COMPLETED", "COMPLETED", "COMPLETED", "COMPLETED"},
- new Instant[]{NOW, NOW, NOW, NOW},
- new Instant[]{NOW, NOW, NOW, NOW},
- new long[]{100, 20, 30, 5},
- new String[]{"n1", "n2", "n3", "n4"},
- new String[]{"", "", "", ""},
- new String[]{"", "", "", ""},
- new int[]{0, 1, 1, 2},
- new int[]{-1, 0, 0, 2}
+ List processors = List.of(
+ proc("parent", "split", "COMPLETED", 0, null),
+ proc("child1", "log", "COMPLETED", 1, "parent"),
+ proc("child2", "bean", "COMPLETED", 1, "parent"),
+ proc("grandchild", "to", "COMPLETED", 2, "child2")
);
+ List roots = detailService.buildTree(processors);
+
assertThat(roots).hasSize(1);
ProcessorNode parent = roots.get(0);
assertThat(parent.getProcessorId()).isEqualTo("parent");
@@ -111,30 +102,8 @@ class TreeReconstructionTest {
}
@Test
- void emptyArrays_producesEmptyList() {
- List roots = detailService.reconstructTree(
- new String[]{},
- new String[]{},
- new String[]{},
- new Instant[]{},
- new Instant[]{},
- new long[]{},
- new String[]{},
- new String[]{},
- new String[]{},
- new int[]{},
- new int[]{}
- );
-
- assertThat(roots).isEmpty();
- }
-
- @Test
- void nullArrays_producesEmptyList() {
- List roots = detailService.reconstructTree(
- null, null, null, null, null, null, null, null, null, null, null
- );
-
+ void emptyList_producesEmptyRoots() {
+ List roots = detailService.buildTree(List.of());
assertThat(roots).isEmpty();
}
}
diff --git a/clickhouse/init/01-schema.sql b/clickhouse/init/01-schema.sql
deleted file mode 100644
index ab56da70..00000000
--- a/clickhouse/init/01-schema.sql
+++ /dev/null
@@ -1,57 +0,0 @@
--- Cameleer3 ClickHouse Schema
--- Tables for route executions, route diagrams, and agent metrics.
-
-CREATE TABLE IF NOT EXISTS route_executions (
- execution_id String,
- route_id LowCardinality(String),
- agent_id LowCardinality(String),
- status LowCardinality(String),
- start_time DateTime64(3, 'UTC'),
- end_time Nullable(DateTime64(3, 'UTC')),
- duration_ms UInt64,
- correlation_id String,
- exchange_id String,
- error_message String DEFAULT '',
- error_stacktrace String DEFAULT '',
- -- Nested processor executions stored as parallel arrays
- processor_ids Array(String),
- processor_types Array(LowCardinality(String)),
- processor_starts Array(DateTime64(3, 'UTC')),
- processor_ends Array(DateTime64(3, 'UTC')),
- processor_durations Array(UInt64),
- processor_statuses Array(LowCardinality(String)),
- -- Metadata
- server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'),
- -- Skip indexes
- INDEX idx_correlation correlation_id TYPE bloom_filter GRANULARITY 4,
- INDEX idx_error error_message TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4
-)
-ENGINE = MergeTree()
-PARTITION BY toYYYYMMDD(start_time)
-ORDER BY (agent_id, status, start_time, execution_id)
-TTL toDateTime(start_time) + toIntervalDay(30)
-SETTINGS ttl_only_drop_parts = 1;
-
-CREATE TABLE IF NOT EXISTS route_diagrams (
- content_hash String,
- route_id LowCardinality(String),
- agent_id LowCardinality(String),
- definition String,
- created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
-)
-ENGINE = ReplacingMergeTree(created_at)
-ORDER BY (content_hash);
-
-CREATE TABLE IF NOT EXISTS agent_metrics (
- agent_id LowCardinality(String),
- collected_at DateTime64(3, 'UTC'),
- metric_name LowCardinality(String),
- metric_value Float64,
- tags Map(String, String),
- server_received_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
-)
-ENGINE = MergeTree()
-PARTITION BY toYYYYMMDD(collected_at)
-ORDER BY (agent_id, metric_name, collected_at)
-TTL toDateTime(collected_at) + toIntervalDay(30)
-SETTINGS ttl_only_drop_parts = 1;
diff --git a/clickhouse/init/02-search-columns.sql b/clickhouse/init/02-search-columns.sql
deleted file mode 100644
index 2b11b435..00000000
--- a/clickhouse/init/02-search-columns.sql
+++ /dev/null
@@ -1,25 +0,0 @@
--- Phase 2: Schema extension for search, detail, and diagram linking columns.
--- Adds exchange snapshot data, processor tree metadata, and diagram content hash.
-
-ALTER TABLE route_executions
- ADD COLUMN IF NOT EXISTS exchange_bodies String DEFAULT '',
- ADD COLUMN IF NOT EXISTS exchange_headers String DEFAULT '',
- ADD COLUMN IF NOT EXISTS processor_depths Array(UInt16) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_parent_indexes Array(Int32) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_error_messages Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_error_stacktraces Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_input_bodies Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_output_bodies Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_input_headers Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_output_headers Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS processor_diagram_node_ids Array(String) DEFAULT [],
- ADD COLUMN IF NOT EXISTS diagram_content_hash String DEFAULT '';
-
--- Skip indexes for full-text search on new text columns
-ALTER TABLE route_executions
- ADD INDEX IF NOT EXISTS idx_exchange_bodies exchange_bodies TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4,
- ADD INDEX IF NOT EXISTS idx_exchange_headers exchange_headers TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;
-
--- Skip index on error_stacktrace (not indexed in 01-schema.sql, needed for SRCH-05)
-ALTER TABLE route_executions
- ADD INDEX IF NOT EXISTS idx_error_stacktrace error_stacktrace TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;
diff --git a/clickhouse/init/03-users.sql b/clickhouse/init/03-users.sql
deleted file mode 100644
index 9dc7ce7a..00000000
--- a/clickhouse/init/03-users.sql
+++ /dev/null
@@ -1,10 +0,0 @@
-CREATE TABLE IF NOT EXISTS users (
- user_id String,
- provider LowCardinality(String),
- email String DEFAULT '',
- display_name String DEFAULT '',
- roles Array(LowCardinality(String)),
- created_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'),
- updated_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
-) ENGINE = ReplacingMergeTree(updated_at)
-ORDER BY (user_id);
diff --git a/clickhouse/init/04-oidc-config.sql b/clickhouse/init/04-oidc-config.sql
deleted file mode 100644
index 35b4d896..00000000
--- a/clickhouse/init/04-oidc-config.sql
+++ /dev/null
@@ -1,13 +0,0 @@
-CREATE TABLE IF NOT EXISTS oidc_config (
- config_id String DEFAULT 'default',
- enabled Bool DEFAULT false,
- issuer_uri String DEFAULT '',
- client_id String DEFAULT '',
- client_secret String DEFAULT '',
- roles_claim String DEFAULT 'realm_access.roles',
- default_roles Array(LowCardinality(String)),
- auto_signup Bool DEFAULT true,
- display_name_claim String DEFAULT 'name',
- updated_at DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC')
-) ENGINE = ReplacingMergeTree(updated_at)
-ORDER BY (config_id);
diff --git a/clickhouse/init/05-oidc-auto-signup.sql b/clickhouse/init/05-oidc-auto-signup.sql
deleted file mode 100644
index 643a69ea..00000000
--- a/clickhouse/init/05-oidc-auto-signup.sql
+++ /dev/null
@@ -1 +0,0 @@
-ALTER TABLE oidc_config ADD COLUMN IF NOT EXISTS auto_signup Bool DEFAULT true;
diff --git a/clickhouse/init/06-oidc-display-name-claim.sql b/clickhouse/init/06-oidc-display-name-claim.sql
deleted file mode 100644
index ef1870bd..00000000
--- a/clickhouse/init/06-oidc-display-name-claim.sql
+++ /dev/null
@@ -1 +0,0 @@
-ALTER TABLE oidc_config ADD COLUMN IF NOT EXISTS display_name_claim String DEFAULT 'name';
diff --git a/pom.xml b/pom.xml
index bca775b0..2f27d0fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,13 @@
cameleer3-server-core
${project.version}
+