feat: remove TimescaleDB, dead PG stores, and storage feature flags
Complete the ClickHouse migration by removing all PostgreSQL analytics code. PostgreSQL now serves only RBAC, config, and audit — all observability data is exclusively in ClickHouse. - Delete 6 dead PostgreSQL store classes (executions, stats, diagrams, events, metrics, metrics-query) and 2 integration tests - Delete RetentionScheduler (ClickHouse TTL handles retention) - Remove all 7 cameleer.storage.* feature flags from application.yml - Remove all @ConditionalOnProperty from ClickHouse beans in StorageBeanConfig - Consolidate 14 Flyway migrations (V1-V14) into single clean V1 with only RBAC/config/audit tables (no TimescaleDB, no analytics tables) - Switch from timescale/timescaledb-ha:pg16 to postgres:16 everywhere (docker-compose, deploy/postgres.yaml, test containers) - Remove TimescaleDB check and /metrics-pipeline from DatabaseAdminController - Set clickhouse.enabled default to true Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -7,9 +7,6 @@ import com.cameleer3.server.app.storage.ClickHouseDiagramStore;
|
||||
import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore;
|
||||
import com.cameleer3.server.app.storage.ClickHouseMetricsStore;
|
||||
import com.cameleer3.server.app.storage.ClickHouseStatsStore;
|
||||
import com.cameleer3.server.app.storage.PostgresExecutionStore;
|
||||
import com.cameleer3.server.app.storage.PostgresMetricsQueryStore;
|
||||
import com.cameleer3.server.app.storage.PostgresMetricsStore;
|
||||
import com.cameleer3.server.core.admin.AuditRepository;
|
||||
import com.cameleer3.server.core.admin.AuditService;
|
||||
import com.cameleer3.server.core.agent.AgentEventRepository;
|
||||
@@ -64,48 +61,26 @@ public class StorageBeanConfig {
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse")
|
||||
public MetricsStore clickHouseMetricsStore(
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||
return new ClickHouseMetricsStore(clickHouseJdbc);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true)
|
||||
public MetricsStore postgresMetricsStore(JdbcTemplate jdbc) {
|
||||
return new PostgresMetricsStore(jdbc);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse")
|
||||
public MetricsQueryStore clickHouseMetricsQueryStore(
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||
return new ClickHouseMetricsQueryStore(clickHouseJdbc);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true)
|
||||
public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) {
|
||||
return new PostgresMetricsQueryStore(jdbc);
|
||||
}
|
||||
|
||||
// ── Execution Store ──────────────────────────────────────────────────
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true)
|
||||
public ClickHouseExecutionStore clickHouseExecutionStore(
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||
return new ClickHouseExecutionStore(clickHouseJdbc);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "postgres")
|
||||
public ExecutionStore executionStorePostgres(JdbcTemplate jdbc) {
|
||||
return new PostgresExecutionStore(jdbc);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true)
|
||||
public ChunkAccumulator chunkAccumulator(
|
||||
WriteBuffer<MergedExecution> executionBuffer,
|
||||
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
|
||||
@@ -118,7 +93,6 @@ public class StorageBeanConfig {
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.executions", havingValue = "clickhouse", matchIfMissing = true)
|
||||
public ExecutionFlushScheduler executionFlushScheduler(
|
||||
WriteBuffer<MergedExecution> executionBuffer,
|
||||
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
|
||||
@@ -130,7 +104,6 @@ public class StorageBeanConfig {
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse", matchIfMissing = true)
|
||||
public SearchIndex clickHouseSearchIndex(
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||
return new ClickHouseSearchIndex(clickHouseJdbc);
|
||||
@@ -139,7 +112,6 @@ public class StorageBeanConfig {
|
||||
// ── ClickHouse Stats Store ─────────────────────────────────────────
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "clickhouse", matchIfMissing = true)
|
||||
public StatsStore clickHouseStatsStore(
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||
return new ClickHouseStatsStore(clickHouseJdbc);
|
||||
@@ -148,7 +120,6 @@ public class StorageBeanConfig {
|
||||
// ── ClickHouse Diagram Store ──────────────────────────────────────
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.diagrams", havingValue = "clickhouse", matchIfMissing = true)
|
||||
public DiagramStore clickHouseDiagramStore(
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||
return new ClickHouseDiagramStore(clickHouseJdbc);
|
||||
@@ -157,7 +128,6 @@ public class StorageBeanConfig {
|
||||
// ── ClickHouse Agent Event Repository ─────────────────────────────
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.events", havingValue = "clickhouse", matchIfMissing = true)
|
||||
public AgentEventRepository clickHouseAgentEventRepository(
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||
return new ClickHouseAgentEventRepository(clickHouseJdbc);
|
||||
@@ -166,7 +136,6 @@ public class StorageBeanConfig {
|
||||
// ── ClickHouse Log Store ──────────────────────────────────────────
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "cameleer.storage.logs", havingValue = "clickhouse", matchIfMissing = true)
|
||||
public LogIndex clickHouseLogStore(
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
|
||||
return new ClickHouseLogStore(clickHouseJdbc);
|
||||
|
||||
@@ -7,7 +7,6 @@ import com.cameleer3.server.app.dto.TableSizeResponse;
|
||||
import com.cameleer3.server.core.admin.AuditCategory;
|
||||
import com.cameleer3.server.core.admin.AuditResult;
|
||||
import com.cameleer3.server.core.admin.AuditService;
|
||||
import com.cameleer3.server.core.ingestion.IngestionService;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import com.zaxxer.hikari.HikariPoolMXBean;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
@@ -25,9 +24,7 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.server.ResponseStatusException;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/api/v1/admin/database")
|
||||
@@ -38,14 +35,12 @@ public class DatabaseAdminController {
|
||||
private final JdbcTemplate jdbc;
|
||||
private final DataSource dataSource;
|
||||
private final AuditService auditService;
|
||||
private final IngestionService ingestionService;
|
||||
|
||||
public DatabaseAdminController(JdbcTemplate jdbc, DataSource dataSource,
|
||||
AuditService auditService, IngestionService ingestionService) {
|
||||
AuditService auditService) {
|
||||
this.jdbc = jdbc;
|
||||
this.dataSource = dataSource;
|
||||
this.auditService = auditService;
|
||||
this.ingestionService = ingestionService;
|
||||
}
|
||||
|
||||
@GetMapping("/status")
|
||||
@@ -53,14 +48,12 @@ public class DatabaseAdminController {
|
||||
public ResponseEntity<DatabaseStatusResponse> getStatus() {
|
||||
try {
|
||||
String version = jdbc.queryForObject("SELECT version()", String.class);
|
||||
boolean timescaleDb = Boolean.TRUE.equals(
|
||||
jdbc.queryForObject("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'timescaledb')", Boolean.class));
|
||||
String schema = jdbc.queryForObject("SELECT current_schema()", String.class);
|
||||
String host = extractHost(dataSource);
|
||||
return ResponseEntity.ok(new DatabaseStatusResponse(true, version, host, schema, timescaleDb));
|
||||
return ResponseEntity.ok(new DatabaseStatusResponse(true, version, host, schema));
|
||||
} catch (Exception e) {
|
||||
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
|
||||
.body(new DatabaseStatusResponse(false, null, null, null, false));
|
||||
.body(new DatabaseStatusResponse(false, null, null, null));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,29 +117,6 @@ public class DatabaseAdminController {
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
|
||||
@GetMapping("/metrics-pipeline")
|
||||
@Operation(summary = "Get metrics ingestion pipeline diagnostics")
|
||||
public ResponseEntity<Map<String, Object>> getMetricsPipeline() {
|
||||
int bufferDepth = ingestionService.getMetricsBufferDepth();
|
||||
|
||||
Long totalRows = jdbc.queryForObject(
|
||||
"SELECT count(*) FROM agent_metrics", Long.class);
|
||||
List<String> agentIds = jdbc.queryForList(
|
||||
"SELECT DISTINCT instance_id FROM agent_metrics ORDER BY instance_id", String.class);
|
||||
Instant latestCollected = jdbc.queryForObject(
|
||||
"SELECT max(collected_at) FROM agent_metrics", Instant.class);
|
||||
List<String> metricNames = jdbc.queryForList(
|
||||
"SELECT DISTINCT metric_name FROM agent_metrics ORDER BY metric_name", String.class);
|
||||
|
||||
return ResponseEntity.ok(Map.of(
|
||||
"bufferDepth", bufferDepth,
|
||||
"totalRows", totalRows != null ? totalRows : 0,
|
||||
"distinctAgents", agentIds,
|
||||
"distinctMetrics", metricNames,
|
||||
"latestCollectedAt", latestCollected != null ? latestCollected.toString() : "none"
|
||||
));
|
||||
}
|
||||
|
||||
private String extractHost(DataSource ds) {
|
||||
try {
|
||||
if (ds instanceof HikariDataSource hds) {
|
||||
|
||||
@@ -7,6 +7,5 @@ public record DatabaseStatusResponse(
|
||||
@Schema(description = "Whether the database is reachable") boolean connected,
|
||||
@Schema(description = "PostgreSQL version string") String version,
|
||||
@Schema(description = "Database host") String host,
|
||||
@Schema(description = "Current schema search path") String schema,
|
||||
@Schema(description = "Whether TimescaleDB extension is available") boolean timescaleDb
|
||||
@Schema(description = "Current schema") String schema
|
||||
) {}
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
package com.cameleer3.server.app.retention;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class RetentionScheduler {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(RetentionScheduler.class);
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
private final int retentionDays;
|
||||
|
||||
public RetentionScheduler(JdbcTemplate jdbc,
|
||||
@Value("${cameleer.retention-days:30}") int retentionDays) {
|
||||
this.jdbc = jdbc;
|
||||
this.retentionDays = retentionDays;
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM UTC
|
||||
public void dropExpiredChunks() {
|
||||
String interval = retentionDays + " days";
|
||||
try {
|
||||
// Raw data
|
||||
jdbc.execute("SELECT drop_chunks('executions', INTERVAL '" + interval + "')");
|
||||
jdbc.execute("SELECT drop_chunks('processor_executions', INTERVAL '" + interval + "')");
|
||||
jdbc.execute("SELECT drop_chunks('agent_metrics', INTERVAL '" + interval + "')");
|
||||
|
||||
// Continuous aggregates (keep 3x longer)
|
||||
String caggInterval = (retentionDays * 3) + " days";
|
||||
jdbc.execute("SELECT drop_chunks('stats_1m_all', INTERVAL '" + caggInterval + "')");
|
||||
jdbc.execute("SELECT drop_chunks('stats_1m_app', INTERVAL '" + caggInterval + "')");
|
||||
jdbc.execute("SELECT drop_chunks('stats_1m_route', INTERVAL '" + caggInterval + "')");
|
||||
jdbc.execute("SELECT drop_chunks('stats_1m_processor', INTERVAL '" + caggInterval + "')");
|
||||
|
||||
log.info("Retention: dropped chunks older than {} days (aggregates: {} days)",
|
||||
retentionDays, retentionDays * 3);
|
||||
} catch (Exception e) {
|
||||
log.error("Retention job failed", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,64 +0,0 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.agent.AgentEventRecord;
|
||||
import com.cameleer3.server.core.agent.AgentEventRepository;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Repository
|
||||
@ConditionalOnProperty(name = "cameleer.storage.events", havingValue = "postgres")
|
||||
public class PostgresAgentEventRepository implements AgentEventRepository {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public PostgresAgentEventRepository(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insert(String instanceId, String applicationId, String eventType, String detail) {
|
||||
jdbc.update(
|
||||
"INSERT INTO agent_events (instance_id, application_id, event_type, detail) VALUES (?, ?, ?, ?)",
|
||||
instanceId, applicationId, eventType, detail);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AgentEventRecord> query(String applicationId, String instanceId, Instant from, Instant to, int limit) {
|
||||
var sql = new StringBuilder("SELECT id, instance_id, application_id, event_type, detail, timestamp FROM agent_events WHERE 1=1");
|
||||
var params = new ArrayList<Object>();
|
||||
|
||||
if (applicationId != null) {
|
||||
sql.append(" AND application_id = ?");
|
||||
params.add(applicationId);
|
||||
}
|
||||
if (instanceId != null) {
|
||||
sql.append(" AND instance_id = ?");
|
||||
params.add(instanceId);
|
||||
}
|
||||
if (from != null) {
|
||||
sql.append(" AND timestamp >= ?");
|
||||
params.add(Timestamp.from(from));
|
||||
}
|
||||
if (to != null) {
|
||||
sql.append(" AND timestamp < ?");
|
||||
params.add(Timestamp.from(to));
|
||||
}
|
||||
sql.append(" ORDER BY timestamp DESC LIMIT ?");
|
||||
params.add(limit);
|
||||
|
||||
return jdbc.query(sql.toString(), (rs, rowNum) -> new AgentEventRecord(
|
||||
rs.getLong("id"),
|
||||
rs.getString("instance_id"),
|
||||
rs.getString("application_id"),
|
||||
rs.getString("event_type"),
|
||||
rs.getString("detail"),
|
||||
rs.getTimestamp("timestamp").toInstant()
|
||||
), params.toArray());
|
||||
}
|
||||
}
|
||||
@@ -1,147 +0,0 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.common.graph.RouteGraph;
|
||||
import com.cameleer3.server.core.ingestion.TaggedDiagram;
|
||||
import com.cameleer3.server.core.storage.DiagramStore;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HexFormat;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* PostgreSQL implementation of {@link DiagramStore}.
|
||||
* <p>
|
||||
* Stores route graphs as JSON with SHA-256 content-hash deduplication.
|
||||
* Uses {@code ON CONFLICT (content_hash) DO NOTHING} for idempotent inserts.
|
||||
*/
|
||||
@Repository
|
||||
@ConditionalOnProperty(name = "cameleer.storage.diagrams", havingValue = "postgres")
|
||||
public class PostgresDiagramStore implements DiagramStore {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PostgresDiagramStore.class);
|
||||
|
||||
private static final String INSERT_SQL = """
|
||||
INSERT INTO route_diagrams (content_hash, route_id, instance_id, application_id, definition)
|
||||
VALUES (?, ?, ?, ?, ?::jsonb)
|
||||
ON CONFLICT (content_hash) DO NOTHING
|
||||
""";
|
||||
|
||||
private static final String SELECT_BY_HASH = """
|
||||
SELECT definition FROM route_diagrams WHERE content_hash = ? LIMIT 1
|
||||
""";
|
||||
|
||||
private static final String SELECT_HASH_FOR_ROUTE = """
|
||||
SELECT content_hash FROM route_diagrams
|
||||
WHERE route_id = ? AND instance_id = ?
|
||||
ORDER BY created_at DESC LIMIT 1
|
||||
""";
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public PostgresDiagramStore(JdbcTemplate jdbcTemplate) {
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void store(TaggedDiagram diagram) {
|
||||
try {
|
||||
RouteGraph graph = diagram.graph();
|
||||
String agentId = diagram.instanceId() != null ? diagram.instanceId() : "";
|
||||
String applicationId = diagram.applicationId() != null ? diagram.applicationId() : "";
|
||||
String json = objectMapper.writeValueAsString(graph);
|
||||
String contentHash = sha256Hex(json);
|
||||
String routeId = graph.getRouteId() != null ? graph.getRouteId() : "";
|
||||
|
||||
jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, applicationId, json);
|
||||
log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException("Failed to serialize RouteGraph to JSON", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<RouteGraph> findByContentHash(String contentHash) {
|
||||
List<Map<String, Object>> rows = jdbcTemplate.queryForList(SELECT_BY_HASH, contentHash);
|
||||
if (rows.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
String json = (String) rows.get(0).get("definition");
|
||||
try {
|
||||
return Optional.of(objectMapper.readValue(json, RouteGraph.class));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Failed to deserialize RouteGraph from PostgreSQL", e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> findContentHashForRoute(String routeId, String agentId) {
|
||||
List<Map<String, Object>> rows = jdbcTemplate.queryForList(SELECT_HASH_FOR_ROUTE, routeId, agentId);
|
||||
if (rows.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of((String) rows.get(0).get("content_hash"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds) {
|
||||
if (agentIds == null || agentIds.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?"));
|
||||
String sql = "SELECT content_hash FROM route_diagrams " +
|
||||
"WHERE route_id = ? AND instance_id IN (" + placeholders + ") " +
|
||||
"ORDER BY created_at DESC LIMIT 1";
|
||||
var params = new ArrayList<Object>();
|
||||
params.add(routeId);
|
||||
params.addAll(agentIds);
|
||||
List<Map<String, Object>> rows = jdbcTemplate.queryForList(sql, params.toArray());
|
||||
if (rows.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of((String) rows.get(0).get("content_hash"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> findProcessorRouteMapping(String applicationId) {
|
||||
Map<String, String> mapping = new HashMap<>();
|
||||
jdbcTemplate.query("""
|
||||
SELECT DISTINCT rd.route_id, node_elem->>'id' AS processor_id
|
||||
FROM route_diagrams rd,
|
||||
jsonb_array_elements(rd.definition::jsonb->'nodes') AS node_elem
|
||||
WHERE rd.application_id = ?
|
||||
AND node_elem->>'id' IS NOT NULL
|
||||
""",
|
||||
rs -> { mapping.put(rs.getString("processor_id"), rs.getString("route_id")); },
|
||||
applicationId);
|
||||
return mapping;
|
||||
}
|
||||
|
||||
static String sha256Hex(String input) {
|
||||
try {
|
||||
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||
byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
|
||||
return HexFormat.of().formatHex(hash);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException("SHA-256 not available", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,214 +0,0 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.ExecutionStore;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public class PostgresExecutionStore implements ExecutionStore {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public PostgresExecutionStore(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void upsert(ExecutionRecord execution) {
|
||||
jdbc.update("""
|
||||
INSERT INTO executions (execution_id, route_id, instance_id, application_id,
|
||||
status, correlation_id, exchange_id, start_time, end_time,
|
||||
duration_ms, error_message, error_stacktrace, diagram_content_hash,
|
||||
engine_level, input_body, output_body, input_headers, output_headers,
|
||||
attributes,
|
||||
error_type, error_category, root_cause_type, root_cause_message,
|
||||
trace_id, span_id,
|
||||
processors_json, has_trace_data, is_replay,
|
||||
created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb,
|
||||
?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?, now(), now())
|
||||
ON CONFLICT (execution_id, start_time) DO UPDATE SET
|
||||
status = CASE
|
||||
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
|
||||
AND executions.status = 'RUNNING'
|
||||
THEN EXCLUDED.status
|
||||
WHEN EXCLUDED.status = executions.status THEN executions.status
|
||||
ELSE EXCLUDED.status
|
||||
END,
|
||||
end_time = COALESCE(EXCLUDED.end_time, executions.end_time),
|
||||
duration_ms = COALESCE(EXCLUDED.duration_ms, executions.duration_ms),
|
||||
error_message = COALESCE(EXCLUDED.error_message, executions.error_message),
|
||||
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace),
|
||||
diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash),
|
||||
engine_level = COALESCE(EXCLUDED.engine_level, executions.engine_level),
|
||||
input_body = COALESCE(EXCLUDED.input_body, executions.input_body),
|
||||
output_body = COALESCE(EXCLUDED.output_body, executions.output_body),
|
||||
input_headers = COALESCE(EXCLUDED.input_headers, executions.input_headers),
|
||||
output_headers = COALESCE(EXCLUDED.output_headers, executions.output_headers),
|
||||
attributes = COALESCE(EXCLUDED.attributes, executions.attributes),
|
||||
error_type = COALESCE(EXCLUDED.error_type, executions.error_type),
|
||||
error_category = COALESCE(EXCLUDED.error_category, executions.error_category),
|
||||
root_cause_type = COALESCE(EXCLUDED.root_cause_type, executions.root_cause_type),
|
||||
root_cause_message = COALESCE(EXCLUDED.root_cause_message, executions.root_cause_message),
|
||||
trace_id = COALESCE(EXCLUDED.trace_id, executions.trace_id),
|
||||
span_id = COALESCE(EXCLUDED.span_id, executions.span_id),
|
||||
processors_json = COALESCE(EXCLUDED.processors_json, executions.processors_json),
|
||||
has_trace_data = EXCLUDED.has_trace_data OR executions.has_trace_data,
|
||||
is_replay = EXCLUDED.is_replay OR executions.is_replay,
|
||||
updated_at = now()
|
||||
""",
|
||||
execution.executionId(), execution.routeId(), execution.instanceId(),
|
||||
execution.applicationId(), execution.status(), execution.correlationId(),
|
||||
execution.exchangeId(),
|
||||
Timestamp.from(execution.startTime()),
|
||||
execution.endTime() != null ? Timestamp.from(execution.endTime()) : null,
|
||||
execution.durationMs(), execution.errorMessage(),
|
||||
execution.errorStacktrace(), execution.diagramContentHash(),
|
||||
execution.engineLevel(),
|
||||
execution.inputBody(), execution.outputBody(),
|
||||
execution.inputHeaders(), execution.outputHeaders(),
|
||||
execution.attributes(),
|
||||
execution.errorType(), execution.errorCategory(),
|
||||
execution.rootCauseType(), execution.rootCauseMessage(),
|
||||
execution.traceId(), execution.spanId(),
|
||||
execution.processorsJson(), execution.hasTraceData(), execution.isReplay());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void upsertProcessors(String executionId, Instant startTime,
|
||||
String applicationId, String routeId,
|
||||
List<ProcessorRecord> processors) {
|
||||
jdbc.batchUpdate("""
|
||||
INSERT INTO processor_executions (execution_id, processor_id, processor_type,
|
||||
application_id, route_id, depth, parent_processor_id,
|
||||
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
|
||||
input_body, output_body, input_headers, output_headers, attributes,
|
||||
loop_index, loop_size, split_index, split_size, multicast_index,
|
||||
resolved_endpoint_uri,
|
||||
error_type, error_category, root_cause_type, root_cause_message,
|
||||
error_handler_type, circuit_breaker_state, fallback_triggered)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb,
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
|
||||
status = EXCLUDED.status,
|
||||
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
|
||||
duration_ms = COALESCE(EXCLUDED.duration_ms, processor_executions.duration_ms),
|
||||
error_message = COALESCE(EXCLUDED.error_message, processor_executions.error_message),
|
||||
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, processor_executions.error_stacktrace),
|
||||
input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body),
|
||||
output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body),
|
||||
input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers),
|
||||
output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers),
|
||||
attributes = COALESCE(EXCLUDED.attributes, processor_executions.attributes),
|
||||
loop_index = COALESCE(EXCLUDED.loop_index, processor_executions.loop_index),
|
||||
loop_size = COALESCE(EXCLUDED.loop_size, processor_executions.loop_size),
|
||||
split_index = COALESCE(EXCLUDED.split_index, processor_executions.split_index),
|
||||
split_size = COALESCE(EXCLUDED.split_size, processor_executions.split_size),
|
||||
multicast_index = COALESCE(EXCLUDED.multicast_index, processor_executions.multicast_index),
|
||||
resolved_endpoint_uri = COALESCE(EXCLUDED.resolved_endpoint_uri, processor_executions.resolved_endpoint_uri),
|
||||
error_type = COALESCE(EXCLUDED.error_type, processor_executions.error_type),
|
||||
error_category = COALESCE(EXCLUDED.error_category, processor_executions.error_category),
|
||||
root_cause_type = COALESCE(EXCLUDED.root_cause_type, processor_executions.root_cause_type),
|
||||
root_cause_message = COALESCE(EXCLUDED.root_cause_message, processor_executions.root_cause_message),
|
||||
error_handler_type = COALESCE(EXCLUDED.error_handler_type, processor_executions.error_handler_type),
|
||||
circuit_breaker_state = COALESCE(EXCLUDED.circuit_breaker_state, processor_executions.circuit_breaker_state),
|
||||
fallback_triggered = COALESCE(EXCLUDED.fallback_triggered, processor_executions.fallback_triggered)
|
||||
""",
|
||||
processors.stream().map(p -> new Object[]{
|
||||
p.executionId(), p.processorId(), p.processorType(),
|
||||
p.applicationId(), p.routeId(),
|
||||
p.depth(), p.parentProcessorId(), p.status(),
|
||||
Timestamp.from(p.startTime()),
|
||||
p.endTime() != null ? Timestamp.from(p.endTime()) : null,
|
||||
p.durationMs(), p.errorMessage(), p.errorStacktrace(),
|
||||
p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders(),
|
||||
p.attributes(),
|
||||
p.loopIndex(), p.loopSize(), p.splitIndex(), p.splitSize(),
|
||||
p.multicastIndex(),
|
||||
p.resolvedEndpointUri(),
|
||||
p.errorType(), p.errorCategory(),
|
||||
p.rootCauseType(), p.rootCauseMessage(),
|
||||
p.errorHandlerType(), p.circuitBreakerState(),
|
||||
p.fallbackTriggered()
|
||||
}).toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ExecutionRecord> findById(String executionId) {
|
||||
List<ExecutionRecord> results = jdbc.query(
|
||||
"SELECT * FROM executions WHERE execution_id = ? ORDER BY start_time DESC LIMIT 1",
|
||||
EXECUTION_MAPPER, executionId);
|
||||
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ProcessorRecord> findProcessors(String executionId) {
|
||||
return jdbc.query(
|
||||
"SELECT * FROM processor_executions WHERE execution_id = ? ORDER BY depth, start_time",
|
||||
PROCESSOR_MAPPER, executionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ProcessorRecord> findProcessorById(String executionId, String processorId) {
|
||||
String sql = "SELECT * FROM processor_executions WHERE execution_id = ? AND processor_id = ? LIMIT 1";
|
||||
List<ProcessorRecord> results = jdbc.query(sql, PROCESSOR_MAPPER, executionId, processorId);
|
||||
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
||||
}
|
||||
|
||||
private static final RowMapper<ExecutionRecord> EXECUTION_MAPPER = (rs, rowNum) ->
|
||||
new ExecutionRecord(
|
||||
rs.getString("execution_id"), rs.getString("route_id"),
|
||||
rs.getString("instance_id"), rs.getString("application_id"),
|
||||
rs.getString("status"), rs.getString("correlation_id"),
|
||||
rs.getString("exchange_id"),
|
||||
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
|
||||
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
|
||||
rs.getString("error_message"), rs.getString("error_stacktrace"),
|
||||
rs.getString("diagram_content_hash"),
|
||||
rs.getString("engine_level"),
|
||||
rs.getString("input_body"), rs.getString("output_body"),
|
||||
rs.getString("input_headers"), rs.getString("output_headers"),
|
||||
rs.getString("attributes"),
|
||||
rs.getString("error_type"), rs.getString("error_category"),
|
||||
rs.getString("root_cause_type"), rs.getString("root_cause_message"),
|
||||
rs.getString("trace_id"), rs.getString("span_id"),
|
||||
rs.getString("processors_json"),
|
||||
rs.getBoolean("has_trace_data"),
|
||||
rs.getBoolean("is_replay"));
|
||||
|
||||
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
|
||||
new ProcessorRecord(
|
||||
rs.getString("execution_id"), rs.getString("processor_id"),
|
||||
rs.getString("processor_type"),
|
||||
rs.getString("application_id"), rs.getString("route_id"),
|
||||
rs.getInt("depth"), rs.getString("parent_processor_id"),
|
||||
rs.getString("status"),
|
||||
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
|
||||
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
|
||||
rs.getString("error_message"), rs.getString("error_stacktrace"),
|
||||
rs.getString("input_body"), rs.getString("output_body"),
|
||||
rs.getString("input_headers"), rs.getString("output_headers"),
|
||||
rs.getString("attributes"),
|
||||
rs.getObject("loop_index") != null ? rs.getInt("loop_index") : null,
|
||||
rs.getObject("loop_size") != null ? rs.getInt("loop_size") : null,
|
||||
rs.getObject("split_index") != null ? rs.getInt("split_index") : null,
|
||||
rs.getObject("split_size") != null ? rs.getInt("split_size") : null,
|
||||
rs.getObject("multicast_index") != null ? rs.getInt("multicast_index") : null,
|
||||
rs.getString("resolved_endpoint_uri"),
|
||||
rs.getString("error_type"), rs.getString("error_category"),
|
||||
rs.getString("root_cause_type"), rs.getString("root_cause_message"),
|
||||
rs.getString("error_handler_type"), rs.getString("circuit_breaker_state"),
|
||||
rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null,
|
||||
null, null, null, null, null, null);
|
||||
|
||||
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
|
||||
Timestamp ts = rs.getTimestamp(column);
|
||||
return ts != null ? ts.toInstant() : null;
|
||||
}
|
||||
}
|
||||
@@ -1,55 +0,0 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.MetricsQueryStore;
|
||||
import com.cameleer3.server.core.storage.model.MetricTimeSeries;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
|
||||
public class PostgresMetricsQueryStore implements MetricsQueryStore {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public PostgresMetricsQueryStore(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<MetricTimeSeries.Bucket>> queryTimeSeries(
|
||||
String agentId, List<String> metricNames,
|
||||
Instant from, Instant to, int buckets) {
|
||||
|
||||
long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1);
|
||||
String intervalStr = intervalMs + " milliseconds";
|
||||
|
||||
Map<String, List<MetricTimeSeries.Bucket>> result = new LinkedHashMap<>();
|
||||
for (String name : metricNames) {
|
||||
result.put(name.trim(), new ArrayList<>());
|
||||
}
|
||||
|
||||
String sql = """
|
||||
SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket,
|
||||
metric_name,
|
||||
AVG(metric_value) AS avg_value
|
||||
FROM agent_metrics
|
||||
WHERE instance_id = ?
|
||||
AND collected_at >= ? AND collected_at < ?
|
||||
AND metric_name = ANY(?)
|
||||
GROUP BY bucket, metric_name
|
||||
ORDER BY bucket
|
||||
""";
|
||||
|
||||
String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new);
|
||||
jdbc.query(sql, rs -> {
|
||||
String metricName = rs.getString("metric_name");
|
||||
Instant bucket = rs.getTimestamp("bucket").toInstant();
|
||||
double value = rs.getDouble("avg_value");
|
||||
result.computeIfAbsent(metricName, k -> new ArrayList<>())
|
||||
.add(new MetricTimeSeries.Bucket(bucket, value));
|
||||
}, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray);
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -1,40 +0,0 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.MetricsStore;
|
||||
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
|
||||
public class PostgresMetricsStore implements MetricsStore {
|
||||
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public PostgresMetricsStore(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insertBatch(List<MetricsSnapshot> snapshots) {
|
||||
jdbc.batchUpdate("""
|
||||
INSERT INTO agent_metrics (instance_id, metric_name, metric_value, tags,
|
||||
collected_at, server_received_at)
|
||||
VALUES (?, ?, ?, ?::jsonb, ?, now())
|
||||
""",
|
||||
snapshots.stream().map(s -> new Object[]{
|
||||
s.instanceId(), s.metricName(), s.metricValue(),
|
||||
tagsToJson(s.tags()),
|
||||
Timestamp.from(s.collectedAt())
|
||||
}).toList());
|
||||
}
|
||||
|
||||
private String tagsToJson(java.util.Map<String, String> tags) {
|
||||
if (tags == null || tags.isEmpty()) return null;
|
||||
try { return MAPPER.writeValueAsString(tags); }
|
||||
catch (JsonProcessingException e) { return null; }
|
||||
}
|
||||
}
|
||||
@@ -1,430 +0,0 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.search.ExecutionStats;
|
||||
import com.cameleer3.server.core.search.StatsTimeseries;
|
||||
import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket;
|
||||
import com.cameleer3.server.core.search.TopError;
|
||||
import com.cameleer3.server.core.storage.StatsStore;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Repository
|
||||
@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "postgres")
|
||||
public class PostgresStatsStore implements StatsStore {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public PostgresStatsStore(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutionStats stats(Instant from, Instant to) {
|
||||
return queryStats("stats_1m_all", from, to, List.of());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutionStats statsForApp(Instant from, Instant to, String applicationId) {
|
||||
return queryStats("stats_1m_app", from, to, List.of(
|
||||
new Filter("application_id", applicationId)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds) {
|
||||
// Note: agentIds is accepted for interface compatibility but not filterable
|
||||
// on the continuous aggregate (it groups by route_id, not instance_id).
|
||||
// All agents for the same route contribute to the same aggregate.
|
||||
return queryStats("stats_1m_route", from, to, List.of(
|
||||
new Filter("route_id", routeId)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) {
|
||||
return queryStats("stats_1m_processor", from, to, List.of(
|
||||
new Filter("route_id", routeId),
|
||||
new Filter("processor_type", processorType)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
|
||||
return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String applicationId) {
|
||||
return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
|
||||
new Filter("application_id", applicationId)), true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
|
||||
String routeId, List<String> agentIds) {
|
||||
return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of(
|
||||
new Filter("route_id", routeId)), true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
|
||||
String routeId, String processorType) {
|
||||
// stats_1m_processor does NOT have running_count column
|
||||
return queryTimeseries("stats_1m_processor", from, to, bucketCount, List.of(
|
||||
new Filter("route_id", routeId),
|
||||
new Filter("processor_type", processorType)), false);
|
||||
}
|
||||
|
||||
private record Filter(String column, String value) {}
|
||||
|
||||
private ExecutionStats queryStats(String view, Instant from, Instant to, List<Filter> filters) {
|
||||
// running_count only exists on execution-level aggregates, not processor
|
||||
boolean hasRunning = !view.equals("stats_1m_processor");
|
||||
String runningCol = hasRunning ? "COALESCE(SUM(running_count), 0)" : "0";
|
||||
|
||||
String sql = "SELECT COALESCE(SUM(total_count), 0) AS total_count, " +
|
||||
"COALESCE(SUM(failed_count), 0) AS failed_count, " +
|
||||
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
|
||||
"COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
|
||||
runningCol + " AS active_count " +
|
||||
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
|
||||
|
||||
List<Object> params = new ArrayList<>();
|
||||
params.add(Timestamp.from(from));
|
||||
params.add(Timestamp.from(to));
|
||||
for (Filter f : filters) {
|
||||
sql += " AND " + f.column() + " = ?";
|
||||
params.add(f.value());
|
||||
}
|
||||
|
||||
long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0;
|
||||
var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
|
||||
rs.getLong("total_count"), rs.getLong("failed_count"),
|
||||
rs.getLong("avg_duration"), rs.getLong("p99_duration"),
|
||||
rs.getLong("active_count")
|
||||
}, params.toArray());
|
||||
if (!currentResult.isEmpty()) {
|
||||
long[] r = currentResult.get(0);
|
||||
totalCount = r[0]; failedCount = r[1]; avgDuration = r[2];
|
||||
p99Duration = r[3]; activeCount = r[4];
|
||||
}
|
||||
|
||||
// Previous period (shifted back 24h)
|
||||
Instant prevFrom = from.minus(Duration.ofHours(24));
|
||||
Instant prevTo = to.minus(Duration.ofHours(24));
|
||||
List<Object> prevParams = new ArrayList<>();
|
||||
prevParams.add(Timestamp.from(prevFrom));
|
||||
prevParams.add(Timestamp.from(prevTo));
|
||||
for (Filter f : filters) prevParams.add(f.value());
|
||||
String prevSql = sql; // same shape, different time params
|
||||
|
||||
long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0;
|
||||
var prevResult = jdbc.query(prevSql, (rs, rowNum) -> new long[]{
|
||||
rs.getLong("total_count"), rs.getLong("failed_count"),
|
||||
rs.getLong("avg_duration"), rs.getLong("p99_duration")
|
||||
}, prevParams.toArray());
|
||||
if (!prevResult.isEmpty()) {
|
||||
long[] r = prevResult.get(0);
|
||||
prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3];
|
||||
}
|
||||
|
||||
// Today total (from midnight UTC)
|
||||
Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
|
||||
List<Object> todayParams = new ArrayList<>();
|
||||
todayParams.add(Timestamp.from(todayStart));
|
||||
todayParams.add(Timestamp.from(Instant.now()));
|
||||
for (Filter f : filters) todayParams.add(f.value());
|
||||
String todaySql = sql;
|
||||
|
||||
long totalToday = 0;
|
||||
var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"),
|
||||
todayParams.toArray());
|
||||
if (!todayResult.isEmpty()) totalToday = todayResult.get(0);
|
||||
|
||||
return new ExecutionStats(
|
||||
totalCount, failedCount, avgDuration, p99Duration, activeCount,
|
||||
totalToday, prevTotal, prevFailed, prevAvg, prevP99);
|
||||
}
|
||||
|
||||
private StatsTimeseries queryTimeseries(String view, Instant from, Instant to,
|
||||
int bucketCount, List<Filter> filters,
|
||||
boolean hasRunningCount) {
|
||||
long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
|
||||
if (intervalSeconds < 60) intervalSeconds = 60;
|
||||
|
||||
String runningCol = hasRunningCount ? "COALESCE(SUM(running_count), 0)" : "0";
|
||||
|
||||
String sql = "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " +
|
||||
"COALESCE(SUM(total_count), 0) AS total_count, " +
|
||||
"COALESCE(SUM(failed_count), 0) AS failed_count, " +
|
||||
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
|
||||
"COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
|
||||
runningCol + " AS active_count " +
|
||||
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
|
||||
|
||||
List<Object> params = new ArrayList<>();
|
||||
params.add(intervalSeconds);
|
||||
params.add(Timestamp.from(from));
|
||||
params.add(Timestamp.from(to));
|
||||
for (Filter f : filters) {
|
||||
sql += " AND " + f.column() + " = ?";
|
||||
params.add(f.value());
|
||||
}
|
||||
sql += " GROUP BY period ORDER BY period";
|
||||
|
||||
List<TimeseriesBucket> buckets = jdbc.query(sql, (rs, rowNum) ->
|
||||
new TimeseriesBucket(
|
||||
rs.getTimestamp("period").toInstant(),
|
||||
rs.getLong("total_count"), rs.getLong("failed_count"),
|
||||
rs.getLong("avg_duration"), rs.getLong("p99_duration"),
|
||||
rs.getLong("active_count")
|
||||
), params.toArray());
|
||||
|
||||
return new StatsTimeseries(buckets);
|
||||
}
|
||||
|
||||
// ── Grouped timeseries ────────────────────────────────────────────────
|
||||
|
||||
@Override
|
||||
public Map<String, StatsTimeseries> timeseriesGroupedByApp(Instant from, Instant to, int bucketCount) {
|
||||
return queryGroupedTimeseries("stats_1m_app", "application_id", from, to,
|
||||
bucketCount, List.of());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, StatsTimeseries> timeseriesGroupedByRoute(Instant from, Instant to,
|
||||
int bucketCount, String applicationId) {
|
||||
return queryGroupedTimeseries("stats_1m_route", "route_id", from, to,
|
||||
bucketCount, List.of(new Filter("application_id", applicationId)));
|
||||
}
|
||||
|
||||
private Map<String, StatsTimeseries> queryGroupedTimeseries(
|
||||
String view, String groupCol, Instant from, Instant to,
|
||||
int bucketCount, List<Filter> filters) {
|
||||
|
||||
long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
|
||||
if (intervalSeconds < 60) intervalSeconds = 60;
|
||||
|
||||
String sql = "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " +
|
||||
groupCol + " AS group_key, " +
|
||||
"COALESCE(SUM(total_count), 0) AS total_count, " +
|
||||
"COALESCE(SUM(failed_count), 0) AS failed_count, " +
|
||||
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
|
||||
"COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
|
||||
"COALESCE(SUM(running_count), 0) AS active_count " +
|
||||
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
|
||||
|
||||
List<Object> params = new ArrayList<>();
|
||||
params.add(intervalSeconds);
|
||||
params.add(Timestamp.from(from));
|
||||
params.add(Timestamp.from(to));
|
||||
for (Filter f : filters) {
|
||||
sql += " AND " + f.column() + " = ?";
|
||||
params.add(f.value());
|
||||
}
|
||||
sql += " GROUP BY period, group_key ORDER BY period, group_key";
|
||||
|
||||
Map<String, List<TimeseriesBucket>> grouped = new LinkedHashMap<>();
|
||||
jdbc.query(sql, (rs) -> {
|
||||
String key = rs.getString("group_key");
|
||||
TimeseriesBucket bucket = new TimeseriesBucket(
|
||||
rs.getTimestamp("period").toInstant(),
|
||||
rs.getLong("total_count"), rs.getLong("failed_count"),
|
||||
rs.getLong("avg_duration"), rs.getLong("p99_duration"),
|
||||
rs.getLong("active_count"));
|
||||
grouped.computeIfAbsent(key, k -> new ArrayList<>()).add(bucket);
|
||||
}, params.toArray());
|
||||
|
||||
Map<String, StatsTimeseries> result = new LinkedHashMap<>();
|
||||
grouped.forEach((key, buckets) -> result.put(key, new StatsTimeseries(buckets)));
|
||||
return result;
|
||||
}
|
||||
|
||||
// ── SLA compliance ────────────────────────────────────────────────────
|
||||
|
||||
@Override
|
||||
public double slaCompliance(Instant from, Instant to, int thresholdMs,
|
||||
String applicationId, String routeId) {
|
||||
String sql = "SELECT " +
|
||||
"COUNT(*) FILTER (WHERE duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
|
||||
"COUNT(*) FILTER (WHERE status != 'RUNNING') AS total " +
|
||||
"FROM executions WHERE start_time >= ? AND start_time < ?";
|
||||
|
||||
List<Object> params = new ArrayList<>();
|
||||
params.add(thresholdMs);
|
||||
params.add(Timestamp.from(from));
|
||||
params.add(Timestamp.from(to));
|
||||
if (applicationId != null) {
|
||||
sql += " AND application_id = ?";
|
||||
params.add(applicationId);
|
||||
}
|
||||
if (routeId != null) {
|
||||
sql += " AND route_id = ?";
|
||||
params.add(routeId);
|
||||
}
|
||||
|
||||
return jdbc.query(sql, (rs, rowNum) -> {
|
||||
long total = rs.getLong("total");
|
||||
if (total == 0) return 1.0;
|
||||
return rs.getLong("compliant") * 100.0 / total;
|
||||
}, params.toArray()).stream().findFirst().orElse(1.0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, long[]> slaCountsByApp(Instant from, Instant to, int defaultThresholdMs) {
|
||||
String sql = "SELECT application_id, " +
|
||||
"COUNT(*) FILTER (WHERE duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
|
||||
"COUNT(*) FILTER (WHERE status != 'RUNNING') AS total " +
|
||||
"FROM executions WHERE start_time >= ? AND start_time < ? " +
|
||||
"GROUP BY application_id";
|
||||
|
||||
Map<String, long[]> result = new LinkedHashMap<>();
|
||||
jdbc.query(sql, (rs) -> {
|
||||
result.put(rs.getString("application_id"),
|
||||
new long[]{rs.getLong("compliant"), rs.getLong("total")});
|
||||
}, defaultThresholdMs, Timestamp.from(from), Timestamp.from(to));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, long[]> slaCountsByRoute(Instant from, Instant to,
|
||||
String applicationId, int thresholdMs) {
|
||||
String sql = "SELECT route_id, " +
|
||||
"COUNT(*) FILTER (WHERE duration_ms <= ? AND status != 'RUNNING') AS compliant, " +
|
||||
"COUNT(*) FILTER (WHERE status != 'RUNNING') AS total " +
|
||||
"FROM executions WHERE start_time >= ? AND start_time < ? " +
|
||||
"AND application_id = ? GROUP BY route_id";
|
||||
|
||||
Map<String, long[]> result = new LinkedHashMap<>();
|
||||
jdbc.query(sql, (rs) -> {
|
||||
result.put(rs.getString("route_id"),
|
||||
new long[]{rs.getLong("compliant"), rs.getLong("total")});
|
||||
}, thresholdMs, Timestamp.from(from), Timestamp.from(to), applicationId);
|
||||
return result;
|
||||
}
|
||||
|
||||
// ── Top errors ────────────────────────────────────────────────────────
|
||||
|
||||
@Override
|
||||
public List<TopError> topErrors(Instant from, Instant to, String applicationId,
|
||||
String routeId, int limit) {
|
||||
StringBuilder where = new StringBuilder(
|
||||
"status = 'FAILED' AND start_time >= ? AND start_time < ?");
|
||||
List<Object> params = new ArrayList<>();
|
||||
params.add(Timestamp.from(from));
|
||||
params.add(Timestamp.from(to));
|
||||
if (applicationId != null) {
|
||||
where.append(" AND application_id = ?");
|
||||
params.add(applicationId);
|
||||
}
|
||||
|
||||
String table;
|
||||
String groupId;
|
||||
if (routeId != null) {
|
||||
// L3: attribute errors to processors
|
||||
table = "processor_executions";
|
||||
groupId = "processor_id";
|
||||
where.append(" AND route_id = ?");
|
||||
params.add(routeId);
|
||||
} else {
|
||||
// L1/L2: attribute errors to routes
|
||||
table = "executions";
|
||||
groupId = "route_id";
|
||||
}
|
||||
|
||||
Instant fiveMinAgo = Instant.now().minus(5, ChronoUnit.MINUTES);
|
||||
Instant tenMinAgo = Instant.now().minus(10, ChronoUnit.MINUTES);
|
||||
|
||||
String sql = "WITH counted AS (" +
|
||||
" SELECT COALESCE(error_type, LEFT(error_message, 200)) AS error_key, " +
|
||||
" " + groupId + " AS group_id, " +
|
||||
" COUNT(*) AS cnt, MAX(start_time) AS last_seen " +
|
||||
" FROM " + table + " WHERE " + where +
|
||||
" GROUP BY error_key, group_id ORDER BY cnt DESC LIMIT ?" +
|
||||
"), velocity AS (" +
|
||||
" SELECT COALESCE(error_type, LEFT(error_message, 200)) AS error_key, " +
|
||||
" COUNT(*) FILTER (WHERE start_time >= ?) AS recent_5m, " +
|
||||
" COUNT(*) FILTER (WHERE start_time >= ? AND start_time < ?) AS prev_5m " +
|
||||
" FROM " + table + " WHERE " + where +
|
||||
" GROUP BY error_key" +
|
||||
") SELECT c.error_key, c.group_id, c.cnt, c.last_seen, " +
|
||||
" COALESCE(v.recent_5m, 0) / 5.0 AS velocity, " +
|
||||
" CASE " +
|
||||
" WHEN COALESCE(v.recent_5m, 0) > COALESCE(v.prev_5m, 0) * 1.2 THEN 'accelerating' " +
|
||||
" WHEN COALESCE(v.recent_5m, 0) < COALESCE(v.prev_5m, 0) * 0.8 THEN 'decelerating' " +
|
||||
" ELSE 'stable' END AS trend " +
|
||||
"FROM counted c LEFT JOIN velocity v ON c.error_key = v.error_key " +
|
||||
"ORDER BY c.cnt DESC";
|
||||
|
||||
// Build full params: counted-where params + limit + velocity timestamps + velocity-where params
|
||||
List<Object> fullParams = new ArrayList<>(params);
|
||||
fullParams.add(limit);
|
||||
fullParams.add(Timestamp.from(fiveMinAgo));
|
||||
fullParams.add(Timestamp.from(tenMinAgo));
|
||||
fullParams.add(Timestamp.from(fiveMinAgo));
|
||||
fullParams.addAll(params); // same where clause for velocity CTE
|
||||
|
||||
return jdbc.query(sql, (rs, rowNum) -> {
|
||||
String errorKey = rs.getString("error_key");
|
||||
String gid = rs.getString("group_id");
|
||||
return new TopError(
|
||||
errorKey,
|
||||
routeId != null ? routeId : gid, // routeId
|
||||
routeId != null ? gid : null, // processorId (only at L3)
|
||||
rs.getLong("cnt"),
|
||||
rs.getDouble("velocity"),
|
||||
rs.getString("trend"),
|
||||
rs.getTimestamp("last_seen").toInstant());
|
||||
}, fullParams.toArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int activeErrorTypes(Instant from, Instant to, String applicationId) {
|
||||
String sql = "SELECT COUNT(DISTINCT COALESCE(error_type, LEFT(error_message, 200))) " +
|
||||
"FROM executions WHERE status = 'FAILED' AND start_time >= ? AND start_time < ?";
|
||||
|
||||
List<Object> params = new ArrayList<>();
|
||||
params.add(Timestamp.from(from));
|
||||
params.add(Timestamp.from(to));
|
||||
if (applicationId != null) {
|
||||
sql += " AND application_id = ?";
|
||||
params.add(applicationId);
|
||||
}
|
||||
|
||||
Integer count = jdbc.queryForObject(sql, Integer.class, params.toArray());
|
||||
return count != null ? count : 0;
|
||||
}
|
||||
|
||||
// ── Punchcard ─────────────────────────────────────────────────────────
|
||||
|
||||
@Override
|
||||
public List<PunchcardCell> punchcard(Instant from, Instant to, String applicationId) {
|
||||
String view = applicationId != null ? "stats_1m_app" : "stats_1m_all";
|
||||
String sql = "SELECT EXTRACT(DOW FROM bucket) AS weekday, " +
|
||||
"EXTRACT(HOUR FROM bucket) AS hour, " +
|
||||
"COALESCE(SUM(total_count), 0) AS total_count, " +
|
||||
"COALESCE(SUM(failed_count), 0) AS failed_count " +
|
||||
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
|
||||
|
||||
List<Object> params = new ArrayList<>();
|
||||
params.add(Timestamp.from(from));
|
||||
params.add(Timestamp.from(to));
|
||||
if (applicationId != null) {
|
||||
sql += " AND application_id = ?";
|
||||
params.add(applicationId);
|
||||
}
|
||||
sql += " GROUP BY weekday, hour ORDER BY weekday, hour";
|
||||
|
||||
return jdbc.query(sql, (rs, rowNum) -> new PunchcardCell(
|
||||
rs.getInt("weekday"), rs.getInt("hour"),
|
||||
rs.getLong("total_count"), rs.getLong("failed_count")),
|
||||
params.toArray());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user