# Storage Layer Refactor Implementation Plan > **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch as the storage and search backends. **Architecture:** PostgreSQL/TimescaleDB is the source of truth for all data and analytics (continuous aggregates). OpenSearch is an async search index for full-text/wildcard queries. Core module defines interfaces; app module provides implementations. **Tech Stack:** Java 17, Spring Boot 3.4.3, PostgreSQL 16 + TimescaleDB, OpenSearch 2.x, Flyway, Testcontainers, OpenSearch Java Client **Spec:** `docs/superpowers/specs/2026-03-16-storage-layer-design.md` --- ## File Structure ### New files **Core module** (`cameleer-server-core/src/main/java/com/cameleer/server/core/`): - `storage/ExecutionStore.java` — new interface replacing ExecutionRepository - `storage/StatsStore.java` — new interface for stats from continuous aggregates - `storage/SearchIndex.java` — new interface for OpenSearch operations - `storage/DiagramStore.java` — new interface replacing DiagramRepository - `storage/MetricsStore.java` — new interface replacing MetricsRepository - `storage/model/ExecutionDocument.java` — document model for OpenSearch indexing - `search/StatsRequest.java` — request DTO for stats queries (level, scope, time range) - `search/TimeSeriesRequest.java` — request DTO for time-series queries (bucket size) - `indexing/SearchIndexer.java` — debounced event listener for OpenSearch indexing - `indexing/ExecutionUpdatedEvent.java` — event published after execution write **App module** (`cameleer-server-app/src/main/java/com/cameleer/server/app/`): - `storage/PostgresExecutionStore.java` — ExecutionStore impl with upsert - `storage/PostgresStatsStore.java` — StatsStore impl querying continuous aggregates - `storage/PostgresDiagramStore.java` — DiagramStore impl - `storage/PostgresUserRepository.java` — UserRepository impl (keeps existing core interface) - `storage/PostgresOidcConfigRepository.java` — OidcConfigRepository impl (keeps existing core interface) - `storage/PostgresMetricsStore.java` — MetricsStore impl - `search/OpenSearchIndex.java` — SearchIndex impl - `config/OpenSearchConfig.java` — OpenSearch client bean - `config/StorageBeanConfig.java` — wires all store beans - `ingestion/MetricsFlushScheduler.java` — scheduled metrics buffer flush (replaces ClickHouseFlushScheduler, metrics only) - `retention/RetentionScheduler.java` — scheduled job for drop_chunks and OpenSearch index deletion **Flyway migrations** (`cameleer-server-app/src/main/resources/db/migration/`): - `V1__extensions.sql` — CREATE EXTENSION timescaledb, timescaledb_toolkit - `V2__executions.sql` — executions hypertable - `V3__processor_executions.sql` — processor_executions hypertable - `V4__agent_metrics.sql` — agent_metrics hypertable - `V5__route_diagrams.sql` — route_diagrams table - `V6__users.sql` — users table - `V7__oidc_config.sql` — oidc_config table - `V8__continuous_aggregates.sql` — all 4 continuous aggregates + refresh policies Note: Retention is NOT in a Flyway migration (Flyway migrations are immutable once applied). No V9 file. Retention is handled by `RetentionScheduler` at runtime with configurable intervals. **Test files** (`cameleer-server-app/src/test/java/com/cameleer/server/app/`): - `AbstractPostgresIT.java` — replaces AbstractClickHouseIT (TimescaleDB Testcontainer) - `storage/PostgresExecutionStoreIT.java` — upsert, dedup, chunked arrival tests - `storage/PostgresStatsStoreIT.java` — continuous aggregate query tests - `storage/PostgresDiagramStoreIT.java` — content-hash dedup tests - `storage/PostgresUserRepositoryIT.java` — CRUD tests - `search/OpenSearchIndexIT.java` — index, search, wildcard tests ### Files to modify - `pom.xml` (root) — no changes needed - `cameleer-server-app/pom.xml` — swap clickhouse-jdbc for postgresql + opensearch-java + flyway - `cameleer-server-core/.../core/search/SearchService.java` — split: search delegates to SearchIndex, stats/timeseries to StatsStore - `cameleer-server-core/.../core/detail/DetailService.java` — use ExecutionStore instead of ExecutionRepository - `cameleer-server-core/.../core/detail/RawExecutionRow.java` — remove (replaced by normalized model) - `cameleer-server-core/.../core/ingestion/IngestionService.java` — synchronous execution/diagram writes, keep buffer for metrics - `cameleer-server-app/.../app/config/SearchBeanConfig.java` — wire StatsStore into SearchService - `cameleer-server-app/.../app/config/IngestionBeanConfig.java` — update bean wiring - `cameleer-server-app/src/main/resources/application.yml` — PostgreSQL + OpenSearch config - `cameleer-server-app/src/test/resources/application-test.yml` — test config ### Files to delete - `cameleer-server-app/.../app/storage/ClickHouseExecutionRepository.java` - `cameleer-server-app/.../app/storage/ClickHouseDiagramRepository.java` - `cameleer-server-app/.../app/storage/ClickHouseMetricsRepository.java` - `cameleer-server-app/.../app/storage/ClickHouseUserRepository.java` - `cameleer-server-app/.../app/storage/ClickHouseOidcConfigRepository.java` - `cameleer-server-app/.../app/search/ClickHouseSearchEngine.java` - `cameleer-server-app/.../app/ingestion/ClickHouseFlushScheduler.java` - `cameleer-server-app/.../app/config/ClickHouseConfig.java` - `cameleer-server-core/.../core/storage/ExecutionRepository.java` - `cameleer-server-core/.../core/storage/DiagramRepository.java` - `cameleer-server-core/.../core/storage/MetricsRepository.java` - `cameleer-server-core/.../core/search/SearchEngine.java` - `cameleer-server-core/.../core/detail/RawExecutionRow.java` Note: `UserRepository` and `OidcConfigRepository` interfaces in `core.security` are **kept** — the new Postgres implementations implement these existing interfaces. No rename needed since their contracts are unchanged. - `cameleer-server-app/src/main/resources/clickhouse/*.sql` (all 8 files) - `cameleer-server-app/src/test/.../app/AbstractClickHouseIT.java` --- ## Chunk 1: Dependencies, Flyway Migrations, and Test Infrastructure ### Task 1: Update Maven dependencies **Files:** - Modify: `cameleer-server-app/pom.xml` - [ ] **Step 1: Replace ClickHouse JDBC with PostgreSQL driver + Flyway + OpenSearch client** In `cameleer-server-app/pom.xml`, replace the ClickHouse dependency and add new ones: Remove: ```xml com.clickhouse clickhouse-jdbc 0.9.7 all ``` Add: ```xml org.postgresql postgresql org.flywaydb flyway-core org.flywaydb flyway-database-postgresql org.opensearch.client opensearch-java 2.19.0 org.opensearch.client opensearch-rest-client 2.19.0 ``` Replace the ClickHouse Testcontainer: ```xml org.testcontainers postgresql test org.opensearch opensearch-testcontainers 2.1.1 test ``` Note: `postgresql` driver and `flyway-core` versions are managed by Spring Boot parent POM. Testcontainers BOM version is `${testcontainers.version}` (2.0.3) from root POM. - [ ] **Step 2: Commit** (compilation will fail until ClickHouse code is deleted in Task 16 — this is expected) ```bash git add cameleer-server-app/pom.xml git commit -m "chore: swap ClickHouse deps for PostgreSQL, Flyway, OpenSearch" ``` ### Task 2: Write Flyway migrations **Files:** - Create: `cameleer-server-app/src/main/resources/db/migration/V1__extensions.sql` - Create: `cameleer-server-app/src/main/resources/db/migration/V2__executions.sql` - Create: `cameleer-server-app/src/main/resources/db/migration/V3__processor_executions.sql` - Create: `cameleer-server-app/src/main/resources/db/migration/V4__agent_metrics.sql` - Create: `cameleer-server-app/src/main/resources/db/migration/V5__route_diagrams.sql` - Create: `cameleer-server-app/src/main/resources/db/migration/V6__users.sql` - Create: `cameleer-server-app/src/main/resources/db/migration/V7__oidc_config.sql` - Create: `cameleer-server-app/src/main/resources/db/migration/V8__continuous_aggregates.sql` - Create: `cameleer-server-app/src/main/resources/db/migration/V9__retention_policies.sql` - [ ] **Step 1: Create V1__extensions.sql** ```sql CREATE EXTENSION IF NOT EXISTS timescaledb; CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit; ``` - [ ] **Step 2: Create V2__executions.sql** ```sql CREATE TABLE executions ( execution_id TEXT NOT NULL, route_id TEXT NOT NULL, agent_id TEXT NOT NULL, group_name TEXT NOT NULL, status TEXT NOT NULL, correlation_id TEXT, exchange_id TEXT, start_time TIMESTAMPTZ NOT NULL, end_time TIMESTAMPTZ, duration_ms BIGINT, error_message TEXT, error_stacktrace TEXT, diagram_content_hash TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (execution_id, start_time) ); SELECT create_hypertable('executions', 'start_time', chunk_time_interval => INTERVAL '1 day'); CREATE INDEX idx_executions_agent_time ON executions (agent_id, start_time DESC); CREATE INDEX idx_executions_route_time ON executions (route_id, start_time DESC); CREATE INDEX idx_executions_group_time ON executions (group_name, start_time DESC); CREATE INDEX idx_executions_correlation ON executions (correlation_id); ``` - [ ] **Step 3: Create V3__processor_executions.sql** ```sql CREATE TABLE processor_executions ( id BIGSERIAL, execution_id TEXT NOT NULL, processor_id TEXT NOT NULL, processor_type TEXT NOT NULL, diagram_node_id TEXT, group_name TEXT NOT NULL, route_id TEXT NOT NULL, depth INT NOT NULL, parent_processor_id TEXT, status TEXT NOT NULL, start_time TIMESTAMPTZ NOT NULL, end_time TIMESTAMPTZ, duration_ms BIGINT, error_message TEXT, error_stacktrace TEXT, input_body TEXT, output_body TEXT, input_headers JSONB, output_headers JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE (execution_id, processor_id, start_time) ); SELECT create_hypertable('processor_executions', 'start_time', chunk_time_interval => INTERVAL '1 day'); CREATE INDEX idx_proc_exec_execution ON processor_executions (execution_id); CREATE INDEX idx_proc_exec_type_time ON processor_executions (processor_type, start_time DESC); ``` - [ ] **Step 4: Create V4__agent_metrics.sql** ```sql CREATE TABLE agent_metrics ( agent_id TEXT NOT NULL, metric_name TEXT NOT NULL, metric_value DOUBLE PRECISION NOT NULL, tags JSONB, collected_at TIMESTAMPTZ NOT NULL, server_received_at TIMESTAMPTZ NOT NULL DEFAULT now() ); SELECT create_hypertable('agent_metrics', 'collected_at', chunk_time_interval => INTERVAL '1 day'); CREATE INDEX idx_metrics_agent_name ON agent_metrics (agent_id, metric_name, collected_at DESC); ``` - [ ] **Step 5: Create V5__route_diagrams.sql** ```sql CREATE TABLE route_diagrams ( content_hash TEXT PRIMARY KEY, route_id TEXT NOT NULL, agent_id TEXT NOT NULL, definition TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX idx_diagrams_route_agent ON route_diagrams (route_id, agent_id); ``` - [ ] **Step 6: Create V6__users.sql** ```sql CREATE TABLE users ( user_id TEXT PRIMARY KEY, provider TEXT NOT NULL, email TEXT, display_name TEXT, roles TEXT[] NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); ``` - [ ] **Step 7: Create V7__oidc_config.sql** ```sql CREATE TABLE oidc_config ( config_id TEXT PRIMARY KEY DEFAULT 'default', enabled BOOLEAN NOT NULL DEFAULT false, issuer_uri TEXT, client_id TEXT, client_secret TEXT, roles_claim TEXT, default_roles TEXT[] NOT NULL DEFAULT '{}', auto_signup BOOLEAN DEFAULT false, display_name_claim TEXT, updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); ``` - [ ] **Step 8: Create V8__continuous_aggregates.sql** ```sql -- Global stats CREATE MATERIALIZED VIEW stats_1m_all WITH (timescaledb.continuous) AS SELECT time_bucket('1 minute', start_time) AS bucket, COUNT(*) AS total_count, COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count, SUM(duration_ms) AS duration_sum, MAX(duration_ms) AS duration_max, approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration FROM executions WHERE status IS NOT NULL GROUP BY bucket; SELECT add_continuous_aggregate_policy('stats_1m_all', start_offset => INTERVAL '1 hour', end_offset => INTERVAL '1 minute', schedule_interval => INTERVAL '1 minute'); -- Per-application stats CREATE MATERIALIZED VIEW stats_1m_app WITH (timescaledb.continuous) AS SELECT time_bucket('1 minute', start_time) AS bucket, group_name, COUNT(*) AS total_count, COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count, SUM(duration_ms) AS duration_sum, MAX(duration_ms) AS duration_max, approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration FROM executions WHERE status IS NOT NULL GROUP BY bucket, group_name; SELECT add_continuous_aggregate_policy('stats_1m_app', start_offset => INTERVAL '1 hour', end_offset => INTERVAL '1 minute', schedule_interval => INTERVAL '1 minute'); -- Per-route stats CREATE MATERIALIZED VIEW stats_1m_route WITH (timescaledb.continuous) AS SELECT time_bucket('1 minute', start_time) AS bucket, group_name, route_id, COUNT(*) AS total_count, COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count, SUM(duration_ms) AS duration_sum, MAX(duration_ms) AS duration_max, approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration FROM executions WHERE status IS NOT NULL GROUP BY bucket, group_name, route_id; SELECT add_continuous_aggregate_policy('stats_1m_route', start_offset => INTERVAL '1 hour', end_offset => INTERVAL '1 minute', schedule_interval => INTERVAL '1 minute'); -- Per-processor stats (uses denormalized group_name/route_id on processor_executions) CREATE MATERIALIZED VIEW stats_1m_processor WITH (timescaledb.continuous) AS SELECT time_bucket('1 minute', start_time) AS bucket, group_name, route_id, processor_type, COUNT(*) AS total_count, COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, SUM(duration_ms) AS duration_sum, MAX(duration_ms) AS duration_max, approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration FROM processor_executions GROUP BY bucket, group_name, route_id, processor_type; SELECT add_continuous_aggregate_policy('stats_1m_processor', start_offset => INTERVAL '1 hour', end_offset => INTERVAL '1 minute', schedule_interval => INTERVAL '1 minute'); ``` - [ ] **Step 9: Commit** ```bash git add cameleer-server-app/src/main/resources/db/migration/ git commit -m "feat: add Flyway migrations for PostgreSQL/TimescaleDB schema" ``` ### Task 3: Create test base class with TimescaleDB Testcontainer **Files:** - Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/AbstractPostgresIT.java` - [ ] **Step 1: Write AbstractPostgresIT** ```java package com.cameleer.server.app; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @Testcontainers public abstract class AbstractPostgresIT { @Container static final PostgreSQLContainer postgres = new PostgreSQLContainer<>("timescale/timescaledb:latest-pg16") .withDatabaseName("cameleer") .withUsername("cameleer") .withPassword("test"); @DynamicPropertySource static void configureProperties(DynamicPropertyRegistry registry) { registry.add("spring.datasource.url", postgres::getJdbcUrl); registry.add("spring.datasource.username", postgres::getUsername); registry.add("spring.datasource.password", postgres::getPassword); registry.add("spring.datasource.driver-class-name", () -> "org.postgresql.Driver"); registry.add("spring.flyway.enabled", () -> "true"); } } ``` - [ ] **Step 2: Write a smoke test to verify migrations run** Create `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/FlywayMigrationIT.java`: ```java package com.cameleer.server.app.storage; import com.cameleer.server.app.AbstractPostgresIT; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import static org.junit.jupiter.api.Assertions.*; class FlywayMigrationIT extends AbstractPostgresIT { @Autowired JdbcTemplate jdbcTemplate; @Test void allMigrationsApplySuccessfully() { // Verify core tables exist Integer execCount = jdbcTemplate.queryForObject( "SELECT COUNT(*) FROM executions", Integer.class); assertEquals(0, execCount); Integer procCount = jdbcTemplate.queryForObject( "SELECT COUNT(*) FROM processor_executions", Integer.class); assertEquals(0, procCount); Integer userCount = jdbcTemplate.queryForObject( "SELECT COUNT(*) FROM users", Integer.class); assertEquals(0, userCount); // Verify continuous aggregates exist Integer caggCount = jdbcTemplate.queryForObject( "SELECT COUNT(*) FROM timescaledb_information.continuous_aggregates", Integer.class); assertEquals(4, caggCount); } } ``` - [ ] **Step 3: Verify test passes** (this test will not compile until Task 16 deletes ClickHouse code. Run it after Task 16 is complete. Listed here for logical grouping.) Run: `mvn test -pl cameleer-server-app -Dtest=FlywayMigrationIT -q` Expected: PASS — all migrations apply, tables and continuous aggregates exist - [ ] **Step 4: Commit** ```bash git add cameleer-server-app/src/test/java/com/cameleer/server/app/AbstractPostgresIT.java git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/FlywayMigrationIT.java git commit -m "test: add TimescaleDB test base class and Flyway migration smoke test" ``` ### Task 4: Update application.yml for PostgreSQL + OpenSearch **Files:** - Modify: `cameleer-server-app/src/main/resources/application.yml` - Modify: `cameleer-server-app/src/test/resources/application-test.yml` - [ ] **Step 1: Update application.yml datasource section** Replace: ```yaml spring: datasource: url: jdbc:ch://localhost:8123/cameleer username: cameleer password: cameleer_dev driver-class-name: com.clickhouse.jdbc.ClickHouseDriver ``` With: ```yaml spring: datasource: url: jdbc:postgresql://localhost:5432/cameleer username: cameleer password: ${CAMELEER_DB_PASSWORD:cameleer_dev} driver-class-name: org.postgresql.Driver flyway: enabled: true locations: classpath:db/migration ``` Add OpenSearch config section: ```yaml opensearch: url: ${OPENSEARCH_URL:http://localhost:9200} queue-size: ${CAMELEER_OPENSEARCH_QUEUE_SIZE:10000} debounce-ms: ${CAMELEER_OPENSEARCH_DEBOUNCE_MS:2000} ``` Add body size limit: ```yaml cameleer: body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384} retention-days: ${CAMELEER_RETENTION_DAYS:30} ``` Remove the `clickhouse:` section. - [ ] **Step 2: Update application-test.yml** ```yaml spring: flyway: enabled: true opensearch: url: http://localhost:9200 ``` - [ ] **Step 3: Commit** ```bash git add cameleer-server-app/src/main/resources/application.yml git add cameleer-server-app/src/test/resources/application-test.yml git commit -m "config: switch datasource to PostgreSQL, add OpenSearch and Flyway config" ``` --- ## Chunk 2: Core Module Interfaces and Models ### Task 5: Create new storage interfaces in core module **Files:** - Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/ExecutionStore.java` - Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/StatsStore.java` - Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/SearchIndex.java` - Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/DiagramStore.java` - Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/MetricsStore.java` - [ ] **Step 1: Create ExecutionStore interface** ```java package com.cameleer.server.core.storage; import com.cameleer.server.core.detail.ProcessorNode; import java.time.Instant; import java.util.List; import java.util.Optional; public interface ExecutionStore { void upsert(ExecutionRecord execution); void upsertProcessors(String executionId, Instant startTime, String groupName, String routeId, List processors); Optional findById(String executionId); List findProcessors(String executionId); record ExecutionRecord( String executionId, String routeId, String agentId, String groupName, String status, String correlationId, String exchangeId, Instant startTime, Instant endTime, Long durationMs, String errorMessage, String errorStacktrace, String diagramContentHash ) {} record ProcessorRecord( String executionId, String processorId, String processorType, String diagramNodeId, String groupName, String routeId, int depth, String parentProcessorId, String status, Instant startTime, Instant endTime, Long durationMs, String errorMessage, String errorStacktrace, String inputBody, String outputBody, String inputHeaders, String outputHeaders ) {} } ``` - [ ] **Step 2: Create StatsStore interface** Supports all 4 aggregation levels: global, per-app, per-route, per-processor. ```java package com.cameleer.server.core.storage; import com.cameleer.server.core.search.ExecutionStats; import com.cameleer.server.core.search.StatsTimeseries; import java.time.Instant; import java.util.List; public interface StatsStore { // Global stats (stats_1m_all) ExecutionStats stats(Instant from, Instant to); // Per-app stats (stats_1m_app) ExecutionStats statsForApp(Instant from, Instant to, String groupName); // Per-route stats (stats_1m_route), optionally scoped to specific agents ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds); // Per-processor stats (stats_1m_processor) ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType); // Global timeseries StatsTimeseries timeseries(Instant from, Instant to, int bucketCount); // Per-app timeseries StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName); // Per-route timeseries, optionally scoped to specific agents StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount, String routeId, List agentIds); // Per-processor timeseries StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount, String routeId, String processorType); } ``` - [ ] **Step 3: Create SearchIndex interface** ```java package com.cameleer.server.core.storage; import com.cameleer.server.core.search.ExecutionSummary; import com.cameleer.server.core.search.SearchRequest; import com.cameleer.server.core.search.SearchResult; import com.cameleer.server.core.storage.model.ExecutionDocument; public interface SearchIndex { SearchResult search(SearchRequest request); long count(SearchRequest request); void index(ExecutionDocument document); void delete(String executionId); } ``` - [ ] **Step 4: Create DiagramStore interface** ```java package com.cameleer.server.core.storage; import com.cameleer.common.graph.RouteGraph; import com.cameleer.server.core.ingestion.TaggedDiagram; import java.util.List; import java.util.Optional; public interface DiagramStore { void store(TaggedDiagram diagram); Optional findByContentHash(String contentHash); Optional findContentHashForRoute(String routeId, String agentId); Optional findContentHashForRouteByAgents(String routeId, List agentIds); } ``` - [ ] **Step 5: Create MetricsStore interface** ```java package com.cameleer.server.core.storage; import com.cameleer.server.core.storage.model.MetricsSnapshot; import java.util.List; public interface MetricsStore { void insertBatch(List snapshots); } ``` - [ ] **Step 6: Commit** ```bash git add cameleer-server-core/src/main/java/com/cameleer/server/core/storage/ git commit -m "feat: add new storage interfaces for PostgreSQL/OpenSearch backends" ``` ### Task 6: Create ExecutionDocument model and indexing event **Files:** - Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionDocument.java` - Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/indexing/ExecutionUpdatedEvent.java` - [ ] **Step 1: Create ExecutionDocument** ```java package com.cameleer.server.core.storage.model; import java.time.Instant; import java.util.List; public record ExecutionDocument( String executionId, String routeId, String agentId, String groupName, String status, String correlationId, String exchangeId, Instant startTime, Instant endTime, Long durationMs, String errorMessage, String errorStacktrace, List processors ) { public record ProcessorDoc( String processorId, String processorType, String status, String errorMessage, String errorStacktrace, String inputBody, String outputBody, String inputHeaders, String outputHeaders ) {} } ``` - [ ] **Step 2: Create ExecutionUpdatedEvent** ```java package com.cameleer.server.core.indexing; import java.time.Instant; public record ExecutionUpdatedEvent(String executionId, Instant startTime) {} ``` - [ ] **Step 3: Commit** ```bash git add cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionDocument.java git add cameleer-server-core/src/main/java/com/cameleer/server/core/indexing/ git commit -m "feat: add ExecutionDocument model and ExecutionUpdatedEvent" ``` ### Task 7: Update SearchService to use StatsStore for stats/timeseries **Files:** - Modify: `cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchService.java` - [ ] **Step 1: Refactor SearchService to accept SearchIndex + StatsStore** Replace the single `SearchEngine` dependency with two dependencies: ```java package com.cameleer.server.core.search; import com.cameleer.server.core.storage.SearchIndex; import com.cameleer.server.core.storage.StatsStore; import java.time.Instant; import java.util.List; public class SearchService { private final SearchIndex searchIndex; private final StatsStore statsStore; public SearchService(SearchIndex searchIndex, StatsStore statsStore) { this.searchIndex = searchIndex; this.statsStore = statsStore; } public SearchResult search(SearchRequest request) { return searchIndex.search(request); } public long count(SearchRequest request) { return searchIndex.count(request); } public ExecutionStats stats(Instant from, Instant to) { return statsStore.stats(from, to); } public ExecutionStats stats(Instant from, Instant to, String routeId, List agentIds) { return statsStore.statsForRoute(from, to, routeId, agentIds); } public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) { return statsStore.timeseries(from, to, bucketCount); } public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount, String routeId, List agentIds) { return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, agentIds); } } ``` - [ ] **Step 2: Commit** ```bash git add cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchService.java git commit -m "refactor: SearchService uses SearchIndex + StatsStore instead of SearchEngine" ``` ### Task 8: Update DetailService to use ExecutionStore **Files:** - Modify: `cameleer-server-core/src/main/java/com/cameleer/server/core/detail/DetailService.java` - [ ] **Step 1: Rewrite DetailService to use ExecutionStore** The tree reconstruction from parallel arrays is no longer needed. Processors are now individual records with `parentProcessorId` for tree structure. ```java package com.cameleer.server.core.detail; import com.cameleer.server.core.storage.ExecutionStore; import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord; import java.util.*; public class DetailService { private final ExecutionStore executionStore; public DetailService(ExecutionStore executionStore) { this.executionStore = executionStore; } public Optional getDetail(String executionId) { return executionStore.findById(executionId) .map(exec -> { List processors = executionStore.findProcessors(executionId); List roots = buildTree(processors); return new ExecutionDetail( exec.executionId(), exec.routeId(), exec.agentId(), exec.status(), exec.startTime(), exec.endTime(), exec.durationMs() != null ? exec.durationMs() : 0L, exec.correlationId(), exec.exchangeId(), exec.errorMessage(), exec.errorStacktrace(), exec.diagramContentHash(), roots ); }); } List buildTree(List processors) { if (processors.isEmpty()) return List.of(); Map nodeMap = new LinkedHashMap<>(); for (ProcessorRecord p : processors) { nodeMap.put(p.processorId(), new ProcessorNode( p.processorId(), p.processorType(), p.status(), p.startTime(), p.endTime(), p.durationMs() != null ? p.durationMs() : 0L, p.diagramNodeId(), p.errorMessage(), p.errorStacktrace() )); } List roots = new ArrayList<>(); for (ProcessorRecord p : processors) { ProcessorNode node = nodeMap.get(p.processorId()); if (p.parentProcessorId() == null) { roots.add(node); } else { ProcessorNode parent = nodeMap.get(p.parentProcessorId()); if (parent != null) { parent.addChild(node); } else { roots.add(node); // orphan safety } } } return roots; } } ``` - [ ] **Step 2: Commit** ```bash git add cameleer-server-core/src/main/java/com/cameleer/server/core/detail/DetailService.java git commit -m "refactor: DetailService uses ExecutionStore, tree built from parentProcessorId" ``` ### Task 9: Update IngestionService for synchronous writes **Files:** - Modify: `cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java` - [ ] **Step 1: Rewrite IngestionService** Executions and diagrams become synchronous writes. Metrics keep the write buffer. Add event publishing for OpenSearch indexing. ```java package com.cameleer.server.core.ingestion; import com.cameleer.common.model.ProcessorExecution; import com.cameleer.common.model.RouteExecution; import com.cameleer.server.core.indexing.ExecutionUpdatedEvent; import com.cameleer.server.core.storage.DiagramStore; import com.cameleer.server.core.storage.ExecutionStore; import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord; import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord; import com.cameleer.server.core.storage.model.MetricsSnapshot; import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; public class IngestionService { private final ExecutionStore executionStore; private final DiagramStore diagramStore; private final WriteBuffer metricsBuffer; private final Consumer eventPublisher; private final int bodySizeLimit; public IngestionService(ExecutionStore executionStore, DiagramStore diagramStore, WriteBuffer metricsBuffer, Consumer eventPublisher, int bodySizeLimit) { this.executionStore = executionStore; this.diagramStore = diagramStore; this.metricsBuffer = metricsBuffer; this.eventPublisher = eventPublisher; this.bodySizeLimit = bodySizeLimit; } public void ingestExecution(String agentId, String groupName, RouteExecution execution) { ExecutionRecord record = toExecutionRecord(agentId, groupName, execution); executionStore.upsert(record); if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) { List processors = flattenProcessors( execution.getProcessors(), record.executionId(), record.startTime(), groupName, execution.getRouteId(), null, 0); executionStore.upsertProcessors( record.executionId(), record.startTime(), groupName, execution.getRouteId(), processors); } eventPublisher.accept(new ExecutionUpdatedEvent( record.executionId(), record.startTime())); } public void ingestDiagram(TaggedDiagram diagram) { diagramStore.store(diagram); } public boolean acceptMetrics(List metrics) { return metricsBuffer.offerBatch(metrics); } public int getMetricsBufferDepth() { return metricsBuffer.size(); } public WriteBuffer getMetricsBuffer() { return metricsBuffer; } private ExecutionRecord toExecutionRecord(String agentId, String groupName, RouteExecution exec) { return new ExecutionRecord( exec.getExecutionId(), exec.getRouteId(), agentId, groupName, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING", exec.getCorrelationId(), exec.getExchangeId(), exec.getStartTime(), exec.getEndTime(), exec.getDurationMs(), exec.getErrorMessage(), exec.getErrorStacktrace(), null // diagramContentHash set separately ); } private List flattenProcessors( List processors, String executionId, java.time.Instant execStartTime, String groupName, String routeId, String parentProcessorId, int depth) { List flat = new ArrayList<>(); for (ProcessorExecution p : processors) { flat.add(new ProcessorRecord( executionId, p.getProcessorId(), p.getProcessorType(), p.getDiagramNodeId(), groupName, routeId, depth, parentProcessorId, p.getStatus() != null ? p.getStatus().name() : "RUNNING", p.getStartTime() != null ? p.getStartTime() : execStartTime, p.getEndTime(), p.getDurationMs(), p.getErrorMessage(), p.getErrorStacktrace(), truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()), p.getInputHeaders() != null ? p.getInputHeaders().toString() : null, p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null )); if (p.getChildren() != null) { flat.addAll(flattenProcessors( p.getChildren(), executionId, execStartTime, groupName, routeId, p.getProcessorId(), depth + 1)); } } return flat; } private String truncateBody(String body) { if (body == null) return null; if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit); return body; } } ``` - [ ] **Step 2: Commit** ```bash git add cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java git commit -m "refactor: IngestionService uses synchronous ExecutionStore writes with event publishing" ``` --- ## Chunk 3: PostgreSQL Store Implementations ### Task 10: Implement PostgresExecutionStore **Files:** - Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresExecutionStore.java` - Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/PostgresExecutionStoreIT.java` - [ ] **Step 1: Write the failing test** ```java package com.cameleer.server.app.storage; import com.cameleer.server.app.AbstractPostgresIT; import com.cameleer.server.core.storage.ExecutionStore; import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord; import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import java.time.Instant; import java.util.List; import java.util.Optional; import static org.junit.jupiter.api.Assertions.*; class PostgresExecutionStoreIT extends AbstractPostgresIT { @Autowired ExecutionStore executionStore; @Test void upsertAndFindById() { Instant now = Instant.now(); ExecutionRecord record = new ExecutionRecord( "exec-1", "route-a", "agent-1", "app-1", "COMPLETED", "corr-1", "exchange-1", now, now.plusMillis(100), 100L, null, null, null); executionStore.upsert(record); Optional found = executionStore.findById("exec-1"); assertTrue(found.isPresent()); assertEquals("exec-1", found.get().executionId()); assertEquals("COMPLETED", found.get().status()); } @Test void upsertDeduplicatesByExecutionId() { Instant now = Instant.now(); ExecutionRecord first = new ExecutionRecord( "exec-dup", "route-a", "agent-1", "app-1", "RUNNING", null, null, now, null, null, null, null, null); ExecutionRecord second = new ExecutionRecord( "exec-dup", "route-a", "agent-1", "app-1", "COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null); executionStore.upsert(first); executionStore.upsert(second); Optional found = executionStore.findById("exec-dup"); assertTrue(found.isPresent()); assertEquals("COMPLETED", found.get().status()); assertEquals(200L, found.get().durationMs()); } @Test void upsertProcessorsAndFind() { Instant now = Instant.now(); ExecutionRecord exec = new ExecutionRecord( "exec-proc", "route-a", "agent-1", "app-1", "COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null); executionStore.upsert(exec); List processors = List.of( new ProcessorRecord("exec-proc", "proc-1", "log", null, "app-1", "route-a", 0, null, "COMPLETED", now, now.plusMillis(10), 10L, null, null, "input body", "output body", null, null), new ProcessorRecord("exec-proc", "proc-2", "to", null, "app-1", "route-a", 1, "proc-1", "COMPLETED", now.plusMillis(10), now.plusMillis(30), 20L, null, null, null, null, null, null) ); executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors); List found = executionStore.findProcessors("exec-proc"); assertEquals(2, found.size()); assertEquals("proc-1", found.get(0).processorId()); assertEquals("proc-2", found.get(1).processorId()); assertEquals("proc-1", found.get(1).parentProcessorId()); } } ``` - [ ] **Step 2: Run test to verify it fails** Run: `mvn test -pl cameleer-server-app -Dtest=PostgresExecutionStoreIT -q` Expected: FAIL — `ExecutionStore` bean not found - [ ] **Step 3: Implement PostgresExecutionStore** ```java package com.cameleer.server.app.storage; import com.cameleer.server.core.storage.ExecutionStore; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Repository; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.time.Instant; import java.util.List; import java.util.Optional; @Repository public class PostgresExecutionStore implements ExecutionStore { private final JdbcTemplate jdbc; public PostgresExecutionStore(JdbcTemplate jdbc) { this.jdbc = jdbc; } @Override public void upsert(ExecutionRecord execution) { jdbc.update(""" INSERT INTO executions (execution_id, route_id, agent_id, group_name, status, correlation_id, exchange_id, start_time, end_time, duration_ms, error_message, error_stacktrace, diagram_content_hash, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now()) ON CONFLICT (execution_id, start_time) DO UPDATE SET status = CASE WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED') AND executions.status = 'RUNNING' THEN EXCLUDED.status WHEN EXCLUDED.status = executions.status THEN executions.status ELSE EXCLUDED.status END, end_time = COALESCE(EXCLUDED.end_time, executions.end_time), duration_ms = COALESCE(EXCLUDED.duration_ms, executions.duration_ms), error_message = COALESCE(EXCLUDED.error_message, executions.error_message), error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace), diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash), updated_at = now() """, execution.executionId(), execution.routeId(), execution.agentId(), execution.groupName(), execution.status(), execution.correlationId(), execution.exchangeId(), Timestamp.from(execution.startTime()), execution.endTime() != null ? Timestamp.from(execution.endTime()) : null, execution.durationMs(), execution.errorMessage(), execution.errorStacktrace(), execution.diagramContentHash()); } @Override public void upsertProcessors(String executionId, Instant startTime, String groupName, String routeId, List processors) { jdbc.batchUpdate(""" INSERT INTO processor_executions (execution_id, processor_id, processor_type, diagram_node_id, group_name, route_id, depth, parent_processor_id, status, start_time, end_time, duration_ms, error_message, error_stacktrace, input_body, output_body, input_headers, output_headers) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb) ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET status = EXCLUDED.status, end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time), duration_ms = COALESCE(EXCLUDED.duration_ms, processor_executions.duration_ms), error_message = COALESCE(EXCLUDED.error_message, processor_executions.error_message), error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, processor_executions.error_stacktrace), input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body), output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body), input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers), output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers) """, processors.stream().map(p -> new Object[]{ p.executionId(), p.processorId(), p.processorType(), p.diagramNodeId(), p.groupName(), p.routeId(), p.depth(), p.parentProcessorId(), p.status(), Timestamp.from(p.startTime()), p.endTime() != null ? Timestamp.from(p.endTime()) : null, p.durationMs(), p.errorMessage(), p.errorStacktrace(), p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders() }).toList()); } @Override public Optional findById(String executionId) { List results = jdbc.query( "SELECT * FROM executions WHERE execution_id = ? ORDER BY start_time DESC LIMIT 1", EXECUTION_MAPPER, executionId); return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } @Override public List findProcessors(String executionId) { return jdbc.query( "SELECT * FROM processor_executions WHERE execution_id = ? ORDER BY depth, start_time", PROCESSOR_MAPPER, executionId); } private static final RowMapper EXECUTION_MAPPER = (rs, rowNum) -> new ExecutionRecord( rs.getString("execution_id"), rs.getString("route_id"), rs.getString("agent_id"), rs.getString("group_name"), rs.getString("status"), rs.getString("correlation_id"), rs.getString("exchange_id"), toInstant(rs, "start_time"), toInstant(rs, "end_time"), rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, rs.getString("error_message"), rs.getString("error_stacktrace"), rs.getString("diagram_content_hash")); private static final RowMapper PROCESSOR_MAPPER = (rs, rowNum) -> new ProcessorRecord( rs.getString("execution_id"), rs.getString("processor_id"), rs.getString("processor_type"), rs.getString("diagram_node_id"), rs.getString("group_name"), rs.getString("route_id"), rs.getInt("depth"), rs.getString("parent_processor_id"), rs.getString("status"), toInstant(rs, "start_time"), toInstant(rs, "end_time"), rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, rs.getString("error_message"), rs.getString("error_stacktrace"), rs.getString("input_body"), rs.getString("output_body"), rs.getString("input_headers"), rs.getString("output_headers")); private static Instant toInstant(ResultSet rs, String column) throws SQLException { Timestamp ts = rs.getTimestamp(column); return ts != null ? ts.toInstant() : null; } } ``` - [ ] **Step 4: Run tests to verify they pass** Run: `mvn test -pl cameleer-server-app -Dtest=PostgresExecutionStoreIT -q` Expected: PASS — all 3 tests green - [ ] **Step 5: Commit** ```bash git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresExecutionStore.java git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/PostgresExecutionStoreIT.java git commit -m "feat: implement PostgresExecutionStore with upsert and dedup" ``` ### Task 11: Implement PostgresStatsStore **Files:** - Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresStatsStore.java` - Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/PostgresStatsStoreIT.java` - [ ] **Step 1: Write the failing test** ```java package com.cameleer.server.app.storage; import com.cameleer.server.app.AbstractPostgresIT; import com.cameleer.server.core.search.ExecutionStats; import com.cameleer.server.core.search.StatsTimeseries; import com.cameleer.server.core.storage.ExecutionStore; import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord; import com.cameleer.server.core.storage.StatsStore; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import java.time.Instant; import java.time.temporal.ChronoUnit; import static org.junit.jupiter.api.Assertions.*; class PostgresStatsStoreIT extends AbstractPostgresIT { @Autowired StatsStore statsStore; @Autowired ExecutionStore executionStore; @Autowired JdbcTemplate jdbc; @Test void statsReturnsCountsForTimeWindow() { Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); insertExecution("stats-1", "route-a", "app-1", "COMPLETED", now, 100L); insertExecution("stats-2", "route-a", "app-1", "FAILED", now.plusSeconds(10), 200L); insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L); // Force continuous aggregate refresh jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)"); ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60)); assertEquals(3, stats.totalCount()); assertEquals(1, stats.failedCount()); } @Test void timeseriesReturnsBuckets() { Instant now = Instant.now().truncatedTo(ChronoUnit.MINUTES); for (int i = 0; i < 10; i++) { insertExecution("ts-" + i, "route-a", "app-1", "COMPLETED", now.plusSeconds(i * 30), 100L + i); } jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)"); StatsTimeseries ts = statsStore.timeseries(now.minusMinutes(1), now.plusMinutes(10), 5); assertNotNull(ts); assertFalse(ts.buckets().isEmpty()); } private void insertExecution(String id, String routeId, String groupName, String status, Instant startTime, long durationMs) { executionStore.upsert(new ExecutionRecord( id, routeId, "agent-1", groupName, status, null, null, startTime, startTime.plusMillis(durationMs), durationMs, status.equals("FAILED") ? "error" : null, null, null)); } } ``` - [ ] **Step 2: Run test to verify it fails** Run: `mvn test -pl cameleer-server-app -Dtest=PostgresStatsStoreIT -q` Expected: FAIL — `StatsStore` bean not found - [ ] **Step 3: Implement PostgresStatsStore** ```java package com.cameleer.server.app.storage; import com.cameleer.server.core.search.ExecutionStats; import com.cameleer.server.core.search.StatsTimeseries; import com.cameleer.server.core.search.StatsTimeseries.TimeseriesBucket; import com.cameleer.server.core.storage.StatsStore; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import java.sql.Timestamp; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; @Repository public class PostgresStatsStore implements StatsStore { private final JdbcTemplate jdbc; public PostgresStatsStore(JdbcTemplate jdbc) { this.jdbc = jdbc; } @Override public ExecutionStats stats(Instant from, Instant to) { return queryStats("stats_1m_all", from, to, List.of()); } @Override public ExecutionStats statsForApp(Instant from, Instant to, String groupName) { return queryStats("stats_1m_app", from, to, List.of( new Filter("group_name", groupName))); } @Override public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds) { // Note: agentIds is accepted for interface compatibility but not filterable // on the continuous aggregate (it groups by route_id, not agent_id). // All agents for the same route contribute to the same aggregate. return queryStats("stats_1m_route", from, to, List.of( new Filter("route_id", routeId))); } @Override public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) { return queryStats("stats_1m_processor", from, to, List.of( new Filter("route_id", routeId), new Filter("processor_type", processorType))); } @Override public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) { return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true); } @Override public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName) { return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of( new Filter("group_name", groupName)), true); } @Override public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount, String routeId, List agentIds) { return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of( new Filter("route_id", routeId)), true); } @Override public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount, String routeId, String processorType) { // stats_1m_processor does NOT have running_count column return queryTimeseries("stats_1m_processor", from, to, bucketCount, List.of( new Filter("route_id", routeId), new Filter("processor_type", processorType)), false); } private record Filter(String column, String value) {} private ExecutionStats queryStats(String view, Instant from, Instant to, List filters) { // running_count only exists on execution-level aggregates, not processor boolean hasRunning = !view.equals("stats_1m_processor"); String runningCol = hasRunning ? "COALESCE(SUM(running_count), 0)" : "0"; String sql = "SELECT COALESCE(SUM(total_count), 0) AS total_count, " + "COALESCE(SUM(failed_count), 0) AS failed_count, " + "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " + "COALESCE(MAX(p99_duration), 0) AS p99_duration, " + runningCol + " AS active_count " + "FROM " + view + " WHERE bucket >= ? AND bucket < ?"; List params = new ArrayList<>(); params.add(Timestamp.from(from)); params.add(Timestamp.from(to)); for (Filter f : filters) { sql += " AND " + f.column() + " = ?"; params.add(f.value()); } long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0; var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{ rs.getLong("total_count"), rs.getLong("failed_count"), rs.getLong("avg_duration"), rs.getLong("p99_duration"), rs.getLong("active_count") }, params.toArray()); if (!currentResult.isEmpty()) { long[] r = currentResult.get(0); totalCount = r[0]; failedCount = r[1]; avgDuration = r[2]; p99Duration = r[3]; activeCount = r[4]; } // Previous period (shifted back 24h) Instant prevFrom = from.minus(Duration.ofHours(24)); Instant prevTo = to.minus(Duration.ofHours(24)); List prevParams = new ArrayList<>(); prevParams.add(Timestamp.from(prevFrom)); prevParams.add(Timestamp.from(prevTo)); for (Filter f : filters) prevParams.add(f.value()); String prevSql = sql; // same shape, different time params long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0; var prevResult = jdbc.query(prevSql, (rs, rowNum) -> new long[]{ rs.getLong("total_count"), rs.getLong("failed_count"), rs.getLong("avg_duration"), rs.getLong("p99_duration") }, prevParams.toArray()); if (!prevResult.isEmpty()) { long[] r = prevResult.get(0); prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3]; } // Today total (from midnight UTC) Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS); List todayParams = new ArrayList<>(); todayParams.add(Timestamp.from(todayStart)); todayParams.add(Timestamp.from(Instant.now())); for (Filter f : filters) todayParams.add(f.value()); String todaySql = sql; long totalToday = 0; var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"), todayParams.toArray()); if (!todayResult.isEmpty()) totalToday = todayResult.get(0); return new ExecutionStats( totalCount, failedCount, avgDuration, p99Duration, activeCount, totalToday, prevTotal, prevFailed, prevAvg, prevP99); } private StatsTimeseries queryTimeseries(String view, Instant from, Instant to, int bucketCount, List filters, boolean hasRunningCount) { long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1); if (intervalSeconds < 60) intervalSeconds = 60; String runningCol = hasRunningCount ? "COALESCE(SUM(running_count), 0)" : "0"; String sql = "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " + "COALESCE(SUM(total_count), 0) AS total_count, " + "COALESCE(SUM(failed_count), 0) AS failed_count, " + "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " + "COALESCE(MAX(p99_duration), 0) AS p99_duration, " + runningCol + " AS active_count " + "FROM " + view + " WHERE bucket >= ? AND bucket < ?"; List params = new ArrayList<>(); params.add(intervalSeconds); params.add(Timestamp.from(from)); params.add(Timestamp.from(to)); for (Filter f : filters) { sql += " AND " + f.column() + " = ?"; params.add(f.value()); } sql += " GROUP BY period ORDER BY period"; List buckets = jdbc.query(sql, (rs, rowNum) -> new TimeseriesBucket( rs.getTimestamp("period").toInstant(), rs.getLong("total_count"), rs.getLong("failed_count"), rs.getLong("avg_duration"), rs.getLong("p99_duration"), rs.getLong("active_count") ), params.toArray()); return new StatsTimeseries(buckets); } } ``` - [ ] **Step 4: Run tests to verify they pass** Run: `mvn test -pl cameleer-server-app -Dtest=PostgresStatsStoreIT -q` Expected: PASS - [ ] **Step 5: Commit** ```bash git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresStatsStore.java git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/PostgresStatsStoreIT.java git commit -m "feat: implement PostgresStatsStore querying continuous aggregates" ``` ### Task 12: Implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore **Files:** - Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresDiagramStore.java` - Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresUserRepository.java` - Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresOidcConfigRepository.java` - Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresMetricsStore.java` - [ ] **Step 1: Write failing test for PostgresDiagramStore** Create `PostgresDiagramStoreIT.java` with tests for: - `store()` + `findByContentHash()` roundtrip - Content-hash dedup (store same hash twice, verify single row) - `findContentHashForRoute()` returns latest for route+agent - `findContentHashForRouteByAgents()` returns across agent list - [ ] **Step 2: Implement PostgresDiagramStore** Straightforward CRUD with `ON CONFLICT (content_hash) DO NOTHING`. Port the SHA-256 hashing logic from `ClickHouseDiagramRepository`. Use `JdbcTemplate` queries. - [ ] **Step 3: Run diagram tests, verify pass** - [ ] **Step 4: Implement PostgresUserRepository** Implements `UserRepository` interface (existing interface in `core.security`, unchanged). ```java package com.cameleer.server.app.storage; import com.cameleer.server.core.security.UserInfo; import com.cameleer.server.core.security.UserRepository; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import java.sql.Array; import java.sql.Timestamp; import java.util.List; import java.util.Optional; @Repository public class PostgresUserRepository implements UserRepository { private final JdbcTemplate jdbc; public PostgresUserRepository(JdbcTemplate jdbc) { this.jdbc = jdbc; } @Override public Optional findById(String userId) { var results = jdbc.query( "SELECT * FROM users WHERE user_id = ?", (rs, rowNum) -> mapUser(rs), userId); return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } @Override public List findAll() { return jdbc.query("SELECT * FROM users ORDER BY user_id", (rs, rowNum) -> mapUser(rs)); } @Override public void upsert(UserInfo user) { jdbc.update(""" INSERT INTO users (user_id, provider, email, display_name, roles, created_at, updated_at) VALUES (?, ?, ?, ?, ?, now(), now()) ON CONFLICT (user_id) DO UPDATE SET provider = EXCLUDED.provider, email = EXCLUDED.email, display_name = EXCLUDED.display_name, roles = EXCLUDED.roles, updated_at = now() """, user.userId(), user.provider(), user.email(), user.displayName(), user.roles().toArray(new String[0])); } @Override public void updateRoles(String userId, List roles) { jdbc.update("UPDATE users SET roles = ?, updated_at = now() WHERE user_id = ?", roles.toArray(new String[0]), userId); } @Override public void delete(String userId) { jdbc.update("DELETE FROM users WHERE user_id = ?", userId); } private UserInfo mapUser(java.sql.ResultSet rs) throws java.sql.SQLException { Array rolesArray = rs.getArray("roles"); String[] roles = rolesArray != null ? (String[]) rolesArray.getArray() : new String[0]; return new UserInfo( rs.getString("user_id"), rs.getString("provider"), rs.getString("email"), rs.getString("display_name"), List.of(roles)); } } ``` - [ ] **Step 5: Implement PostgresOidcConfigRepository** Implements `OidcConfigRepository` interface (existing interface in `core.security`). ```java package com.cameleer.server.app.storage; import com.cameleer.server.core.security.OidcConfig; import com.cameleer.server.core.security.OidcConfigRepository; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import java.sql.Array; import java.util.List; import java.util.Optional; @Repository public class PostgresOidcConfigRepository implements OidcConfigRepository { private final JdbcTemplate jdbc; public PostgresOidcConfigRepository(JdbcTemplate jdbc) { this.jdbc = jdbc; } @Override public Optional find() { var results = jdbc.query( "SELECT * FROM oidc_config WHERE config_id = 'default'", (rs, rowNum) -> { Array arr = rs.getArray("default_roles"); String[] roles = arr != null ? (String[]) arr.getArray() : new String[0]; return new OidcConfig( rs.getBoolean("enabled"), rs.getString("issuer_uri"), rs.getString("client_id"), rs.getString("client_secret"), rs.getString("roles_claim"), List.of(roles), rs.getBoolean("auto_signup"), rs.getString("display_name_claim")); }); return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } @Override public void save(OidcConfig config) { jdbc.update(""" INSERT INTO oidc_config (config_id, enabled, issuer_uri, client_id, client_secret, roles_claim, default_roles, auto_signup, display_name_claim, updated_at) VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, now()) ON CONFLICT (config_id) DO UPDATE SET enabled = EXCLUDED.enabled, issuer_uri = EXCLUDED.issuer_uri, client_id = EXCLUDED.client_id, client_secret = EXCLUDED.client_secret, roles_claim = EXCLUDED.roles_claim, default_roles = EXCLUDED.default_roles, auto_signup = EXCLUDED.auto_signup, display_name_claim = EXCLUDED.display_name_claim, updated_at = now() """, config.enabled(), config.issuerUri(), config.clientId(), config.clientSecret(), config.rolesClaim(), config.defaultRoles().toArray(new String[0]), config.autoSignup(), config.displayNameClaim()); } @Override public void delete() { jdbc.update("DELETE FROM oidc_config WHERE config_id = 'default'"); } } ``` - [ ] **Step 6: Implement PostgresMetricsStore** ```java package com.cameleer.server.app.storage; import com.cameleer.server.core.storage.MetricsStore; import com.cameleer.server.core.storage.model.MetricsSnapshot; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import java.sql.Timestamp; import java.util.List; @Repository public class PostgresMetricsStore implements MetricsStore { private static final ObjectMapper MAPPER = new ObjectMapper(); private final JdbcTemplate jdbc; public PostgresMetricsStore(JdbcTemplate jdbc) { this.jdbc = jdbc; } @Override public void insertBatch(List snapshots) { jdbc.batchUpdate(""" INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags, collected_at, server_received_at) VALUES (?, ?, ?, ?::jsonb, ?, now()) """, snapshots.stream().map(s -> new Object[]{ s.agentId(), s.metricName(), s.metricValue(), tagsToJson(s.tags()), Timestamp.from(s.collectedAt()) }).toList()); } private String tagsToJson(java.util.Map tags) { if (tags == null || tags.isEmpty()) return null; try { return MAPPER.writeValueAsString(tags); } catch (JsonProcessingException e) { return null; } } } ``` - [ ] **Step 7: Run all store tests, verify pass** Run: `mvn test -pl cameleer-server-app -Dtest="Postgres*IT" -q` - [ ] **Step 8: Commit** ```bash git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/Postgres*.java git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/Postgres*.java git commit -m "feat: implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore" ``` --- ## Chunk 4: OpenSearch Integration ### Task 13: Implement OpenSearchIndex **Files:** - Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/OpenSearchConfig.java` - Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/search/OpenSearchIndex.java` - Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/search/OpenSearchIndexIT.java` - [ ] **Step 1: Write failing test** ```java package com.cameleer.server.app.search; import com.cameleer.server.app.AbstractPostgresIT; import com.cameleer.server.core.search.ExecutionSummary; import com.cameleer.server.core.search.SearchRequest; import com.cameleer.server.core.search.SearchResult; import com.cameleer.server.core.storage.SearchIndex; import com.cameleer.server.core.storage.model.ExecutionDocument; import com.cameleer.server.core.storage.model.ExecutionDocument.ProcessorDoc; import org.junit.jupiter.api.Test; import org.opensearch.testcontainers.OpensearchContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.junit.jupiter.Container; import java.time.Instant; import java.util.List; import static org.junit.jupiter.api.Assertions.*; // Extends AbstractPostgresIT for PostgreSQL datasource needed by Spring context class OpenSearchIndexIT extends AbstractPostgresIT { @Container static final OpensearchContainer opensearch = new OpensearchContainer<>("opensearchproject/opensearch:2.19.0") .withSecurityEnabled(false); @DynamicPropertySource static void configureOpenSearch(DynamicPropertyRegistry registry) { registry.add("opensearch.url", opensearch::getHttpHostAddress); } @Autowired SearchIndex searchIndex; @Test void indexAndSearchByText() throws Exception { Instant now = Instant.now(); ExecutionDocument doc = new ExecutionDocument( "search-1", "route-a", "agent-1", "app-1", "FAILED", "corr-1", "exch-1", now, now.plusMillis(100), 100L, "OrderNotFoundException: order-12345 not found", null, List.of(new ProcessorDoc("proc-1", "log", "COMPLETED", null, null, "request body with customer-99", null, null, null))); searchIndex.index(doc); Thread.sleep(1500); // Allow OpenSearch refresh SearchRequest request = new SearchRequest( null, now.minusSeconds(60), now.plusSeconds(60), null, null, null, "OrderNotFoundException", null, null, null, null, null, null, null, null, 0, 50, "startTime", "desc"); SearchResult result = searchIndex.search(request); assertTrue(result.total() > 0); assertEquals("search-1", result.items().get(0).executionId()); } @Test void wildcardSearchFindsSubstring() throws Exception { Instant now = Instant.now(); ExecutionDocument doc = new ExecutionDocument( "wild-1", "route-b", "agent-1", "app-1", "COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, List.of(new ProcessorDoc("proc-1", "bean", "COMPLETED", null, null, "UniquePayloadIdentifier12345", null, null, null))); searchIndex.index(doc); Thread.sleep(1500); SearchRequest request = new SearchRequest( null, now.minusSeconds(60), now.plusSeconds(60), null, null, null, "PayloadIdentifier", null, null, null, null, null, null, null, null, 0, 50, "startTime", "desc"); SearchResult result = searchIndex.search(request); assertTrue(result.total() > 0); } } ``` - [ ] **Step 2: Create OpenSearchConfig** ```java package com.cameleer.server.app.config; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.core5.http.HttpHost; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class OpenSearchConfig { @Value("${opensearch.url:http://localhost:9200}") private String opensearchUrl; @Bean public OpenSearchClient openSearchClient() { HttpHost host = HttpHost.create(opensearchUrl); var transport = ApacheHttpClient5TransportBuilder.builder(host).build(); return new OpenSearchClient(transport); } } ``` - [ ] **Step 3: Implement OpenSearchIndex** ```java package com.cameleer.server.app.search; import com.cameleer.server.core.search.ExecutionSummary; import com.cameleer.server.core.search.SearchRequest; import com.cameleer.server.core.search.SearchResult; import com.cameleer.server.core.storage.SearchIndex; import com.cameleer.server.core.storage.model.ExecutionDocument; import com.cameleer.server.core.storage.model.ExecutionDocument.ProcessorDoc; import jakarta.annotation.PostConstruct; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.FieldValue; import org.opensearch.client.opensearch._types.SortOrder; import org.opensearch.client.opensearch._types.query_dsl.*; import org.opensearch.client.opensearch.core.*; import org.opensearch.client.opensearch.core.search.Hit; import org.opensearch.client.opensearch.indices.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Repository; import java.io.IOException; import java.time.Instant; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; @Repository public class OpenSearchIndex implements SearchIndex { private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class); private static final String INDEX_PREFIX = "executions-"; private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd") .withZone(ZoneOffset.UTC); private final OpenSearchClient client; public OpenSearchIndex(OpenSearchClient client) { this.client = client; } @PostConstruct void ensureIndexTemplate() { // Full template with ngram analyzer for infix wildcard search. // The template JSON matches the spec's OpenSearch index template definition. try { boolean exists = client.indices().existsIndexTemplate( ExistsIndexTemplateRequest.of(b -> b.name("executions-template"))).value(); if (!exists) { client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b .name("executions-template") .indexPatterns(List.of("executions-*")) .template(t -> t .settings(s -> s .numberOfShards("3") .numberOfReplicas("1") .analysis(a -> a .analyzer("ngram_analyzer", an -> an .custom(c -> c .tokenizer("ngram_tokenizer") .filter("lowercase"))) .tokenizer("ngram_tokenizer", tk -> tk .definition(d -> d .ngram(ng -> ng .minGram(3) .maxGram(4) .tokenChars(TokenChar.Letter, TokenChar.Digit, TokenChar.Punctuation, TokenChar.Symbol))))))))); log.info("OpenSearch index template created with ngram analyzer"); } } catch (IOException e) { log.error("Failed to create index template", e); } } @Override public void index(ExecutionDocument doc) { String indexName = INDEX_PREFIX + DAY_FMT.format(doc.startTime()); try { client.index(IndexRequest.of(b -> b .index(indexName) .id(doc.executionId()) .document(toMap(doc)))); } catch (IOException e) { log.error("Failed to index execution {}", doc.executionId(), e); } } @Override public SearchResult search(SearchRequest request) { try { var searchReq = buildSearchRequest(request, request.limit()); var response = client.search(searchReq, Map.class); List items = response.hits().hits().stream() .map(this::hitToSummary) .collect(Collectors.toList()); long total = response.hits().total() != null ? response.hits().total().value() : 0; return new SearchResult<>(items, total); } catch (IOException e) { log.error("Search failed", e); return new SearchResult<>(List.of(), 0); } } @Override public long count(SearchRequest request) { try { var countReq = CountRequest.of(b -> b .index(INDEX_PREFIX + "*") .query(buildQuery(request))); return client.count(countReq).count(); } catch (IOException e) { log.error("Count failed", e); return 0; } } @Override public void delete(String executionId) { try { client.deleteByQuery(DeleteByQueryRequest.of(b -> b .index(List.of(INDEX_PREFIX + "*")) .query(Query.of(q -> q.term(t -> t .field("execution_id").value(executionId)))))); } catch (IOException e) { log.error("Failed to delete execution {}", executionId, e); } } private org.opensearch.client.opensearch.core.SearchRequest buildSearchRequest( SearchRequest request, int size) { return org.opensearch.client.opensearch.core.SearchRequest.of(b -> { b.index(INDEX_PREFIX + "*") .query(buildQuery(request)) .size(size) .from(request.offset()) .sort(s -> s.field(f -> f .field(request.sortColumn()) .order("asc".equalsIgnoreCase(request.sortDir()) ? SortOrder.Asc : SortOrder.Desc))); return b; }); } private Query buildQuery(SearchRequest request) { List must = new ArrayList<>(); List filter = new ArrayList<>(); // Time range if (request.timeFrom() != null || request.timeTo() != null) { filter.add(Query.of(q -> q.range(r -> { r.field("start_time"); if (request.timeFrom() != null) r.gte(jakarta.json.Json.createValue(request.timeFrom().toString())); if (request.timeTo() != null) r.lte(jakarta.json.Json.createValue(request.timeTo().toString())); return r; }))); } // Keyword filters if (request.status() != null) filter.add(termQuery("status", request.status())); if (request.routeId() != null) filter.add(termQuery("route_id", request.routeId())); if (request.agentId() != null) filter.add(termQuery("agent_id", request.agentId())); if (request.correlationId() != null) filter.add(termQuery("correlation_id", request.correlationId())); // Full-text search across all fields + nested processor fields if (request.text() != null && !request.text().isBlank()) { String text = request.text(); List textQueries = new ArrayList<>(); // Search top-level text fields textQueries.add(Query.of(q -> q.multiMatch(m -> m .query(text) .fields("error_message", "error_stacktrace", "error_message.ngram", "error_stacktrace.ngram")))); // Search nested processor fields textQueries.add(Query.of(q -> q.nested(n -> n .path("processors") .query(nq -> nq.multiMatch(m -> m .query(text) .fields("processors.input_body", "processors.output_body", "processors.input_headers", "processors.output_headers", "processors.error_message", "processors.error_stacktrace", "processors.input_body.ngram", "processors.output_body.ngram", "processors.input_headers.ngram", "processors.output_headers.ngram", "processors.error_message.ngram", "processors.error_stacktrace.ngram")))))); // Also try keyword fields for exact matches textQueries.add(Query.of(q -> q.multiMatch(m -> m .query(text) .fields("execution_id", "route_id", "agent_id", "correlation_id", "exchange_id")))); must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1")))); } // Scoped text searches if (request.textInBody() != null && !request.textInBody().isBlank()) { must.add(Query.of(q -> q.nested(n -> n .path("processors") .query(nq -> nq.multiMatch(m -> m .query(request.textInBody()) .fields("processors.input_body", "processors.output_body", "processors.input_body.ngram", "processors.output_body.ngram")))))); } if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) { must.add(Query.of(q -> q.nested(n -> n .path("processors") .query(nq -> nq.multiMatch(m -> m .query(request.textInHeaders()) .fields("processors.input_headers", "processors.output_headers", "processors.input_headers.ngram", "processors.output_headers.ngram")))))); } if (request.textInErrors() != null && !request.textInErrors().isBlank()) { String errText = request.textInErrors(); must.add(Query.of(q -> q.bool(b -> b.should( Query.of(sq -> sq.multiMatch(m -> m .query(errText) .fields("error_message", "error_stacktrace", "error_message.ngram", "error_stacktrace.ngram"))), Query.of(sq -> sq.nested(n -> n .path("processors") .query(nq -> nq.multiMatch(m -> m .query(errText) .fields("processors.error_message", "processors.error_stacktrace", "processors.error_message.ngram", "processors.error_stacktrace.ngram"))))) ).minimumShouldMatch("1")))); } // Duration range if (request.durationMin() != null || request.durationMax() != null) { filter.add(Query.of(q -> q.range(r -> { r.field("duration_ms"); if (request.durationMin() != null) r.gte(jakarta.json.Json.createValue(request.durationMin())); if (request.durationMax() != null) r.lte(jakarta.json.Json.createValue(request.durationMax())); return r; }))); } return Query.of(q -> q.bool(b -> { if (!must.isEmpty()) b.must(must); if (!filter.isEmpty()) b.filter(filter); if (must.isEmpty() && filter.isEmpty()) b.must(Query.of(mq -> mq.matchAll(m -> m))); return b; })); } private Query termQuery(String field, String value) { return Query.of(q -> q.term(t -> t.field(field).value(value))); } private Map toMap(ExecutionDocument doc) { Map map = new LinkedHashMap<>(); map.put("execution_id", doc.executionId()); map.put("route_id", doc.routeId()); map.put("agent_id", doc.agentId()); map.put("group_name", doc.groupName()); map.put("status", doc.status()); map.put("correlation_id", doc.correlationId()); map.put("exchange_id", doc.exchangeId()); map.put("start_time", doc.startTime() != null ? doc.startTime().toString() : null); map.put("end_time", doc.endTime() != null ? doc.endTime().toString() : null); map.put("duration_ms", doc.durationMs()); map.put("error_message", doc.errorMessage()); map.put("error_stacktrace", doc.errorStacktrace()); if (doc.processors() != null) { map.put("processors", doc.processors().stream().map(p -> { Map pm = new LinkedHashMap<>(); pm.put("processor_id", p.processorId()); pm.put("processor_type", p.processorType()); pm.put("status", p.status()); pm.put("error_message", p.errorMessage()); pm.put("error_stacktrace", p.errorStacktrace()); pm.put("input_body", p.inputBody()); pm.put("output_body", p.outputBody()); pm.put("input_headers", p.inputHeaders()); pm.put("output_headers", p.outputHeaders()); return pm; }).toList()); } return map; } @SuppressWarnings("unchecked") private ExecutionSummary hitToSummary(Hit hit) { Map src = hit.source(); if (src == null) return null; return new ExecutionSummary( (String) src.get("execution_id"), (String) src.get("route_id"), (String) src.get("agent_id"), (String) src.get("status"), src.get("start_time") != null ? Instant.parse((String) src.get("start_time")) : null, src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null, src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L, (String) src.get("correlation_id"), (String) src.get("error_message")); } } ``` - [ ] **Step 4: Run tests to verify they pass** Run: `mvn test -pl cameleer-server-app -Dtest=OpenSearchIndexIT -q` Expected: PASS - [ ] **Step 5: Commit** ```bash git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/OpenSearchConfig.java git add cameleer-server-app/src/main/java/com/cameleer/server/app/search/OpenSearchIndex.java git add cameleer-server-app/src/test/java/com/cameleer/server/app/search/OpenSearchIndexIT.java git commit -m "feat: implement OpenSearchIndex with full-text and wildcard search" ``` ### Task 14: Implement SearchIndexer (debounced event-driven indexer) **Files:** - Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/indexing/SearchIndexer.java` - [ ] **Step 1: Implement SearchIndexer** ```java package com.cameleer.server.core.indexing; import com.cameleer.server.core.storage.ExecutionStore; import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord; import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord; import com.cameleer.server.core.storage.SearchIndex; import com.cameleer.server.core.storage.model.ExecutionDocument; import com.cameleer.server.core.storage.model.ExecutionDocument.ProcessorDoc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; import java.util.concurrent.*; public class SearchIndexer { private static final Logger log = LoggerFactory.getLogger(SearchIndexer.class); private final ExecutionStore executionStore; private final SearchIndex searchIndex; private final long debounceMs; private final int queueCapacity; private final Map> pending = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( r -> { Thread t = new Thread(r, "search-indexer"); t.setDaemon(true); return t; }); public SearchIndexer(ExecutionStore executionStore, SearchIndex searchIndex, long debounceMs, int queueCapacity) { this.executionStore = executionStore; this.searchIndex = searchIndex; this.debounceMs = debounceMs; this.queueCapacity = queueCapacity; } public void onExecutionUpdated(ExecutionUpdatedEvent event) { if (pending.size() >= queueCapacity) { log.warn("Search indexer queue full, dropping event for {}", event.executionId()); return; } ScheduledFuture existing = pending.put(event.executionId(), scheduler.schedule(() -> indexExecution(event.executionId()), debounceMs, TimeUnit.MILLISECONDS)); if (existing != null) { existing.cancel(false); } } private void indexExecution(String executionId) { pending.remove(executionId); try { ExecutionRecord exec = executionStore.findById(executionId).orElse(null); if (exec == null) return; List processors = executionStore.findProcessors(executionId); List processorDocs = processors.stream() .map(p -> new ProcessorDoc( p.processorId(), p.processorType(), p.status(), p.errorMessage(), p.errorStacktrace(), p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders())) .toList(); searchIndex.index(new ExecutionDocument( exec.executionId(), exec.routeId(), exec.agentId(), exec.groupName(), exec.status(), exec.correlationId(), exec.exchangeId(), exec.startTime(), exec.endTime(), exec.durationMs(), exec.errorMessage(), exec.errorStacktrace(), processorDocs)); } catch (Exception e) { log.error("Failed to index execution {}", executionId, e); } } public void shutdown() { scheduler.shutdown(); } } ``` - [ ] **Step 2: Commit** ```bash git add cameleer-server-core/src/main/java/com/cameleer/server/core/indexing/SearchIndexer.java git commit -m "feat: implement debounced SearchIndexer for async OpenSearch indexing" ``` --- ## Chunk 5: Wiring, Cleanup, and Integration ### Task 15: Create bean configuration and wire everything **Files:** - Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java` - Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/MetricsFlushScheduler.java` - Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/retention/RetentionScheduler.java` - Modify: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/SearchBeanConfig.java` - Modify: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/IngestionBeanConfig.java` - [ ] **Step 1: Create StorageBeanConfig** Wire `DetailService`, `SearchIndexer`, `IngestionService` with new store beans: ```java package com.cameleer.server.app.config; import com.cameleer.server.core.detail.DetailService; import com.cameleer.server.core.indexing.SearchIndexer; import com.cameleer.server.core.ingestion.IngestionService; import com.cameleer.server.core.ingestion.WriteBuffer; import com.cameleer.server.core.storage.*; import com.cameleer.server.core.storage.model.MetricsSnapshot; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class StorageBeanConfig { @Bean public DetailService detailService(ExecutionStore executionStore) { return new DetailService(executionStore); } @Bean(destroyMethod = "shutdown") public SearchIndexer searchIndexer(ExecutionStore executionStore, SearchIndex searchIndex, @Value("${opensearch.debounce-ms:2000}") long debounceMs, @Value("${opensearch.queue-size:10000}") int queueSize) { return new SearchIndexer(executionStore, searchIndex, debounceMs, queueSize); } @Bean public IngestionService ingestionService(ExecutionStore executionStore, DiagramStore diagramStore, WriteBuffer metricsBuffer, SearchIndexer searchIndexer, @Value("${cameleer.body-size-limit:16384}") int bodySizeLimit) { return new IngestionService(executionStore, diagramStore, metricsBuffer, searchIndexer::onExecutionUpdated, bodySizeLimit); } } ``` - [ ] **Step 2: Update SearchBeanConfig** Wire `SearchService` with `SearchIndex` + `StatsStore`: ```java @Bean public SearchService searchService(SearchIndex searchIndex, StatsStore statsStore) { return new SearchService(searchIndex, statsStore); } ``` - [ ] **Step 3: Update IngestionBeanConfig** Remove execution and diagram write buffers. Keep only metrics write buffer: ```java @Bean public WriteBuffer metricsBuffer(IngestionConfig config) { return new WriteBuffer<>(config.getBufferCapacity()); } ``` - [ ] **Step 4: Create MetricsFlushScheduler** Create `cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/MetricsFlushScheduler.java`: ```java package com.cameleer.server.app.ingestion; import com.cameleer.server.app.config.IngestionConfig; import com.cameleer.server.core.ingestion.WriteBuffer; import com.cameleer.server.core.storage.MetricsStore; import com.cameleer.server.core.storage.model.MetricsSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.SmartLifecycle; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; @Component public class MetricsFlushScheduler implements SmartLifecycle { private static final Logger log = LoggerFactory.getLogger(MetricsFlushScheduler.class); private final WriteBuffer metricsBuffer; private final MetricsStore metricsStore; private final int batchSize; private volatile boolean running = false; public MetricsFlushScheduler(WriteBuffer metricsBuffer, MetricsStore metricsStore, IngestionConfig config) { this.metricsBuffer = metricsBuffer; this.metricsStore = metricsStore; this.batchSize = config.getBatchSize(); } @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}") public void flush() { try { List batch = metricsBuffer.drain(batchSize); if (!batch.isEmpty()) { metricsStore.insertBatch(batch); log.debug("Flushed {} metrics to PostgreSQL", batch.size()); } } catch (Exception e) { log.error("Failed to flush metrics", e); } } @Override public void start() { running = true; } @Override public void stop() { // Drain remaining on shutdown while (metricsBuffer.size() > 0) { List batch = metricsBuffer.drain(batchSize); if (batch.isEmpty()) break; try { metricsStore.insertBatch(batch); } catch (Exception e) { log.error("Failed to flush metrics during shutdown", e); break; } } running = false; } @Override public boolean isRunning() { return running; } @Override public int getPhase() { return Integer.MAX_VALUE - 1; } } ``` - [ ] **Step 5: Create RetentionScheduler** Create `cameleer-server-app/src/main/java/com/cameleer/server/app/retention/RetentionScheduler.java`: ```java package com.cameleer.server.app.retention; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class RetentionScheduler { private static final Logger log = LoggerFactory.getLogger(RetentionScheduler.class); private final JdbcTemplate jdbc; private final int retentionDays; public RetentionScheduler(JdbcTemplate jdbc, @Value("${cameleer.retention-days:30}") int retentionDays) { this.jdbc = jdbc; this.retentionDays = retentionDays; } @Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM UTC public void dropExpiredChunks() { String interval = retentionDays + " days"; try { // Raw data jdbc.execute("SELECT drop_chunks('executions', INTERVAL '" + interval + "')"); jdbc.execute("SELECT drop_chunks('processor_executions', INTERVAL '" + interval + "')"); jdbc.execute("SELECT drop_chunks('agent_metrics', INTERVAL '" + interval + "')"); // Continuous aggregates (keep 3x longer) String caggInterval = (retentionDays * 3) + " days"; jdbc.execute("SELECT drop_chunks('stats_1m_all', INTERVAL '" + caggInterval + "')"); jdbc.execute("SELECT drop_chunks('stats_1m_app', INTERVAL '" + caggInterval + "')"); jdbc.execute("SELECT drop_chunks('stats_1m_route', INTERVAL '" + caggInterval + "')"); jdbc.execute("SELECT drop_chunks('stats_1m_processor', INTERVAL '" + caggInterval + "')"); log.info("Retention: dropped chunks older than {} days (aggregates: {} days)", retentionDays, retentionDays * 3); } catch (Exception e) { log.error("Retention job failed", e); } } // Note: OpenSearch daily index deletion should be handled via ILM policy // configured at deployment time, not in application code. } ``` - [ ] **Step 6: Commit** ```bash git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/SearchBeanConfig.java git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/IngestionBeanConfig.java git add cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/MetricsFlushScheduler.java git add cameleer-server-app/src/main/java/com/cameleer/server/app/retention/RetentionScheduler.java git commit -m "feat: wire new storage beans, add MetricsFlushScheduler and RetentionScheduler" ``` ### Task 16: Delete ClickHouse code and old interfaces **Files:** - Delete all files listed in "Files to delete" section above - [ ] **Step 1: Delete ClickHouse implementations** ```bash rm cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouse*.java rm cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseSearchEngine.java rm cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ClickHouseFlushScheduler.java rm cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseConfig.java ``` - [ ] **Step 2: Delete old core interfaces replaced by new ones** `UserRepository` and `OidcConfigRepository` in `core.security` are **kept** — the new Postgres implementations implement them. Only interfaces replaced by new storage interfaces are deleted. ```bash rm cameleer-server-core/src/main/java/com/cameleer/server/core/storage/ExecutionRepository.java rm cameleer-server-core/src/main/java/com/cameleer/server/core/storage/DiagramRepository.java rm cameleer-server-core/src/main/java/com/cameleer/server/core/storage/MetricsRepository.java rm cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchEngine.java rm cameleer-server-core/src/main/java/com/cameleer/server/core/detail/RawExecutionRow.java ``` - [ ] **Step 3: Delete ClickHouse SQL migrations** ```bash rm -r cameleer-server-app/src/main/resources/clickhouse/ ``` - [ ] **Step 4: Delete old test base class** ```bash rm cameleer-server-app/src/test/java/com/cameleer/server/app/AbstractClickHouseIT.java ``` - [ ] **Step 5: Fix compilation errors in specific files** Each file and what to change: - `DiagramRenderController.java` — change `DiagramRepository` to `DiagramStore` - `DiagramController.java` — change `DiagramRepository` to `DiagramStore` - `SearchController.java` — already uses `SearchService`, verify no direct `SearchEngine` refs - `DetailController.java` — already uses `DetailService`, verify no direct `ExecutionRepository` refs - `TreeReconstructionTest.java` — rewrite to test `DetailService.buildTree()` with `ProcessorRecord` list input - `ExecutionController.java` — update to call `IngestionService.ingestExecution()` (synchronous, catches exceptions for 503) - `DiagramController.java` — update to call `IngestionService.ingestDiagram()` (synchronous) - `MetricsController.java` — update to call `IngestionService.acceptMetrics()` (still buffered) - `SearchBeanConfig.java` — wire `SearchService(SearchIndex, StatsStore)` instead of `SearchService(SearchEngine)` - `IngestionBeanConfig.java` — remove execution/diagram buffer beans, add `bodySizeLimit` to `IngestionService` constructor - All ITs extending `AbstractClickHouseIT` — change to extend `AbstractPostgresIT` - [ ] **Step 6: Verify compilation** Run: `mvn compile -q` Expected: BUILD SUCCESS - [ ] **Step 7: Commit** ```bash git add -A git commit -m "refactor: remove all ClickHouse code, old interfaces, and SQL migrations" ``` ### Task 17: Update existing integration tests **Files:** - Modify: all IT files under `cameleer-server-app/src/test/` - [ ] **Step 1: Update all ITs to extend AbstractPostgresIT** Every IT that currently extends `AbstractClickHouseIT` must extend `AbstractPostgresIT` instead. Also add OpenSearch Testcontainer where search tests are needed. - [ ] **Step 2: Update ExecutionControllerIT** Adapt to synchronous writes (no buffer flush delay). The controller now returns 202 on success, 503 on DB write failure. - [ ] **Step 3: Update SearchControllerIT** Search now hits OpenSearch. Add OpenSearch Testcontainer. Allow time for async indexing (use Awaitility to poll search results rather than `Thread.sleep`). - [ ] **Step 4: Update DetailControllerIT** Detail now reads from `PostgresExecutionStore` directly. Simpler setup — just insert execution + processors. - [ ] **Step 5: Run full test suite** Run: `mvn verify -q` Expected: BUILD SUCCESS, all tests pass - [ ] **Step 6: Commit** ```bash git add -A git commit -m "test: migrate all integration tests from ClickHouse to PostgreSQL + OpenSearch" ``` ### Task 18: Update Dockerfile and K8s manifests **Files:** - Modify: `Dockerfile` - Modify: `deploy/*.yaml` (K8s manifests) - [ ] **Step 1: Update Dockerfile** No JDBC driver changes needed in Dockerfile (drivers are in the fat JAR). Just verify the `REGISTRY_TOKEN` build arg still works for `cameleer-common` resolution. - [ ] **Step 2: Update K8s manifests** - Replace ClickHouse StatefulSet with PostgreSQL/TimescaleDB StatefulSet - Add OpenSearch StatefulSet (single-node for dev, clustered for prod) - Update server Deployment env vars: `SPRING_DATASOURCE_URL`, `OPENSEARCH_URL` - Update secrets: replace `clickhouse-credentials` with `postgres-credentials` - Update health check probes: PostgreSQL uses standard port 5432, OpenSearch uses 9200 - [ ] **Step 3: Commit** ```bash git add Dockerfile deploy/ git commit -m "deploy: replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch in K8s manifests" ``` ### Task 19: Update CI workflow **Files:** - Modify: `.gitea/workflows/ci.yml` - [ ] **Step 1: Update CI workflow** Changes needed: - Docker build: no ClickHouse references in the image (it's a fat JAR, driver is bundled) - Deploy step: update K8s secret names from `clickhouse-credentials` to `postgres-credentials` - Deploy step: add OpenSearch deployment manifests - Verify `REGISTRY_TOKEN` build arg still works for `cameleer-common` - Integration tests still skipped in CI (`-DskipITs`) — Testcontainers needs Docker-in-Docker - [ ] **Step 2: Commit** ```bash git add .gitea/workflows/ci.yml git commit -m "ci: update workflow for PostgreSQL + OpenSearch deployment" ``` ### Task 20: Update OpenAPI spec - [ ] **Step 1: Regenerate openapi.json** Run the server locally (or via test) and export the OpenAPI spec. The API surface hasn't changed — only the storage backend. Run: `mvn spring-boot:run` (briefly, to generate spec), then fetch `/api/v1/api-docs`. - [ ] **Step 2: Commit** ```bash git add cameleer-server-app/src/main/resources/static/openapi.json git commit -m "docs: regenerate openapi.json after storage layer refactor" ``` ### Task 21: Final verification - [ ] **Step 1: Full build** Run: `mvn clean verify -q` Expected: BUILD SUCCESS - [ ] **Step 2: Manual smoke test** Start server against local PostgreSQL/TimescaleDB + OpenSearch (Docker Compose or individual containers): 1. POST an execution → verify 202 2. GET `/api/v1/search?text=...` → verify search works 3. GET `/api/v1/stats` → verify stats return 4. GET `/api/v1/detail/{id}` → verify detail with processor tree - [ ] **Step 3: Commit any remaining fixes** ```bash git add -A git commit -m "fix: final adjustments from smoke testing" ```