diff --git a/src/main/java/net/siegeln/cameleer/saas/vendor/InfrastructureService.java b/src/main/java/net/siegeln/cameleer/saas/vendor/InfrastructureService.java new file mode 100644 index 0000000..f8a58ec --- /dev/null +++ b/src/main/java/net/siegeln/cameleer/saas/vendor/InfrastructureService.java @@ -0,0 +1,280 @@ +package net.siegeln.cameleer.saas.vendor; + +import net.siegeln.cameleer.saas.provisioning.ProvisioningProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Service +public class InfrastructureService { + + private static final Logger log = LoggerFactory.getLogger(InfrastructureService.class); + + private static final String[] CH_TABLES = { + "executions", "processor_executions", "logs", "agent_events", "usage_events" + }; + + private final ProvisioningProperties props; + + public InfrastructureService(ProvisioningProperties props) { + this.props = props; + } + + // --- Response records --- + + public record PostgresOverview(String version, long databaseSizeBytes, int activeConnections) {} + + public record TenantPgStats(String slug, long schemaSizeBytes, int tableCount, long totalRows) {} + + public record TableStats(String tableName, long rowCount, long dataSizeBytes, long indexSizeBytes) {} + + public record ClickHouseOverview( + String version, + long uptimeSeconds, + long totalDiskBytes, + long totalUncompressedBytes, + double compressionRatio, + long totalRows, + int activeMerges + ) {} + + public record TenantChStats(String tenantId, long totalRows, Map rowsByTable) {} + + public record ChTableStats(String tableName, long rowCount) {} + + // --- PostgreSQL methods --- + + public PostgresOverview getPostgresOverview() { + try (Connection conn = pgConnection(); + Statement stmt = conn.createStatement()) { + + String version; + try (ResultSet rs = stmt.executeQuery("SELECT version()")) { + rs.next(); + version = rs.getString(1); + } + + long dbSize; + try (ResultSet rs = stmt.executeQuery("SELECT pg_database_size(current_database())")) { + rs.next(); + dbSize = rs.getLong(1); + } + + int activeConnections; + try (ResultSet rs = stmt.executeQuery( + "SELECT count(*) FROM pg_stat_activity WHERE datname = current_database()")) { + rs.next(); + activeConnections = rs.getInt(1); + } + + return new PostgresOverview(version, dbSize, activeConnections); + } catch (Exception e) { + log.error("Failed to get PostgreSQL overview: {}", e.getMessage(), e); + throw new RuntimeException("Failed to get PostgreSQL overview", e); + } + } + + public List getPostgresTenantStats() { + String sql = """ + SELECT + s.schema_name, + coalesce(sum(pg_total_relation_size(quote_ident(s.schema_name) || '.' || quote_ident(t.table_name))), 0) AS schema_size, + count(t.table_name) AS table_count, + coalesce(sum(st.n_live_tup), 0) AS total_rows + FROM information_schema.schemata s + LEFT JOIN information_schema.tables t + ON t.table_schema = s.schema_name AND t.table_type = 'BASE TABLE' + LEFT JOIN pg_stat_user_tables st + ON st.schemaname = s.schema_name AND st.relname = t.table_name + WHERE s.schema_name LIKE 'tenant_%' + GROUP BY s.schema_name + ORDER BY schema_size DESC + """; + + try (Connection conn = pgConnection(); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + + List result = new ArrayList<>(); + while (rs.next()) { + String schemaName = rs.getString("schema_name"); + String slug = schemaName.substring("tenant_".length()); + long schemaSize = rs.getLong("schema_size"); + int tableCount = rs.getInt("table_count"); + long totalRows = rs.getLong("total_rows"); + result.add(new TenantPgStats(slug, schemaSize, tableCount, totalRows)); + } + return result; + } catch (Exception e) { + log.error("Failed to get PostgreSQL tenant stats: {}", e.getMessage(), e); + throw new RuntimeException("Failed to get PostgreSQL tenant stats", e); + } + } + + public List getPostgresTenantDetail(String slug) { + String sql = """ + SELECT + st.relname AS table_name, + st.n_live_tup AS row_count, + pg_table_size(quote_ident(st.schemaname) || '.' || quote_ident(st.relname)) AS data_size, + pg_indexes_size(quote_ident(st.schemaname) || '.' || quote_ident(st.relname)) AS index_size + FROM pg_stat_user_tables st + WHERE st.schemaname = ? + ORDER BY data_size DESC + """; + + String schema = "tenant_" + slug; + try (Connection conn = pgConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + + ps.setString(1, schema); + try (ResultSet rs = ps.executeQuery()) { + List result = new ArrayList<>(); + while (rs.next()) { + result.add(new TableStats( + rs.getString("table_name"), + rs.getLong("row_count"), + rs.getLong("data_size"), + rs.getLong("index_size") + )); + } + return result; + } + } catch (Exception e) { + log.error("Failed to get PostgreSQL tenant detail for '{}': {}", slug, e.getMessage(), e); + throw new RuntimeException("Failed to get PostgreSQL tenant detail for: " + slug, e); + } + } + + // --- ClickHouse methods --- + + public ClickHouseOverview getClickHouseOverview() { + try (Connection conn = chConnection(); + Statement stmt = conn.createStatement()) { + + String version; + long uptimeSeconds; + try (ResultSet rs = stmt.executeQuery("SELECT version(), uptime()")) { + rs.next(); + version = rs.getString(1); + uptimeSeconds = rs.getLong(2); + } + + long totalDiskBytes; + long totalUncompressedBytes; + long totalRows; + try (ResultSet rs = stmt.executeQuery( + "SELECT sum(bytes_on_disk), sum(data_uncompressed_bytes), sum(rows) " + + "FROM system.parts WHERE database = currentDatabase() AND active")) { + rs.next(); + totalDiskBytes = rs.getLong(1); + totalUncompressedBytes = rs.getLong(2); + totalRows = rs.getLong(3); + } + + double compressionRatio = totalDiskBytes == 0 ? 0.0 + : BigDecimal.valueOf((double) totalUncompressedBytes / totalDiskBytes) + .setScale(2, RoundingMode.HALF_UP) + .doubleValue(); + + int activeMerges; + try (ResultSet rs = stmt.executeQuery( + "SELECT count() FROM system.merges WHERE database = currentDatabase()")) { + rs.next(); + activeMerges = rs.getInt(1); + } + + return new ClickHouseOverview(version, uptimeSeconds, totalDiskBytes, + totalUncompressedBytes, compressionRatio, totalRows, activeMerges); + } catch (Exception e) { + log.error("Failed to get ClickHouse overview: {}", e.getMessage(), e); + throw new RuntimeException("Failed to get ClickHouse overview", e); + } + } + + public List getClickHouseTenantStats() { + // tenantId -> tableName -> count + Map> aggregated = new HashMap<>(); + + try (Connection conn = chConnection(); + Statement stmt = conn.createStatement()) { + + for (String table : CH_TABLES) { + try (ResultSet rs = stmt.executeQuery( + "SELECT tenant_id, count() AS cnt FROM " + table + " GROUP BY tenant_id")) { + while (rs.next()) { + String tenantId = rs.getString("tenant_id"); + long cnt = rs.getLong("cnt"); + aggregated + .computeIfAbsent(tenantId, k -> new HashMap<>()) + .put(table, cnt); + } + } catch (Exception e) { + log.error("Failed to query ClickHouse table '{}' for tenant stats: {}", table, e.getMessage(), e); + } + } + } catch (Exception e) { + log.error("Failed to get ClickHouse tenant stats: {}", e.getMessage(), e); + throw new RuntimeException("Failed to get ClickHouse tenant stats", e); + } + + List result = new ArrayList<>(); + for (Map.Entry> entry : aggregated.entrySet()) { + String tenantId = entry.getKey(); + Map rowsByTable = entry.getValue(); + long totalRows = rowsByTable.values().stream().mapToLong(Long::longValue).sum(); + result.add(new TenantChStats(tenantId, totalRows, rowsByTable)); + } + result.sort(Comparator.comparingLong(TenantChStats::totalRows).reversed()); + return result; + } + + public List getClickHouseTenantDetail(String tenantId) { + List result = new ArrayList<>(); + + try (Connection conn = chConnection()) { + for (String table : CH_TABLES) { + String sql = "SELECT count() AS cnt FROM " + table + " WHERE tenant_id = ?"; + try (PreparedStatement ps = conn.prepareStatement(sql)) { + ps.setString(1, tenantId); + try (ResultSet rs = ps.executeQuery()) { + rs.next(); + result.add(new ChTableStats(table, rs.getLong("cnt"))); + } + } catch (Exception e) { + log.error("Failed to query ClickHouse table '{}' for tenant '{}': {}", + table, tenantId, e.getMessage(), e); + } + } + } catch (Exception e) { + log.error("Failed to get ClickHouse tenant detail for '{}': {}", tenantId, e.getMessage(), e); + throw new RuntimeException("Failed to get ClickHouse tenant detail for: " + tenantId, e); + } + + return result; + } + + // --- Private helpers --- + + private Connection pgConnection() throws SQLException { + return DriverManager.getConnection(props.datasourceUrl(), "cameleer", "cameleer_dev"); + } + + private Connection chConnection() throws SQLException { + return DriverManager.getConnection(props.clickhouseUrl()); + } +}