diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java new file mode 100644 index 00000000..0c7dbbf8 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java @@ -0,0 +1,128 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.common.graph.RouteGraph; +import com.cameleer3.server.core.ingestion.TaggedDiagram; +import com.cameleer3.server.core.storage.DiagramStore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HexFormat; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * PostgreSQL implementation of {@link DiagramStore}. + *

+ * Stores route graphs as JSON with SHA-256 content-hash deduplication. + * Uses {@code ON CONFLICT (content_hash) DO NOTHING} for idempotent inserts. + */ +@Repository +public class PostgresDiagramStore implements DiagramStore { + + private static final Logger log = LoggerFactory.getLogger(PostgresDiagramStore.class); + + private static final String INSERT_SQL = """ + INSERT INTO route_diagrams (content_hash, route_id, agent_id, definition) + VALUES (?, ?, ?, ?::jsonb) + ON CONFLICT (content_hash) DO NOTHING + """; + + private static final String SELECT_BY_HASH = """ + SELECT definition FROM route_diagrams WHERE content_hash = ? LIMIT 1 + """; + + private static final String SELECT_HASH_FOR_ROUTE = """ + SELECT content_hash FROM route_diagrams + WHERE route_id = ? AND agent_id = ? + ORDER BY created_at DESC LIMIT 1 + """; + + private final JdbcTemplate jdbcTemplate; + private final ObjectMapper objectMapper; + + public PostgresDiagramStore(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + this.objectMapper = new ObjectMapper(); + this.objectMapper.registerModule(new JavaTimeModule()); + } + + @Override + public void store(TaggedDiagram diagram) { + try { + RouteGraph graph = diagram.graph(); + String agentId = diagram.agentId() != null ? diagram.agentId() : ""; + String json = objectMapper.writeValueAsString(graph); + String contentHash = sha256Hex(json); + String routeId = graph.getRouteId() != null ? graph.getRouteId() : ""; + + jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, json); + log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize RouteGraph to JSON", e); + } + } + + @Override + public Optional findByContentHash(String contentHash) { + List> rows = jdbcTemplate.queryForList(SELECT_BY_HASH, contentHash); + if (rows.isEmpty()) { + return Optional.empty(); + } + String json = (String) rows.get(0).get("definition"); + try { + return Optional.of(objectMapper.readValue(json, RouteGraph.class)); + } catch (JsonProcessingException e) { + log.error("Failed to deserialize RouteGraph from PostgreSQL", e); + return Optional.empty(); + } + } + + @Override + public Optional findContentHashForRoute(String routeId, String agentId) { + List> rows = jdbcTemplate.queryForList(SELECT_HASH_FOR_ROUTE, routeId, agentId); + if (rows.isEmpty()) { + return Optional.empty(); + } + return Optional.of((String) rows.get(0).get("content_hash")); + } + + @Override + public Optional findContentHashForRouteByAgents(String routeId, List agentIds) { + if (agentIds == null || agentIds.isEmpty()) { + return Optional.empty(); + } + String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?")); + String sql = "SELECT content_hash FROM route_diagrams " + + "WHERE route_id = ? AND agent_id IN (" + placeholders + ") " + + "ORDER BY created_at DESC LIMIT 1"; + var params = new ArrayList(); + params.add(routeId); + params.addAll(agentIds); + List> rows = jdbcTemplate.queryForList(sql, params.toArray()); + if (rows.isEmpty()) { + return Optional.empty(); + } + return Optional.of((String) rows.get(0).get("content_hash")); + } + + static String sha256Hex(String input) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8)); + return HexFormat.of().formatHex(hash); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("SHA-256 not available", e); + } + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java new file mode 100644 index 00000000..8b8fed63 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java @@ -0,0 +1,42 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsStore; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; +import java.util.List; + +@Repository +public class PostgresMetricsStore implements MetricsStore { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private final JdbcTemplate jdbc; + + public PostgresMetricsStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void insertBatch(List snapshots) { + jdbc.batchUpdate(""" + INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags, + collected_at, server_received_at) + VALUES (?, ?, ?, ?::jsonb, ?, now()) + """, + snapshots.stream().map(s -> new Object[]{ + s.agentId(), s.metricName(), s.metricValue(), + tagsToJson(s.tags()), + Timestamp.from(s.collectedAt()) + }).toList()); + } + + private String tagsToJson(java.util.Map tags) { + if (tags == null || tags.isEmpty()) return null; + try { return MAPPER.writeValueAsString(tags); } + catch (JsonProcessingException e) { return null; } + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresOidcConfigRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresOidcConfigRepository.java new file mode 100644 index 00000000..6da18993 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresOidcConfigRepository.java @@ -0,0 +1,59 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.security.OidcConfig; +import com.cameleer3.server.core.security.OidcConfigRepository; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Array; +import java.util.List; +import java.util.Optional; + +@Repository +public class PostgresOidcConfigRepository implements OidcConfigRepository { + + private final JdbcTemplate jdbc; + + public PostgresOidcConfigRepository(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Optional find() { + var results = jdbc.query( + "SELECT * FROM oidc_config WHERE config_id = 'default'", + (rs, rowNum) -> { + Array arr = rs.getArray("default_roles"); + String[] roles = arr != null ? (String[]) arr.getArray() : new String[0]; + return new OidcConfig( + rs.getBoolean("enabled"), rs.getString("issuer_uri"), + rs.getString("client_id"), rs.getString("client_secret"), + rs.getString("roles_claim"), List.of(roles), + rs.getBoolean("auto_signup"), rs.getString("display_name_claim")); + }); + return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); + } + + @Override + public void save(OidcConfig config) { + jdbc.update(""" + INSERT INTO oidc_config (config_id, enabled, issuer_uri, client_id, client_secret, + roles_claim, default_roles, auto_signup, display_name_claim, updated_at) + VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, now()) + ON CONFLICT (config_id) DO UPDATE SET + enabled = EXCLUDED.enabled, issuer_uri = EXCLUDED.issuer_uri, + client_id = EXCLUDED.client_id, client_secret = EXCLUDED.client_secret, + roles_claim = EXCLUDED.roles_claim, default_roles = EXCLUDED.default_roles, + auto_signup = EXCLUDED.auto_signup, display_name_claim = EXCLUDED.display_name_claim, + updated_at = now() + """, + config.enabled(), config.issuerUri(), config.clientId(), config.clientSecret(), + config.rolesClaim(), config.defaultRoles().toArray(new String[0]), + config.autoSignup(), config.displayNameClaim()); + } + + @Override + public void delete() { + jdbc.update("DELETE FROM oidc_config WHERE config_id = 'default'"); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java new file mode 100644 index 00000000..f5867fec --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java @@ -0,0 +1,69 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.security.UserInfo; +import com.cameleer3.server.core.security.UserRepository; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Array; +import java.sql.Timestamp; +import java.util.List; +import java.util.Optional; + +@Repository +public class PostgresUserRepository implements UserRepository { + + private final JdbcTemplate jdbc; + + public PostgresUserRepository(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Optional findById(String userId) { + var results = jdbc.query( + "SELECT * FROM users WHERE user_id = ?", + (rs, rowNum) -> mapUser(rs), userId); + return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); + } + + @Override + public List findAll() { + return jdbc.query("SELECT * FROM users ORDER BY user_id", + (rs, rowNum) -> mapUser(rs)); + } + + @Override + public void upsert(UserInfo user) { + jdbc.update(""" + INSERT INTO users (user_id, provider, email, display_name, roles, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, now(), now()) + ON CONFLICT (user_id) DO UPDATE SET + provider = EXCLUDED.provider, email = EXCLUDED.email, + display_name = EXCLUDED.display_name, roles = EXCLUDED.roles, + updated_at = now() + """, + user.userId(), user.provider(), user.email(), user.displayName(), + user.roles().toArray(new String[0])); + } + + @Override + public void updateRoles(String userId, List roles) { + jdbc.update("UPDATE users SET roles = ?, updated_at = now() WHERE user_id = ?", + roles.toArray(new String[0]), userId); + } + + @Override + public void delete(String userId) { + jdbc.update("DELETE FROM users WHERE user_id = ?", userId); + } + + private UserInfo mapUser(java.sql.ResultSet rs) throws java.sql.SQLException { + Array rolesArray = rs.getArray("roles"); + String[] roles = rolesArray != null ? (String[]) rolesArray.getArray() : new String[0]; + return new UserInfo( + rs.getString("user_id"), rs.getString("provider"), + rs.getString("email"), rs.getString("display_name"), + List.of(roles)); + } +}