From 188810e54b111f1c9482cb4b5fbfb8e8d6d49efa Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 1 Apr 2026 20:10:58 +0200 Subject: [PATCH] feat: remove TimescaleDB, dead PG stores, and storage feature flags MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Complete the ClickHouse migration by removing all PostgreSQL analytics code. PostgreSQL now serves only RBAC, config, and audit — all observability data is exclusively in ClickHouse. - Delete 6 dead PostgreSQL store classes (executions, stats, diagrams, events, metrics, metrics-query) and 2 integration tests - Delete RetentionScheduler (ClickHouse TTL handles retention) - Remove all 7 cameleer.storage.* feature flags from application.yml - Remove all @ConditionalOnProperty from ClickHouse beans in StorageBeanConfig - Consolidate 14 Flyway migrations (V1-V14) into single clean V1 with only RBAC/config/audit tables (no TimescaleDB, no analytics tables) - Switch from timescale/timescaledb-ha:pg16 to postgres:16 everywhere (docker-compose, deploy/postgres.yaml, test containers) - Remove TimescaleDB check and /metrics-pipeline from DatabaseAdminController - Set clickhouse.enabled default to true Co-Authored-By: Claude Opus 4.6 (1M context) --- CLAUDE.md | 2 +- HOWTO.md | 2 +- .../server/app/config/StorageBeanConfig.java | 31 -- .../controller/DatabaseAdminController.java | 36 +- .../app/dto/DatabaseStatusResponse.java | 3 +- .../app/retention/RetentionScheduler.java | 46 -- .../storage/PostgresAgentEventRepository.java | 64 --- .../app/storage/PostgresDiagramStore.java | 147 ------ .../app/storage/PostgresExecutionStore.java | 214 --------- .../storage/PostgresMetricsQueryStore.java | 55 --- .../app/storage/PostgresMetricsStore.java | 40 -- .../app/storage/PostgresStatsStore.java | 430 ------------------ .../src/main/resources/application.yml | 11 +- .../clickhouse/V4__stats_tables_and_mvs.sql | 2 +- .../V10__error_categorization_and_tracing.sql | 23 - .../db/migration/V11__has_trace_data.sql | 10 - .../db/migration/V12__app_settings.sql | 11 - .../resources/db/migration/V13__is_replay.sql | 7 - .../V14__rename_identity_columns.sql | 16 - .../main/resources/db/migration/V1__init.sql | 232 ++-------- .../resources/db/migration/V2__policies.sql | 38 -- .../V3__engine_level_and_snapshots.sql | 9 - .../db/migration/V4__application_config.sql | 9 - .../resources/db/migration/V5__attributes.sql | 2 - .../db/migration/V6__drop_diagram_node_id.sql | 1 - .../V7__diagram_application_name.sql | 2 - .../V8__processor_iteration_fields.sql | 5 - .../migration/V9__resolved_endpoint_uri.sql | 3 - .../server/app/AbstractPostgresIT.java | 8 +- .../server/app/storage/FlywayMigrationIT.java | 39 +- .../app/storage/PostgresExecutionStoreIT.java | 98 ---- .../app/storage/PostgresStatsStoreIT.java | 66 --- deploy/postgres.yaml | 2 +- docker-compose.yml | 2 +- ui/src/api/queries/admin/database.ts | 1 - ui/src/api/schema.d.ts | 4 +- ui/src/pages/Admin/DatabaseAdminPage.tsx | 1 - 37 files changed, 65 insertions(+), 1607 deletions(-) delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresAgentEventRepository.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V10__error_categorization_and_tracing.sql delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V11__has_trace_data.sql delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V12__app_settings.sql delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V13__is_replay.sql delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V14__rename_identity_columns.sql delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V2__policies.sql delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V3__engine_level_and_snapshots.sql delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V4__application_config.sql delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V5__attributes.sql delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V6__drop_diagram_node_id.sql delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V7__diagram_application_name.sql delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V8__processor_iteration_fields.sql delete mode 100644 cameleer3-server-app/src/main/resources/db/migration/V9__resolved_endpoint_uri.sql delete mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java delete mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java diff --git a/CLAUDE.md b/CLAUDE.md index 9beec93d..c17d8e9a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -38,7 +38,7 @@ java -jar cameleer3-server-app/target/cameleer3-server-app-1.0-SNAPSHOT.jar - Jackson `JavaTimeModule` for `Instant` deserialization - Communication: receives HTTP POST data from agents (executions, diagrams, metrics, logs), serves SSE event streams for config push/commands (config-update, deep-trace, replay, route-control) - Maintains agent instance registry with states: LIVE → STALE → DEAD -- Storage: PostgreSQL (TimescaleDB) for structured data, ClickHouse for analytics, search, logs, and time-series +- Storage: PostgreSQL for RBAC, config, and audit; ClickHouse for all observability data (executions, search, logs, metrics, stats, diagrams) - Security: JWT auth with RBAC (AGENT/VIEWER/OPERATOR/ADMIN roles), Ed25519 config signing, bootstrap token for registration - OIDC: Optional external identity provider support (token exchange pattern). Configured via admin API, stored in database (`server_config` table) - User persistence: PostgreSQL `users` table, admin CRUD at `/api/v1/admin/users` diff --git a/HOWTO.md b/HOWTO.md index 0d2367fb..22b5d18e 100644 --- a/HOWTO.md +++ b/HOWTO.md @@ -27,7 +27,7 @@ Start PostgreSQL: docker compose up -d ``` -This starts TimescaleDB (PostgreSQL 16). The database schema is applied automatically via Flyway migrations on server startup. ClickHouse tables are created by the schema initializer on startup. +This starts PostgreSQL 16. The database schema is applied automatically via Flyway migrations on server startup. ClickHouse tables are created by the schema initializer on startup. | Service | Port | Purpose | |------------|------|----------------------| diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 94c4c11d..9d0b0f6c 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -7,9 +7,6 @@ import com.cameleer3.server.app.storage.ClickHouseDiagramStore; import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore; import com.cameleer3.server.app.storage.ClickHouseMetricsStore; import com.cameleer3.server.app.storage.ClickHouseStatsStore; -import com.cameleer3.server.app.storage.PostgresExecutionStore; -import com.cameleer3.server.app.storage.PostgresMetricsQueryStore; -import com.cameleer3.server.app.storage.PostgresMetricsStore; import com.cameleer3.server.core.admin.AuditRepository; import com.cameleer3.server.core.admin.AuditService; import com.cameleer3.server.core.agent.AgentEventRepository; @@ -64,48 +61,26 @@ public class StorageBeanConfig { } @Bean - @ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse") public MetricsStore clickHouseMetricsStore( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseMetricsStore(clickHouseJdbc); } @Bean - @ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true) - public MetricsStore postgresMetricsStore(JdbcTemplate jdbc) { - return new PostgresMetricsStore(jdbc); - } - - @Bean - @ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse") public MetricsQueryStore clickHouseMetricsQueryStore( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseMetricsQueryStore(clickHouseJdbc); } - @Bean - @ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true) - public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) { - return new PostgresMetricsQueryStore(jdbc); - } - // ── Execution Store ────────────────────────────────────────────────── @Bean - @ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true) public ClickHouseExecutionStore clickHouseExecutionStore( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseExecutionStore(clickHouseJdbc); } @Bean - @ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "postgres") - public ExecutionStore executionStorePostgres(JdbcTemplate jdbc) { - return new PostgresExecutionStore(jdbc); - } - - @Bean - @ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true) public ChunkAccumulator chunkAccumulator( WriteBuffer executionBuffer, WriteBuffer processorBatchBuffer, @@ -118,7 +93,6 @@ public class StorageBeanConfig { } @Bean - @ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true) public ExecutionFlushScheduler executionFlushScheduler( WriteBuffer executionBuffer, WriteBuffer processorBatchBuffer, @@ -130,7 +104,6 @@ public class StorageBeanConfig { } @Bean - @ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse", matchIfMissing = true) public SearchIndex clickHouseSearchIndex( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseSearchIndex(clickHouseJdbc); @@ -139,7 +112,6 @@ public class StorageBeanConfig { // ── ClickHouse Stats Store ───────────────────────────────────────── @Bean - @ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "clickhouse", matchIfMissing = true) public StatsStore clickHouseStatsStore( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseStatsStore(clickHouseJdbc); @@ -148,7 +120,6 @@ public class StorageBeanConfig { // ── ClickHouse Diagram Store ────────────────────────────────────── @Bean - @ConditionalOnProperty(name = "cameleer.storage.diagrams", havingValue = "clickhouse", matchIfMissing = true) public DiagramStore clickHouseDiagramStore( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseDiagramStore(clickHouseJdbc); @@ -157,7 +128,6 @@ public class StorageBeanConfig { // ── ClickHouse Agent Event Repository ───────────────────────────── @Bean - @ConditionalOnProperty(name = "cameleer.storage.events", havingValue = "clickhouse", matchIfMissing = true) public AgentEventRepository clickHouseAgentEventRepository( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseAgentEventRepository(clickHouseJdbc); @@ -166,7 +136,6 @@ public class StorageBeanConfig { // ── ClickHouse Log Store ────────────────────────────────────────── @Bean - @ConditionalOnProperty(name = "cameleer.storage.logs", havingValue = "clickhouse", matchIfMissing = true) public LogIndex clickHouseLogStore( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseLogStore(clickHouseJdbc); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DatabaseAdminController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DatabaseAdminController.java index e9b83032..fb02ddd6 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DatabaseAdminController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DatabaseAdminController.java @@ -7,7 +7,6 @@ import com.cameleer3.server.app.dto.TableSizeResponse; import com.cameleer3.server.core.admin.AuditCategory; import com.cameleer3.server.core.admin.AuditResult; import com.cameleer3.server.core.admin.AuditService; -import com.cameleer3.server.core.ingestion.IngestionService; import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariPoolMXBean; import io.swagger.v3.oas.annotations.Operation; @@ -25,9 +24,7 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ResponseStatusException; import javax.sql.DataSource; -import java.time.Instant; import java.util.List; -import java.util.Map; @RestController @RequestMapping("/api/v1/admin/database") @@ -38,14 +35,12 @@ public class DatabaseAdminController { private final JdbcTemplate jdbc; private final DataSource dataSource; private final AuditService auditService; - private final IngestionService ingestionService; public DatabaseAdminController(JdbcTemplate jdbc, DataSource dataSource, - AuditService auditService, IngestionService ingestionService) { + AuditService auditService) { this.jdbc = jdbc; this.dataSource = dataSource; this.auditService = auditService; - this.ingestionService = ingestionService; } @GetMapping("/status") @@ -53,14 +48,12 @@ public class DatabaseAdminController { public ResponseEntity getStatus() { try { String version = jdbc.queryForObject("SELECT version()", String.class); - boolean timescaleDb = Boolean.TRUE.equals( - jdbc.queryForObject("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'timescaledb')", Boolean.class)); String schema = jdbc.queryForObject("SELECT current_schema()", String.class); String host = extractHost(dataSource); - return ResponseEntity.ok(new DatabaseStatusResponse(true, version, host, schema, timescaleDb)); + return ResponseEntity.ok(new DatabaseStatusResponse(true, version, host, schema)); } catch (Exception e) { return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE) - .body(new DatabaseStatusResponse(false, null, null, null, false)); + .body(new DatabaseStatusResponse(false, null, null, null)); } } @@ -124,29 +117,6 @@ public class DatabaseAdminController { return ResponseEntity.ok().build(); } - @GetMapping("/metrics-pipeline") - @Operation(summary = "Get metrics ingestion pipeline diagnostics") - public ResponseEntity> getMetricsPipeline() { - int bufferDepth = ingestionService.getMetricsBufferDepth(); - - Long totalRows = jdbc.queryForObject( - "SELECT count(*) FROM agent_metrics", Long.class); - List agentIds = jdbc.queryForList( - "SELECT DISTINCT instance_id FROM agent_metrics ORDER BY instance_id", String.class); - Instant latestCollected = jdbc.queryForObject( - "SELECT max(collected_at) FROM agent_metrics", Instant.class); - List metricNames = jdbc.queryForList( - "SELECT DISTINCT metric_name FROM agent_metrics ORDER BY metric_name", String.class); - - return ResponseEntity.ok(Map.of( - "bufferDepth", bufferDepth, - "totalRows", totalRows != null ? totalRows : 0, - "distinctAgents", agentIds, - "distinctMetrics", metricNames, - "latestCollectedAt", latestCollected != null ? latestCollected.toString() : "none" - )); - } - private String extractHost(DataSource ds) { try { if (ds instanceof HikariDataSource hds) { diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/DatabaseStatusResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/DatabaseStatusResponse.java index c845c355..153a2cac 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/DatabaseStatusResponse.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/DatabaseStatusResponse.java @@ -7,6 +7,5 @@ public record DatabaseStatusResponse( @Schema(description = "Whether the database is reachable") boolean connected, @Schema(description = "PostgreSQL version string") String version, @Schema(description = "Database host") String host, - @Schema(description = "Current schema search path") String schema, - @Schema(description = "Whether TimescaleDB extension is available") boolean timescaleDb + @Schema(description = "Current schema") String schema ) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java deleted file mode 100644 index 1a48d3fd..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.cameleer3.server.app.retention; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -@Component -public class RetentionScheduler { - - private static final Logger log = LoggerFactory.getLogger(RetentionScheduler.class); - - private final JdbcTemplate jdbc; - private final int retentionDays; - - public RetentionScheduler(JdbcTemplate jdbc, - @Value("${cameleer.retention-days:30}") int retentionDays) { - this.jdbc = jdbc; - this.retentionDays = retentionDays; - } - - @Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM UTC - public void dropExpiredChunks() { - String interval = retentionDays + " days"; - try { - // Raw data - jdbc.execute("SELECT drop_chunks('executions', INTERVAL '" + interval + "')"); - jdbc.execute("SELECT drop_chunks('processor_executions', INTERVAL '" + interval + "')"); - jdbc.execute("SELECT drop_chunks('agent_metrics', INTERVAL '" + interval + "')"); - - // Continuous aggregates (keep 3x longer) - String caggInterval = (retentionDays * 3) + " days"; - jdbc.execute("SELECT drop_chunks('stats_1m_all', INTERVAL '" + caggInterval + "')"); - jdbc.execute("SELECT drop_chunks('stats_1m_app', INTERVAL '" + caggInterval + "')"); - jdbc.execute("SELECT drop_chunks('stats_1m_route', INTERVAL '" + caggInterval + "')"); - jdbc.execute("SELECT drop_chunks('stats_1m_processor', INTERVAL '" + caggInterval + "')"); - - log.info("Retention: dropped chunks older than {} days (aggregates: {} days)", - retentionDays, retentionDays * 3); - } catch (Exception e) { - log.error("Retention job failed", e); - } - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresAgentEventRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresAgentEventRepository.java deleted file mode 100644 index 0cfb3a41..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresAgentEventRepository.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.server.core.agent.AgentEventRecord; -import com.cameleer3.server.core.agent.AgentEventRepository; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Repository; - -import java.sql.Timestamp; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; - -@Repository -@ConditionalOnProperty(name = "cameleer.storage.events", havingValue = "postgres") -public class PostgresAgentEventRepository implements AgentEventRepository { - - private final JdbcTemplate jdbc; - - public PostgresAgentEventRepository(JdbcTemplate jdbc) { - this.jdbc = jdbc; - } - - @Override - public void insert(String instanceId, String applicationId, String eventType, String detail) { - jdbc.update( - "INSERT INTO agent_events (instance_id, application_id, event_type, detail) VALUES (?, ?, ?, ?)", - instanceId, applicationId, eventType, detail); - } - - @Override - public List query(String applicationId, String instanceId, Instant from, Instant to, int limit) { - var sql = new StringBuilder("SELECT id, instance_id, application_id, event_type, detail, timestamp FROM agent_events WHERE 1=1"); - var params = new ArrayList(); - - if (applicationId != null) { - sql.append(" AND application_id = ?"); - params.add(applicationId); - } - if (instanceId != null) { - sql.append(" AND instance_id = ?"); - params.add(instanceId); - } - if (from != null) { - sql.append(" AND timestamp >= ?"); - params.add(Timestamp.from(from)); - } - if (to != null) { - sql.append(" AND timestamp < ?"); - params.add(Timestamp.from(to)); - } - sql.append(" ORDER BY timestamp DESC LIMIT ?"); - params.add(limit); - - return jdbc.query(sql.toString(), (rs, rowNum) -> new AgentEventRecord( - rs.getLong("id"), - rs.getString("instance_id"), - rs.getString("application_id"), - rs.getString("event_type"), - rs.getString("detail"), - rs.getTimestamp("timestamp").toInstant() - ), params.toArray()); - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java deleted file mode 100644 index 84ddc16f..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java +++ /dev/null @@ -1,147 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.common.graph.RouteGraph; -import com.cameleer3.server.core.ingestion.TaggedDiagram; -import com.cameleer3.server.core.storage.DiagramStore; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Repository; - -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HexFormat; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -/** - * PostgreSQL implementation of {@link DiagramStore}. - *

- * Stores route graphs as JSON with SHA-256 content-hash deduplication. - * Uses {@code ON CONFLICT (content_hash) DO NOTHING} for idempotent inserts. - */ -@Repository -@ConditionalOnProperty(name = "cameleer.storage.diagrams", havingValue = "postgres") -public class PostgresDiagramStore implements DiagramStore { - - private static final Logger log = LoggerFactory.getLogger(PostgresDiagramStore.class); - - private static final String INSERT_SQL = """ - INSERT INTO route_diagrams (content_hash, route_id, instance_id, application_id, definition) - VALUES (?, ?, ?, ?, ?::jsonb) - ON CONFLICT (content_hash) DO NOTHING - """; - - private static final String SELECT_BY_HASH = """ - SELECT definition FROM route_diagrams WHERE content_hash = ? LIMIT 1 - """; - - private static final String SELECT_HASH_FOR_ROUTE = """ - SELECT content_hash FROM route_diagrams - WHERE route_id = ? AND instance_id = ? - ORDER BY created_at DESC LIMIT 1 - """; - - private final JdbcTemplate jdbcTemplate; - private final ObjectMapper objectMapper; - - public PostgresDiagramStore(JdbcTemplate jdbcTemplate) { - this.jdbcTemplate = jdbcTemplate; - this.objectMapper = new ObjectMapper(); - this.objectMapper.registerModule(new JavaTimeModule()); - } - - @Override - public void store(TaggedDiagram diagram) { - try { - RouteGraph graph = diagram.graph(); - String agentId = diagram.instanceId() != null ? diagram.instanceId() : ""; - String applicationId = diagram.applicationId() != null ? diagram.applicationId() : ""; - String json = objectMapper.writeValueAsString(graph); - String contentHash = sha256Hex(json); - String routeId = graph.getRouteId() != null ? graph.getRouteId() : ""; - - jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, applicationId, json); - log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash); - } catch (JsonProcessingException e) { - throw new RuntimeException("Failed to serialize RouteGraph to JSON", e); - } - } - - @Override - public Optional findByContentHash(String contentHash) { - List> rows = jdbcTemplate.queryForList(SELECT_BY_HASH, contentHash); - if (rows.isEmpty()) { - return Optional.empty(); - } - String json = (String) rows.get(0).get("definition"); - try { - return Optional.of(objectMapper.readValue(json, RouteGraph.class)); - } catch (JsonProcessingException e) { - log.error("Failed to deserialize RouteGraph from PostgreSQL", e); - return Optional.empty(); - } - } - - @Override - public Optional findContentHashForRoute(String routeId, String agentId) { - List> rows = jdbcTemplate.queryForList(SELECT_HASH_FOR_ROUTE, routeId, agentId); - if (rows.isEmpty()) { - return Optional.empty(); - } - return Optional.of((String) rows.get(0).get("content_hash")); - } - - @Override - public Optional findContentHashForRouteByAgents(String routeId, List agentIds) { - if (agentIds == null || agentIds.isEmpty()) { - return Optional.empty(); - } - String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?")); - String sql = "SELECT content_hash FROM route_diagrams " + - "WHERE route_id = ? AND instance_id IN (" + placeholders + ") " + - "ORDER BY created_at DESC LIMIT 1"; - var params = new ArrayList(); - params.add(routeId); - params.addAll(agentIds); - List> rows = jdbcTemplate.queryForList(sql, params.toArray()); - if (rows.isEmpty()) { - return Optional.empty(); - } - return Optional.of((String) rows.get(0).get("content_hash")); - } - - @Override - public Map findProcessorRouteMapping(String applicationId) { - Map mapping = new HashMap<>(); - jdbcTemplate.query(""" - SELECT DISTINCT rd.route_id, node_elem->>'id' AS processor_id - FROM route_diagrams rd, - jsonb_array_elements(rd.definition::jsonb->'nodes') AS node_elem - WHERE rd.application_id = ? - AND node_elem->>'id' IS NOT NULL - """, - rs -> { mapping.put(rs.getString("processor_id"), rs.getString("route_id")); }, - applicationId); - return mapping; - } - - static String sha256Hex(String input) { - try { - MessageDigest digest = MessageDigest.getInstance("SHA-256"); - byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8)); - return HexFormat.of().formatHex(hash); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException("SHA-256 not available", e); - } - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java deleted file mode 100644 index fea3c42a..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java +++ /dev/null @@ -1,214 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.server.core.storage.ExecutionStore; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.RowMapper; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.time.Instant; -import java.util.List; -import java.util.Optional; - -public class PostgresExecutionStore implements ExecutionStore { - - private final JdbcTemplate jdbc; - - public PostgresExecutionStore(JdbcTemplate jdbc) { - this.jdbc = jdbc; - } - - @Override - public void upsert(ExecutionRecord execution) { - jdbc.update(""" - INSERT INTO executions (execution_id, route_id, instance_id, application_id, - status, correlation_id, exchange_id, start_time, end_time, - duration_ms, error_message, error_stacktrace, diagram_content_hash, - engine_level, input_body, output_body, input_headers, output_headers, - attributes, - error_type, error_category, root_cause_type, root_cause_message, - trace_id, span_id, - processors_json, has_trace_data, is_replay, - created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, - ?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?, now(), now()) - ON CONFLICT (execution_id, start_time) DO UPDATE SET - status = CASE - WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED') - AND executions.status = 'RUNNING' - THEN EXCLUDED.status - WHEN EXCLUDED.status = executions.status THEN executions.status - ELSE EXCLUDED.status - END, - end_time = COALESCE(EXCLUDED.end_time, executions.end_time), - duration_ms = COALESCE(EXCLUDED.duration_ms, executions.duration_ms), - error_message = COALESCE(EXCLUDED.error_message, executions.error_message), - error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace), - diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash), - engine_level = COALESCE(EXCLUDED.engine_level, executions.engine_level), - input_body = COALESCE(EXCLUDED.input_body, executions.input_body), - output_body = COALESCE(EXCLUDED.output_body, executions.output_body), - input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers), - output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers), - attributes = COALESCE(EXCLUDED.attributes, executions.attributes), - error_type = COALESCE(EXCLUDED.error_type, executions.error_type), - error_category = COALESCE(EXCLUDED.error_category, executions.error_category), - root_cause_type = COALESCE(EXCLUDED.root_cause_type, executions.root_cause_type), - root_cause_message = COALESCE(EXCLUDED.root_cause_message, executions.root_cause_message), - trace_id = COALESCE(EXCLUDED.trace_id, executions.trace_id), - span_id = COALESCE(EXCLUDED.span_id, executions.span_id), - processors_json = COALESCE(EXCLUDED.processors_json, executions.processors_json), - has_trace_data = EXCLUDED.has_trace_data OR executions.has_trace_data, - is_replay = EXCLUDED.is_replay OR executions.is_replay, - updated_at = now() - """, - execution.executionId(), execution.routeId(), execution.instanceId(), - execution.applicationId(), execution.status(), execution.correlationId(), - execution.exchangeId(), - Timestamp.from(execution.startTime()), - execution.endTime() != null ? Timestamp.from(execution.endTime()) : null, - execution.durationMs(), execution.errorMessage(), - execution.errorStacktrace(), execution.diagramContentHash(), - execution.engineLevel(), - execution.inputBody(), execution.outputBody(), - execution.inputHeaders(), execution.outputHeaders(), - execution.attributes(), - execution.errorType(), execution.errorCategory(), - execution.rootCauseType(), execution.rootCauseMessage(), - execution.traceId(), execution.spanId(), - execution.processorsJson(), execution.hasTraceData(), execution.isReplay()); - } - - @Override - public void upsertProcessors(String executionId, Instant startTime, - String applicationId, String routeId, - List processors) { - jdbc.batchUpdate(""" - INSERT INTO processor_executions (execution_id, processor_id, processor_type, - application_id, route_id, depth, parent_processor_id, - status, start_time, end_time, duration_ms, error_message, error_stacktrace, - input_body, output_body, input_headers, output_headers, attributes, - loop_index, loop_size, split_index, split_size, multicast_index, - resolved_endpoint_uri, - error_type, error_category, root_cause_type, root_cause_message, - error_handler_type, circuit_breaker_state, fallback_triggered) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET - status = EXCLUDED.status, - end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time), - duration_ms = COALESCE(EXCLUDED.duration_ms, processor_executions.duration_ms), - error_message = COALESCE(EXCLUDED.error_message, processor_executions.error_message), - error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, processor_executions.error_stacktrace), - input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body), - output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body), - input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers), - output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers), - attributes = COALESCE(EXCLUDED.attributes, processor_executions.attributes), - loop_index = COALESCE(EXCLUDED.loop_index, processor_executions.loop_index), - loop_size = COALESCE(EXCLUDED.loop_size, processor_executions.loop_size), - split_index = COALESCE(EXCLUDED.split_index, processor_executions.split_index), - split_size = COALESCE(EXCLUDED.split_size, processor_executions.split_size), - multicast_index = COALESCE(EXCLUDED.multicast_index, processor_executions.multicast_index), - resolved_endpoint_uri = COALESCE(EXCLUDED.resolved_endpoint_uri, processor_executions.resolved_endpoint_uri), - error_type = COALESCE(EXCLUDED.error_type, processor_executions.error_type), - error_category = COALESCE(EXCLUDED.error_category, processor_executions.error_category), - root_cause_type = COALESCE(EXCLUDED.root_cause_type, processor_executions.root_cause_type), - root_cause_message = COALESCE(EXCLUDED.root_cause_message, processor_executions.root_cause_message), - error_handler_type = COALESCE(EXCLUDED.error_handler_type, processor_executions.error_handler_type), - circuit_breaker_state = COALESCE(EXCLUDED.circuit_breaker_state, processor_executions.circuit_breaker_state), - fallback_triggered = COALESCE(EXCLUDED.fallback_triggered, processor_executions.fallback_triggered) - """, - processors.stream().map(p -> new Object[]{ - p.executionId(), p.processorId(), p.processorType(), - p.applicationId(), p.routeId(), - p.depth(), p.parentProcessorId(), p.status(), - Timestamp.from(p.startTime()), - p.endTime() != null ? Timestamp.from(p.endTime()) : null, - p.durationMs(), p.errorMessage(), p.errorStacktrace(), - p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders(), - p.attributes(), - p.loopIndex(), p.loopSize(), p.splitIndex(), p.splitSize(), - p.multicastIndex(), - p.resolvedEndpointUri(), - p.errorType(), p.errorCategory(), - p.rootCauseType(), p.rootCauseMessage(), - p.errorHandlerType(), p.circuitBreakerState(), - p.fallbackTriggered() - }).toList()); - } - - @Override - public Optional findById(String executionId) { - List results = jdbc.query( - "SELECT * FROM executions WHERE execution_id = ? ORDER BY start_time DESC LIMIT 1", - EXECUTION_MAPPER, executionId); - return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); - } - - @Override - public List findProcessors(String executionId) { - return jdbc.query( - "SELECT * FROM processor_executions WHERE execution_id = ? ORDER BY depth, start_time", - PROCESSOR_MAPPER, executionId); - } - - @Override - public Optional findProcessorById(String executionId, String processorId) { - String sql = "SELECT * FROM processor_executions WHERE execution_id = ? AND processor_id = ? LIMIT 1"; - List results = jdbc.query(sql, PROCESSOR_MAPPER, executionId, processorId); - return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); - } - - private static final RowMapper EXECUTION_MAPPER = (rs, rowNum) -> - new ExecutionRecord( - rs.getString("execution_id"), rs.getString("route_id"), - rs.getString("instance_id"), rs.getString("application_id"), - rs.getString("status"), rs.getString("correlation_id"), - rs.getString("exchange_id"), - toInstant(rs, "start_time"), toInstant(rs, "end_time"), - rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, - rs.getString("error_message"), rs.getString("error_stacktrace"), - rs.getString("diagram_content_hash"), - rs.getString("engine_level"), - rs.getString("input_body"), rs.getString("output_body"), - rs.getString("input_headers"), rs.getString("output_headers"), - rs.getString("attributes"), - rs.getString("error_type"), rs.getString("error_category"), - rs.getString("root_cause_type"), rs.getString("root_cause_message"), - rs.getString("trace_id"), rs.getString("span_id"), - rs.getString("processors_json"), - rs.getBoolean("has_trace_data"), - rs.getBoolean("is_replay")); - - private static final RowMapper PROCESSOR_MAPPER = (rs, rowNum) -> - new ProcessorRecord( - rs.getString("execution_id"), rs.getString("processor_id"), - rs.getString("processor_type"), - rs.getString("application_id"), rs.getString("route_id"), - rs.getInt("depth"), rs.getString("parent_processor_id"), - rs.getString("status"), - toInstant(rs, "start_time"), toInstant(rs, "end_time"), - rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, - rs.getString("error_message"), rs.getString("error_stacktrace"), - rs.getString("input_body"), rs.getString("output_body"), - rs.getString("input_headers"), rs.getString("output_headers"), - rs.getString("attributes"), - rs.getObject("loop_index") != null ? rs.getInt("loop_index") : null, - rs.getObject("loop_size") != null ? rs.getInt("loop_size") : null, - rs.getObject("split_index") != null ? rs.getInt("split_index") : null, - rs.getObject("split_size") != null ? rs.getInt("split_size") : null, - rs.getObject("multicast_index") != null ? rs.getInt("multicast_index") : null, - rs.getString("resolved_endpoint_uri"), - rs.getString("error_type"), rs.getString("error_category"), - rs.getString("root_cause_type"), rs.getString("root_cause_message"), - rs.getString("error_handler_type"), rs.getString("circuit_breaker_state"), - rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null, - null, null, null, null, null, null); - - private static Instant toInstant(ResultSet rs, String column) throws SQLException { - Timestamp ts = rs.getTimestamp(column); - return ts != null ? ts.toInstant() : null; - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java deleted file mode 100644 index c9377703..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.server.core.storage.MetricsQueryStore; -import com.cameleer3.server.core.storage.model.MetricTimeSeries; -import org.springframework.jdbc.core.JdbcTemplate; - -import java.sql.Timestamp; -import java.time.Instant; -import java.util.*; - -public class PostgresMetricsQueryStore implements MetricsQueryStore { - - private final JdbcTemplate jdbc; - - public PostgresMetricsQueryStore(JdbcTemplate jdbc) { - this.jdbc = jdbc; - } - - @Override - public Map> queryTimeSeries( - String agentId, List metricNames, - Instant from, Instant to, int buckets) { - - long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1); - String intervalStr = intervalMs + " milliseconds"; - - Map> result = new LinkedHashMap<>(); - for (String name : metricNames) { - result.put(name.trim(), new ArrayList<>()); - } - - String sql = """ - SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket, - metric_name, - AVG(metric_value) AS avg_value - FROM agent_metrics - WHERE instance_id = ? - AND collected_at >= ? AND collected_at < ? - AND metric_name = ANY(?) - GROUP BY bucket, metric_name - ORDER BY bucket - """; - - String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); - jdbc.query(sql, rs -> { - String metricName = rs.getString("metric_name"); - Instant bucket = rs.getTimestamp("bucket").toInstant(); - double value = rs.getDouble("avg_value"); - result.computeIfAbsent(metricName, k -> new ArrayList<>()) - .add(new MetricTimeSeries.Bucket(bucket, value)); - }, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray); - - return result; - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java deleted file mode 100644 index ec36ca45..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.server.core.storage.MetricsStore; -import com.cameleer3.server.core.storage.model.MetricsSnapshot; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.springframework.jdbc.core.JdbcTemplate; - -import java.sql.Timestamp; -import java.util.List; - -public class PostgresMetricsStore implements MetricsStore { - - private static final ObjectMapper MAPPER = new ObjectMapper(); - private final JdbcTemplate jdbc; - - public PostgresMetricsStore(JdbcTemplate jdbc) { - this.jdbc = jdbc; - } - - @Override - public void insertBatch(List snapshots) { - jdbc.batchUpdate(""" - INSERT INTO agent_metrics (instance_id, metric_name, metric_value, tags, - collected_at, server_received_at) - VALUES (?, ?, ?, ?::jsonb, ?, now()) - """, - snapshots.stream().map(s -> new Object[]{ - s.instanceId(), s.metricName(), s.metricValue(), - tagsToJson(s.tags()), - Timestamp.from(s.collectedAt()) - }).toList()); - } - - private String tagsToJson(java.util.Map tags) { - if (tags == null || tags.isEmpty()) return null; - try { return MAPPER.writeValueAsString(tags); } - catch (JsonProcessingException e) { return null; } - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java deleted file mode 100644 index f690e312..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java +++ /dev/null @@ -1,430 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.server.core.search.ExecutionStats; -import com.cameleer3.server.core.search.StatsTimeseries; -import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket; -import com.cameleer3.server.core.search.TopError; -import com.cameleer3.server.core.storage.StatsStore; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Repository; - -import java.sql.Timestamp; -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -@Repository -@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "postgres") -public class PostgresStatsStore implements StatsStore { - - private final JdbcTemplate jdbc; - - public PostgresStatsStore(JdbcTemplate jdbc) { - this.jdbc = jdbc; - } - - @Override - public ExecutionStats stats(Instant from, Instant to) { - return queryStats("stats_1m_all", from, to, List.of()); - } - - @Override - public ExecutionStats statsForApp(Instant from, Instant to, String applicationId) { - return queryStats("stats_1m_app", from, to, List.of( - new Filter("application_id", applicationId))); - } - - @Override - public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds) { - // Note: agentIds is accepted for interface compatibility but not filterable - // on the continuous aggregate (it groups by route_id, not instance_id). - // All agents for the same route contribute to the same aggregate. - return queryStats("stats_1m_route", from, to, List.of( - new Filter("route_id", routeId))); - } - - @Override - public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) { - return queryStats("stats_1m_processor", from, to, List.of( - new Filter("route_id", routeId), - new Filter("processor_type", processorType))); - } - - @Override - public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) { - return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true); - } - - @Override - public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationId) { - return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of( - new Filter("application_id", applicationId)), true); - } - - @Override - public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount, - String routeId, List agentIds) { - return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of( - new Filter("route_id", routeId)), true); - } - - @Override - public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount, - String routeId, String processorType) { - // stats_1m_processor does NOT have running_count column - return queryTimeseries("stats_1m_processor", from, to, bucketCount, List.of( - new Filter("route_id", routeId), - new Filter("processor_type", processorType)), false); - } - - private record Filter(String column, String value) {} - - private ExecutionStats queryStats(String view, Instant from, Instant to, List filters) { - // running_count only exists on execution-level aggregates, not processor - boolean hasRunning = !view.equals("stats_1m_processor"); - String runningCol = hasRunning ? "COALESCE(SUM(running_count), 0)" : "0"; - - String sql = "SELECT COALESCE(SUM(total_count), 0) AS total_count, " + - "COALESCE(SUM(failed_count), 0) AS failed_count, " + - "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " + - "COALESCE(MAX(p99_duration), 0) AS p99_duration, " + - runningCol + " AS active_count " + - "FROM " + view + " WHERE bucket >= ? AND bucket < ?"; - - List params = new ArrayList<>(); - params.add(Timestamp.from(from)); - params.add(Timestamp.from(to)); - for (Filter f : filters) { - sql += " AND " + f.column() + " = ?"; - params.add(f.value()); - } - - long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0; - var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{ - rs.getLong("total_count"), rs.getLong("failed_count"), - rs.getLong("avg_duration"), rs.getLong("p99_duration"), - rs.getLong("active_count") - }, params.toArray()); - if (!currentResult.isEmpty()) { - long[] r = currentResult.get(0); - totalCount = r[0]; failedCount = r[1]; avgDuration = r[2]; - p99Duration = r[3]; activeCount = r[4]; - } - - // Previous period (shifted back 24h) - Instant prevFrom = from.minus(Duration.ofHours(24)); - Instant prevTo = to.minus(Duration.ofHours(24)); - List prevParams = new ArrayList<>(); - prevParams.add(Timestamp.from(prevFrom)); - prevParams.add(Timestamp.from(prevTo)); - for (Filter f : filters) prevParams.add(f.value()); - String prevSql = sql; // same shape, different time params - - long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0; - var prevResult = jdbc.query(prevSql, (rs, rowNum) -> new long[]{ - rs.getLong("total_count"), rs.getLong("failed_count"), - rs.getLong("avg_duration"), rs.getLong("p99_duration") - }, prevParams.toArray()); - if (!prevResult.isEmpty()) { - long[] r = prevResult.get(0); - prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3]; - } - - // Today total (from midnight UTC) - Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS); - List todayParams = new ArrayList<>(); - todayParams.add(Timestamp.from(todayStart)); - todayParams.add(Timestamp.from(Instant.now())); - for (Filter f : filters) todayParams.add(f.value()); - String todaySql = sql; - - long totalToday = 0; - var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"), - todayParams.toArray()); - if (!todayResult.isEmpty()) totalToday = todayResult.get(0); - - return new ExecutionStats( - totalCount, failedCount, avgDuration, p99Duration, activeCount, - totalToday, prevTotal, prevFailed, prevAvg, prevP99); - } - - private StatsTimeseries queryTimeseries(String view, Instant from, Instant to, - int bucketCount, List filters, - boolean hasRunningCount) { - long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1); - if (intervalSeconds < 60) intervalSeconds = 60; - - String runningCol = hasRunningCount ? "COALESCE(SUM(running_count), 0)" : "0"; - - String sql = "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " + - "COALESCE(SUM(total_count), 0) AS total_count, " + - "COALESCE(SUM(failed_count), 0) AS failed_count, " + - "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " + - "COALESCE(MAX(p99_duration), 0) AS p99_duration, " + - runningCol + " AS active_count " + - "FROM " + view + " WHERE bucket >= ? AND bucket < ?"; - - List params = new ArrayList<>(); - params.add(intervalSeconds); - params.add(Timestamp.from(from)); - params.add(Timestamp.from(to)); - for (Filter f : filters) { - sql += " AND " + f.column() + " = ?"; - params.add(f.value()); - } - sql += " GROUP BY period ORDER BY period"; - - List buckets = jdbc.query(sql, (rs, rowNum) -> - new TimeseriesBucket( - rs.getTimestamp("period").toInstant(), - rs.getLong("total_count"), rs.getLong("failed_count"), - rs.getLong("avg_duration"), rs.getLong("p99_duration"), - rs.getLong("active_count") - ), params.toArray()); - - return new StatsTimeseries(buckets); - } - - // ── Grouped timeseries ──────────────────────────────────────────────── - - @Override - public Map timeseriesGroupedByApp(Instant from, Instant to, int bucketCount) { - return queryGroupedTimeseries("stats_1m_app", "application_id", from, to, - bucketCount, List.of()); - } - - @Override - public Map timeseriesGroupedByRoute(Instant from, Instant to, - int bucketCount, String applicationId) { - return queryGroupedTimeseries("stats_1m_route", "route_id", from, to, - bucketCount, List.of(new Filter("application_id", applicationId))); - } - - private Map queryGroupedTimeseries( - String view, String groupCol, Instant from, Instant to, - int bucketCount, List filters) { - - long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1); - if (intervalSeconds < 60) intervalSeconds = 60; - - String sql = "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " + - groupCol + " AS group_key, " + - "COALESCE(SUM(total_count), 0) AS total_count, " + - "COALESCE(SUM(failed_count), 0) AS failed_count, " + - "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " + - "COALESCE(MAX(p99_duration), 0) AS p99_duration, " + - "COALESCE(SUM(running_count), 0) AS active_count " + - "FROM " + view + " WHERE bucket >= ? AND bucket < ?"; - - List params = new ArrayList<>(); - params.add(intervalSeconds); - params.add(Timestamp.from(from)); - params.add(Timestamp.from(to)); - for (Filter f : filters) { - sql += " AND " + f.column() + " = ?"; - params.add(f.value()); - } - sql += " GROUP BY period, group_key ORDER BY period, group_key"; - - Map> grouped = new LinkedHashMap<>(); - jdbc.query(sql, (rs) -> { - String key = rs.getString("group_key"); - TimeseriesBucket bucket = new TimeseriesBucket( - rs.getTimestamp("period").toInstant(), - rs.getLong("total_count"), rs.getLong("failed_count"), - rs.getLong("avg_duration"), rs.getLong("p99_duration"), - rs.getLong("active_count")); - grouped.computeIfAbsent(key, k -> new ArrayList<>()).add(bucket); - }, params.toArray()); - - Map result = new LinkedHashMap<>(); - grouped.forEach((key, buckets) -> result.put(key, new StatsTimeseries(buckets))); - return result; - } - - // ── SLA compliance ──────────────────────────────────────────────────── - - @Override - public double slaCompliance(Instant from, Instant to, int thresholdMs, - String applicationId, String routeId) { - String sql = "SELECT " + - "COUNT(*) FILTER (WHERE duration_ms <= ? AND status != 'RUNNING') AS compliant, " + - "COUNT(*) FILTER (WHERE status != 'RUNNING') AS total " + - "FROM executions WHERE start_time >= ? AND start_time < ?"; - - List params = new ArrayList<>(); - params.add(thresholdMs); - params.add(Timestamp.from(from)); - params.add(Timestamp.from(to)); - if (applicationId != null) { - sql += " AND application_id = ?"; - params.add(applicationId); - } - if (routeId != null) { - sql += " AND route_id = ?"; - params.add(routeId); - } - - return jdbc.query(sql, (rs, rowNum) -> { - long total = rs.getLong("total"); - if (total == 0) return 1.0; - return rs.getLong("compliant") * 100.0 / total; - }, params.toArray()).stream().findFirst().orElse(1.0); - } - - @Override - public Map slaCountsByApp(Instant from, Instant to, int defaultThresholdMs) { - String sql = "SELECT application_id, " + - "COUNT(*) FILTER (WHERE duration_ms <= ? AND status != 'RUNNING') AS compliant, " + - "COUNT(*) FILTER (WHERE status != 'RUNNING') AS total " + - "FROM executions WHERE start_time >= ? AND start_time < ? " + - "GROUP BY application_id"; - - Map result = new LinkedHashMap<>(); - jdbc.query(sql, (rs) -> { - result.put(rs.getString("application_id"), - new long[]{rs.getLong("compliant"), rs.getLong("total")}); - }, defaultThresholdMs, Timestamp.from(from), Timestamp.from(to)); - return result; - } - - @Override - public Map slaCountsByRoute(Instant from, Instant to, - String applicationId, int thresholdMs) { - String sql = "SELECT route_id, " + - "COUNT(*) FILTER (WHERE duration_ms <= ? AND status != 'RUNNING') AS compliant, " + - "COUNT(*) FILTER (WHERE status != 'RUNNING') AS total " + - "FROM executions WHERE start_time >= ? AND start_time < ? " + - "AND application_id = ? GROUP BY route_id"; - - Map result = new LinkedHashMap<>(); - jdbc.query(sql, (rs) -> { - result.put(rs.getString("route_id"), - new long[]{rs.getLong("compliant"), rs.getLong("total")}); - }, thresholdMs, Timestamp.from(from), Timestamp.from(to), applicationId); - return result; - } - - // ── Top errors ──────────────────────────────────────────────────────── - - @Override - public List topErrors(Instant from, Instant to, String applicationId, - String routeId, int limit) { - StringBuilder where = new StringBuilder( - "status = 'FAILED' AND start_time >= ? AND start_time < ?"); - List params = new ArrayList<>(); - params.add(Timestamp.from(from)); - params.add(Timestamp.from(to)); - if (applicationId != null) { - where.append(" AND application_id = ?"); - params.add(applicationId); - } - - String table; - String groupId; - if (routeId != null) { - // L3: attribute errors to processors - table = "processor_executions"; - groupId = "processor_id"; - where.append(" AND route_id = ?"); - params.add(routeId); - } else { - // L1/L2: attribute errors to routes - table = "executions"; - groupId = "route_id"; - } - - Instant fiveMinAgo = Instant.now().minus(5, ChronoUnit.MINUTES); - Instant tenMinAgo = Instant.now().minus(10, ChronoUnit.MINUTES); - - String sql = "WITH counted AS (" + - " SELECT COALESCE(error_type, LEFT(error_message, 200)) AS error_key, " + - " " + groupId + " AS group_id, " + - " COUNT(*) AS cnt, MAX(start_time) AS last_seen " + - " FROM " + table + " WHERE " + where + - " GROUP BY error_key, group_id ORDER BY cnt DESC LIMIT ?" + - "), velocity AS (" + - " SELECT COALESCE(error_type, LEFT(error_message, 200)) AS error_key, " + - " COUNT(*) FILTER (WHERE start_time >= ?) AS recent_5m, " + - " COUNT(*) FILTER (WHERE start_time >= ? AND start_time < ?) AS prev_5m " + - " FROM " + table + " WHERE " + where + - " GROUP BY error_key" + - ") SELECT c.error_key, c.group_id, c.cnt, c.last_seen, " + - " COALESCE(v.recent_5m, 0) / 5.0 AS velocity, " + - " CASE " + - " WHEN COALESCE(v.recent_5m, 0) > COALESCE(v.prev_5m, 0) * 1.2 THEN 'accelerating' " + - " WHEN COALESCE(v.recent_5m, 0) < COALESCE(v.prev_5m, 0) * 0.8 THEN 'decelerating' " + - " ELSE 'stable' END AS trend " + - "FROM counted c LEFT JOIN velocity v ON c.error_key = v.error_key " + - "ORDER BY c.cnt DESC"; - - // Build full params: counted-where params + limit + velocity timestamps + velocity-where params - List fullParams = new ArrayList<>(params); - fullParams.add(limit); - fullParams.add(Timestamp.from(fiveMinAgo)); - fullParams.add(Timestamp.from(tenMinAgo)); - fullParams.add(Timestamp.from(fiveMinAgo)); - fullParams.addAll(params); // same where clause for velocity CTE - - return jdbc.query(sql, (rs, rowNum) -> { - String errorKey = rs.getString("error_key"); - String gid = rs.getString("group_id"); - return new TopError( - errorKey, - routeId != null ? routeId : gid, // routeId - routeId != null ? gid : null, // processorId (only at L3) - rs.getLong("cnt"), - rs.getDouble("velocity"), - rs.getString("trend"), - rs.getTimestamp("last_seen").toInstant()); - }, fullParams.toArray()); - } - - @Override - public int activeErrorTypes(Instant from, Instant to, String applicationId) { - String sql = "SELECT COUNT(DISTINCT COALESCE(error_type, LEFT(error_message, 200))) " + - "FROM executions WHERE status = 'FAILED' AND start_time >= ? AND start_time < ?"; - - List params = new ArrayList<>(); - params.add(Timestamp.from(from)); - params.add(Timestamp.from(to)); - if (applicationId != null) { - sql += " AND application_id = ?"; - params.add(applicationId); - } - - Integer count = jdbc.queryForObject(sql, Integer.class, params.toArray()); - return count != null ? count : 0; - } - - // ── Punchcard ───────────────────────────────────────────────────────── - - @Override - public List punchcard(Instant from, Instant to, String applicationId) { - String view = applicationId != null ? "stats_1m_app" : "stats_1m_all"; - String sql = "SELECT EXTRACT(DOW FROM bucket) AS weekday, " + - "EXTRACT(HOUR FROM bucket) AS hour, " + - "COALESCE(SUM(total_count), 0) AS total_count, " + - "COALESCE(SUM(failed_count), 0) AS failed_count " + - "FROM " + view + " WHERE bucket >= ? AND bucket < ?"; - - List params = new ArrayList<>(); - params.add(Timestamp.from(from)); - params.add(Timestamp.from(to)); - if (applicationId != null) { - sql += " AND application_id = ?"; - params.add(applicationId); - } - sql += " GROUP BY weekday, hour ORDER BY weekday, hour"; - - return jdbc.query(sql, (rs, rowNum) -> new PunchcardCell( - rs.getInt("weekday"), rs.getInt("hour"), - rs.getLong("total_count"), rs.getLong("failed_count")), - params.toArray()); - } -} diff --git a/cameleer3-server-app/src/main/resources/application.yml b/cameleer3-server-app/src/main/resources/application.yml index 8abf9284..2d6375b8 100644 --- a/cameleer3-server-app/src/main/resources/application.yml +++ b/cameleer3-server-app/src/main/resources/application.yml @@ -42,15 +42,6 @@ cameleer: indexer: debounce-ms: ${CAMELEER_INDEXER_DEBOUNCE_MS:2000} queue-size: ${CAMELEER_INDEXER_QUEUE_SIZE:10000} - retention-days: ${CAMELEER_RETENTION_DAYS:30} - storage: - metrics: ${CAMELEER_STORAGE_METRICS:postgres} - search: ${CAMELEER_STORAGE_SEARCH:clickhouse} - stats: ${CAMELEER_STORAGE_STATS:clickhouse} - diagrams: ${CAMELEER_STORAGE_DIAGRAMS:clickhouse} - events: ${CAMELEER_STORAGE_EVENTS:clickhouse} - logs: ${CAMELEER_STORAGE_LOGS:clickhouse} - executions: ${CAMELEER_STORAGE_EXECUTIONS:clickhouse} security: access-token-expiry-ms: 3600000 @@ -70,7 +61,7 @@ springdoc: path: /api/v1/swagger-ui clickhouse: - enabled: ${CLICKHOUSE_ENABLED:false} + enabled: ${CLICKHOUSE_ENABLED:true} url: ${CLICKHOUSE_URL:jdbc:clickhouse://localhost:8123/cameleer} username: ${CLICKHOUSE_USERNAME:default} password: ${CLICKHOUSE_PASSWORD:} diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql index 2eca3504..c35795ce 100644 --- a/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql +++ b/cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql @@ -1,5 +1,5 @@ -- V4__stats_tables_and_mvs.sql --- Materialized views replacing TimescaleDB continuous aggregates. +-- Pre-aggregated statistics tables and materialized views. -- Tables use AggregatingMergeTree, MVs use -State combinators. -- stats_1m_all (global) diff --git a/cameleer3-server-app/src/main/resources/db/migration/V10__error_categorization_and_tracing.sql b/cameleer3-server-app/src/main/resources/db/migration/V10__error_categorization_and_tracing.sql deleted file mode 100644 index 8aab749a..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V10__error_categorization_and_tracing.sql +++ /dev/null @@ -1,23 +0,0 @@ --- executions: store raw processor tree for faithful detail response -ALTER TABLE executions ADD COLUMN processors_json JSONB; - --- executions: error categorization + OTel tracing -ALTER TABLE executions ADD COLUMN error_type TEXT; -ALTER TABLE executions ADD COLUMN error_category TEXT; -ALTER TABLE executions ADD COLUMN root_cause_type TEXT; -ALTER TABLE executions ADD COLUMN root_cause_message TEXT; -ALTER TABLE executions ADD COLUMN trace_id TEXT; -ALTER TABLE executions ADD COLUMN span_id TEXT; - --- processor_executions: error categorization + circuit breaker -ALTER TABLE processor_executions ADD COLUMN error_type TEXT; -ALTER TABLE processor_executions ADD COLUMN error_category TEXT; -ALTER TABLE processor_executions ADD COLUMN root_cause_type TEXT; -ALTER TABLE processor_executions ADD COLUMN root_cause_message TEXT; -ALTER TABLE processor_executions ADD COLUMN error_handler_type TEXT; -ALTER TABLE processor_executions ADD COLUMN circuit_breaker_state TEXT; -ALTER TABLE processor_executions ADD COLUMN fallback_triggered BOOLEAN; - --- Remove erroneous depth columns from V9 -ALTER TABLE processor_executions DROP COLUMN IF EXISTS split_depth; -ALTER TABLE processor_executions DROP COLUMN IF EXISTS loop_depth; diff --git a/cameleer3-server-app/src/main/resources/db/migration/V11__has_trace_data.sql b/cameleer3-server-app/src/main/resources/db/migration/V11__has_trace_data.sql deleted file mode 100644 index 7d925e5a..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V11__has_trace_data.sql +++ /dev/null @@ -1,10 +0,0 @@ --- Flag indicating whether any processor in this execution captured trace data -ALTER TABLE executions ADD COLUMN IF NOT EXISTS has_trace_data BOOLEAN NOT NULL DEFAULT FALSE; - --- Backfill: set flag for existing executions that have processor trace data -UPDATE executions e SET has_trace_data = TRUE -WHERE EXISTS ( - SELECT 1 FROM processor_executions pe - WHERE pe.execution_id = e.execution_id - AND (pe.input_body IS NOT NULL OR pe.output_body IS NOT NULL) -); diff --git a/cameleer3-server-app/src/main/resources/db/migration/V12__app_settings.sql b/cameleer3-server-app/src/main/resources/db/migration/V12__app_settings.sql deleted file mode 100644 index 50f4c42b..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V12__app_settings.sql +++ /dev/null @@ -1,11 +0,0 @@ --- Per-application dashboard settings (SLA thresholds, health dot thresholds) -CREATE TABLE app_settings ( - app_id TEXT PRIMARY KEY, - sla_threshold_ms INTEGER NOT NULL DEFAULT 300, - health_error_warn DOUBLE PRECISION NOT NULL DEFAULT 1.0, - health_error_crit DOUBLE PRECISION NOT NULL DEFAULT 5.0, - health_sla_warn DOUBLE PRECISION NOT NULL DEFAULT 99.0, - health_sla_crit DOUBLE PRECISION NOT NULL DEFAULT 95.0, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now() -); diff --git a/cameleer3-server-app/src/main/resources/db/migration/V13__is_replay.sql b/cameleer3-server-app/src/main/resources/db/migration/V13__is_replay.sql deleted file mode 100644 index cd6f5be3..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V13__is_replay.sql +++ /dev/null @@ -1,7 +0,0 @@ --- Flag indicating whether this execution is a replayed exchange -ALTER TABLE executions ADD COLUMN IF NOT EXISTS is_replay BOOLEAN NOT NULL DEFAULT FALSE; - --- Backfill: check inputHeaders JSON for X-Cameleer-Replay header -UPDATE executions SET is_replay = TRUE -WHERE input_headers IS NOT NULL - AND input_headers::jsonb ? 'X-Cameleer-Replay'; diff --git a/cameleer3-server-app/src/main/resources/db/migration/V14__rename_identity_columns.sql b/cameleer3-server-app/src/main/resources/db/migration/V14__rename_identity_columns.sql deleted file mode 100644 index 82254763..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V14__rename_identity_columns.sql +++ /dev/null @@ -1,16 +0,0 @@ --- Rename agent identity columns for protocol v2 alignment. - -ALTER TABLE executions RENAME COLUMN agent_id TO instance_id; -ALTER TABLE executions RENAME COLUMN application_name TO application_id; - -ALTER TABLE processor_executions RENAME COLUMN application_name TO application_id; - -ALTER TABLE agent_metrics RENAME COLUMN agent_id TO instance_id; - -ALTER TABLE route_diagrams RENAME COLUMN agent_id TO instance_id; -ALTER TABLE route_diagrams RENAME COLUMN application_name TO application_id; - -ALTER TABLE agent_events RENAME COLUMN agent_id TO instance_id; -ALTER TABLE agent_events RENAME COLUMN app_id TO application_id; - -ALTER TABLE app_settings RENAME COLUMN app_id TO application_id; diff --git a/cameleer3-server-app/src/main/resources/db/migration/V1__init.sql b/cameleer3-server-app/src/main/resources/db/migration/V1__init.sql index 950f2ac8..81d8ca6f 100644 --- a/cameleer3-server-app/src/main/resources/db/migration/V1__init.sql +++ b/cameleer3-server-app/src/main/resources/db/migration/V1__init.sql @@ -1,8 +1,6 @@ --- V1__init.sql - Consolidated schema for Cameleer3 - --- Extensions -CREATE EXTENSION IF NOT EXISTS timescaledb; -CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit; +-- V1__init.sql — PostgreSQL schema for Cameleer3 Server +-- PostgreSQL stores RBAC, configuration, and audit data only. +-- All observability data (executions, metrics, diagrams, logs, stats) is in ClickHouse. -- ============================================================= -- RBAC @@ -40,7 +38,6 @@ CREATE TABLE groups ( created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); --- Built-in Admins group INSERT INTO groups (id, name) VALUES ('00000000-0000-0000-0000-000000000010', 'Admins'); @@ -50,7 +47,6 @@ CREATE TABLE group_roles ( PRIMARY KEY (group_id, role_id) ); --- Assign ADMIN role to Admins group INSERT INTO group_roles (group_id, role_id) VALUES ('00000000-0000-0000-0000-000000000010', '00000000-0000-0000-0000-000000000004'); @@ -71,113 +67,6 @@ CREATE INDEX idx_user_groups_user_id ON user_groups(user_id); CREATE INDEX idx_group_roles_group_id ON group_roles(group_id); CREATE INDEX idx_groups_parent ON groups(parent_group_id); --- ============================================================= --- Execution data (TimescaleDB hypertables) --- ============================================================= - -CREATE TABLE executions ( - execution_id TEXT NOT NULL, - route_id TEXT NOT NULL, - agent_id TEXT NOT NULL, - application_name TEXT NOT NULL, - status TEXT NOT NULL, - correlation_id TEXT, - exchange_id TEXT, - start_time TIMESTAMPTZ NOT NULL, - end_time TIMESTAMPTZ, - duration_ms BIGINT, - error_message TEXT, - error_stacktrace TEXT, - diagram_content_hash TEXT, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - PRIMARY KEY (execution_id, start_time) -); - -SELECT create_hypertable('executions', 'start_time', chunk_time_interval => INTERVAL '1 day'); - -CREATE INDEX idx_executions_agent_time ON executions (agent_id, start_time DESC); -CREATE INDEX idx_executions_route_time ON executions (route_id, start_time DESC); -CREATE INDEX idx_executions_app_time ON executions (application_name, start_time DESC); -CREATE INDEX idx_executions_correlation ON executions (correlation_id); - -CREATE TABLE processor_executions ( - id BIGSERIAL, - execution_id TEXT NOT NULL, - processor_id TEXT NOT NULL, - processor_type TEXT NOT NULL, - diagram_node_id TEXT, - application_name TEXT NOT NULL, - route_id TEXT NOT NULL, - depth INT NOT NULL, - parent_processor_id TEXT, - status TEXT NOT NULL, - start_time TIMESTAMPTZ NOT NULL, - end_time TIMESTAMPTZ, - duration_ms BIGINT, - error_message TEXT, - error_stacktrace TEXT, - input_body TEXT, - output_body TEXT, - input_headers JSONB, - output_headers JSONB, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - UNIQUE (execution_id, processor_id, start_time) -); - -SELECT create_hypertable('processor_executions', 'start_time', chunk_time_interval => INTERVAL '1 day'); - -CREATE INDEX idx_proc_exec_execution ON processor_executions (execution_id); -CREATE INDEX idx_proc_exec_type_time ON processor_executions (processor_type, start_time DESC); - --- ============================================================= --- Agent metrics --- ============================================================= - -CREATE TABLE agent_metrics ( - agent_id TEXT NOT NULL, - metric_name TEXT NOT NULL, - metric_value DOUBLE PRECISION NOT NULL, - tags JSONB, - collected_at TIMESTAMPTZ NOT NULL, - server_received_at TIMESTAMPTZ NOT NULL DEFAULT now() -); - -SELECT create_hypertable('agent_metrics', 'collected_at', chunk_time_interval => INTERVAL '1 day'); - -CREATE INDEX idx_metrics_agent_name ON agent_metrics (agent_id, metric_name, collected_at DESC); - --- ============================================================= --- Route diagrams --- ============================================================= - -CREATE TABLE route_diagrams ( - content_hash TEXT PRIMARY KEY, - route_id TEXT NOT NULL, - agent_id TEXT NOT NULL, - definition TEXT NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now() -); - -CREATE INDEX idx_diagrams_route_agent ON route_diagrams (route_id, agent_id); - --- ============================================================= --- Agent events --- ============================================================= - -CREATE TABLE agent_events ( - id BIGSERIAL PRIMARY KEY, - agent_id TEXT NOT NULL, - app_id TEXT NOT NULL, - event_type TEXT NOT NULL, - detail TEXT, - timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW() -); - -CREATE INDEX idx_agent_events_agent ON agent_events(agent_id, timestamp DESC); -CREATE INDEX idx_agent_events_app ON agent_events(app_id, timestamp DESC); -CREATE INDEX idx_agent_events_time ON agent_events(timestamp DESC); - -- ============================================================= -- Server configuration -- ============================================================= @@ -190,7 +79,30 @@ CREATE TABLE server_config ( ); -- ============================================================= --- Admin +-- Application configuration +-- ============================================================= + +CREATE TABLE application_config ( + application TEXT PRIMARY KEY, + config_val JSONB NOT NULL, + version INTEGER NOT NULL DEFAULT 1, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_by TEXT +); + +CREATE TABLE app_settings ( + application_id TEXT PRIMARY KEY, + sla_threshold_ms INTEGER NOT NULL DEFAULT 300, + health_error_warn DOUBLE PRECISION NOT NULL DEFAULT 1.0, + health_error_crit DOUBLE PRECISION NOT NULL DEFAULT 5.0, + health_sla_warn DOUBLE PRECISION NOT NULL DEFAULT 99.0, + health_sla_crit DOUBLE PRECISION NOT NULL DEFAULT 95.0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- ============================================================= +-- Audit log -- ============================================================= CREATE TABLE audit_log ( @@ -211,93 +123,3 @@ CREATE INDEX idx_audit_log_username ON audit_log (username); CREATE INDEX idx_audit_log_category ON audit_log (category); CREATE INDEX idx_audit_log_action ON audit_log (action); CREATE INDEX idx_audit_log_target ON audit_log (target); - --- ============================================================= --- Continuous aggregates --- ============================================================= - -CREATE MATERIALIZED VIEW stats_1m_all -WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS -SELECT - time_bucket('1 minute', start_time) AS bucket, - COUNT(*) AS total_count, - COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, - COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count, - SUM(duration_ms) AS duration_sum, - MAX(duration_ms) AS duration_max, - approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration -FROM executions -WHERE status IS NOT NULL -GROUP BY bucket -WITH NO DATA; - - -CREATE MATERIALIZED VIEW stats_1m_app -WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS -SELECT - time_bucket('1 minute', start_time) AS bucket, - application_name, - COUNT(*) AS total_count, - COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, - COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count, - SUM(duration_ms) AS duration_sum, - MAX(duration_ms) AS duration_max, - approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration -FROM executions -WHERE status IS NOT NULL -GROUP BY bucket, application_name -WITH NO DATA; - - -CREATE MATERIALIZED VIEW stats_1m_route -WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS -SELECT - time_bucket('1 minute', start_time) AS bucket, - application_name, - route_id, - COUNT(*) AS total_count, - COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, - COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count, - SUM(duration_ms) AS duration_sum, - MAX(duration_ms) AS duration_max, - approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration -FROM executions -WHERE status IS NOT NULL -GROUP BY bucket, application_name, route_id -WITH NO DATA; - - -CREATE MATERIALIZED VIEW stats_1m_processor -WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS -SELECT - time_bucket('1 minute', start_time) AS bucket, - application_name, - route_id, - processor_type, - COUNT(*) AS total_count, - COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, - SUM(duration_ms) AS duration_sum, - MAX(duration_ms) AS duration_max, - approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration -FROM processor_executions -GROUP BY bucket, application_name, route_id, processor_type -WITH NO DATA; - - -CREATE MATERIALIZED VIEW stats_1m_processor_detail -WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS -SELECT - time_bucket('1 minute', start_time) AS bucket, - application_name, - route_id, - processor_id, - processor_type, - COUNT(*) AS total_count, - COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, - SUM(duration_ms) AS duration_sum, - MAX(duration_ms) AS duration_max, - approx_percentile(0.99, percentile_agg(duration_ms)) AS p99_duration -FROM processor_executions -GROUP BY bucket, application_name, route_id, processor_id, processor_type -WITH NO DATA; - diff --git a/cameleer3-server-app/src/main/resources/db/migration/V2__policies.sql b/cameleer3-server-app/src/main/resources/db/migration/V2__policies.sql deleted file mode 100644 index 76a087a7..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V2__policies.sql +++ /dev/null @@ -1,38 +0,0 @@ --- V2__policies.sql - TimescaleDB policies (must run outside transaction) --- flyway:executeInTransaction=false - --- Agent metrics retention & compression -ALTER TABLE agent_metrics SET (timescaledb.compress); -SELECT add_retention_policy('agent_metrics', INTERVAL '90 days', if_not_exists => true); -SELECT add_compression_policy('agent_metrics', INTERVAL '7 days', if_not_exists => true); - --- Continuous aggregate refresh policies -SELECT add_continuous_aggregate_policy('stats_1m_all', - start_offset => INTERVAL '1 hour', - end_offset => INTERVAL '1 minute', - schedule_interval => INTERVAL '1 minute', - if_not_exists => true); - -SELECT add_continuous_aggregate_policy('stats_1m_app', - start_offset => INTERVAL '1 hour', - end_offset => INTERVAL '1 minute', - schedule_interval => INTERVAL '1 minute', - if_not_exists => true); - -SELECT add_continuous_aggregate_policy('stats_1m_route', - start_offset => INTERVAL '1 hour', - end_offset => INTERVAL '1 minute', - schedule_interval => INTERVAL '1 minute', - if_not_exists => true); - -SELECT add_continuous_aggregate_policy('stats_1m_processor', - start_offset => INTERVAL '1 hour', - end_offset => INTERVAL '1 minute', - schedule_interval => INTERVAL '1 minute', - if_not_exists => true); - -SELECT add_continuous_aggregate_policy('stats_1m_processor_detail', - start_offset => INTERVAL '1 hour', - end_offset => INTERVAL '1 minute', - schedule_interval => INTERVAL '1 minute', - if_not_exists => true); diff --git a/cameleer3-server-app/src/main/resources/db/migration/V3__engine_level_and_snapshots.sql b/cameleer3-server-app/src/main/resources/db/migration/V3__engine_level_and_snapshots.sql deleted file mode 100644 index a8d65e14..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V3__engine_level_and_snapshots.sql +++ /dev/null @@ -1,9 +0,0 @@ --- Add engine level and route-level snapshot columns to executions table. --- Required for REGULAR engine level where route-level payloads exist but --- no processor execution records are created. - -ALTER TABLE executions ADD COLUMN IF NOT EXISTS engine_level VARCHAR(16); -ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_body TEXT; -ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_body TEXT; -ALTER TABLE executions ADD COLUMN IF NOT EXISTS input_headers JSONB; -ALTER TABLE executions ADD COLUMN IF NOT EXISTS output_headers JSONB; diff --git a/cameleer3-server-app/src/main/resources/db/migration/V4__application_config.sql b/cameleer3-server-app/src/main/resources/db/migration/V4__application_config.sql deleted file mode 100644 index ffff157a..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V4__application_config.sql +++ /dev/null @@ -1,9 +0,0 @@ --- Per-application configuration for agent observability settings. --- Agents download this at startup and receive updates via SSE CONFIG_UPDATE. -CREATE TABLE application_config ( - application TEXT PRIMARY KEY, - config_val JSONB NOT NULL, - version INTEGER NOT NULL DEFAULT 1, - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_by TEXT -); diff --git a/cameleer3-server-app/src/main/resources/db/migration/V5__attributes.sql b/cameleer3-server-app/src/main/resources/db/migration/V5__attributes.sql deleted file mode 100644 index 817c07ea..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V5__attributes.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE executions ADD COLUMN IF NOT EXISTS attributes JSONB; -ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS attributes JSONB; diff --git a/cameleer3-server-app/src/main/resources/db/migration/V6__drop_diagram_node_id.sql b/cameleer3-server-app/src/main/resources/db/migration/V6__drop_diagram_node_id.sql deleted file mode 100644 index f9447571..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V6__drop_diagram_node_id.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE processor_executions DROP COLUMN IF EXISTS diagram_node_id; diff --git a/cameleer3-server-app/src/main/resources/db/migration/V7__diagram_application_name.sql b/cameleer3-server-app/src/main/resources/db/migration/V7__diagram_application_name.sql deleted file mode 100644 index 708c1960..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V7__diagram_application_name.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE route_diagrams ADD COLUMN IF NOT EXISTS application_name TEXT NOT NULL DEFAULT ''; -CREATE INDEX IF NOT EXISTS idx_diagrams_application ON route_diagrams (application_name); diff --git a/cameleer3-server-app/src/main/resources/db/migration/V8__processor_iteration_fields.sql b/cameleer3-server-app/src/main/resources/db/migration/V8__processor_iteration_fields.sql deleted file mode 100644 index 5adb0cce..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V8__processor_iteration_fields.sql +++ /dev/null @@ -1,5 +0,0 @@ -ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS loop_index INTEGER; -ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS loop_size INTEGER; -ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS split_index INTEGER; -ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS split_size INTEGER; -ALTER TABLE processor_executions ADD COLUMN IF NOT EXISTS multicast_index INTEGER; diff --git a/cameleer3-server-app/src/main/resources/db/migration/V9__resolved_endpoint_uri.sql b/cameleer3-server-app/src/main/resources/db/migration/V9__resolved_endpoint_uri.sql deleted file mode 100644 index 04c2ff11..00000000 --- a/cameleer3-server-app/src/main/resources/db/migration/V9__resolved_endpoint_uri.sql +++ /dev/null @@ -1,3 +0,0 @@ -ALTER TABLE processor_executions ADD COLUMN resolved_endpoint_uri TEXT; -ALTER TABLE processor_executions ADD COLUMN split_depth INTEGER DEFAULT 0; -ALTER TABLE processor_executions ADD COLUMN loop_depth INTEGER DEFAULT 0; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java index 8a59a6db..1a7faa2f 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java @@ -8,21 +8,17 @@ import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.clickhouse.ClickHouseContainer; import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.utility.DockerImageName; + @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @ActiveProfiles("test") public abstract class AbstractPostgresIT { - private static final DockerImageName TIMESCALEDB_IMAGE = - DockerImageName.parse("timescale/timescaledb-ha:pg16") - .asCompatibleSubstituteFor("postgres"); - static final PostgreSQLContainer postgres; static final ClickHouseContainer clickhouse; static { - postgres = new PostgreSQLContainer<>(TIMESCALEDB_IMAGE) + postgres = new PostgreSQLContainer<>("postgres:16") .withDatabaseName("cameleer3") .withUsername("cameleer") .withPassword("test"); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/FlywayMigrationIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/FlywayMigrationIT.java index 227a4236..68ed2a65 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/FlywayMigrationIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/FlywayMigrationIT.java @@ -14,23 +14,34 @@ class FlywayMigrationIT extends AbstractPostgresIT { @Test void allMigrationsApplySuccessfully() { - // Verify core tables exist - Integer execCount = jdbcTemplate.queryForObject( - "SELECT COUNT(*) FROM executions", Integer.class); - assertEquals(0, execCount); - - Integer procCount = jdbcTemplate.queryForObject( - "SELECT COUNT(*) FROM processor_executions", Integer.class); - assertEquals(0, procCount); - + // Verify RBAC tables exist Integer userCount = jdbcTemplate.queryForObject( "SELECT COUNT(*) FROM users", Integer.class); assertEquals(0, userCount); - // Verify continuous aggregates exist - Integer caggCount = jdbcTemplate.queryForObject( - "SELECT COUNT(*) FROM timescaledb_information.continuous_aggregates", - Integer.class); - assertEquals(4, caggCount); + Integer roleCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM roles", Integer.class); + assertEquals(4, roleCount); // AGENT, VIEWER, OPERATOR, ADMIN + + Integer groupCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM groups", Integer.class); + assertEquals(1, groupCount); // Admins + + // Verify config/audit tables exist + Integer configCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM server_config", Integer.class); + assertEquals(0, configCount); + + Integer auditCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM audit_log", Integer.class); + assertEquals(0, auditCount); + + Integer appConfigCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM application_config", Integer.class); + assertEquals(0, appConfigCount); + + Integer appSettingsCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM app_settings", Integer.class); + assertEquals(0, appSettingsCount); } } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java deleted file mode 100644 index affa0c29..00000000 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java +++ /dev/null @@ -1,98 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.server.app.AbstractPostgresIT; -import com.cameleer3.server.core.storage.ExecutionStore; -import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord; -import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; - -import java.time.Instant; -import java.util.List; -import java.util.Optional; - -import static org.junit.jupiter.api.Assertions.*; - -class PostgresExecutionStoreIT extends AbstractPostgresIT { - - @Autowired - ExecutionStore executionStore; - - @Test - void upsertAndFindById() { - Instant now = Instant.now(); - ExecutionRecord record = new ExecutionRecord( - "exec-1", "route-a", "agent-1", "app-1", - "COMPLETED", "corr-1", "exchange-1", - now, now.plusMillis(100), 100L, - null, null, null, - "REGULAR", null, null, null, null, null, - null, null, null, null, null, null, null, false, false); - - executionStore.upsert(record); - Optional found = executionStore.findById("exec-1"); - - assertTrue(found.isPresent()); - assertEquals("exec-1", found.get().executionId()); - assertEquals("COMPLETED", found.get().status()); - assertEquals("REGULAR", found.get().engineLevel()); - } - - @Test - void upsertDeduplicatesByExecutionId() { - Instant now = Instant.now(); - ExecutionRecord first = new ExecutionRecord( - "exec-dup", "route-a", "agent-1", "app-1", - "RUNNING", null, null, now, null, null, null, null, null, - null, null, null, null, null, null, - null, null, null, null, null, null, null, false, false); - ExecutionRecord second = new ExecutionRecord( - "exec-dup", "route-a", "agent-1", "app-1", - "COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null, - "COMPLETE", null, null, null, null, null, - null, null, null, null, null, null, null, false, false); - - executionStore.upsert(first); - executionStore.upsert(second); - - Optional found = executionStore.findById("exec-dup"); - assertTrue(found.isPresent()); - assertEquals("COMPLETED", found.get().status()); - assertEquals(200L, found.get().durationMs()); - } - - @Test - void upsertProcessorsAndFind() { - Instant now = Instant.now(); - ExecutionRecord exec = new ExecutionRecord( - "exec-proc", "route-a", "agent-1", "app-1", - "COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null, - "COMPLETE", null, null, null, null, null, - null, null, null, null, null, null, null, false, false); - executionStore.upsert(exec); - - List processors = List.of( - new ProcessorRecord("exec-proc", "proc-1", "log", - "app-1", "route-a", 0, null, "COMPLETED", - now, now.plusMillis(10), 10L, null, null, - "input body", "output body", null, null, null, - null, null, null, null, null, - null, null, null, null, null, null, null, null, - null, null, null, null, null, null), - new ProcessorRecord("exec-proc", "proc-2", "to", - "app-1", "route-a", 1, "proc-1", "COMPLETED", - now.plusMillis(10), now.plusMillis(30), 20L, null, null, - null, null, null, null, null, - null, null, null, null, null, - null, null, null, null, null, null, null, null, - null, null, null, null, null, null) - ); - executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors); - - List found = executionStore.findProcessors("exec-proc"); - assertEquals(2, found.size()); - assertEquals("proc-1", found.get(0).processorId()); - assertEquals("proc-2", found.get(1).processorId()); - assertEquals("proc-1", found.get(1).parentProcessorId()); - } -} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java deleted file mode 100644 index 15c76706..00000000 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.cameleer3.server.app.storage; - -import com.cameleer3.server.app.AbstractPostgresIT; -import com.cameleer3.server.core.search.ExecutionStats; -import com.cameleer3.server.core.search.StatsTimeseries; -import com.cameleer3.server.core.storage.ExecutionStore; -import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord; -import com.cameleer3.server.core.storage.StatsStore; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jdbc.core.JdbcTemplate; - -import java.time.Instant; -import java.time.temporal.ChronoUnit; - -import static org.junit.jupiter.api.Assertions.*; - -class PostgresStatsStoreIT extends AbstractPostgresIT { - - @Autowired StatsStore statsStore; - @Autowired ExecutionStore executionStore; - @Autowired JdbcTemplate jdbc; - - @Test - void statsReturnsCountsForTimeWindow() { - // Use a unique route + statsForRoute to avoid data contamination from other tests - String uniqueRoute = "stats-route-" + System.nanoTime(); - Instant base = Instant.now().minus(5, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS); - insertExecution("stats-1-" + uniqueRoute, uniqueRoute, "app-stats", "COMPLETED", base, 100L); - insertExecution("stats-2-" + uniqueRoute, uniqueRoute, "app-stats", "FAILED", base.plusSeconds(10), 200L); - insertExecution("stats-3-" + uniqueRoute, uniqueRoute, "app-stats", "COMPLETED", base.plusSeconds(20), 50L); - - // Force continuous aggregate refresh - jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_route', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')"); - - ExecutionStats stats = statsStore.statsForRoute(base.minusSeconds(60), base.plusSeconds(60), uniqueRoute, null); - assertEquals(3, stats.totalCount()); - assertEquals(1, stats.failedCount()); - } - - @Test - void timeseriesReturnsBuckets() { - String uniqueRoute = "ts-route-" + System.nanoTime(); - Instant base = Instant.now().minus(10, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MINUTES); - for (int i = 0; i < 10; i++) { - insertExecution("ts-" + i + "-" + uniqueRoute, uniqueRoute, "app-ts", "COMPLETED", - base.plusSeconds(i * 30), 100L + i); - } - - jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_route', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')"); - - StatsTimeseries ts = statsStore.timeseriesForRoute(base.minus(1, ChronoUnit.MINUTES), base.plus(10, ChronoUnit.MINUTES), 5, uniqueRoute, null); - assertNotNull(ts); - assertFalse(ts.buckets().isEmpty()); - } - - private void insertExecution(String id, String routeId, String applicationId, - String status, Instant startTime, long durationMs) { - executionStore.upsert(new ExecutionRecord( - id, routeId, "agent-1", applicationId, status, null, null, - startTime, startTime.plusMillis(durationMs), durationMs, - status.equals("FAILED") ? "error" : null, null, null, - null, null, null, null, null, null, - null, null, null, null, null, null, null, false, false)); - } -} diff --git a/deploy/postgres.yaml b/deploy/postgres.yaml index 046e4486..fc4a600f 100644 --- a/deploy/postgres.yaml +++ b/deploy/postgres.yaml @@ -16,7 +16,7 @@ spec: spec: containers: - name: postgres - image: timescale/timescaledb-ha:pg16 + image: postgres:16 ports: - containerPort: 5432 name: postgres diff --git a/docker-compose.yml b/docker-compose.yml index 1cd5d6d5..360f0d00 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: postgres: - image: timescale/timescaledb-ha:pg16 + image: postgres:16 ports: - "5432:5432" environment: diff --git a/ui/src/api/queries/admin/database.ts b/ui/src/api/queries/admin/database.ts index 44cae732..4a0ea71e 100644 --- a/ui/src/api/queries/admin/database.ts +++ b/ui/src/api/queries/admin/database.ts @@ -9,7 +9,6 @@ export interface DatabaseStatus { version: string | null; host: string | null; schema: string | null; - timescaleDb: boolean; } export interface PoolStats { diff --git a/ui/src/api/schema.d.ts b/ui/src/api/schema.d.ts index 90afbf8a..27d6da13 100644 --- a/ui/src/api/schema.d.ts +++ b/ui/src/api/schema.d.ts @@ -2189,10 +2189,8 @@ export interface components { version?: string; /** @description Database host */ host?: string; - /** @description Current schema search path */ + /** @description Current schema */ schema?: string; - /** @description Whether TimescaleDB extension is available */ - timescaleDb?: boolean; }; /** @description Currently running database query */ ActiveQueryResponse: { diff --git a/ui/src/pages/Admin/DatabaseAdminPage.tsx b/ui/src/pages/Admin/DatabaseAdminPage.tsx index 7b9c2943..f8459db5 100644 --- a/ui/src/pages/Admin/DatabaseAdminPage.tsx +++ b/ui/src/pages/Admin/DatabaseAdminPage.tsx @@ -37,7 +37,6 @@ export default function DatabaseAdminPage() {
-
{pool && (