feat: implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,128 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.common.graph.RouteGraph;
|
||||
import com.cameleer3.server.core.ingestion.TaggedDiagram;
|
||||
import com.cameleer3.server.core.storage.DiagramStore;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HexFormat;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* PostgreSQL implementation of {@link DiagramStore}.
|
||||
* <p>
|
||||
* Stores route graphs as JSON with SHA-256 content-hash deduplication.
|
||||
* Uses {@code ON CONFLICT (content_hash) DO NOTHING} for idempotent inserts.
|
||||
*/
|
||||
@Repository
|
||||
public class PostgresDiagramStore implements DiagramStore {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PostgresDiagramStore.class);
|
||||
|
||||
private static final String INSERT_SQL = """
|
||||
INSERT INTO route_diagrams (content_hash, route_id, agent_id, definition)
|
||||
VALUES (?, ?, ?, ?::jsonb)
|
||||
ON CONFLICT (content_hash) DO NOTHING
|
||||
""";
|
||||
|
||||
private static final String SELECT_BY_HASH = """
|
||||
SELECT definition FROM route_diagrams WHERE content_hash = ? LIMIT 1
|
||||
""";
|
||||
|
||||
private static final String SELECT_HASH_FOR_ROUTE = """
|
||||
SELECT content_hash FROM route_diagrams
|
||||
WHERE route_id = ? AND agent_id = ?
|
||||
ORDER BY created_at DESC LIMIT 1
|
||||
""";
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public PostgresDiagramStore(JdbcTemplate jdbcTemplate) {
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void store(TaggedDiagram diagram) {
|
||||
try {
|
||||
RouteGraph graph = diagram.graph();
|
||||
String agentId = diagram.agentId() != null ? diagram.agentId() : "";
|
||||
String json = objectMapper.writeValueAsString(graph);
|
||||
String contentHash = sha256Hex(json);
|
||||
String routeId = graph.getRouteId() != null ? graph.getRouteId() : "";
|
||||
|
||||
jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, json);
|
||||
log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException("Failed to serialize RouteGraph to JSON", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<RouteGraph> findByContentHash(String contentHash) {
|
||||
List<Map<String, Object>> rows = jdbcTemplate.queryForList(SELECT_BY_HASH, contentHash);
|
||||
if (rows.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
String json = (String) rows.get(0).get("definition");
|
||||
try {
|
||||
return Optional.of(objectMapper.readValue(json, RouteGraph.class));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Failed to deserialize RouteGraph from PostgreSQL", e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> findContentHashForRoute(String routeId, String agentId) {
|
||||
List<Map<String, Object>> rows = jdbcTemplate.queryForList(SELECT_HASH_FOR_ROUTE, routeId, agentId);
|
||||
if (rows.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of((String) rows.get(0).get("content_hash"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds) {
|
||||
if (agentIds == null || agentIds.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
String placeholders = String.join(", ", Collections.nCopies(agentIds.size(), "?"));
|
||||
String sql = "SELECT content_hash FROM route_diagrams " +
|
||||
"WHERE route_id = ? AND agent_id IN (" + placeholders + ") " +
|
||||
"ORDER BY created_at DESC LIMIT 1";
|
||||
var params = new ArrayList<Object>();
|
||||
params.add(routeId);
|
||||
params.addAll(agentIds);
|
||||
List<Map<String, Object>> rows = jdbcTemplate.queryForList(sql, params.toArray());
|
||||
if (rows.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of((String) rows.get(0).get("content_hash"));
|
||||
}
|
||||
|
||||
static String sha256Hex(String input) {
|
||||
try {
|
||||
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||
byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
|
||||
return HexFormat.of().formatHex(hash);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException("SHA-256 not available", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.storage.MetricsStore;
|
||||
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
|
||||
@Repository
|
||||
public class PostgresMetricsStore implements MetricsStore {
|
||||
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public PostgresMetricsStore(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insertBatch(List<MetricsSnapshot> snapshots) {
|
||||
jdbc.batchUpdate("""
|
||||
INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags,
|
||||
collected_at, server_received_at)
|
||||
VALUES (?, ?, ?, ?::jsonb, ?, now())
|
||||
""",
|
||||
snapshots.stream().map(s -> new Object[]{
|
||||
s.agentId(), s.metricName(), s.metricValue(),
|
||||
tagsToJson(s.tags()),
|
||||
Timestamp.from(s.collectedAt())
|
||||
}).toList());
|
||||
}
|
||||
|
||||
private String tagsToJson(java.util.Map<String, String> tags) {
|
||||
if (tags == null || tags.isEmpty()) return null;
|
||||
try { return MAPPER.writeValueAsString(tags); }
|
||||
catch (JsonProcessingException e) { return null; }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.security.OidcConfig;
|
||||
import com.cameleer3.server.core.security.OidcConfigRepository;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.sql.Array;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Repository
|
||||
public class PostgresOidcConfigRepository implements OidcConfigRepository {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public PostgresOidcConfigRepository(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<OidcConfig> find() {
|
||||
var results = jdbc.query(
|
||||
"SELECT * FROM oidc_config WHERE config_id = 'default'",
|
||||
(rs, rowNum) -> {
|
||||
Array arr = rs.getArray("default_roles");
|
||||
String[] roles = arr != null ? (String[]) arr.getArray() : new String[0];
|
||||
return new OidcConfig(
|
||||
rs.getBoolean("enabled"), rs.getString("issuer_uri"),
|
||||
rs.getString("client_id"), rs.getString("client_secret"),
|
||||
rs.getString("roles_claim"), List.of(roles),
|
||||
rs.getBoolean("auto_signup"), rs.getString("display_name_claim"));
|
||||
});
|
||||
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void save(OidcConfig config) {
|
||||
jdbc.update("""
|
||||
INSERT INTO oidc_config (config_id, enabled, issuer_uri, client_id, client_secret,
|
||||
roles_claim, default_roles, auto_signup, display_name_claim, updated_at)
|
||||
VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, now())
|
||||
ON CONFLICT (config_id) DO UPDATE SET
|
||||
enabled = EXCLUDED.enabled, issuer_uri = EXCLUDED.issuer_uri,
|
||||
client_id = EXCLUDED.client_id, client_secret = EXCLUDED.client_secret,
|
||||
roles_claim = EXCLUDED.roles_claim, default_roles = EXCLUDED.default_roles,
|
||||
auto_signup = EXCLUDED.auto_signup, display_name_claim = EXCLUDED.display_name_claim,
|
||||
updated_at = now()
|
||||
""",
|
||||
config.enabled(), config.issuerUri(), config.clientId(), config.clientSecret(),
|
||||
config.rolesClaim(), config.defaultRoles().toArray(new String[0]),
|
||||
config.autoSignup(), config.displayNameClaim());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete() {
|
||||
jdbc.update("DELETE FROM oidc_config WHERE config_id = 'default'");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,69 @@
|
||||
package com.cameleer3.server.app.storage;
|
||||
|
||||
import com.cameleer3.server.core.security.UserInfo;
|
||||
import com.cameleer3.server.core.security.UserRepository;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.sql.Array;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Repository
|
||||
public class PostgresUserRepository implements UserRepository {
|
||||
|
||||
private final JdbcTemplate jdbc;
|
||||
|
||||
public PostgresUserRepository(JdbcTemplate jdbc) {
|
||||
this.jdbc = jdbc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<UserInfo> findById(String userId) {
|
||||
var results = jdbc.query(
|
||||
"SELECT * FROM users WHERE user_id = ?",
|
||||
(rs, rowNum) -> mapUser(rs), userId);
|
||||
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<UserInfo> findAll() {
|
||||
return jdbc.query("SELECT * FROM users ORDER BY user_id",
|
||||
(rs, rowNum) -> mapUser(rs));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void upsert(UserInfo user) {
|
||||
jdbc.update("""
|
||||
INSERT INTO users (user_id, provider, email, display_name, roles, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, now(), now())
|
||||
ON CONFLICT (user_id) DO UPDATE SET
|
||||
provider = EXCLUDED.provider, email = EXCLUDED.email,
|
||||
display_name = EXCLUDED.display_name, roles = EXCLUDED.roles,
|
||||
updated_at = now()
|
||||
""",
|
||||
user.userId(), user.provider(), user.email(), user.displayName(),
|
||||
user.roles().toArray(new String[0]));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateRoles(String userId, List<String> roles) {
|
||||
jdbc.update("UPDATE users SET roles = ?, updated_at = now() WHERE user_id = ?",
|
||||
roles.toArray(new String[0]), userId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(String userId) {
|
||||
jdbc.update("DELETE FROM users WHERE user_id = ?", userId);
|
||||
}
|
||||
|
||||
private UserInfo mapUser(java.sql.ResultSet rs) throws java.sql.SQLException {
|
||||
Array rolesArray = rs.getArray("roles");
|
||||
String[] roles = rolesArray != null ? (String[]) rolesArray.getArray() : new String[0];
|
||||
return new UserInfo(
|
||||
rs.getString("user_id"), rs.getString("provider"),
|
||||
rs.getString("email"), rs.getString("display_name"),
|
||||
List.of(roles));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user