feat: add InfrastructureService with PG and CH queries
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
280
src/main/java/net/siegeln/cameleer/saas/vendor/InfrastructureService.java
vendored
Normal file
280
src/main/java/net/siegeln/cameleer/saas/vendor/InfrastructureService.java
vendored
Normal file
@@ -0,0 +1,280 @@
|
||||
package net.siegeln.cameleer.saas.vendor;
|
||||
|
||||
import net.siegeln.cameleer.saas.provisioning.ProvisioningProperties;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Service
|
||||
public class InfrastructureService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(InfrastructureService.class);
|
||||
|
||||
private static final String[] CH_TABLES = {
|
||||
"executions", "processor_executions", "logs", "agent_events", "usage_events"
|
||||
};
|
||||
|
||||
private final ProvisioningProperties props;
|
||||
|
||||
public InfrastructureService(ProvisioningProperties props) {
|
||||
this.props = props;
|
||||
}
|
||||
|
||||
// --- Response records ---
|
||||
|
||||
public record PostgresOverview(String version, long databaseSizeBytes, int activeConnections) {}
|
||||
|
||||
public record TenantPgStats(String slug, long schemaSizeBytes, int tableCount, long totalRows) {}
|
||||
|
||||
public record TableStats(String tableName, long rowCount, long dataSizeBytes, long indexSizeBytes) {}
|
||||
|
||||
public record ClickHouseOverview(
|
||||
String version,
|
||||
long uptimeSeconds,
|
||||
long totalDiskBytes,
|
||||
long totalUncompressedBytes,
|
||||
double compressionRatio,
|
||||
long totalRows,
|
||||
int activeMerges
|
||||
) {}
|
||||
|
||||
public record TenantChStats(String tenantId, long totalRows, Map<String, Long> rowsByTable) {}
|
||||
|
||||
public record ChTableStats(String tableName, long rowCount) {}
|
||||
|
||||
// --- PostgreSQL methods ---
|
||||
|
||||
public PostgresOverview getPostgresOverview() {
|
||||
try (Connection conn = pgConnection();
|
||||
Statement stmt = conn.createStatement()) {
|
||||
|
||||
String version;
|
||||
try (ResultSet rs = stmt.executeQuery("SELECT version()")) {
|
||||
rs.next();
|
||||
version = rs.getString(1);
|
||||
}
|
||||
|
||||
long dbSize;
|
||||
try (ResultSet rs = stmt.executeQuery("SELECT pg_database_size(current_database())")) {
|
||||
rs.next();
|
||||
dbSize = rs.getLong(1);
|
||||
}
|
||||
|
||||
int activeConnections;
|
||||
try (ResultSet rs = stmt.executeQuery(
|
||||
"SELECT count(*) FROM pg_stat_activity WHERE datname = current_database()")) {
|
||||
rs.next();
|
||||
activeConnections = rs.getInt(1);
|
||||
}
|
||||
|
||||
return new PostgresOverview(version, dbSize, activeConnections);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to get PostgreSQL overview: {}", e.getMessage(), e);
|
||||
throw new RuntimeException("Failed to get PostgreSQL overview", e);
|
||||
}
|
||||
}
|
||||
|
||||
public List<TenantPgStats> getPostgresTenantStats() {
|
||||
String sql = """
|
||||
SELECT
|
||||
s.schema_name,
|
||||
coalesce(sum(pg_total_relation_size(quote_ident(s.schema_name) || '.' || quote_ident(t.table_name))), 0) AS schema_size,
|
||||
count(t.table_name) AS table_count,
|
||||
coalesce(sum(st.n_live_tup), 0) AS total_rows
|
||||
FROM information_schema.schemata s
|
||||
LEFT JOIN information_schema.tables t
|
||||
ON t.table_schema = s.schema_name AND t.table_type = 'BASE TABLE'
|
||||
LEFT JOIN pg_stat_user_tables st
|
||||
ON st.schemaname = s.schema_name AND st.relname = t.table_name
|
||||
WHERE s.schema_name LIKE 'tenant_%'
|
||||
GROUP BY s.schema_name
|
||||
ORDER BY schema_size DESC
|
||||
""";
|
||||
|
||||
try (Connection conn = pgConnection();
|
||||
Statement stmt = conn.createStatement();
|
||||
ResultSet rs = stmt.executeQuery(sql)) {
|
||||
|
||||
List<TenantPgStats> result = new ArrayList<>();
|
||||
while (rs.next()) {
|
||||
String schemaName = rs.getString("schema_name");
|
||||
String slug = schemaName.substring("tenant_".length());
|
||||
long schemaSize = rs.getLong("schema_size");
|
||||
int tableCount = rs.getInt("table_count");
|
||||
long totalRows = rs.getLong("total_rows");
|
||||
result.add(new TenantPgStats(slug, schemaSize, tableCount, totalRows));
|
||||
}
|
||||
return result;
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to get PostgreSQL tenant stats: {}", e.getMessage(), e);
|
||||
throw new RuntimeException("Failed to get PostgreSQL tenant stats", e);
|
||||
}
|
||||
}
|
||||
|
||||
public List<TableStats> getPostgresTenantDetail(String slug) {
|
||||
String sql = """
|
||||
SELECT
|
||||
st.relname AS table_name,
|
||||
st.n_live_tup AS row_count,
|
||||
pg_table_size(quote_ident(st.schemaname) || '.' || quote_ident(st.relname)) AS data_size,
|
||||
pg_indexes_size(quote_ident(st.schemaname) || '.' || quote_ident(st.relname)) AS index_size
|
||||
FROM pg_stat_user_tables st
|
||||
WHERE st.schemaname = ?
|
||||
ORDER BY data_size DESC
|
||||
""";
|
||||
|
||||
String schema = "tenant_" + slug;
|
||||
try (Connection conn = pgConnection();
|
||||
PreparedStatement ps = conn.prepareStatement(sql)) {
|
||||
|
||||
ps.setString(1, schema);
|
||||
try (ResultSet rs = ps.executeQuery()) {
|
||||
List<TableStats> result = new ArrayList<>();
|
||||
while (rs.next()) {
|
||||
result.add(new TableStats(
|
||||
rs.getString("table_name"),
|
||||
rs.getLong("row_count"),
|
||||
rs.getLong("data_size"),
|
||||
rs.getLong("index_size")
|
||||
));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to get PostgreSQL tenant detail for '{}': {}", slug, e.getMessage(), e);
|
||||
throw new RuntimeException("Failed to get PostgreSQL tenant detail for: " + slug, e);
|
||||
}
|
||||
}
|
||||
|
||||
// --- ClickHouse methods ---
|
||||
|
||||
public ClickHouseOverview getClickHouseOverview() {
|
||||
try (Connection conn = chConnection();
|
||||
Statement stmt = conn.createStatement()) {
|
||||
|
||||
String version;
|
||||
long uptimeSeconds;
|
||||
try (ResultSet rs = stmt.executeQuery("SELECT version(), uptime()")) {
|
||||
rs.next();
|
||||
version = rs.getString(1);
|
||||
uptimeSeconds = rs.getLong(2);
|
||||
}
|
||||
|
||||
long totalDiskBytes;
|
||||
long totalUncompressedBytes;
|
||||
long totalRows;
|
||||
try (ResultSet rs = stmt.executeQuery(
|
||||
"SELECT sum(bytes_on_disk), sum(data_uncompressed_bytes), sum(rows) " +
|
||||
"FROM system.parts WHERE database = currentDatabase() AND active")) {
|
||||
rs.next();
|
||||
totalDiskBytes = rs.getLong(1);
|
||||
totalUncompressedBytes = rs.getLong(2);
|
||||
totalRows = rs.getLong(3);
|
||||
}
|
||||
|
||||
double compressionRatio = totalDiskBytes == 0 ? 0.0
|
||||
: BigDecimal.valueOf((double) totalUncompressedBytes / totalDiskBytes)
|
||||
.setScale(2, RoundingMode.HALF_UP)
|
||||
.doubleValue();
|
||||
|
||||
int activeMerges;
|
||||
try (ResultSet rs = stmt.executeQuery(
|
||||
"SELECT count() FROM system.merges WHERE database = currentDatabase()")) {
|
||||
rs.next();
|
||||
activeMerges = rs.getInt(1);
|
||||
}
|
||||
|
||||
return new ClickHouseOverview(version, uptimeSeconds, totalDiskBytes,
|
||||
totalUncompressedBytes, compressionRatio, totalRows, activeMerges);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to get ClickHouse overview: {}", e.getMessage(), e);
|
||||
throw new RuntimeException("Failed to get ClickHouse overview", e);
|
||||
}
|
||||
}
|
||||
|
||||
public List<TenantChStats> getClickHouseTenantStats() {
|
||||
// tenantId -> tableName -> count
|
||||
Map<String, Map<String, Long>> aggregated = new HashMap<>();
|
||||
|
||||
try (Connection conn = chConnection();
|
||||
Statement stmt = conn.createStatement()) {
|
||||
|
||||
for (String table : CH_TABLES) {
|
||||
try (ResultSet rs = stmt.executeQuery(
|
||||
"SELECT tenant_id, count() AS cnt FROM " + table + " GROUP BY tenant_id")) {
|
||||
while (rs.next()) {
|
||||
String tenantId = rs.getString("tenant_id");
|
||||
long cnt = rs.getLong("cnt");
|
||||
aggregated
|
||||
.computeIfAbsent(tenantId, k -> new HashMap<>())
|
||||
.put(table, cnt);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to query ClickHouse table '{}' for tenant stats: {}", table, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to get ClickHouse tenant stats: {}", e.getMessage(), e);
|
||||
throw new RuntimeException("Failed to get ClickHouse tenant stats", e);
|
||||
}
|
||||
|
||||
List<TenantChStats> result = new ArrayList<>();
|
||||
for (Map.Entry<String, Map<String, Long>> entry : aggregated.entrySet()) {
|
||||
String tenantId = entry.getKey();
|
||||
Map<String, Long> rowsByTable = entry.getValue();
|
||||
long totalRows = rowsByTable.values().stream().mapToLong(Long::longValue).sum();
|
||||
result.add(new TenantChStats(tenantId, totalRows, rowsByTable));
|
||||
}
|
||||
result.sort(Comparator.comparingLong(TenantChStats::totalRows).reversed());
|
||||
return result;
|
||||
}
|
||||
|
||||
public List<ChTableStats> getClickHouseTenantDetail(String tenantId) {
|
||||
List<ChTableStats> result = new ArrayList<>();
|
||||
|
||||
try (Connection conn = chConnection()) {
|
||||
for (String table : CH_TABLES) {
|
||||
String sql = "SELECT count() AS cnt FROM " + table + " WHERE tenant_id = ?";
|
||||
try (PreparedStatement ps = conn.prepareStatement(sql)) {
|
||||
ps.setString(1, tenantId);
|
||||
try (ResultSet rs = ps.executeQuery()) {
|
||||
rs.next();
|
||||
result.add(new ChTableStats(table, rs.getLong("cnt")));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to query ClickHouse table '{}' for tenant '{}': {}",
|
||||
table, tenantId, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to get ClickHouse tenant detail for '{}': {}", tenantId, e.getMessage(), e);
|
||||
throw new RuntimeException("Failed to get ClickHouse tenant detail for: " + tenantId, e);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// --- Private helpers ---
|
||||
|
||||
private Connection pgConnection() throws SQLException {
|
||||
return DriverManager.getConnection(props.datasourceUrl(), "cameleer", "cameleer_dev");
|
||||
}
|
||||
|
||||
private Connection chConnection() throws SQLException {
|
||||
return DriverManager.getConnection(props.clickhouseUrl());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user