diff --git a/docs/superpowers/plans/2026-03-16-storage-layer-refactor.md b/docs/superpowers/plans/2026-03-16-storage-layer-refactor.md new file mode 100644 index 00000000..b4f7777a --- /dev/null +++ b/docs/superpowers/plans/2026-03-16-storage-layer-refactor.md @@ -0,0 +1,2765 @@ +# Storage Layer Refactor Implementation Plan + +> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch as the storage and search backends. + +**Architecture:** PostgreSQL/TimescaleDB is the source of truth for all data and analytics (continuous aggregates). OpenSearch is an async search index for full-text/wildcard queries. Core module defines interfaces; app module provides implementations. + +**Tech Stack:** Java 17, Spring Boot 3.4.3, PostgreSQL 16 + TimescaleDB, OpenSearch 2.x, Flyway, Testcontainers, OpenSearch Java Client + +**Spec:** `docs/superpowers/specs/2026-03-16-storage-layer-design.md` + +--- + +## File Structure + +### New files + +**Core module** (`cameleer3-server-core/src/main/java/com/cameleer3/server/core/`): +- `storage/ExecutionStore.java` — new interface replacing ExecutionRepository +- `storage/StatsStore.java` — new interface for stats from continuous aggregates +- `storage/SearchIndex.java` — new interface for OpenSearch operations +- `storage/DiagramStore.java` — new interface replacing DiagramRepository +- `storage/MetricsStore.java` — new interface replacing MetricsRepository +- `storage/model/ExecutionDocument.java` — document model for OpenSearch indexing +- `search/StatsRequest.java` — request DTO for stats queries (level, scope, time range) +- `search/TimeSeriesRequest.java` — request DTO for time-series queries (bucket size) +- `indexing/SearchIndexer.java` — debounced event listener for OpenSearch indexing +- `indexing/ExecutionUpdatedEvent.java` — event published after execution write + +**App module** (`cameleer3-server-app/src/main/java/com/cameleer3/server/app/`): +- `storage/PostgresExecutionStore.java` — ExecutionStore impl with upsert +- `storage/PostgresStatsStore.java` — StatsStore impl querying continuous aggregates +- `storage/PostgresDiagramStore.java` — DiagramStore impl +- `storage/PostgresUserRepository.java` — UserRepository impl (keeps existing core interface) +- `storage/PostgresOidcConfigRepository.java` — OidcConfigRepository impl (keeps existing core interface) +- `storage/PostgresMetricsStore.java` — MetricsStore impl +- `search/OpenSearchIndex.java` — SearchIndex impl +- `config/OpenSearchConfig.java` — OpenSearch client bean +- `config/StorageBeanConfig.java` — wires all store beans +- `ingestion/MetricsFlushScheduler.java` — scheduled metrics buffer flush (replaces ClickHouseFlushScheduler, metrics only) +- `retention/RetentionScheduler.java` — scheduled job for drop_chunks and OpenSearch index deletion + +**Flyway migrations** (`cameleer3-server-app/src/main/resources/db/migration/`): +- `V1__extensions.sql` — CREATE EXTENSION timescaledb, timescaledb_toolkit +- `V2__executions.sql` — executions hypertable +- `V3__processor_executions.sql` — processor_executions hypertable +- `V4__agent_metrics.sql` — agent_metrics hypertable +- `V5__route_diagrams.sql` — route_diagrams table +- `V6__users.sql` — users table +- `V7__oidc_config.sql` — oidc_config table +- `V8__continuous_aggregates.sql` — all 4 continuous aggregates + refresh policies + +Note: Retention is NOT in a Flyway migration (Flyway migrations are immutable once applied). No V9 file. Retention is handled by `RetentionScheduler` at runtime with configurable intervals. + +**Test files** (`cameleer3-server-app/src/test/java/com/cameleer3/server/app/`): +- `AbstractPostgresIT.java` — replaces AbstractClickHouseIT (TimescaleDB Testcontainer) +- `storage/PostgresExecutionStoreIT.java` — upsert, dedup, chunked arrival tests +- `storage/PostgresStatsStoreIT.java` — continuous aggregate query tests +- `storage/PostgresDiagramStoreIT.java` — content-hash dedup tests +- `storage/PostgresUserRepositoryIT.java` — CRUD tests +- `search/OpenSearchIndexIT.java` — index, search, wildcard tests + +### Files to modify + +- `pom.xml` (root) — no changes needed +- `cameleer3-server-app/pom.xml` — swap clickhouse-jdbc for postgresql + opensearch-java + flyway +- `cameleer3-server-core/.../core/search/SearchService.java` — split: search delegates to SearchIndex, stats/timeseries to StatsStore +- `cameleer3-server-core/.../core/detail/DetailService.java` — use ExecutionStore instead of ExecutionRepository +- `cameleer3-server-core/.../core/detail/RawExecutionRow.java` — remove (replaced by normalized model) +- `cameleer3-server-core/.../core/ingestion/IngestionService.java` — synchronous execution/diagram writes, keep buffer for metrics +- `cameleer3-server-app/.../app/config/SearchBeanConfig.java` — wire StatsStore into SearchService +- `cameleer3-server-app/.../app/config/IngestionBeanConfig.java` — update bean wiring +- `cameleer3-server-app/src/main/resources/application.yml` — PostgreSQL + OpenSearch config +- `cameleer3-server-app/src/test/resources/application-test.yml` — test config + +### Files to delete + +- `cameleer3-server-app/.../app/storage/ClickHouseExecutionRepository.java` +- `cameleer3-server-app/.../app/storage/ClickHouseDiagramRepository.java` +- `cameleer3-server-app/.../app/storage/ClickHouseMetricsRepository.java` +- `cameleer3-server-app/.../app/storage/ClickHouseUserRepository.java` +- `cameleer3-server-app/.../app/storage/ClickHouseOidcConfigRepository.java` +- `cameleer3-server-app/.../app/search/ClickHouseSearchEngine.java` +- `cameleer3-server-app/.../app/ingestion/ClickHouseFlushScheduler.java` +- `cameleer3-server-app/.../app/config/ClickHouseConfig.java` +- `cameleer3-server-core/.../core/storage/ExecutionRepository.java` +- `cameleer3-server-core/.../core/storage/DiagramRepository.java` +- `cameleer3-server-core/.../core/storage/MetricsRepository.java` +- `cameleer3-server-core/.../core/search/SearchEngine.java` +- `cameleer3-server-core/.../core/detail/RawExecutionRow.java` + +Note: `UserRepository` and `OidcConfigRepository` interfaces in `core.security` are **kept** — the new Postgres implementations implement these existing interfaces. No rename needed since their contracts are unchanged. +- `cameleer3-server-app/src/main/resources/clickhouse/*.sql` (all 8 files) +- `cameleer3-server-app/src/test/.../app/AbstractClickHouseIT.java` + +--- + +## Chunk 1: Dependencies, Flyway Migrations, and Test Infrastructure + +### Task 1: Update Maven dependencies + +**Files:** +- Modify: `cameleer3-server-app/pom.xml` + +- [ ] **Step 1: Replace ClickHouse JDBC with PostgreSQL driver + Flyway + OpenSearch client** + +In `cameleer3-server-app/pom.xml`, replace the ClickHouse dependency and add new ones: + +Remove: +```xml + + com.clickhouse + clickhouse-jdbc + 0.9.7 + all + +``` + +Add: +```xml + + org.postgresql + postgresql + + + org.flywaydb + flyway-core + + + org.flywaydb + flyway-database-postgresql + + + org.opensearch.client + opensearch-java + 2.19.0 + + + org.opensearch.client + opensearch-rest-client + 2.19.0 + +``` + +Replace the ClickHouse Testcontainer: +```xml + + + + org.testcontainers + postgresql + test + + + org.opensearch + opensearch-testcontainers + 2.1.1 + test + +``` + +Note: `postgresql` driver and `flyway-core` versions are managed by Spring Boot parent POM. Testcontainers BOM version is `${testcontainers.version}` (2.0.3) from root POM. + +- [ ] **Step 2: Commit** (compilation will fail until ClickHouse code is deleted in Task 16 — this is expected) + +```bash +git add cameleer3-server-app/pom.xml +git commit -m "chore: swap ClickHouse deps for PostgreSQL, Flyway, OpenSearch" +``` + +### Task 2: Write Flyway migrations + +**Files:** +- Create: `cameleer3-server-app/src/main/resources/db/migration/V1__extensions.sql` +- Create: `cameleer3-server-app/src/main/resources/db/migration/V2__executions.sql` +- Create: `cameleer3-server-app/src/main/resources/db/migration/V3__processor_executions.sql` +- Create: `cameleer3-server-app/src/main/resources/db/migration/V4__agent_metrics.sql` +- Create: `cameleer3-server-app/src/main/resources/db/migration/V5__route_diagrams.sql` +- Create: `cameleer3-server-app/src/main/resources/db/migration/V6__users.sql` +- Create: `cameleer3-server-app/src/main/resources/db/migration/V7__oidc_config.sql` +- Create: `cameleer3-server-app/src/main/resources/db/migration/V8__continuous_aggregates.sql` +- Create: `cameleer3-server-app/src/main/resources/db/migration/V9__retention_policies.sql` + +- [ ] **Step 1: Create V1__extensions.sql** + +```sql +CREATE EXTENSION IF NOT EXISTS timescaledb; +CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit; +``` + +- [ ] **Step 2: Create V2__executions.sql** + +```sql +CREATE TABLE executions ( + execution_id TEXT NOT NULL, + route_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + group_name TEXT NOT NULL, + status TEXT NOT NULL, + correlation_id TEXT, + exchange_id TEXT, + start_time TIMESTAMPTZ NOT NULL, + end_time TIMESTAMPTZ, + duration_ms BIGINT, + error_message TEXT, + error_stacktrace TEXT, + diagram_content_hash TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (execution_id, start_time) +); + +SELECT create_hypertable('executions', 'start_time', chunk_time_interval => INTERVAL '1 day'); + +CREATE INDEX idx_executions_agent_time ON executions (agent_id, start_time DESC); +CREATE INDEX idx_executions_route_time ON executions (route_id, start_time DESC); +CREATE INDEX idx_executions_group_time ON executions (group_name, start_time DESC); +CREATE INDEX idx_executions_correlation ON executions (correlation_id); +``` + +- [ ] **Step 3: Create V3__processor_executions.sql** + +```sql +CREATE TABLE processor_executions ( + id BIGSERIAL, + execution_id TEXT NOT NULL, + processor_id TEXT NOT NULL, + processor_type TEXT NOT NULL, + diagram_node_id TEXT, + group_name TEXT NOT NULL, + route_id TEXT NOT NULL, + depth INT NOT NULL, + parent_processor_id TEXT, + status TEXT NOT NULL, + start_time TIMESTAMPTZ NOT NULL, + end_time TIMESTAMPTZ, + duration_ms BIGINT, + error_message TEXT, + error_stacktrace TEXT, + input_body TEXT, + output_body TEXT, + input_headers JSONB, + output_headers JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (execution_id, processor_id, start_time) +); + +SELECT create_hypertable('processor_executions', 'start_time', chunk_time_interval => INTERVAL '1 day'); + +CREATE INDEX idx_proc_exec_execution ON processor_executions (execution_id); +CREATE INDEX idx_proc_exec_type_time ON processor_executions (processor_type, start_time DESC); +``` + +- [ ] **Step 4: Create V4__agent_metrics.sql** + +```sql +CREATE TABLE agent_metrics ( + agent_id TEXT NOT NULL, + metric_name TEXT NOT NULL, + metric_value DOUBLE PRECISION NOT NULL, + tags JSONB, + collected_at TIMESTAMPTZ NOT NULL, + server_received_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +SELECT create_hypertable('agent_metrics', 'collected_at', chunk_time_interval => INTERVAL '1 day'); + +CREATE INDEX idx_metrics_agent_name ON agent_metrics (agent_id, metric_name, collected_at DESC); +``` + +- [ ] **Step 5: Create V5__route_diagrams.sql** + +```sql +CREATE TABLE route_diagrams ( + content_hash TEXT PRIMARY KEY, + route_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + definition TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX idx_diagrams_route_agent ON route_diagrams (route_id, agent_id); +``` + +- [ ] **Step 6: Create V6__users.sql** + +```sql +CREATE TABLE users ( + user_id TEXT PRIMARY KEY, + provider TEXT NOT NULL, + email TEXT, + display_name TEXT, + roles TEXT[] NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +``` + +- [ ] **Step 7: Create V7__oidc_config.sql** + +```sql +CREATE TABLE oidc_config ( + config_id TEXT PRIMARY KEY DEFAULT 'default', + enabled BOOLEAN NOT NULL DEFAULT false, + issuer_uri TEXT, + client_id TEXT, + client_secret TEXT, + roles_claim TEXT, + default_roles TEXT[] NOT NULL DEFAULT '{}', + auto_signup BOOLEAN DEFAULT false, + display_name_claim TEXT, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +``` + +- [ ] **Step 8: Create V8__continuous_aggregates.sql** + +```sql +-- Global stats +CREATE MATERIALIZED VIEW stats_1m_all +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 minute', start_time) AS bucket, + COUNT(*) AS total_count, + COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, + COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count, + SUM(duration_ms) AS duration_sum, + MAX(duration_ms) AS duration_max, + approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration +FROM executions +WHERE status IS NOT NULL +GROUP BY bucket; + +SELECT add_continuous_aggregate_policy('stats_1m_all', + start_offset => INTERVAL '1 hour', + end_offset => INTERVAL '1 minute', + schedule_interval => INTERVAL '1 minute'); + +-- Per-application stats +CREATE MATERIALIZED VIEW stats_1m_app +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 minute', start_time) AS bucket, + group_name, + COUNT(*) AS total_count, + COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, + COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count, + SUM(duration_ms) AS duration_sum, + MAX(duration_ms) AS duration_max, + approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration +FROM executions +WHERE status IS NOT NULL +GROUP BY bucket, group_name; + +SELECT add_continuous_aggregate_policy('stats_1m_app', + start_offset => INTERVAL '1 hour', + end_offset => INTERVAL '1 minute', + schedule_interval => INTERVAL '1 minute'); + +-- Per-route stats +CREATE MATERIALIZED VIEW stats_1m_route +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 minute', start_time) AS bucket, + group_name, + route_id, + COUNT(*) AS total_count, + COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, + COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count, + SUM(duration_ms) AS duration_sum, + MAX(duration_ms) AS duration_max, + approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration +FROM executions +WHERE status IS NOT NULL +GROUP BY bucket, group_name, route_id; + +SELECT add_continuous_aggregate_policy('stats_1m_route', + start_offset => INTERVAL '1 hour', + end_offset => INTERVAL '1 minute', + schedule_interval => INTERVAL '1 minute'); + +-- Per-processor stats (uses denormalized group_name/route_id on processor_executions) +CREATE MATERIALIZED VIEW stats_1m_processor +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 minute', start_time) AS bucket, + group_name, + route_id, + processor_type, + COUNT(*) AS total_count, + COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, + SUM(duration_ms) AS duration_sum, + MAX(duration_ms) AS duration_max, + approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration +FROM processor_executions +GROUP BY bucket, group_name, route_id, processor_type; + +SELECT add_continuous_aggregate_policy('stats_1m_processor', + start_offset => INTERVAL '1 hour', + end_offset => INTERVAL '1 minute', + schedule_interval => INTERVAL '1 minute'); +``` + +- [ ] **Step 9: Commit** + +```bash +git add cameleer3-server-app/src/main/resources/db/migration/ +git commit -m "feat: add Flyway migrations for PostgreSQL/TimescaleDB schema" +``` + +### Task 3: Create test base class with TimescaleDB Testcontainer + +**Files:** +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java` + +- [ ] **Step 1: Write AbstractPostgresIT** + +```java +package com.cameleer3.server.app; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@Testcontainers +public abstract class AbstractPostgresIT { + + @Container + static final PostgreSQLContainer postgres = + new PostgreSQLContainer<>("timescale/timescaledb:latest-pg16") + .withDatabaseName("cameleer3") + .withUsername("cameleer") + .withPassword("test"); + + @DynamicPropertySource + static void configureProperties(DynamicPropertyRegistry registry) { + registry.add("spring.datasource.url", postgres::getJdbcUrl); + registry.add("spring.datasource.username", postgres::getUsername); + registry.add("spring.datasource.password", postgres::getPassword); + registry.add("spring.datasource.driver-class-name", () -> "org.postgresql.Driver"); + registry.add("spring.flyway.enabled", () -> "true"); + } +} +``` + +- [ ] **Step 2: Write a smoke test to verify migrations run** + +Create `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/FlywayMigrationIT.java`: + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.app.AbstractPostgresIT; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; + +import static org.junit.jupiter.api.Assertions.*; + +class FlywayMigrationIT extends AbstractPostgresIT { + + @Autowired + JdbcTemplate jdbcTemplate; + + @Test + void allMigrationsApplySuccessfully() { + // Verify core tables exist + Integer execCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM executions", Integer.class); + assertEquals(0, execCount); + + Integer procCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM processor_executions", Integer.class); + assertEquals(0, procCount); + + Integer userCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM users", Integer.class); + assertEquals(0, userCount); + + // Verify continuous aggregates exist + Integer caggCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM timescaledb_information.continuous_aggregates", + Integer.class); + assertEquals(4, caggCount); + } +} +``` + +- [ ] **Step 3: Verify test passes** (this test will not compile until Task 16 deletes ClickHouse code. Run it after Task 16 is complete. Listed here for logical grouping.) + +Run: `mvn test -pl cameleer3-server-app -Dtest=FlywayMigrationIT -q` +Expected: PASS — all migrations apply, tables and continuous aggregates exist + +- [ ] **Step 4: Commit** + +```bash +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/FlywayMigrationIT.java +git commit -m "test: add TimescaleDB test base class and Flyway migration smoke test" +``` + +### Task 4: Update application.yml for PostgreSQL + OpenSearch + +**Files:** +- Modify: `cameleer3-server-app/src/main/resources/application.yml` +- Modify: `cameleer3-server-app/src/test/resources/application-test.yml` + +- [ ] **Step 1: Update application.yml datasource section** + +Replace: +```yaml +spring: + datasource: + url: jdbc:ch://localhost:8123/cameleer3 + username: cameleer + password: cameleer_dev + driver-class-name: com.clickhouse.jdbc.ClickHouseDriver +``` + +With: +```yaml +spring: + datasource: + url: jdbc:postgresql://localhost:5432/cameleer3 + username: cameleer + password: ${CAMELEER_DB_PASSWORD:cameleer_dev} + driver-class-name: org.postgresql.Driver + flyway: + enabled: true + locations: classpath:db/migration +``` + +Add OpenSearch config section: +```yaml +opensearch: + url: ${OPENSEARCH_URL:http://localhost:9200} + queue-size: ${CAMELEER_OPENSEARCH_QUEUE_SIZE:10000} + debounce-ms: ${CAMELEER_OPENSEARCH_DEBOUNCE_MS:2000} +``` + +Add body size limit: +```yaml +cameleer: + body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384} + retention-days: ${CAMELEER_RETENTION_DAYS:30} +``` + +Remove the `clickhouse:` section. + +- [ ] **Step 2: Update application-test.yml** + +```yaml +spring: + flyway: + enabled: true +opensearch: + url: http://localhost:9200 +``` + +- [ ] **Step 3: Commit** + +```bash +git add cameleer3-server-app/src/main/resources/application.yml +git add cameleer3-server-app/src/test/resources/application-test.yml +git commit -m "config: switch datasource to PostgreSQL, add OpenSearch and Flyway config" +``` + +--- + +## Chunk 2: Core Module Interfaces and Models + +### Task 5: Create new storage interfaces in core module + +**Files:** +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java` +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/StatsStore.java` +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/SearchIndex.java` +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramStore.java` +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsStore.java` + +- [ ] **Step 1: Create ExecutionStore interface** + +```java +package com.cameleer3.server.core.storage; + +import com.cameleer3.server.core.detail.ProcessorNode; + +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +public interface ExecutionStore { + + void upsert(ExecutionRecord execution); + + void upsertProcessors(String executionId, Instant startTime, + String groupName, String routeId, + List processors); + + Optional findById(String executionId); + + List findProcessors(String executionId); + + record ExecutionRecord( + String executionId, String routeId, String agentId, String groupName, + String status, String correlationId, String exchangeId, + Instant startTime, Instant endTime, Long durationMs, + String errorMessage, String errorStacktrace, String diagramContentHash + ) {} + + record ProcessorRecord( + String executionId, String processorId, String processorType, + String diagramNodeId, String groupName, String routeId, + int depth, String parentProcessorId, String status, + Instant startTime, Instant endTime, Long durationMs, + String errorMessage, String errorStacktrace, + String inputBody, String outputBody, String inputHeaders, String outputHeaders + ) {} +} +``` + +- [ ] **Step 2: Create StatsStore interface** + +Supports all 4 aggregation levels: global, per-app, per-route, per-processor. + +```java +package com.cameleer3.server.core.storage; + +import com.cameleer3.server.core.search.ExecutionStats; +import com.cameleer3.server.core.search.StatsTimeseries; + +import java.time.Instant; +import java.util.List; + +public interface StatsStore { + + // Global stats (stats_1m_all) + ExecutionStats stats(Instant from, Instant to); + + // Per-app stats (stats_1m_app) + ExecutionStats statsForApp(Instant from, Instant to, String groupName); + + // Per-route stats (stats_1m_route), optionally scoped to specific agents + ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds); + + // Per-processor stats (stats_1m_processor) + ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType); + + // Global timeseries + StatsTimeseries timeseries(Instant from, Instant to, int bucketCount); + + // Per-app timeseries + StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName); + + // Per-route timeseries, optionally scoped to specific agents + StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount, + String routeId, List agentIds); + + // Per-processor timeseries + StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount, + String routeId, String processorType); +} +``` + +- [ ] **Step 3: Create SearchIndex interface** + +```java +package com.cameleer3.server.core.storage; + +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.model.ExecutionDocument; + +public interface SearchIndex { + + SearchResult search(SearchRequest request); + + long count(SearchRequest request); + + void index(ExecutionDocument document); + + void delete(String executionId); +} +``` + +- [ ] **Step 4: Create DiagramStore interface** + +```java +package com.cameleer3.server.core.storage; + +import com.cameleer3.common.graph.RouteGraph; +import com.cameleer3.server.core.ingestion.TaggedDiagram; + +import java.util.List; +import java.util.Optional; + +public interface DiagramStore { + + void store(TaggedDiagram diagram); + + Optional findByContentHash(String contentHash); + + Optional findContentHashForRoute(String routeId, String agentId); + + Optional findContentHashForRouteByAgents(String routeId, List agentIds); +} +``` + +- [ ] **Step 5: Create MetricsStore interface** + +```java +package com.cameleer3.server.core.storage; + +import com.cameleer3.server.core.storage.model.MetricsSnapshot; + +import java.util.List; + +public interface MetricsStore { + + void insertBatch(List snapshots); +} +``` + +- [ ] **Step 6: Commit** + +```bash +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ +git commit -m "feat: add new storage interfaces for PostgreSQL/OpenSearch backends" +``` + +### Task 6: Create ExecutionDocument model and indexing event + +**Files:** +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java` +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/ExecutionUpdatedEvent.java` + +- [ ] **Step 1: Create ExecutionDocument** + +```java +package com.cameleer3.server.core.storage.model; + +import java.time.Instant; +import java.util.List; + +public record ExecutionDocument( + String executionId, String routeId, String agentId, String groupName, + String status, String correlationId, String exchangeId, + Instant startTime, Instant endTime, Long durationMs, + String errorMessage, String errorStacktrace, + List processors +) { + public record ProcessorDoc( + String processorId, String processorType, String status, + String errorMessage, String errorStacktrace, + String inputBody, String outputBody, + String inputHeaders, String outputHeaders + ) {} +} +``` + +- [ ] **Step 2: Create ExecutionUpdatedEvent** + +```java +package com.cameleer3.server.core.indexing; + +import java.time.Instant; + +public record ExecutionUpdatedEvent(String executionId, Instant startTime) {} +``` + +- [ ] **Step 3: Commit** + +```bash +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/ +git commit -m "feat: add ExecutionDocument model and ExecutionUpdatedEvent" +``` + +### Task 7: Update SearchService to use StatsStore for stats/timeseries + +**Files:** +- Modify: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchService.java` + +- [ ] **Step 1: Refactor SearchService to accept SearchIndex + StatsStore** + +Replace the single `SearchEngine` dependency with two dependencies: + +```java +package com.cameleer3.server.core.search; + +import com.cameleer3.server.core.storage.SearchIndex; +import com.cameleer3.server.core.storage.StatsStore; + +import java.time.Instant; +import java.util.List; + +public class SearchService { + + private final SearchIndex searchIndex; + private final StatsStore statsStore; + + public SearchService(SearchIndex searchIndex, StatsStore statsStore) { + this.searchIndex = searchIndex; + this.statsStore = statsStore; + } + + public SearchResult search(SearchRequest request) { + return searchIndex.search(request); + } + + public long count(SearchRequest request) { + return searchIndex.count(request); + } + + public ExecutionStats stats(Instant from, Instant to) { + return statsStore.stats(from, to); + } + + public ExecutionStats stats(Instant from, Instant to, String routeId, List agentIds) { + return statsStore.statsForRoute(from, to, routeId, agentIds); + } + + public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) { + return statsStore.timeseries(from, to, bucketCount); + } + + public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount, + String routeId, List agentIds) { + return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, agentIds); + } +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchService.java +git commit -m "refactor: SearchService uses SearchIndex + StatsStore instead of SearchEngine" +``` + +### Task 8: Update DetailService to use ExecutionStore + +**Files:** +- Modify: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java` + +- [ ] **Step 1: Rewrite DetailService to use ExecutionStore** + +The tree reconstruction from parallel arrays is no longer needed. Processors are now individual records with `parentProcessorId` for tree structure. + +```java +package com.cameleer3.server.core.detail; + +import com.cameleer3.server.core.storage.ExecutionStore; +import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; + +import java.util.*; + +public class DetailService { + + private final ExecutionStore executionStore; + + public DetailService(ExecutionStore executionStore) { + this.executionStore = executionStore; + } + + public Optional getDetail(String executionId) { + return executionStore.findById(executionId) + .map(exec -> { + List processors = executionStore.findProcessors(executionId); + List roots = buildTree(processors); + return new ExecutionDetail( + exec.executionId(), exec.routeId(), exec.agentId(), + exec.status(), exec.startTime(), exec.endTime(), + exec.durationMs() != null ? exec.durationMs() : 0L, + exec.correlationId(), exec.exchangeId(), + exec.errorMessage(), exec.errorStacktrace(), + exec.diagramContentHash(), roots + ); + }); + } + + List buildTree(List processors) { + if (processors.isEmpty()) return List.of(); + + Map nodeMap = new LinkedHashMap<>(); + for (ProcessorRecord p : processors) { + nodeMap.put(p.processorId(), new ProcessorNode( + p.processorId(), p.processorType(), p.status(), + p.startTime(), p.endTime(), + p.durationMs() != null ? p.durationMs() : 0L, + p.diagramNodeId(), p.errorMessage(), p.errorStacktrace() + )); + } + + List roots = new ArrayList<>(); + for (ProcessorRecord p : processors) { + ProcessorNode node = nodeMap.get(p.processorId()); + if (p.parentProcessorId() == null) { + roots.add(node); + } else { + ProcessorNode parent = nodeMap.get(p.parentProcessorId()); + if (parent != null) { + parent.addChild(node); + } else { + roots.add(node); // orphan safety + } + } + } + return roots; + } +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java +git commit -m "refactor: DetailService uses ExecutionStore, tree built from parentProcessorId" +``` + +### Task 9: Update IngestionService for synchronous writes + +**Files:** +- Modify: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java` + +- [ ] **Step 1: Rewrite IngestionService** + +Executions and diagrams become synchronous writes. Metrics keep the write buffer. Add event publishing for OpenSearch indexing. + +```java +package com.cameleer3.server.core.ingestion; + +import com.cameleer3.common.model.ProcessorExecution; +import com.cameleer3.common.model.RouteExecution; +import com.cameleer3.server.core.indexing.ExecutionUpdatedEvent; +import com.cameleer3.server.core.storage.DiagramStore; +import com.cameleer3.server.core.storage.ExecutionStore; +import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord; +import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +public class IngestionService { + + private final ExecutionStore executionStore; + private final DiagramStore diagramStore; + private final WriteBuffer metricsBuffer; + private final Consumer eventPublisher; + private final int bodySizeLimit; + + public IngestionService(ExecutionStore executionStore, + DiagramStore diagramStore, + WriteBuffer metricsBuffer, + Consumer eventPublisher, + int bodySizeLimit) { + this.executionStore = executionStore; + this.diagramStore = diagramStore; + this.metricsBuffer = metricsBuffer; + this.eventPublisher = eventPublisher; + this.bodySizeLimit = bodySizeLimit; + } + + public void ingestExecution(String agentId, String groupName, RouteExecution execution) { + ExecutionRecord record = toExecutionRecord(agentId, groupName, execution); + executionStore.upsert(record); + + if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) { + List processors = flattenProcessors( + execution.getProcessors(), record.executionId(), + record.startTime(), groupName, execution.getRouteId(), + null, 0); + executionStore.upsertProcessors( + record.executionId(), record.startTime(), + groupName, execution.getRouteId(), processors); + } + + eventPublisher.accept(new ExecutionUpdatedEvent( + record.executionId(), record.startTime())); + } + + public void ingestDiagram(TaggedDiagram diagram) { + diagramStore.store(diagram); + } + + public boolean acceptMetrics(List metrics) { + return metricsBuffer.offerBatch(metrics); + } + + public int getMetricsBufferDepth() { + return metricsBuffer.size(); + } + + public WriteBuffer getMetricsBuffer() { + return metricsBuffer; + } + + private ExecutionRecord toExecutionRecord(String agentId, String groupName, + RouteExecution exec) { + return new ExecutionRecord( + exec.getExecutionId(), exec.getRouteId(), agentId, groupName, + exec.getStatus() != null ? exec.getStatus().name() : "RUNNING", + exec.getCorrelationId(), exec.getExchangeId(), + exec.getStartTime(), exec.getEndTime(), + exec.getDurationMs(), + exec.getErrorMessage(), exec.getErrorStacktrace(), + null // diagramContentHash set separately + ); + } + + private List flattenProcessors( + List processors, String executionId, + java.time.Instant execStartTime, String groupName, String routeId, + String parentProcessorId, int depth) { + List flat = new ArrayList<>(); + for (ProcessorExecution p : processors) { + flat.add(new ProcessorRecord( + executionId, p.getProcessorId(), p.getProcessorType(), + p.getDiagramNodeId(), groupName, routeId, + depth, parentProcessorId, + p.getStatus() != null ? p.getStatus().name() : "RUNNING", + p.getStartTime() != null ? p.getStartTime() : execStartTime, + p.getEndTime(), + p.getDurationMs(), + p.getErrorMessage(), p.getErrorStacktrace(), + truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()), + p.getInputHeaders() != null ? p.getInputHeaders().toString() : null, + p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null + )); + if (p.getChildren() != null) { + flat.addAll(flattenProcessors( + p.getChildren(), executionId, execStartTime, + groupName, routeId, p.getProcessorId(), depth + 1)); + } + } + return flat; + } + + private String truncateBody(String body) { + if (body == null) return null; + if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit); + return body; + } +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java +git commit -m "refactor: IngestionService uses synchronous ExecutionStore writes with event publishing" +``` + +--- + +## Chunk 3: PostgreSQL Store Implementations + +### Task 10: Implement PostgresExecutionStore + +**Files:** +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java` +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java` + +- [ ] **Step 1: Write the failing test** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.app.AbstractPostgresIT; +import com.cameleer3.server.core.storage.ExecutionStore; +import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord; +import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.*; + +class PostgresExecutionStoreIT extends AbstractPostgresIT { + + @Autowired + ExecutionStore executionStore; + + @Test + void upsertAndFindById() { + Instant now = Instant.now(); + ExecutionRecord record = new ExecutionRecord( + "exec-1", "route-a", "agent-1", "app-1", + "COMPLETED", "corr-1", "exchange-1", + now, now.plusMillis(100), 100L, + null, null, null); + + executionStore.upsert(record); + Optional found = executionStore.findById("exec-1"); + + assertTrue(found.isPresent()); + assertEquals("exec-1", found.get().executionId()); + assertEquals("COMPLETED", found.get().status()); + } + + @Test + void upsertDeduplicatesByExecutionId() { + Instant now = Instant.now(); + ExecutionRecord first = new ExecutionRecord( + "exec-dup", "route-a", "agent-1", "app-1", + "RUNNING", null, null, now, null, null, null, null, null); + ExecutionRecord second = new ExecutionRecord( + "exec-dup", "route-a", "agent-1", "app-1", + "COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null); + + executionStore.upsert(first); + executionStore.upsert(second); + + Optional found = executionStore.findById("exec-dup"); + assertTrue(found.isPresent()); + assertEquals("COMPLETED", found.get().status()); + assertEquals(200L, found.get().durationMs()); + } + + @Test + void upsertProcessorsAndFind() { + Instant now = Instant.now(); + ExecutionRecord exec = new ExecutionRecord( + "exec-proc", "route-a", "agent-1", "app-1", + "COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null); + executionStore.upsert(exec); + + List processors = List.of( + new ProcessorRecord("exec-proc", "proc-1", "log", null, + "app-1", "route-a", 0, null, "COMPLETED", + now, now.plusMillis(10), 10L, null, null, + "input body", "output body", null, null), + new ProcessorRecord("exec-proc", "proc-2", "to", null, + "app-1", "route-a", 1, "proc-1", "COMPLETED", + now.plusMillis(10), now.plusMillis(30), 20L, null, null, + null, null, null, null) + ); + executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors); + + List found = executionStore.findProcessors("exec-proc"); + assertEquals(2, found.size()); + assertEquals("proc-1", found.get(0).processorId()); + assertEquals("proc-2", found.get(1).processorId()); + assertEquals("proc-1", found.get(1).parentProcessorId()); + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `mvn test -pl cameleer3-server-app -Dtest=PostgresExecutionStoreIT -q` +Expected: FAIL — `ExecutionStore` bean not found + +- [ ] **Step 3: Implement PostgresExecutionStore** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.ExecutionStore; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +@Repository +public class PostgresExecutionStore implements ExecutionStore { + + private final JdbcTemplate jdbc; + + public PostgresExecutionStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void upsert(ExecutionRecord execution) { + jdbc.update(""" + INSERT INTO executions (execution_id, route_id, agent_id, group_name, + status, correlation_id, exchange_id, start_time, end_time, + duration_ms, error_message, error_stacktrace, diagram_content_hash, + created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now()) + ON CONFLICT (execution_id, start_time) DO UPDATE SET + status = CASE + WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED') + AND executions.status = 'RUNNING' + THEN EXCLUDED.status + WHEN EXCLUDED.status = executions.status THEN executions.status + ELSE EXCLUDED.status + END, + end_time = COALESCE(EXCLUDED.end_time, executions.end_time), + duration_ms = COALESCE(EXCLUDED.duration_ms, executions.duration_ms), + error_message = COALESCE(EXCLUDED.error_message, executions.error_message), + error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace), + diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash), + updated_at = now() + """, + execution.executionId(), execution.routeId(), execution.agentId(), + execution.groupName(), execution.status(), execution.correlationId(), + execution.exchangeId(), + Timestamp.from(execution.startTime()), + execution.endTime() != null ? Timestamp.from(execution.endTime()) : null, + execution.durationMs(), execution.errorMessage(), + execution.errorStacktrace(), execution.diagramContentHash()); + } + + @Override + public void upsertProcessors(String executionId, Instant startTime, + String groupName, String routeId, + List processors) { + jdbc.batchUpdate(""" + INSERT INTO processor_executions (execution_id, processor_id, processor_type, + diagram_node_id, group_name, route_id, depth, parent_processor_id, + status, start_time, end_time, duration_ms, error_message, error_stacktrace, + input_body, output_body, input_headers, output_headers) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb) + ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET + status = EXCLUDED.status, + end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time), + duration_ms = COALESCE(EXCLUDED.duration_ms, processor_executions.duration_ms), + error_message = COALESCE(EXCLUDED.error_message, processor_executions.error_message), + error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, processor_executions.error_stacktrace), + input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body), + output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body), + input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers), + output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers) + """, + processors.stream().map(p -> new Object[]{ + p.executionId(), p.processorId(), p.processorType(), + p.diagramNodeId(), p.groupName(), p.routeId(), + p.depth(), p.parentProcessorId(), p.status(), + Timestamp.from(p.startTime()), + p.endTime() != null ? Timestamp.from(p.endTime()) : null, + p.durationMs(), p.errorMessage(), p.errorStacktrace(), + p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders() + }).toList()); + } + + @Override + public Optional findById(String executionId) { + List results = jdbc.query( + "SELECT * FROM executions WHERE execution_id = ? ORDER BY start_time DESC LIMIT 1", + EXECUTION_MAPPER, executionId); + return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); + } + + @Override + public List findProcessors(String executionId) { + return jdbc.query( + "SELECT * FROM processor_executions WHERE execution_id = ? ORDER BY depth, start_time", + PROCESSOR_MAPPER, executionId); + } + + private static final RowMapper EXECUTION_MAPPER = (rs, rowNum) -> + new ExecutionRecord( + rs.getString("execution_id"), rs.getString("route_id"), + rs.getString("agent_id"), rs.getString("group_name"), + rs.getString("status"), rs.getString("correlation_id"), + rs.getString("exchange_id"), + toInstant(rs, "start_time"), toInstant(rs, "end_time"), + rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, + rs.getString("error_message"), rs.getString("error_stacktrace"), + rs.getString("diagram_content_hash")); + + private static final RowMapper PROCESSOR_MAPPER = (rs, rowNum) -> + new ProcessorRecord( + rs.getString("execution_id"), rs.getString("processor_id"), + rs.getString("processor_type"), rs.getString("diagram_node_id"), + rs.getString("group_name"), rs.getString("route_id"), + rs.getInt("depth"), rs.getString("parent_processor_id"), + rs.getString("status"), + toInstant(rs, "start_time"), toInstant(rs, "end_time"), + rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null, + rs.getString("error_message"), rs.getString("error_stacktrace"), + rs.getString("input_body"), rs.getString("output_body"), + rs.getString("input_headers"), rs.getString("output_headers")); + + private static Instant toInstant(ResultSet rs, String column) throws SQLException { + Timestamp ts = rs.getTimestamp(column); + return ts != null ? ts.toInstant() : null; + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `mvn test -pl cameleer3-server-app -Dtest=PostgresExecutionStoreIT -q` +Expected: PASS — all 3 tests green + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java +git commit -m "feat: implement PostgresExecutionStore with upsert and dedup" +``` + +### Task 11: Implement PostgresStatsStore + +**Files:** +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java` +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java` + +- [ ] **Step 1: Write the failing test** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.app.AbstractPostgresIT; +import com.cameleer3.server.core.search.ExecutionStats; +import com.cameleer3.server.core.search.StatsTimeseries; +import com.cameleer3.server.core.storage.ExecutionStore; +import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord; +import com.cameleer3.server.core.storage.StatsStore; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +import static org.junit.jupiter.api.Assertions.*; + +class PostgresStatsStoreIT extends AbstractPostgresIT { + + @Autowired StatsStore statsStore; + @Autowired ExecutionStore executionStore; + @Autowired JdbcTemplate jdbc; + + @Test + void statsReturnsCountsForTimeWindow() { + Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); + insertExecution("stats-1", "route-a", "app-1", "COMPLETED", now, 100L); + insertExecution("stats-2", "route-a", "app-1", "FAILED", now.plusSeconds(10), 200L); + insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L); + + // Force continuous aggregate refresh + jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)"); + + ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60)); + assertEquals(3, stats.totalCount()); + assertEquals(1, stats.failedCount()); + } + + @Test + void timeseriesReturnsBuckets() { + Instant now = Instant.now().truncatedTo(ChronoUnit.MINUTES); + for (int i = 0; i < 10; i++) { + insertExecution("ts-" + i, "route-a", "app-1", "COMPLETED", + now.plusSeconds(i * 30), 100L + i); + } + + jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)"); + + StatsTimeseries ts = statsStore.timeseries(now.minusMinutes(1), now.plusMinutes(10), 5); + assertNotNull(ts); + assertFalse(ts.buckets().isEmpty()); + } + + private void insertExecution(String id, String routeId, String groupName, + String status, Instant startTime, long durationMs) { + executionStore.upsert(new ExecutionRecord( + id, routeId, "agent-1", groupName, status, null, null, + startTime, startTime.plusMillis(durationMs), durationMs, + status.equals("FAILED") ? "error" : null, null, null)); + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `mvn test -pl cameleer3-server-app -Dtest=PostgresStatsStoreIT -q` +Expected: FAIL — `StatsStore` bean not found + +- [ ] **Step 3: Implement PostgresStatsStore** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.search.ExecutionStats; +import com.cameleer3.server.core.search.StatsTimeseries; +import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket; +import com.cameleer3.server.core.storage.StatsStore; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; + +@Repository +public class PostgresStatsStore implements StatsStore { + + private final JdbcTemplate jdbc; + + public PostgresStatsStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public ExecutionStats stats(Instant from, Instant to) { + return queryStats("stats_1m_all", from, to, List.of()); + } + + @Override + public ExecutionStats statsForApp(Instant from, Instant to, String groupName) { + return queryStats("stats_1m_app", from, to, List.of( + new Filter("group_name", groupName))); + } + + @Override + public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds) { + // Note: agentIds is accepted for interface compatibility but not filterable + // on the continuous aggregate (it groups by route_id, not agent_id). + // All agents for the same route contribute to the same aggregate. + return queryStats("stats_1m_route", from, to, List.of( + new Filter("route_id", routeId))); + } + + @Override + public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) { + return queryStats("stats_1m_processor", from, to, List.of( + new Filter("route_id", routeId), + new Filter("processor_type", processorType))); + } + + @Override + public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) { + return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true); + } + + @Override + public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName) { + return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of( + new Filter("group_name", groupName)), true); + } + + @Override + public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount, + String routeId, List agentIds) { + return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of( + new Filter("route_id", routeId)), true); + } + + @Override + public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount, + String routeId, String processorType) { + // stats_1m_processor does NOT have running_count column + return queryTimeseries("stats_1m_processor", from, to, bucketCount, List.of( + new Filter("route_id", routeId), + new Filter("processor_type", processorType)), false); + } + + private record Filter(String column, String value) {} + + private ExecutionStats queryStats(String view, Instant from, Instant to, List filters) { + // running_count only exists on execution-level aggregates, not processor + boolean hasRunning = !view.equals("stats_1m_processor"); + String runningCol = hasRunning ? "COALESCE(SUM(running_count), 0)" : "0"; + + String sql = "SELECT COALESCE(SUM(total_count), 0) AS total_count, " + + "COALESCE(SUM(failed_count), 0) AS failed_count, " + + "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " + + "COALESCE(MAX(p99_duration), 0) AS p99_duration, " + + runningCol + " AS active_count " + + "FROM " + view + " WHERE bucket >= ? AND bucket < ?"; + + List params = new ArrayList<>(); + params.add(Timestamp.from(from)); + params.add(Timestamp.from(to)); + for (Filter f : filters) { + sql += " AND " + f.column() + " = ?"; + params.add(f.value()); + } + + long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0; + var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{ + rs.getLong("total_count"), rs.getLong("failed_count"), + rs.getLong("avg_duration"), rs.getLong("p99_duration"), + rs.getLong("active_count") + }, params.toArray()); + if (!currentResult.isEmpty()) { + long[] r = currentResult.get(0); + totalCount = r[0]; failedCount = r[1]; avgDuration = r[2]; + p99Duration = r[3]; activeCount = r[4]; + } + + // Previous period (shifted back 24h) + Instant prevFrom = from.minus(Duration.ofHours(24)); + Instant prevTo = to.minus(Duration.ofHours(24)); + List prevParams = new ArrayList<>(); + prevParams.add(Timestamp.from(prevFrom)); + prevParams.add(Timestamp.from(prevTo)); + for (Filter f : filters) prevParams.add(f.value()); + String prevSql = sql; // same shape, different time params + + long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0; + var prevResult = jdbc.query(prevSql, (rs, rowNum) -> new long[]{ + rs.getLong("total_count"), rs.getLong("failed_count"), + rs.getLong("avg_duration"), rs.getLong("p99_duration") + }, prevParams.toArray()); + if (!prevResult.isEmpty()) { + long[] r = prevResult.get(0); + prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3]; + } + + // Today total (from midnight UTC) + Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS); + List todayParams = new ArrayList<>(); + todayParams.add(Timestamp.from(todayStart)); + todayParams.add(Timestamp.from(Instant.now())); + for (Filter f : filters) todayParams.add(f.value()); + String todaySql = sql; + + long totalToday = 0; + var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"), + todayParams.toArray()); + if (!todayResult.isEmpty()) totalToday = todayResult.get(0); + + return new ExecutionStats( + totalCount, failedCount, avgDuration, p99Duration, activeCount, + totalToday, prevTotal, prevFailed, prevAvg, prevP99); + } + + private StatsTimeseries queryTimeseries(String view, Instant from, Instant to, + int bucketCount, List filters, + boolean hasRunningCount) { + long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1); + if (intervalSeconds < 60) intervalSeconds = 60; + + String runningCol = hasRunningCount ? "COALESCE(SUM(running_count), 0)" : "0"; + + String sql = "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " + + "COALESCE(SUM(total_count), 0) AS total_count, " + + "COALESCE(SUM(failed_count), 0) AS failed_count, " + + "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " + + "COALESCE(MAX(p99_duration), 0) AS p99_duration, " + + runningCol + " AS active_count " + + "FROM " + view + " WHERE bucket >= ? AND bucket < ?"; + + List params = new ArrayList<>(); + params.add(intervalSeconds); + params.add(Timestamp.from(from)); + params.add(Timestamp.from(to)); + for (Filter f : filters) { + sql += " AND " + f.column() + " = ?"; + params.add(f.value()); + } + sql += " GROUP BY period ORDER BY period"; + + List buckets = jdbc.query(sql, (rs, rowNum) -> + new TimeseriesBucket( + rs.getTimestamp("period").toInstant(), + rs.getLong("total_count"), rs.getLong("failed_count"), + rs.getLong("avg_duration"), rs.getLong("p99_duration"), + rs.getLong("active_count") + ), params.toArray()); + + return new StatsTimeseries(buckets); + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `mvn test -pl cameleer3-server-app -Dtest=PostgresStatsStoreIT -q` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java +git commit -m "feat: implement PostgresStatsStore querying continuous aggregates" +``` + +### Task 12: Implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore + +**Files:** +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresOidcConfigRepository.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java` + +- [ ] **Step 1: Write failing test for PostgresDiagramStore** + +Create `PostgresDiagramStoreIT.java` with tests for: +- `store()` + `findByContentHash()` roundtrip +- Content-hash dedup (store same hash twice, verify single row) +- `findContentHashForRoute()` returns latest for route+agent +- `findContentHashForRouteByAgents()` returns across agent list + +- [ ] **Step 2: Implement PostgresDiagramStore** + +Straightforward CRUD with `ON CONFLICT (content_hash) DO NOTHING`. Port the SHA-256 hashing logic from `ClickHouseDiagramRepository`. Use `JdbcTemplate` queries. + +- [ ] **Step 3: Run diagram tests, verify pass** + +- [ ] **Step 4: Implement PostgresUserRepository** + +Implements `UserRepository` interface (existing interface in `core.security`, unchanged). + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.security.UserInfo; +import com.cameleer3.server.core.security.UserRepository; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Array; +import java.sql.Timestamp; +import java.util.List; +import java.util.Optional; + +@Repository +public class PostgresUserRepository implements UserRepository { + + private final JdbcTemplate jdbc; + + public PostgresUserRepository(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Optional findById(String userId) { + var results = jdbc.query( + "SELECT * FROM users WHERE user_id = ?", + (rs, rowNum) -> mapUser(rs), userId); + return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); + } + + @Override + public List findAll() { + return jdbc.query("SELECT * FROM users ORDER BY user_id", + (rs, rowNum) -> mapUser(rs)); + } + + @Override + public void upsert(UserInfo user) { + jdbc.update(""" + INSERT INTO users (user_id, provider, email, display_name, roles, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, now(), now()) + ON CONFLICT (user_id) DO UPDATE SET + provider = EXCLUDED.provider, email = EXCLUDED.email, + display_name = EXCLUDED.display_name, roles = EXCLUDED.roles, + updated_at = now() + """, + user.userId(), user.provider(), user.email(), user.displayName(), + user.roles().toArray(new String[0])); + } + + @Override + public void updateRoles(String userId, List roles) { + jdbc.update("UPDATE users SET roles = ?, updated_at = now() WHERE user_id = ?", + roles.toArray(new String[0]), userId); + } + + @Override + public void delete(String userId) { + jdbc.update("DELETE FROM users WHERE user_id = ?", userId); + } + + private UserInfo mapUser(java.sql.ResultSet rs) throws java.sql.SQLException { + Array rolesArray = rs.getArray("roles"); + String[] roles = rolesArray != null ? (String[]) rolesArray.getArray() : new String[0]; + return new UserInfo( + rs.getString("user_id"), rs.getString("provider"), + rs.getString("email"), rs.getString("display_name"), + List.of(roles)); + } +} +``` + +- [ ] **Step 5: Implement PostgresOidcConfigRepository** + +Implements `OidcConfigRepository` interface (existing interface in `core.security`). + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.security.OidcConfig; +import com.cameleer3.server.core.security.OidcConfigRepository; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Array; +import java.util.List; +import java.util.Optional; + +@Repository +public class PostgresOidcConfigRepository implements OidcConfigRepository { + + private final JdbcTemplate jdbc; + + public PostgresOidcConfigRepository(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Optional find() { + var results = jdbc.query( + "SELECT * FROM oidc_config WHERE config_id = 'default'", + (rs, rowNum) -> { + Array arr = rs.getArray("default_roles"); + String[] roles = arr != null ? (String[]) arr.getArray() : new String[0]; + return new OidcConfig( + rs.getBoolean("enabled"), rs.getString("issuer_uri"), + rs.getString("client_id"), rs.getString("client_secret"), + rs.getString("roles_claim"), List.of(roles), + rs.getBoolean("auto_signup"), rs.getString("display_name_claim")); + }); + return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); + } + + @Override + public void save(OidcConfig config) { + jdbc.update(""" + INSERT INTO oidc_config (config_id, enabled, issuer_uri, client_id, client_secret, + roles_claim, default_roles, auto_signup, display_name_claim, updated_at) + VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, now()) + ON CONFLICT (config_id) DO UPDATE SET + enabled = EXCLUDED.enabled, issuer_uri = EXCLUDED.issuer_uri, + client_id = EXCLUDED.client_id, client_secret = EXCLUDED.client_secret, + roles_claim = EXCLUDED.roles_claim, default_roles = EXCLUDED.default_roles, + auto_signup = EXCLUDED.auto_signup, display_name_claim = EXCLUDED.display_name_claim, + updated_at = now() + """, + config.enabled(), config.issuerUri(), config.clientId(), config.clientSecret(), + config.rolesClaim(), config.defaultRoles().toArray(new String[0]), + config.autoSignup(), config.displayNameClaim()); + } + + @Override + public void delete() { + jdbc.update("DELETE FROM oidc_config WHERE config_id = 'default'"); + } +} +``` + +- [ ] **Step 6: Implement PostgresMetricsStore** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsStore; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; +import java.util.List; + +@Repository +public class PostgresMetricsStore implements MetricsStore { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private final JdbcTemplate jdbc; + + public PostgresMetricsStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void insertBatch(List snapshots) { + jdbc.batchUpdate(""" + INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags, + collected_at, server_received_at) + VALUES (?, ?, ?, ?::jsonb, ?, now()) + """, + snapshots.stream().map(s -> new Object[]{ + s.agentId(), s.metricName(), s.metricValue(), + tagsToJson(s.tags()), + Timestamp.from(s.collectedAt()) + }).toList()); + } + + private String tagsToJson(java.util.Map tags) { + if (tags == null || tags.isEmpty()) return null; + try { return MAPPER.writeValueAsString(tags); } + catch (JsonProcessingException e) { return null; } + } +} +``` + +- [ ] **Step 7: Run all store tests, verify pass** + +Run: `mvn test -pl cameleer3-server-app -Dtest="Postgres*IT" -q` + +- [ ] **Step 8: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/Postgres*.java +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/Postgres*.java +git commit -m "feat: implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore" +``` + +--- + +## Chunk 4: OpenSearch Integration + +### Task 13: Implement OpenSearchIndex + +**Files:** +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java` +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java` + +- [ ] **Step 1: Write failing test** + +```java +package com.cameleer3.server.app.search; + +import com.cameleer3.server.app.AbstractPostgresIT; +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.SearchIndex; +import com.cameleer3.server.core.storage.model.ExecutionDocument; +import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; +import org.junit.jupiter.api.Test; +import org.opensearch.testcontainers.OpensearchContainer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.junit.jupiter.Container; + +import java.time.Instant; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +// Extends AbstractPostgresIT for PostgreSQL datasource needed by Spring context +class OpenSearchIndexIT extends AbstractPostgresIT { + + @Container + static final OpensearchContainer opensearch = + new OpensearchContainer<>("opensearchproject/opensearch:2.19.0") + .withSecurityEnabled(false); + + @DynamicPropertySource + static void configureOpenSearch(DynamicPropertyRegistry registry) { + registry.add("opensearch.url", opensearch::getHttpHostAddress); + } + + @Autowired + SearchIndex searchIndex; + + @Test + void indexAndSearchByText() throws Exception { + Instant now = Instant.now(); + ExecutionDocument doc = new ExecutionDocument( + "search-1", "route-a", "agent-1", "app-1", + "FAILED", "corr-1", "exch-1", + now, now.plusMillis(100), 100L, + "OrderNotFoundException: order-12345 not found", null, + List.of(new ProcessorDoc("proc-1", "log", "COMPLETED", + null, null, "request body with customer-99", null, null, null))); + + searchIndex.index(doc); + Thread.sleep(1500); // Allow OpenSearch refresh + + SearchRequest request = new SearchRequest( + null, now.minusSeconds(60), now.plusSeconds(60), + null, null, null, + "OrderNotFoundException", null, null, null, + null, null, null, null, null, + 0, 50, "startTime", "desc"); + + SearchResult result = searchIndex.search(request); + assertTrue(result.total() > 0); + assertEquals("search-1", result.items().get(0).executionId()); + } + + @Test + void wildcardSearchFindsSubstring() throws Exception { + Instant now = Instant.now(); + ExecutionDocument doc = new ExecutionDocument( + "wild-1", "route-b", "agent-1", "app-1", + "COMPLETED", null, null, + now, now.plusMillis(50), 50L, null, null, + List.of(new ProcessorDoc("proc-1", "bean", "COMPLETED", + null, null, "UniquePayloadIdentifier12345", null, null, null))); + + searchIndex.index(doc); + Thread.sleep(1500); + + SearchRequest request = new SearchRequest( + null, now.minusSeconds(60), now.plusSeconds(60), + null, null, null, + "PayloadIdentifier", null, null, null, + null, null, null, null, null, + 0, 50, "startTime", "desc"); + + SearchResult result = searchIndex.search(request); + assertTrue(result.total() > 0); + } +} +``` + +- [ ] **Step 2: Create OpenSearchConfig** + +```java +package com.cameleer3.server.app.config; + +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.core5.http.HttpHost; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class OpenSearchConfig { + + @Value("${opensearch.url:http://localhost:9200}") + private String opensearchUrl; + + @Bean + public OpenSearchClient openSearchClient() { + HttpHost host = HttpHost.create(opensearchUrl); + var transport = ApacheHttpClient5TransportBuilder.builder(host).build(); + return new OpenSearchClient(transport); + } +} +``` + +- [ ] **Step 3: Implement OpenSearchIndex** + +```java +package com.cameleer3.server.app.search; + +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.SearchIndex; +import com.cameleer3.server.core.storage.model.ExecutionDocument; +import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; +import jakarta.annotation.PostConstruct; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.FieldValue; +import org.opensearch.client.opensearch._types.SortOrder; +import org.opensearch.client.opensearch._types.query_dsl.*; +import org.opensearch.client.opensearch.core.*; +import org.opensearch.client.opensearch.core.search.Hit; +import org.opensearch.client.opensearch.indices.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Repository; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.stream.Collectors; + +@Repository +public class OpenSearchIndex implements SearchIndex { + + private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class); + private static final String INDEX_PREFIX = "executions-"; + private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd") + .withZone(ZoneOffset.UTC); + + private final OpenSearchClient client; + + public OpenSearchIndex(OpenSearchClient client) { + this.client = client; + } + + @PostConstruct + void ensureIndexTemplate() { + // Full template with ngram analyzer for infix wildcard search. + // The template JSON matches the spec's OpenSearch index template definition. + try { + boolean exists = client.indices().existsIndexTemplate( + ExistsIndexTemplateRequest.of(b -> b.name("executions-template"))).value(); + if (!exists) { + client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b + .name("executions-template") + .indexPatterns(List.of("executions-*")) + .template(t -> t + .settings(s -> s + .numberOfShards("3") + .numberOfReplicas("1") + .analysis(a -> a + .analyzer("ngram_analyzer", an -> an + .custom(c -> c + .tokenizer("ngram_tokenizer") + .filter("lowercase"))) + .tokenizer("ngram_tokenizer", tk -> tk + .definition(d -> d + .ngram(ng -> ng + .minGram(3) + .maxGram(4) + .tokenChars(TokenChar.Letter, + TokenChar.Digit, + TokenChar.Punctuation, + TokenChar.Symbol))))))))); + log.info("OpenSearch index template created with ngram analyzer"); + } + } catch (IOException e) { + log.error("Failed to create index template", e); + } + } + + @Override + public void index(ExecutionDocument doc) { + String indexName = INDEX_PREFIX + DAY_FMT.format(doc.startTime()); + try { + client.index(IndexRequest.of(b -> b + .index(indexName) + .id(doc.executionId()) + .document(toMap(doc)))); + } catch (IOException e) { + log.error("Failed to index execution {}", doc.executionId(), e); + } + } + + @Override + public SearchResult search(SearchRequest request) { + try { + var searchReq = buildSearchRequest(request, request.limit()); + var response = client.search(searchReq, Map.class); + + List items = response.hits().hits().stream() + .map(this::hitToSummary) + .collect(Collectors.toList()); + + long total = response.hits().total() != null ? response.hits().total().value() : 0; + return new SearchResult<>(items, total); + } catch (IOException e) { + log.error("Search failed", e); + return new SearchResult<>(List.of(), 0); + } + } + + @Override + public long count(SearchRequest request) { + try { + var countReq = CountRequest.of(b -> b + .index(INDEX_PREFIX + "*") + .query(buildQuery(request))); + return client.count(countReq).count(); + } catch (IOException e) { + log.error("Count failed", e); + return 0; + } + } + + @Override + public void delete(String executionId) { + try { + client.deleteByQuery(DeleteByQueryRequest.of(b -> b + .index(List.of(INDEX_PREFIX + "*")) + .query(Query.of(q -> q.term(t -> t + .field("execution_id").value(executionId)))))); + } catch (IOException e) { + log.error("Failed to delete execution {}", executionId, e); + } + } + + private org.opensearch.client.opensearch.core.SearchRequest buildSearchRequest( + SearchRequest request, int size) { + return org.opensearch.client.opensearch.core.SearchRequest.of(b -> { + b.index(INDEX_PREFIX + "*") + .query(buildQuery(request)) + .size(size) + .from(request.offset()) + .sort(s -> s.field(f -> f + .field(request.sortColumn()) + .order("asc".equalsIgnoreCase(request.sortDir()) + ? SortOrder.Asc : SortOrder.Desc))); + return b; + }); + } + + private Query buildQuery(SearchRequest request) { + List must = new ArrayList<>(); + List filter = new ArrayList<>(); + + // Time range + if (request.timeFrom() != null || request.timeTo() != null) { + filter.add(Query.of(q -> q.range(r -> { + r.field("start_time"); + if (request.timeFrom() != null) + r.gte(jakarta.json.Json.createValue(request.timeFrom().toString())); + if (request.timeTo() != null) + r.lte(jakarta.json.Json.createValue(request.timeTo().toString())); + return r; + }))); + } + + // Keyword filters + if (request.status() != null) + filter.add(termQuery("status", request.status())); + if (request.routeId() != null) + filter.add(termQuery("route_id", request.routeId())); + if (request.agentId() != null) + filter.add(termQuery("agent_id", request.agentId())); + if (request.correlationId() != null) + filter.add(termQuery("correlation_id", request.correlationId())); + + // Full-text search across all fields + nested processor fields + if (request.text() != null && !request.text().isBlank()) { + String text = request.text(); + List textQueries = new ArrayList<>(); + + // Search top-level text fields + textQueries.add(Query.of(q -> q.multiMatch(m -> m + .query(text) + .fields("error_message", "error_stacktrace", + "error_message.ngram", "error_stacktrace.ngram")))); + + // Search nested processor fields + textQueries.add(Query.of(q -> q.nested(n -> n + .path("processors") + .query(nq -> nq.multiMatch(m -> m + .query(text) + .fields("processors.input_body", "processors.output_body", + "processors.input_headers", "processors.output_headers", + "processors.error_message", "processors.error_stacktrace", + "processors.input_body.ngram", "processors.output_body.ngram", + "processors.input_headers.ngram", "processors.output_headers.ngram", + "processors.error_message.ngram", "processors.error_stacktrace.ngram")))))); + + // Also try keyword fields for exact matches + textQueries.add(Query.of(q -> q.multiMatch(m -> m + .query(text) + .fields("execution_id", "route_id", "agent_id", "correlation_id", "exchange_id")))); + + must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1")))); + } + + // Scoped text searches + if (request.textInBody() != null && !request.textInBody().isBlank()) { + must.add(Query.of(q -> q.nested(n -> n + .path("processors") + .query(nq -> nq.multiMatch(m -> m + .query(request.textInBody()) + .fields("processors.input_body", "processors.output_body", + "processors.input_body.ngram", "processors.output_body.ngram")))))); + } + if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) { + must.add(Query.of(q -> q.nested(n -> n + .path("processors") + .query(nq -> nq.multiMatch(m -> m + .query(request.textInHeaders()) + .fields("processors.input_headers", "processors.output_headers", + "processors.input_headers.ngram", "processors.output_headers.ngram")))))); + } + if (request.textInErrors() != null && !request.textInErrors().isBlank()) { + String errText = request.textInErrors(); + must.add(Query.of(q -> q.bool(b -> b.should( + Query.of(sq -> sq.multiMatch(m -> m + .query(errText) + .fields("error_message", "error_stacktrace", + "error_message.ngram", "error_stacktrace.ngram"))), + Query.of(sq -> sq.nested(n -> n + .path("processors") + .query(nq -> nq.multiMatch(m -> m + .query(errText) + .fields("processors.error_message", "processors.error_stacktrace", + "processors.error_message.ngram", "processors.error_stacktrace.ngram"))))) + ).minimumShouldMatch("1")))); + } + + // Duration range + if (request.durationMin() != null || request.durationMax() != null) { + filter.add(Query.of(q -> q.range(r -> { + r.field("duration_ms"); + if (request.durationMin() != null) + r.gte(jakarta.json.Json.createValue(request.durationMin())); + if (request.durationMax() != null) + r.lte(jakarta.json.Json.createValue(request.durationMax())); + return r; + }))); + } + + return Query.of(q -> q.bool(b -> { + if (!must.isEmpty()) b.must(must); + if (!filter.isEmpty()) b.filter(filter); + if (must.isEmpty() && filter.isEmpty()) b.must(Query.of(mq -> mq.matchAll(m -> m))); + return b; + })); + } + + private Query termQuery(String field, String value) { + return Query.of(q -> q.term(t -> t.field(field).value(value))); + } + + private Map toMap(ExecutionDocument doc) { + Map map = new LinkedHashMap<>(); + map.put("execution_id", doc.executionId()); + map.put("route_id", doc.routeId()); + map.put("agent_id", doc.agentId()); + map.put("group_name", doc.groupName()); + map.put("status", doc.status()); + map.put("correlation_id", doc.correlationId()); + map.put("exchange_id", doc.exchangeId()); + map.put("start_time", doc.startTime() != null ? doc.startTime().toString() : null); + map.put("end_time", doc.endTime() != null ? doc.endTime().toString() : null); + map.put("duration_ms", doc.durationMs()); + map.put("error_message", doc.errorMessage()); + map.put("error_stacktrace", doc.errorStacktrace()); + if (doc.processors() != null) { + map.put("processors", doc.processors().stream().map(p -> { + Map pm = new LinkedHashMap<>(); + pm.put("processor_id", p.processorId()); + pm.put("processor_type", p.processorType()); + pm.put("status", p.status()); + pm.put("error_message", p.errorMessage()); + pm.put("error_stacktrace", p.errorStacktrace()); + pm.put("input_body", p.inputBody()); + pm.put("output_body", p.outputBody()); + pm.put("input_headers", p.inputHeaders()); + pm.put("output_headers", p.outputHeaders()); + return pm; + }).toList()); + } + return map; + } + + @SuppressWarnings("unchecked") + private ExecutionSummary hitToSummary(Hit hit) { + Map src = hit.source(); + if (src == null) return null; + return new ExecutionSummary( + (String) src.get("execution_id"), + (String) src.get("route_id"), + (String) src.get("agent_id"), + (String) src.get("status"), + src.get("start_time") != null ? Instant.parse((String) src.get("start_time")) : null, + src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null, + src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L, + (String) src.get("correlation_id"), + (String) src.get("error_message")); + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `mvn test -pl cameleer3-server-app -Dtest=OpenSearchIndexIT -q` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java +git commit -m "feat: implement OpenSearchIndex with full-text and wildcard search" +``` + +### Task 14: Implement SearchIndexer (debounced event-driven indexer) + +**Files:** +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java` + +- [ ] **Step 1: Implement SearchIndexer** + +```java +package com.cameleer3.server.core.indexing; + +import com.cameleer3.server.core.storage.ExecutionStore; +import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord; +import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; +import com.cameleer3.server.core.storage.SearchIndex; +import com.cameleer3.server.core.storage.model.ExecutionDocument; +import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +public class SearchIndexer { + + private static final Logger log = LoggerFactory.getLogger(SearchIndexer.class); + + private final ExecutionStore executionStore; + private final SearchIndex searchIndex; + private final long debounceMs; + private final int queueCapacity; + + private final Map> pending = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( + r -> { Thread t = new Thread(r, "search-indexer"); t.setDaemon(true); return t; }); + + public SearchIndexer(ExecutionStore executionStore, SearchIndex searchIndex, + long debounceMs, int queueCapacity) { + this.executionStore = executionStore; + this.searchIndex = searchIndex; + this.debounceMs = debounceMs; + this.queueCapacity = queueCapacity; + } + + public void onExecutionUpdated(ExecutionUpdatedEvent event) { + if (pending.size() >= queueCapacity) { + log.warn("Search indexer queue full, dropping event for {}", event.executionId()); + return; + } + + ScheduledFuture existing = pending.put(event.executionId(), + scheduler.schedule(() -> indexExecution(event.executionId()), + debounceMs, TimeUnit.MILLISECONDS)); + if (existing != null) { + existing.cancel(false); + } + } + + private void indexExecution(String executionId) { + pending.remove(executionId); + try { + ExecutionRecord exec = executionStore.findById(executionId).orElse(null); + if (exec == null) return; + + List processors = executionStore.findProcessors(executionId); + List processorDocs = processors.stream() + .map(p -> new ProcessorDoc( + p.processorId(), p.processorType(), p.status(), + p.errorMessage(), p.errorStacktrace(), + p.inputBody(), p.outputBody(), + p.inputHeaders(), p.outputHeaders())) + .toList(); + + searchIndex.index(new ExecutionDocument( + exec.executionId(), exec.routeId(), exec.agentId(), exec.groupName(), + exec.status(), exec.correlationId(), exec.exchangeId(), + exec.startTime(), exec.endTime(), exec.durationMs(), + exec.errorMessage(), exec.errorStacktrace(), processorDocs)); + } catch (Exception e) { + log.error("Failed to index execution {}", executionId, e); + } + } + + public void shutdown() { + scheduler.shutdown(); + } +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java +git commit -m "feat: implement debounced SearchIndexer for async OpenSearch indexing" +``` + +--- + +## Chunk 5: Wiring, Cleanup, and Integration + +### Task 15: Create bean configuration and wire everything + +**Files:** +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java` +- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java` +- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java` + +- [ ] **Step 1: Create StorageBeanConfig** + +Wire `DetailService`, `SearchIndexer`, `IngestionService` with new store beans: + +```java +package com.cameleer3.server.app.config; + +import com.cameleer3.server.core.detail.DetailService; +import com.cameleer3.server.core.indexing.SearchIndexer; +import com.cameleer3.server.core.ingestion.IngestionService; +import com.cameleer3.server.core.ingestion.WriteBuffer; +import com.cameleer3.server.core.storage.*; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class StorageBeanConfig { + + @Bean + public DetailService detailService(ExecutionStore executionStore) { + return new DetailService(executionStore); + } + + @Bean(destroyMethod = "shutdown") + public SearchIndexer searchIndexer(ExecutionStore executionStore, SearchIndex searchIndex, + @Value("${opensearch.debounce-ms:2000}") long debounceMs, + @Value("${opensearch.queue-size:10000}") int queueSize) { + return new SearchIndexer(executionStore, searchIndex, debounceMs, queueSize); + } + + @Bean + public IngestionService ingestionService(ExecutionStore executionStore, + DiagramStore diagramStore, + WriteBuffer metricsBuffer, + SearchIndexer searchIndexer, + @Value("${cameleer.body-size-limit:16384}") int bodySizeLimit) { + return new IngestionService(executionStore, diagramStore, metricsBuffer, + searchIndexer::onExecutionUpdated, bodySizeLimit); + } +} +``` + +- [ ] **Step 2: Update SearchBeanConfig** + +Wire `SearchService` with `SearchIndex` + `StatsStore`: + +```java +@Bean +public SearchService searchService(SearchIndex searchIndex, StatsStore statsStore) { + return new SearchService(searchIndex, statsStore); +} +``` + +- [ ] **Step 3: Update IngestionBeanConfig** + +Remove execution and diagram write buffers. Keep only metrics write buffer: + +```java +@Bean +public WriteBuffer metricsBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); +} +``` + +- [ ] **Step 4: Create MetricsFlushScheduler** + +Create `cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java`: + +```java +package com.cameleer3.server.app.ingestion; + +import com.cameleer3.server.app.config.IngestionConfig; +import com.cameleer3.server.core.ingestion.WriteBuffer; +import com.cameleer3.server.core.storage.MetricsStore; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Component +public class MetricsFlushScheduler implements SmartLifecycle { + + private static final Logger log = LoggerFactory.getLogger(MetricsFlushScheduler.class); + + private final WriteBuffer metricsBuffer; + private final MetricsStore metricsStore; + private final int batchSize; + private volatile boolean running = false; + + public MetricsFlushScheduler(WriteBuffer metricsBuffer, + MetricsStore metricsStore, + IngestionConfig config) { + this.metricsBuffer = metricsBuffer; + this.metricsStore = metricsStore; + this.batchSize = config.getBatchSize(); + } + + @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}") + public void flush() { + try { + List batch = metricsBuffer.drain(batchSize); + if (!batch.isEmpty()) { + metricsStore.insertBatch(batch); + log.debug("Flushed {} metrics to PostgreSQL", batch.size()); + } + } catch (Exception e) { + log.error("Failed to flush metrics", e); + } + } + + @Override public void start() { running = true; } + @Override public void stop() { + // Drain remaining on shutdown + while (metricsBuffer.size() > 0) { + List batch = metricsBuffer.drain(batchSize); + if (batch.isEmpty()) break; + try { metricsStore.insertBatch(batch); } + catch (Exception e) { log.error("Failed to flush metrics during shutdown", e); break; } + } + running = false; + } + @Override public boolean isRunning() { return running; } + @Override public int getPhase() { return Integer.MAX_VALUE - 1; } +} +``` + +- [ ] **Step 5: Create RetentionScheduler** + +Create `cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java`: + +```java +package com.cameleer3.server.app.retention; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Component +public class RetentionScheduler { + + private static final Logger log = LoggerFactory.getLogger(RetentionScheduler.class); + + private final JdbcTemplate jdbc; + private final int retentionDays; + + public RetentionScheduler(JdbcTemplate jdbc, + @Value("${cameleer.retention-days:30}") int retentionDays) { + this.jdbc = jdbc; + this.retentionDays = retentionDays; + } + + @Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM UTC + public void dropExpiredChunks() { + String interval = retentionDays + " days"; + try { + // Raw data + jdbc.execute("SELECT drop_chunks('executions', INTERVAL '" + interval + "')"); + jdbc.execute("SELECT drop_chunks('processor_executions', INTERVAL '" + interval + "')"); + jdbc.execute("SELECT drop_chunks('agent_metrics', INTERVAL '" + interval + "')"); + + // Continuous aggregates (keep 3x longer) + String caggInterval = (retentionDays * 3) + " days"; + jdbc.execute("SELECT drop_chunks('stats_1m_all', INTERVAL '" + caggInterval + "')"); + jdbc.execute("SELECT drop_chunks('stats_1m_app', INTERVAL '" + caggInterval + "')"); + jdbc.execute("SELECT drop_chunks('stats_1m_route', INTERVAL '" + caggInterval + "')"); + jdbc.execute("SELECT drop_chunks('stats_1m_processor', INTERVAL '" + caggInterval + "')"); + + log.info("Retention: dropped chunks older than {} days (aggregates: {} days)", + retentionDays, retentionDays * 3); + } catch (Exception e) { + log.error("Retention job failed", e); + } + } + // Note: OpenSearch daily index deletion should be handled via ILM policy + // configured at deployment time, not in application code. +} +``` + +- [ ] **Step 6: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java +git commit -m "feat: wire new storage beans, add MetricsFlushScheduler and RetentionScheduler" +``` + +### Task 16: Delete ClickHouse code and old interfaces + +**Files:** +- Delete all files listed in "Files to delete" section above + +- [ ] **Step 1: Delete ClickHouse implementations** + +```bash +rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouse*.java +rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java +rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java +rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java +``` + +- [ ] **Step 2: Delete old core interfaces replaced by new ones** + +`UserRepository` and `OidcConfigRepository` in `core.security` are **kept** — the new Postgres implementations implement them. Only interfaces replaced by new storage interfaces are deleted. + +```bash +rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java +rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java +rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java +rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java +rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java +``` + +- [ ] **Step 3: Delete ClickHouse SQL migrations** + +```bash +rm -r cameleer3-server-app/src/main/resources/clickhouse/ +``` + +- [ ] **Step 4: Delete old test base class** + +```bash +rm cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java +``` + +- [ ] **Step 5: Fix compilation errors in specific files** + +Each file and what to change: +- `DiagramRenderController.java` — change `DiagramRepository` to `DiagramStore` +- `DiagramController.java` — change `DiagramRepository` to `DiagramStore` +- `SearchController.java` — already uses `SearchService`, verify no direct `SearchEngine` refs +- `DetailController.java` — already uses `DetailService`, verify no direct `ExecutionRepository` refs +- `TreeReconstructionTest.java` — rewrite to test `DetailService.buildTree()` with `ProcessorRecord` list input +- `ExecutionController.java` — update to call `IngestionService.ingestExecution()` (synchronous, catches exceptions for 503) +- `DiagramController.java` — update to call `IngestionService.ingestDiagram()` (synchronous) +- `MetricsController.java` — update to call `IngestionService.acceptMetrics()` (still buffered) +- `SearchBeanConfig.java` — wire `SearchService(SearchIndex, StatsStore)` instead of `SearchService(SearchEngine)` +- `IngestionBeanConfig.java` — remove execution/diagram buffer beans, add `bodySizeLimit` to `IngestionService` constructor +- All ITs extending `AbstractClickHouseIT` — change to extend `AbstractPostgresIT` + +- [ ] **Step 6: Verify compilation** + +Run: `mvn compile -q` +Expected: BUILD SUCCESS + +- [ ] **Step 7: Commit** + +```bash +git add -A +git commit -m "refactor: remove all ClickHouse code, old interfaces, and SQL migrations" +``` + +### Task 17: Update existing integration tests + +**Files:** +- Modify: all IT files under `cameleer3-server-app/src/test/` + +- [ ] **Step 1: Update all ITs to extend AbstractPostgresIT** + +Every IT that currently extends `AbstractClickHouseIT` must extend `AbstractPostgresIT` instead. Also add OpenSearch Testcontainer where search tests are needed. + +- [ ] **Step 2: Update ExecutionControllerIT** + +Adapt to synchronous writes (no buffer flush delay). The controller now returns 202 on success, 503 on DB write failure. + +- [ ] **Step 3: Update SearchControllerIT** + +Search now hits OpenSearch. Add OpenSearch Testcontainer. Allow time for async indexing (use Awaitility to poll search results rather than `Thread.sleep`). + +- [ ] **Step 4: Update DetailControllerIT** + +Detail now reads from `PostgresExecutionStore` directly. Simpler setup — just insert execution + processors. + +- [ ] **Step 5: Run full test suite** + +Run: `mvn verify -q` +Expected: BUILD SUCCESS, all tests pass + +- [ ] **Step 6: Commit** + +```bash +git add -A +git commit -m "test: migrate all integration tests from ClickHouse to PostgreSQL + OpenSearch" +``` + +### Task 18: Update Dockerfile and K8s manifests + +**Files:** +- Modify: `Dockerfile` +- Modify: `deploy/*.yaml` (K8s manifests) + +- [ ] **Step 1: Update Dockerfile** + +No JDBC driver changes needed in Dockerfile (drivers are in the fat JAR). Just verify the `REGISTRY_TOKEN` build arg still works for `cameleer3-common` resolution. + +- [ ] **Step 2: Update K8s manifests** + +- Replace ClickHouse StatefulSet with PostgreSQL/TimescaleDB StatefulSet +- Add OpenSearch StatefulSet (single-node for dev, clustered for prod) +- Update server Deployment env vars: `SPRING_DATASOURCE_URL`, `OPENSEARCH_URL` +- Update secrets: replace `clickhouse-credentials` with `postgres-credentials` +- Update health check probes: PostgreSQL uses standard port 5432, OpenSearch uses 9200 + +- [ ] **Step 3: Commit** + +```bash +git add Dockerfile deploy/ +git commit -m "deploy: replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch in K8s manifests" +``` + +### Task 19: Update CI workflow + +**Files:** +- Modify: `.gitea/workflows/ci.yml` + +- [ ] **Step 1: Update CI workflow** + +Changes needed: +- Docker build: no ClickHouse references in the image (it's a fat JAR, driver is bundled) +- Deploy step: update K8s secret names from `clickhouse-credentials` to `postgres-credentials` +- Deploy step: add OpenSearch deployment manifests +- Verify `REGISTRY_TOKEN` build arg still works for `cameleer3-common` +- Integration tests still skipped in CI (`-DskipITs`) — Testcontainers needs Docker-in-Docker + +- [ ] **Step 2: Commit** + +```bash +git add .gitea/workflows/ci.yml +git commit -m "ci: update workflow for PostgreSQL + OpenSearch deployment" +``` + +### Task 20: Update OpenAPI spec + +- [ ] **Step 1: Regenerate openapi.json** + +Run the server locally (or via test) and export the OpenAPI spec. The API surface hasn't changed — only the storage backend. + +Run: `mvn spring-boot:run` (briefly, to generate spec), then fetch `/api/v1/api-docs`. + +- [ ] **Step 2: Commit** + +```bash +git add cameleer3-server-app/src/main/resources/static/openapi.json +git commit -m "docs: regenerate openapi.json after storage layer refactor" +``` + +### Task 21: Final verification + +- [ ] **Step 1: Full build** + +Run: `mvn clean verify -q` +Expected: BUILD SUCCESS + +- [ ] **Step 2: Manual smoke test** + +Start server against local PostgreSQL/TimescaleDB + OpenSearch (Docker Compose or individual containers): +1. POST an execution → verify 202 +2. GET `/api/v1/search?text=...` → verify search works +3. GET `/api/v1/stats` → verify stats return +4. GET `/api/v1/detail/{id}` → verify detail with processor tree + +- [ ] **Step 3: Commit any remaining fixes** + +```bash +git add -A +git commit -m "fix: final adjustments from smoke testing" +``` diff --git a/docs/superpowers/specs/2026-03-16-storage-layer-design.md b/docs/superpowers/specs/2026-03-16-storage-layer-design.md new file mode 100644 index 00000000..46014b73 --- /dev/null +++ b/docs/superpowers/specs/2026-03-16-storage-layer-design.md @@ -0,0 +1,586 @@ +# Storage Layer Refactor: PostgreSQL + TimescaleDB + OpenSearch + +**Date:** 2026-03-16 +**Status:** Draft +**Scope:** Replace ClickHouse with PostgreSQL/TimescaleDB (source of truth, analytics) and OpenSearch (full-text search index) + +## Motivation + +The current all-in-ClickHouse storage layer has structural problems at scale: + +- **OOM on batch inserts**: Wide rows with parallel arrays of variable-length blobs (exchange bodies, headers, stacktraces) exhaust ClickHouse server memory during batch insert processing. +- **CRUD misfit**: Users, OIDC config, and diagrams use ReplacingMergeTree, requiring `FINAL` on every read and workarounds to prevent version row accumulation. +- **Weak full-text search**: LIKE patterns with tokenbf_v1 skip indexes provide no ranking, stemming, fuzzy matching, or infix wildcard support. +- **Rigid data model**: Parallel arrays for processor executions prevent chunked/streaming ingestion and require ~100 lines of array type conversion workarounds. + +## Requirements + +- Hundreds of applications, thousands of routes, billions of records +- Statistics at four levels: all, application, route, processor +- Arbitrary time bucket sizes for statistics queries +- Full-text wildcard (infix) search across all fields +- P99 response time < 2000ms +- Support chunked/streaming execution ingestion (partial updates for long-running routes) +- Idempotent inserts (deduplication on execution_id) +- All software must be free (Apache 2.0, MIT, BSD, PostgreSQL License) +- Deployment target: Kubernetes (k3s) +- Data expired per day by dropping partitions/indexes — no row-level deletes + +## Architecture + +Two backends: + +1. **PostgreSQL + TimescaleDB** — source of truth for all data, analytics via continuous aggregates +2. **OpenSearch** — asynchronous search index for full-text and wildcard queries + +OpenSearch is a derived index, not a source of truth. If it goes down, writes and detail views continue via PostgreSQL. If an index corrupts, it is rebuilt from PostgreSQL. + +### Data Flow + +``` +Agent POST -> Server + |-- ExecutionStore.upsert() -> PostgreSQL (immediate) + |-- ExecutionStore.upsertProcessors() -> PostgreSQL (immediate) + |-- DiagramStore.store() -> PostgreSQL (immediate, idempotent) + '-- publish(ExecutionUpdatedEvent) -> in-process event queue + +MetricsController -> WriteBuffer -> MetricsFlushScheduler -> MetricsStore (batched) + +SearchIndexer (event listener) + |-- debounce by execution_id (1-2s window) + |-- read consolidated state from ExecutionStore + '-- SearchIndex.index(document) -> OpenSearch + +Stats query -> PostgreSQL continuous aggregate (sub-100ms) +Wildcard search -> OpenSearch -> execution_ids -> PostgreSQL for detail +Detail lookup -> PostgreSQL by execution_id +CRUD -> PostgreSQL +``` + +## PostgreSQL Data Model + +### Hypertables (TimescaleDB, partitioned by day) + +#### `executions` + +| Column | Type | Notes | +|---|---|---| +| execution_id | TEXT | NOT NULL, natural key from agent (dedup) | +| route_id | TEXT | NOT NULL | +| agent_id | TEXT | NOT NULL | +| group_name | TEXT | NOT NULL, application name | +| status | TEXT | NOT NULL: RUNNING, COMPLETED, FAILED | +| correlation_id | TEXT | | +| exchange_id | TEXT | | +| start_time | TIMESTAMPTZ | NOT NULL, hypertable partition key | +| end_time | TIMESTAMPTZ | | +| duration_ms | BIGINT | | +| error_message | TEXT | | +| error_stacktrace | TEXT | | +| diagram_content_hash | TEXT | | +| created_at | TIMESTAMPTZ | NOT NULL DEFAULT now() | +| updated_at | TIMESTAMPTZ | NOT NULL DEFAULT now() | + +- Hypertable chunk interval: 1 day +- Primary key: `(execution_id, start_time)` — TimescaleDB requires the partition column in unique constraints +- `ON CONFLICT (execution_id, start_time) DO UPDATE` for dedup and status progression (RUNNING -> COMPLETED/FAILED guard: only update if new status supersedes old) +- Indexes: `(agent_id, start_time)`, `(route_id, start_time)`, `(group_name, start_time)`, `(correlation_id)` + +#### `processor_executions` + +| Column | Type | Notes | +|---|---|---| +| id | BIGSERIAL | | +| execution_id | TEXT | NOT NULL, FK to executions | +| processor_id | TEXT | NOT NULL | +| processor_type | TEXT | NOT NULL | +| diagram_node_id | TEXT | | +| group_name | TEXT | NOT NULL, denormalized from execution | +| route_id | TEXT | NOT NULL, denormalized from execution | +| depth | INT | NOT NULL | +| parent_processor_id | TEXT | Self-reference for tree structure | +| status | TEXT | NOT NULL | +| start_time | TIMESTAMPTZ | NOT NULL, hypertable partition key | +| end_time | TIMESTAMPTZ | | +| duration_ms | BIGINT | | +| error_message | TEXT | | +| error_stacktrace | TEXT | | +| input_body | TEXT | Size-limited at ingestion (configurable) | +| output_body | TEXT | Size-limited at ingestion (configurable) | +| input_headers | JSONB | | +| output_headers | JSONB | | +| created_at | TIMESTAMPTZ | NOT NULL DEFAULT now() | + +- Hypertable chunk interval: 1 day +- Unique constraint: `(execution_id, processor_id, start_time)` — TimescaleDB requires partition column in unique constraints +- `ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE` for re-sent processors +- `group_name` and `route_id` denormalized from the parent execution (immutable per execution, set at ingestion) to enable JOIN-free continuous aggregates +- Indexes: `(execution_id)`, `(processor_type, start_time)` + +#### `agent_metrics` + +| Column | Type | Notes | +|---|---|---| +| agent_id | TEXT | NOT NULL | +| metric_name | TEXT | NOT NULL | +| metric_value | DOUBLE PRECISION | NOT NULL | +| tags | JSONB | Arbitrary key-value pairs | +| collected_at | TIMESTAMPTZ | NOT NULL, hypertable partition key | +| server_received_at | TIMESTAMPTZ | NOT NULL DEFAULT now() | + +- Hypertable chunk interval: 1 day +- Buffered ingestion (write buffer retained for metrics) + +### Regular tables + +#### `route_diagrams` + +| Column | Type | Notes | +|---|---|---| +| content_hash | TEXT | PK, SHA-256 of definition | +| route_id | TEXT | NOT NULL | +| agent_id | TEXT | NOT NULL | +| definition | TEXT | JSON of RouteGraph | +| created_at | TIMESTAMPTZ | NOT NULL DEFAULT now() | + +- `ON CONFLICT (content_hash) DO NOTHING` for content-addressed dedup + +#### `users` + +| Column | Type | Notes | +|---|---|---| +| user_id | TEXT | PK | +| provider | TEXT | NOT NULL | +| email | TEXT | | +| display_name | TEXT | | +| roles | TEXT[] | AGENT, VIEWER, OPERATOR, ADMIN | +| created_at | TIMESTAMPTZ | NOT NULL DEFAULT now() | +| updated_at | TIMESTAMPTZ | NOT NULL DEFAULT now() | + +#### `oidc_config` + +| Column | Type | Notes | +|---|---|---| +| config_id | TEXT | PK, always 'default' | +| enabled | BOOLEAN | NOT NULL | +| issuer_uri | TEXT | | +| client_id | TEXT | | +| client_secret | TEXT | | +| roles_claim | TEXT | | +| default_roles | TEXT[] | | +| auto_signup | BOOLEAN | | +| display_name_claim | TEXT | | +| updated_at | TIMESTAMPTZ | NOT NULL DEFAULT now() | + +### Schema migrations + +Flyway replaces the current custom `ClickHouseSchemaInitializer`. Versioned migrations in `src/main/resources/db/migration/`. + +### Required extensions + +The following PostgreSQL extensions must be created in the initial Flyway migration: + +- `timescaledb` — hypertables, continuous aggregates, chunk management +- `timescaledb_toolkit` — `percentile_agg()` and `approx_percentile()` for P99 calculations + +The TimescaleDB Docker image (`timescale/timescaledb`) includes both extensions. The Flyway V1 migration must `CREATE EXTENSION IF NOT EXISTS` for both before creating hypertables. + +## Continuous Aggregates (Statistics) + +Four continuous aggregates at 1-minute resolution, one per aggregation level. + +### `stats_1m_all` — global + +```sql +CREATE MATERIALIZED VIEW stats_1m_all +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 minute', start_time) AS bucket, + COUNT(*) AS total_count, + COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, + COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count, + SUM(duration_ms) AS duration_sum, + MAX(duration_ms) AS duration_max, + approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration +FROM executions +WHERE status IS NOT NULL +GROUP BY bucket; +``` + +### `stats_1m_app` — per application + +Group by: `bucket, group_name` + +### `stats_1m_route` — per route + +Group by: `bucket, group_name, route_id` + +### `stats_1m_processor` — per processor type within a route + +```sql +-- No JOIN needed: group_name and route_id are denormalized on processor_executions +CREATE MATERIALIZED VIEW stats_1m_processor +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 minute', start_time) AS bucket, + group_name, + route_id, + processor_type, + COUNT(*) AS total_count, + COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count, + SUM(duration_ms) AS duration_sum, + MAX(duration_ms) AS duration_max, + approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration +FROM processor_executions +GROUP BY bucket, group_name, route_id, processor_type; +``` + +Note: TimescaleDB continuous aggregates only support single-hypertable queries (no JOINs). This is why `group_name` and `route_id` are denormalized onto `processor_executions`. + +### Query pattern for arbitrary buckets + +```sql +SELECT time_bucket('30 minutes', bucket) AS period, + SUM(total_count) AS total_count, + SUM(failed_count) AS failed_count, + SUM(duration_sum) / NULLIF(SUM(total_count), 0) AS avg_duration +FROM stats_1m_route +WHERE route_id = ? AND bucket >= now() - interval '24 hours' +GROUP BY period +ORDER BY period; +``` + +### Refresh policy + +```sql +SELECT add_continuous_aggregate_policy('stats_1m_all', + start_offset => INTERVAL '1 hour', + end_offset => INTERVAL '1 minute', + schedule_interval => INTERVAL '1 minute'); +``` + +### Retention + +- Raw data (`executions`, `processor_executions`): configurable, default 30 days +- 1-minute rollups: configurable, default 90 days +- All retention via `drop_chunks()` — drops entire daily partitions + +## OpenSearch Index + +### Index template + +Index pattern: `executions-YYYY-MM-DD` (daily rollover, aligned with PostgreSQL daily partitions for synchronized expiry). + +```json +{ + "index_patterns": ["executions-*"], + "template": { + "settings": { + "number_of_shards": 3, + "number_of_replicas": 1, + "analysis": { + "analyzer": { + "ngram_analyzer": { + "type": "custom", + "tokenizer": "ngram_tokenizer", + "filter": ["lowercase"] + } + }, + "tokenizer": { + "ngram_tokenizer": { + "type": "ngram", + "min_gram": 3, + "max_gram": 4, + "token_chars": ["letter", "digit", "punctuation", "symbol"] + } + } + } + }, + "mappings": { + "properties": { + "execution_id": { "type": "keyword" }, + "route_id": { "type": "keyword" }, + "agent_id": { "type": "keyword" }, + "group_name": { "type": "keyword" }, + "status": { "type": "keyword" }, + "correlation_id": { "type": "keyword" }, + "exchange_id": { "type": "keyword" }, + "start_time": { "type": "date" }, + "end_time": { "type": "date" }, + "duration_ms": { "type": "long" }, + "error_message": { + "type": "text", + "analyzer": "standard", + "fields": { + "ngram": { "type": "text", "analyzer": "ngram_analyzer" } + } + }, + "error_stacktrace": { + "type": "text", + "analyzer": "standard", + "fields": { + "ngram": { "type": "text", "analyzer": "ngram_analyzer" } + } + }, + "processors": { + "type": "nested", + "properties": { + "processor_id": { "type": "keyword" }, + "processor_type": { "type": "keyword" }, + "status": { "type": "keyword" }, + "error_message": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } }, + "error_stacktrace": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } }, + "input_body": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } }, + "output_body": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } }, + "input_headers": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } }, + "output_headers": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } } + } + } + } + } + } +} +``` + +### Indexing strategy + +- Asynchronous, event-driven from PostgreSQL writes +- Debounced by `execution_id` with a 1-2 second window to batch rapid processor updates +- Document ID = `execution_id` (OpenSearch replaces existing document on re-index) +- Daily indexes: `executions-2026-03-16`, `executions-2026-03-17`, etc. +- Index selected by `start_time` of the execution + +### Search strategy + +- Unified search box: `multi_match` across all text fields + nested processor fields +- Infix wildcard: ngram sub-fields (3-4 character grams) for fast substring matching +- Keyword filters: `term` queries on status, route_id, agent_id, group_name, correlation_id +- Time range: `range` query on start_time +- Duration range: `range` query on duration_ms +- Pagination: `search_after` (no OFFSET scaling problem) +- Returns: execution_ids with highlights, detail fetched from PostgreSQL + +### Index lifecycle + +- Daily rollover aligned with PostgreSQL chunk intervals +- ILM policy: hot (current day) -> delete (after N days, matching PostgreSQL retention) +- Index deletion is instant — no row-level deletes + +## Java Interfaces + +### Core module interfaces + +```java +public interface ExecutionStore { + void upsert(Execution execution); + void upsertProcessors(String executionId, List processors); + Optional findById(String executionId); + List findProcessors(String executionId); +} + +public interface StatsStore { + Stats getStats(StatsRequest request); + List getTimeSeries(TimeSeriesRequest request); +} + +public interface SearchIndex { + SearchResult search(SearchRequest request); + long count(SearchRequest request); + void index(ExecutionDocument document); + void delete(String executionId); +} + +// Note: stats() and timeseries() methods move from the old SearchEngine interface +// to StatsStore, since they now query PostgreSQL continuous aggregates instead of +// ClickHouse. Callers (SearchService, controllers) must be updated to use StatsStore. + +public interface DiagramStore { + void store(String contentHash, String routeId, String agentId, String definition); + Optional findByContentHash(String contentHash); + Optional findContentHashForRoute(String routeId, String agentId); + Optional findContentHashForRouteByAgents(String routeId, List agentIds); +} + +public interface UserStore { + Optional findById(String userId); + List findAll(); + void upsert(UserInfo user); + void updateRoles(String userId, List roles); + void delete(String userId); +} + +public interface OidcConfigStore { + Optional find(); + void save(OidcConfig config); + void delete(); +} + +public interface MetricsStore { + void insertBatch(List snapshots); +} +``` + +### App module implementations + +| Interface | Implementation | Backend | +|---|---|---| +| ExecutionStore | PostgresExecutionStore | PostgreSQL/TimescaleDB | +| StatsStore | PostgresStatsStore | PostgreSQL continuous aggregates | +| SearchIndex | OpenSearchIndex | OpenSearch | +| DiagramStore | PostgresDiagramStore | PostgreSQL | +| UserStore | PostgresUserStore | PostgreSQL | +| OidcConfigStore | PostgresOidcConfigStore | PostgreSQL | +| MetricsStore | PostgresMetricsStore | PostgreSQL/TimescaleDB | + +### Ingestion flow + +``` +ExecutionController (HTTP POST) + |-- IngestionService.acceptExecution(tagged) + |-- ExecutionStore.upsert(execution) + |-- ExecutionStore.upsertProcessors(executionId, processors) + |-- eventPublisher.publish(new ExecutionUpdatedEvent(executionId)) + '-- return success/backpressure + +SearchIndexer (Spring @EventListener or in-process queue) + |-- debounce by execution_id (configurable, default 1-2s) + |-- ExecutionStore.findById(executionId) + findProcessors(executionId) + |-- build ExecutionDocument + '-- SearchIndex.index(document) +``` + +The `IngestionService` interface changes from buffer-based to synchronous for executions: + +```java +// New IngestionService (core module) +public class IngestionService { + // Executions: synchronous write to PostgreSQL, returns void, throws on failure. + // Controller catches exceptions and returns 503 with Retry-After. + // This replaces the boolean return / WriteBuffer pattern for executions. + void ingestExecution(String agentId, RouteExecution execution); + + // Metrics: retains write buffer pattern (high-volume, low-urgency) + boolean acceptMetrics(List metrics); + + // Diagrams: synchronous write (low-volume, idempotent) + void ingestDiagram(String agentId, TaggedDiagram diagram); +} +``` + +Metrics ingestion retains the write buffer pattern: + +``` +MetricsController (HTTP POST) + |-- WriteBuffer.offer(batch) + +MetricsFlushScheduler (@Scheduled) + |-- drain(batchSize) + '-- MetricsStore.insertBatch(batch) +``` + +### What gets deleted + +- `ClickHouseExecutionRepository` — replaced by `PostgresExecutionStore` +- `ClickHouseSearchEngine` — replaced by `OpenSearchIndex` +- `ClickHouseFlushScheduler` — simplified, only retained for metrics +- `ClickHouseDiagramRepository` — replaced by `PostgresDiagramStore` +- `ClickHouseUserRepository` — replaced by `PostgresUserStore` +- `ClickHouseOidcConfigRepository` — replaced by `PostgresOidcConfigStore` +- `ClickHouseMetricsRepository` — replaced by `PostgresMetricsStore` +- `ClickHouseSchemaInitializer` — replaced by Flyway +- All `clickhouse/*.sql` migration files — replaced by Flyway migrations +- Array type conversion helpers, `FINAL` workarounds, `ifNotFinite()` guards + +## Error Handling and Resilience + +### PostgreSQL (source of truth) + +- **Write failure**: Return 503 to agent with `Retry-After` header. +- **Connection pool exhaustion**: HikariCP handles queueing. Sustained exhaustion triggers backpressure via 503. +- **Schema migrations**: Flyway with versioned migrations. Validates on startup. + +### OpenSearch (search index) + +- **Unavailable at write time**: Events accumulate in bounded in-memory queue. If queue fills, new index events are dropped silently (log warning). No data loss — PostgreSQL has the data. +- **Unavailable at search time**: Search endpoint returns 503. Stats and detail endpoints still work via PostgreSQL. +- **Index corruption/drift**: Rebuild via admin API endpoint (`POST /api/v1/admin/search/rebuild?from=&to=`) that re-indexes from PostgreSQL, scoped by time range. Processes in background, returns job status. +- **Document staleness**: Debouncer provides eventual consistency with 1-2 second typical lag. + +### Partial execution handling + +1. Execution arrives with `status=RUNNING` -> `INSERT ... ON CONFLICT (execution_id, start_time) DO UPDATE` with status progression guard (only update if new status supersedes old: RUNNING < COMPLETED/FAILED) +2. Processors arrive in chunks -> `INSERT ... ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE` per processor +3. Completion signal -> same upsert as step 1, with `status='COMPLETED'`, `duration_ms`, `end_time` — the progression guard allows this update +4. Each mutation publishes `ExecutionUpdatedEvent` -> debounced OpenSearch re-index +5. Timeout: configurable threshold marks executions stuck in RUNNING as STALE + +### Data lifecycle + +- All time-series data partitioned by day +- Expiry by dropping entire daily partitions/indexes — no row-level deletes +- PostgreSQL: `SELECT drop_chunks(older_than => INTERVAL 'N days')` on each hypertable +- OpenSearch: ILM delete action on daily indexes +- Retention periods (configurable): + - Raw data (executions, processor_executions, metrics): default 30 days + - 1-minute continuous aggregates: default 90 days + - Users, diagrams, OIDC config: no expiry + +## Testing Strategy + +### Unit tests + +- Store interface tests using Testcontainers `PostgreSQLContainer` with TimescaleDB image +- SearchIndex tests using Testcontainers `OpensearchContainer` +- Dedup: insert same execution_id twice, verify single row +- Chunked arrival: insert execution, then processors in separate calls, verify consolidated state +- Upsert: update execution status from RUNNING to COMPLETED, verify single row with updated fields + +### Integration tests + +- Full ingestion flow: POST execution -> verify PostgreSQL row -> verify OpenSearch document +- Partial execution: POST RUNNING -> POST processors -> POST COMPLETED -> verify state transitions and OpenSearch updates +- Search -> detail roundtrip: index document, search by text, fetch detail by returned execution_id +- Stats accuracy: insert known executions, query continuous aggregate, verify counts/durations +- Arbitrary bucket re-aggregation: insert data spanning 1 hour, query with 15-minute buckets, verify grouping + +### Resilience tests + +- OpenSearch down: verify PostgreSQL writes succeed, search returns 503, detail still works +- Re-index: delete OpenSearch index, trigger rebuild from PostgreSQL, verify search recovery + +### Test infrastructure + +- Local Docker for Testcontainers (PostgreSQL + TimescaleDB, OpenSearch) +- Remote k3s for deployment testing +- Flyway migrations tested via Testcontainers on every build + +## Deployment (Kubernetes) + +### New components + +- **PostgreSQL + TimescaleDB**: StatefulSet with persistent volume (replaces ClickHouse StatefulSet) +- **OpenSearch**: StatefulSet (single node for small deployments, cluster for production) + +### Removed components + +- ClickHouse StatefulSet and associated ConfigMaps/Secrets + +### Configuration + +Environment variables (existing pattern): +- `SPRING_DATASOURCE_URL=jdbc:postgresql://postgres:5432/cameleer3` +- `SPRING_DATASOURCE_USERNAME`, `SPRING_DATASOURCE_PASSWORD` +- `OPENSEARCH_URL=http://opensearch:9200` +- `CAMELEER_RETENTION_DAYS=30` (applies to both PostgreSQL and OpenSearch) +- `CAMELEER_BODY_SIZE_LIMIT=16384` (configurable body size limit in bytes) +- `CAMELEER_OPENSEARCH_QUEUE_SIZE=10000` (bounded in-memory queue for async indexing) + +### Health checks + +- PostgreSQL: Spring Boot actuator datasource health +- OpenSearch: cluster health endpoint +- Combined health at `/api/v1/health` + +## Migration Strategy + +This is a clean cutover, not a live migration. No data migration from ClickHouse to PostgreSQL/OpenSearch is planned. Existing ClickHouse data will be abandoned (it has a 30-day TTL anyway). The refactor is implemented on a separate git branch and deployed as a replacement.