Files
cameleer-server/docs/superpowers/plans/2026-03-16-storage-layer-refactor.md

2766 lines
108 KiB
Markdown
Raw Normal View History

# Storage Layer Refactor Implementation Plan
> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch as the storage and search backends.
**Architecture:** PostgreSQL/TimescaleDB is the source of truth for all data and analytics (continuous aggregates). OpenSearch is an async search index for full-text/wildcard queries. Core module defines interfaces; app module provides implementations.
**Tech Stack:** Java 17, Spring Boot 3.4.3, PostgreSQL 16 + TimescaleDB, OpenSearch 2.x, Flyway, Testcontainers, OpenSearch Java Client
**Spec:** `docs/superpowers/specs/2026-03-16-storage-layer-design.md`
---
## File Structure
### New files
**Core module** (`cameleer3-server-core/src/main/java/com/cameleer3/server/core/`):
- `storage/ExecutionStore.java` — new interface replacing ExecutionRepository
- `storage/StatsStore.java` — new interface for stats from continuous aggregates
- `storage/SearchIndex.java` — new interface for OpenSearch operations
- `storage/DiagramStore.java` — new interface replacing DiagramRepository
- `storage/MetricsStore.java` — new interface replacing MetricsRepository
- `storage/model/ExecutionDocument.java` — document model for OpenSearch indexing
- `search/StatsRequest.java` — request DTO for stats queries (level, scope, time range)
- `search/TimeSeriesRequest.java` — request DTO for time-series queries (bucket size)
- `indexing/SearchIndexer.java` — debounced event listener for OpenSearch indexing
- `indexing/ExecutionUpdatedEvent.java` — event published after execution write
**App module** (`cameleer3-server-app/src/main/java/com/cameleer3/server/app/`):
- `storage/PostgresExecutionStore.java` — ExecutionStore impl with upsert
- `storage/PostgresStatsStore.java` — StatsStore impl querying continuous aggregates
- `storage/PostgresDiagramStore.java` — DiagramStore impl
- `storage/PostgresUserRepository.java` — UserRepository impl (keeps existing core interface)
- `storage/PostgresOidcConfigRepository.java` — OidcConfigRepository impl (keeps existing core interface)
- `storage/PostgresMetricsStore.java` — MetricsStore impl
- `search/OpenSearchIndex.java` — SearchIndex impl
- `config/OpenSearchConfig.java` — OpenSearch client bean
- `config/StorageBeanConfig.java` — wires all store beans
- `ingestion/MetricsFlushScheduler.java` — scheduled metrics buffer flush (replaces ClickHouseFlushScheduler, metrics only)
- `retention/RetentionScheduler.java` — scheduled job for drop_chunks and OpenSearch index deletion
**Flyway migrations** (`cameleer3-server-app/src/main/resources/db/migration/`):
- `V1__extensions.sql` — CREATE EXTENSION timescaledb, timescaledb_toolkit
- `V2__executions.sql` — executions hypertable
- `V3__processor_executions.sql` — processor_executions hypertable
- `V4__agent_metrics.sql` — agent_metrics hypertable
- `V5__route_diagrams.sql` — route_diagrams table
- `V6__users.sql` — users table
- `V7__oidc_config.sql` — oidc_config table
- `V8__continuous_aggregates.sql` — all 4 continuous aggregates + refresh policies
Note: Retention is NOT in a Flyway migration (Flyway migrations are immutable once applied). No V9 file. Retention is handled by `RetentionScheduler` at runtime with configurable intervals.
**Test files** (`cameleer3-server-app/src/test/java/com/cameleer3/server/app/`):
- `AbstractPostgresIT.java` — replaces AbstractClickHouseIT (TimescaleDB Testcontainer)
- `storage/PostgresExecutionStoreIT.java` — upsert, dedup, chunked arrival tests
- `storage/PostgresStatsStoreIT.java` — continuous aggregate query tests
- `storage/PostgresDiagramStoreIT.java` — content-hash dedup tests
- `storage/PostgresUserRepositoryIT.java` — CRUD tests
- `search/OpenSearchIndexIT.java` — index, search, wildcard tests
### Files to modify
- `pom.xml` (root) — no changes needed
- `cameleer3-server-app/pom.xml` — swap clickhouse-jdbc for postgresql + opensearch-java + flyway
- `cameleer3-server-core/.../core/search/SearchService.java` — split: search delegates to SearchIndex, stats/timeseries to StatsStore
- `cameleer3-server-core/.../core/detail/DetailService.java` — use ExecutionStore instead of ExecutionRepository
- `cameleer3-server-core/.../core/detail/RawExecutionRow.java` — remove (replaced by normalized model)
- `cameleer3-server-core/.../core/ingestion/IngestionService.java` — synchronous execution/diagram writes, keep buffer for metrics
- `cameleer3-server-app/.../app/config/SearchBeanConfig.java` — wire StatsStore into SearchService
- `cameleer3-server-app/.../app/config/IngestionBeanConfig.java` — update bean wiring
- `cameleer3-server-app/src/main/resources/application.yml` — PostgreSQL + OpenSearch config
- `cameleer3-server-app/src/test/resources/application-test.yml` — test config
### Files to delete
- `cameleer3-server-app/.../app/storage/ClickHouseExecutionRepository.java`
- `cameleer3-server-app/.../app/storage/ClickHouseDiagramRepository.java`
- `cameleer3-server-app/.../app/storage/ClickHouseMetricsRepository.java`
- `cameleer3-server-app/.../app/storage/ClickHouseUserRepository.java`
- `cameleer3-server-app/.../app/storage/ClickHouseOidcConfigRepository.java`
- `cameleer3-server-app/.../app/search/ClickHouseSearchEngine.java`
- `cameleer3-server-app/.../app/ingestion/ClickHouseFlushScheduler.java`
- `cameleer3-server-app/.../app/config/ClickHouseConfig.java`
- `cameleer3-server-core/.../core/storage/ExecutionRepository.java`
- `cameleer3-server-core/.../core/storage/DiagramRepository.java`
- `cameleer3-server-core/.../core/storage/MetricsRepository.java`
- `cameleer3-server-core/.../core/search/SearchEngine.java`
- `cameleer3-server-core/.../core/detail/RawExecutionRow.java`
Note: `UserRepository` and `OidcConfigRepository` interfaces in `core.security` are **kept** — the new Postgres implementations implement these existing interfaces. No rename needed since their contracts are unchanged.
- `cameleer3-server-app/src/main/resources/clickhouse/*.sql` (all 8 files)
- `cameleer3-server-app/src/test/.../app/AbstractClickHouseIT.java`
---
## Chunk 1: Dependencies, Flyway Migrations, and Test Infrastructure
### Task 1: Update Maven dependencies
**Files:**
- Modify: `cameleer3-server-app/pom.xml`
- [ ] **Step 1: Replace ClickHouse JDBC with PostgreSQL driver + Flyway + OpenSearch client**
In `cameleer3-server-app/pom.xml`, replace the ClickHouse dependency and add new ones:
Remove:
```xml
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.9.7</version>
<classifier>all</classifier>
</dependency>
```
Add:
```xml
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-database-postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-java</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-client</artifactId>
<version>2.19.0</version>
</dependency>
```
Replace the ClickHouse Testcontainer:
```xml
<!-- Remove testcontainers-clickhouse -->
<!-- Add: -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.opensearch</groupId>
<artifactId>opensearch-testcontainers</artifactId>
<version>2.1.1</version>
<scope>test</scope>
</dependency>
```
Note: `postgresql` driver and `flyway-core` versions are managed by Spring Boot parent POM. Testcontainers BOM version is `${testcontainers.version}` (2.0.3) from root POM.
- [ ] **Step 2: Commit** (compilation will fail until ClickHouse code is deleted in Task 16 — this is expected)
```bash
git add cameleer3-server-app/pom.xml
git commit -m "chore: swap ClickHouse deps for PostgreSQL, Flyway, OpenSearch"
```
### Task 2: Write Flyway migrations
**Files:**
- Create: `cameleer3-server-app/src/main/resources/db/migration/V1__extensions.sql`
- Create: `cameleer3-server-app/src/main/resources/db/migration/V2__executions.sql`
- Create: `cameleer3-server-app/src/main/resources/db/migration/V3__processor_executions.sql`
- Create: `cameleer3-server-app/src/main/resources/db/migration/V4__agent_metrics.sql`
- Create: `cameleer3-server-app/src/main/resources/db/migration/V5__route_diagrams.sql`
- Create: `cameleer3-server-app/src/main/resources/db/migration/V6__users.sql`
- Create: `cameleer3-server-app/src/main/resources/db/migration/V7__oidc_config.sql`
- Create: `cameleer3-server-app/src/main/resources/db/migration/V8__continuous_aggregates.sql`
- Create: `cameleer3-server-app/src/main/resources/db/migration/V9__retention_policies.sql`
- [ ] **Step 1: Create V1__extensions.sql**
```sql
CREATE EXTENSION IF NOT EXISTS timescaledb;
CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit;
```
- [ ] **Step 2: Create V2__executions.sql**
```sql
CREATE TABLE executions (
execution_id TEXT NOT NULL,
route_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
group_name TEXT NOT NULL,
status TEXT NOT NULL,
correlation_id TEXT,
exchange_id TEXT,
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ,
duration_ms BIGINT,
error_message TEXT,
error_stacktrace TEXT,
diagram_content_hash TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (execution_id, start_time)
);
SELECT create_hypertable('executions', 'start_time', chunk_time_interval => INTERVAL '1 day');
CREATE INDEX idx_executions_agent_time ON executions (agent_id, start_time DESC);
CREATE INDEX idx_executions_route_time ON executions (route_id, start_time DESC);
CREATE INDEX idx_executions_group_time ON executions (group_name, start_time DESC);
CREATE INDEX idx_executions_correlation ON executions (correlation_id);
```
- [ ] **Step 3: Create V3__processor_executions.sql**
```sql
CREATE TABLE processor_executions (
id BIGSERIAL,
execution_id TEXT NOT NULL,
processor_id TEXT NOT NULL,
processor_type TEXT NOT NULL,
diagram_node_id TEXT,
group_name TEXT NOT NULL,
route_id TEXT NOT NULL,
depth INT NOT NULL,
parent_processor_id TEXT,
status TEXT NOT NULL,
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ,
duration_ms BIGINT,
error_message TEXT,
error_stacktrace TEXT,
input_body TEXT,
output_body TEXT,
input_headers JSONB,
output_headers JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (execution_id, processor_id, start_time)
);
SELECT create_hypertable('processor_executions', 'start_time', chunk_time_interval => INTERVAL '1 day');
CREATE INDEX idx_proc_exec_execution ON processor_executions (execution_id);
CREATE INDEX idx_proc_exec_type_time ON processor_executions (processor_type, start_time DESC);
```
- [ ] **Step 4: Create V4__agent_metrics.sql**
```sql
CREATE TABLE agent_metrics (
agent_id TEXT NOT NULL,
metric_name TEXT NOT NULL,
metric_value DOUBLE PRECISION NOT NULL,
tags JSONB,
collected_at TIMESTAMPTZ NOT NULL,
server_received_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
SELECT create_hypertable('agent_metrics', 'collected_at', chunk_time_interval => INTERVAL '1 day');
CREATE INDEX idx_metrics_agent_name ON agent_metrics (agent_id, metric_name, collected_at DESC);
```
- [ ] **Step 5: Create V5__route_diagrams.sql**
```sql
CREATE TABLE route_diagrams (
content_hash TEXT PRIMARY KEY,
route_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
definition TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_diagrams_route_agent ON route_diagrams (route_id, agent_id);
```
- [ ] **Step 6: Create V6__users.sql**
```sql
CREATE TABLE users (
user_id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
email TEXT,
display_name TEXT,
roles TEXT[] NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
- [ ] **Step 7: Create V7__oidc_config.sql**
```sql
CREATE TABLE oidc_config (
config_id TEXT PRIMARY KEY DEFAULT 'default',
enabled BOOLEAN NOT NULL DEFAULT false,
issuer_uri TEXT,
client_id TEXT,
client_secret TEXT,
roles_claim TEXT,
default_roles TEXT[] NOT NULL DEFAULT '{}',
auto_signup BOOLEAN DEFAULT false,
display_name_claim TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
- [ ] **Step 8: Create V8__continuous_aggregates.sql**
```sql
-- Global stats
CREATE MATERIALIZED VIEW stats_1m_all
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket;
SELECT add_continuous_aggregate_policy('stats_1m_all',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
-- Per-application stats
CREATE MATERIALIZED VIEW stats_1m_app
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
group_name,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket, group_name;
SELECT add_continuous_aggregate_policy('stats_1m_app',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
-- Per-route stats
CREATE MATERIALIZED VIEW stats_1m_route
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
group_name,
route_id,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket, group_name, route_id;
SELECT add_continuous_aggregate_policy('stats_1m_route',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
-- Per-processor stats (uses denormalized group_name/route_id on processor_executions)
CREATE MATERIALIZED VIEW stats_1m_processor
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
group_name,
route_id,
processor_type,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM processor_executions
GROUP BY bucket, group_name, route_id, processor_type;
SELECT add_continuous_aggregate_policy('stats_1m_processor',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
```
- [ ] **Step 9: Commit**
```bash
git add cameleer3-server-app/src/main/resources/db/migration/
git commit -m "feat: add Flyway migrations for PostgreSQL/TimescaleDB schema"
```
### Task 3: Create test base class with TimescaleDB Testcontainer
**Files:**
- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java`
- [ ] **Step 1: Write AbstractPostgresIT**
```java
package com.cameleer3.server.app;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
public abstract class AbstractPostgresIT {
@Container
static final PostgreSQLContainer<?> postgres =
new PostgreSQLContainer<>("timescale/timescaledb:latest-pg16")
.withDatabaseName("cameleer3")
.withUsername("cameleer")
.withPassword("test");
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", postgres::getJdbcUrl);
registry.add("spring.datasource.username", postgres::getUsername);
registry.add("spring.datasource.password", postgres::getPassword);
registry.add("spring.datasource.driver-class-name", () -> "org.postgresql.Driver");
registry.add("spring.flyway.enabled", () -> "true");
}
}
```
- [ ] **Step 2: Write a smoke test to verify migrations run**
Create `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/FlywayMigrationIT.java`:
```java
package com.cameleer3.server.app.storage;
import com.cameleer3.server.app.AbstractPostgresIT;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import static org.junit.jupiter.api.Assertions.*;
class FlywayMigrationIT extends AbstractPostgresIT {
@Autowired
JdbcTemplate jdbcTemplate;
@Test
void allMigrationsApplySuccessfully() {
// Verify core tables exist
Integer execCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM executions", Integer.class);
assertEquals(0, execCount);
Integer procCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processor_executions", Integer.class);
assertEquals(0, procCount);
Integer userCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM users", Integer.class);
assertEquals(0, userCount);
// Verify continuous aggregates exist
Integer caggCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM timescaledb_information.continuous_aggregates",
Integer.class);
assertEquals(4, caggCount);
}
}
```
- [ ] **Step 3: Verify test passes** (this test will not compile until Task 16 deletes ClickHouse code. Run it after Task 16 is complete. Listed here for logical grouping.)
Run: `mvn test -pl cameleer3-server-app -Dtest=FlywayMigrationIT -q`
Expected: PASS — all migrations apply, tables and continuous aggregates exist
- [ ] **Step 4: Commit**
```bash
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/FlywayMigrationIT.java
git commit -m "test: add TimescaleDB test base class and Flyway migration smoke test"
```
### Task 4: Update application.yml for PostgreSQL + OpenSearch
**Files:**
- Modify: `cameleer3-server-app/src/main/resources/application.yml`
- Modify: `cameleer3-server-app/src/test/resources/application-test.yml`
- [ ] **Step 1: Update application.yml datasource section**
Replace:
```yaml
spring:
datasource:
url: jdbc:ch://localhost:8123/cameleer3
username: cameleer
password: cameleer_dev
driver-class-name: com.clickhouse.jdbc.ClickHouseDriver
```
With:
```yaml
spring:
datasource:
url: jdbc:postgresql://localhost:5432/cameleer3
username: cameleer
password: ${CAMELEER_DB_PASSWORD:cameleer_dev}
driver-class-name: org.postgresql.Driver
flyway:
enabled: true
locations: classpath:db/migration
```
Add OpenSearch config section:
```yaml
opensearch:
url: ${OPENSEARCH_URL:http://localhost:9200}
queue-size: ${CAMELEER_OPENSEARCH_QUEUE_SIZE:10000}
debounce-ms: ${CAMELEER_OPENSEARCH_DEBOUNCE_MS:2000}
```
Add body size limit:
```yaml
cameleer:
body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384}
retention-days: ${CAMELEER_RETENTION_DAYS:30}
```
Remove the `clickhouse:` section.
- [ ] **Step 2: Update application-test.yml**
```yaml
spring:
flyway:
enabled: true
opensearch:
url: http://localhost:9200
```
- [ ] **Step 3: Commit**
```bash
git add cameleer3-server-app/src/main/resources/application.yml
git add cameleer3-server-app/src/test/resources/application-test.yml
git commit -m "config: switch datasource to PostgreSQL, add OpenSearch and Flyway config"
```
---
## Chunk 2: Core Module Interfaces and Models
### Task 5: Create new storage interfaces in core module
**Files:**
- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java`
- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/StatsStore.java`
- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/SearchIndex.java`
- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramStore.java`
- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsStore.java`
- [ ] **Step 1: Create ExecutionStore interface**
```java
package com.cameleer3.server.core.storage;
import com.cameleer3.server.core.detail.ProcessorNode;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
public interface ExecutionStore {
void upsert(ExecutionRecord execution);
void upsertProcessors(String executionId, Instant startTime,
String groupName, String routeId,
List<ProcessorRecord> processors);
Optional<ExecutionRecord> findById(String executionId);
List<ProcessorRecord> findProcessors(String executionId);
record ExecutionRecord(
String executionId, String routeId, String agentId, String groupName,
String status, String correlationId, String exchangeId,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace, String diagramContentHash
) {}
record ProcessorRecord(
String executionId, String processorId, String processorType,
String diagramNodeId, String groupName, String routeId,
int depth, String parentProcessorId, String status,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace,
String inputBody, String outputBody, String inputHeaders, String outputHeaders
) {}
}
```
- [ ] **Step 2: Create StatsStore interface**
Supports all 4 aggregation levels: global, per-app, per-route, per-processor.
```java
package com.cameleer3.server.core.storage;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import java.time.Instant;
import java.util.List;
public interface StatsStore {
// Global stats (stats_1m_all)
ExecutionStats stats(Instant from, Instant to);
// Per-app stats (stats_1m_app)
ExecutionStats statsForApp(Instant from, Instant to, String groupName);
// Per-route stats (stats_1m_route), optionally scoped to specific agents
ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds);
// Per-processor stats (stats_1m_processor)
ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType);
// Global timeseries
StatsTimeseries timeseries(Instant from, Instant to, int bucketCount);
// Per-app timeseries
StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName);
// Per-route timeseries, optionally scoped to specific agents
StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
String routeId, List<String> agentIds);
// Per-processor timeseries
StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
String routeId, String processorType);
}
```
- [ ] **Step 3: Create SearchIndex interface**
```java
package com.cameleer3.server.core.storage;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
public interface SearchIndex {
SearchResult<ExecutionSummary> search(SearchRequest request);
long count(SearchRequest request);
void index(ExecutionDocument document);
void delete(String executionId);
}
```
- [ ] **Step 4: Create DiagramStore interface**
```java
package com.cameleer3.server.core.storage;
import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.server.core.ingestion.TaggedDiagram;
import java.util.List;
import java.util.Optional;
public interface DiagramStore {
void store(TaggedDiagram diagram);
Optional<RouteGraph> findByContentHash(String contentHash);
Optional<String> findContentHashForRoute(String routeId, String agentId);
Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds);
}
```
- [ ] **Step 5: Create MetricsStore interface**
```java
package com.cameleer3.server.core.storage;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import java.util.List;
public interface MetricsStore {
void insertBatch(List<MetricsSnapshot> snapshots);
}
```
- [ ] **Step 6: Commit**
```bash
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/
git commit -m "feat: add new storage interfaces for PostgreSQL/OpenSearch backends"
```
### Task 6: Create ExecutionDocument model and indexing event
**Files:**
- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java`
- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/ExecutionUpdatedEvent.java`
- [ ] **Step 1: Create ExecutionDocument**
```java
package com.cameleer3.server.core.storage.model;
import java.time.Instant;
import java.util.List;
public record ExecutionDocument(
String executionId, String routeId, String agentId, String groupName,
String status, String correlationId, String exchangeId,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace,
List<ProcessorDoc> processors
) {
public record ProcessorDoc(
String processorId, String processorType, String status,
String errorMessage, String errorStacktrace,
String inputBody, String outputBody,
String inputHeaders, String outputHeaders
) {}
}
```
- [ ] **Step 2: Create ExecutionUpdatedEvent**
```java
package com.cameleer3.server.core.indexing;
import java.time.Instant;
public record ExecutionUpdatedEvent(String executionId, Instant startTime) {}
```
- [ ] **Step 3: Commit**
```bash
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/
git commit -m "feat: add ExecutionDocument model and ExecutionUpdatedEvent"
```
### Task 7: Update SearchService to use StatsStore for stats/timeseries
**Files:**
- Modify: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchService.java`
- [ ] **Step 1: Refactor SearchService to accept SearchIndex + StatsStore**
Replace the single `SearchEngine` dependency with two dependencies:
```java
package com.cameleer3.server.core.search;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.StatsStore;
import java.time.Instant;
import java.util.List;
public class SearchService {
private final SearchIndex searchIndex;
private final StatsStore statsStore;
public SearchService(SearchIndex searchIndex, StatsStore statsStore) {
this.searchIndex = searchIndex;
this.statsStore = statsStore;
}
public SearchResult<ExecutionSummary> search(SearchRequest request) {
return searchIndex.search(request);
}
public long count(SearchRequest request) {
return searchIndex.count(request);
}
public ExecutionStats stats(Instant from, Instant to) {
return statsStore.stats(from, to);
}
public ExecutionStats stats(Instant from, Instant to, String routeId, List<String> agentIds) {
return statsStore.statsForRoute(from, to, routeId, agentIds);
}
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
return statsStore.timeseries(from, to, bucketCount);
}
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount,
String routeId, List<String> agentIds) {
return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, agentIds);
}
}
```
- [ ] **Step 2: Commit**
```bash
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchService.java
git commit -m "refactor: SearchService uses SearchIndex + StatsStore instead of SearchEngine"
```
### Task 8: Update DetailService to use ExecutionStore
**Files:**
- Modify: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java`
- [ ] **Step 1: Rewrite DetailService to use ExecutionStore**
The tree reconstruction from parallel arrays is no longer needed. Processors are now individual records with `parentProcessorId` for tree structure.
```java
package com.cameleer3.server.core.detail;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import java.util.*;
public class DetailService {
private final ExecutionStore executionStore;
public DetailService(ExecutionStore executionStore) {
this.executionStore = executionStore;
}
public Optional<ExecutionDetail> getDetail(String executionId) {
return executionStore.findById(executionId)
.map(exec -> {
List<ProcessorRecord> processors = executionStore.findProcessors(executionId);
List<ProcessorNode> roots = buildTree(processors);
return new ExecutionDetail(
exec.executionId(), exec.routeId(), exec.agentId(),
exec.status(), exec.startTime(), exec.endTime(),
exec.durationMs() != null ? exec.durationMs() : 0L,
exec.correlationId(), exec.exchangeId(),
exec.errorMessage(), exec.errorStacktrace(),
exec.diagramContentHash(), roots
);
});
}
List<ProcessorNode> buildTree(List<ProcessorRecord> processors) {
if (processors.isEmpty()) return List.of();
Map<String, ProcessorNode> nodeMap = new LinkedHashMap<>();
for (ProcessorRecord p : processors) {
nodeMap.put(p.processorId(), new ProcessorNode(
p.processorId(), p.processorType(), p.status(),
p.startTime(), p.endTime(),
p.durationMs() != null ? p.durationMs() : 0L,
p.diagramNodeId(), p.errorMessage(), p.errorStacktrace()
));
}
List<ProcessorNode> roots = new ArrayList<>();
for (ProcessorRecord p : processors) {
ProcessorNode node = nodeMap.get(p.processorId());
if (p.parentProcessorId() == null) {
roots.add(node);
} else {
ProcessorNode parent = nodeMap.get(p.parentProcessorId());
if (parent != null) {
parent.addChild(node);
} else {
roots.add(node); // orphan safety
}
}
}
return roots;
}
}
```
- [ ] **Step 2: Commit**
```bash
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java
git commit -m "refactor: DetailService uses ExecutionStore, tree built from parentProcessorId"
```
### Task 9: Update IngestionService for synchronous writes
**Files:**
- Modify: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java`
- [ ] **Step 1: Rewrite IngestionService**
Executions and diagrams become synchronous writes. Metrics keep the write buffer. Add event publishing for OpenSearch indexing.
```java
package com.cameleer3.server.core.ingestion;
import com.cameleer3.common.model.ProcessorExecution;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.indexing.ExecutionUpdatedEvent;
import com.cameleer3.server.core.storage.DiagramStore;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
public class IngestionService {
private final ExecutionStore executionStore;
private final DiagramStore diagramStore;
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
private final Consumer<ExecutionUpdatedEvent> eventPublisher;
private final int bodySizeLimit;
public IngestionService(ExecutionStore executionStore,
DiagramStore diagramStore,
WriteBuffer<MetricsSnapshot> metricsBuffer,
Consumer<ExecutionUpdatedEvent> eventPublisher,
int bodySizeLimit) {
this.executionStore = executionStore;
this.diagramStore = diagramStore;
this.metricsBuffer = metricsBuffer;
this.eventPublisher = eventPublisher;
this.bodySizeLimit = bodySizeLimit;
}
public void ingestExecution(String agentId, String groupName, RouteExecution execution) {
ExecutionRecord record = toExecutionRecord(agentId, groupName, execution);
executionStore.upsert(record);
if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) {
List<ProcessorRecord> processors = flattenProcessors(
execution.getProcessors(), record.executionId(),
record.startTime(), groupName, execution.getRouteId(),
null, 0);
executionStore.upsertProcessors(
record.executionId(), record.startTime(),
groupName, execution.getRouteId(), processors);
}
eventPublisher.accept(new ExecutionUpdatedEvent(
record.executionId(), record.startTime()));
}
public void ingestDiagram(TaggedDiagram diagram) {
diagramStore.store(diagram);
}
public boolean acceptMetrics(List<MetricsSnapshot> metrics) {
return metricsBuffer.offerBatch(metrics);
}
public int getMetricsBufferDepth() {
return metricsBuffer.size();
}
public WriteBuffer<MetricsSnapshot> getMetricsBuffer() {
return metricsBuffer;
}
private ExecutionRecord toExecutionRecord(String agentId, String groupName,
RouteExecution exec) {
return new ExecutionRecord(
exec.getExecutionId(), exec.getRouteId(), agentId, groupName,
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
exec.getCorrelationId(), exec.getExchangeId(),
exec.getStartTime(), exec.getEndTime(),
exec.getDurationMs(),
exec.getErrorMessage(), exec.getErrorStacktrace(),
null // diagramContentHash set separately
);
}
private List<ProcessorRecord> flattenProcessors(
List<ProcessorExecution> processors, String executionId,
java.time.Instant execStartTime, String groupName, String routeId,
String parentProcessorId, int depth) {
List<ProcessorRecord> flat = new ArrayList<>();
for (ProcessorExecution p : processors) {
flat.add(new ProcessorRecord(
executionId, p.getProcessorId(), p.getProcessorType(),
p.getDiagramNodeId(), groupName, routeId,
depth, parentProcessorId,
p.getStatus() != null ? p.getStatus().name() : "RUNNING",
p.getStartTime() != null ? p.getStartTime() : execStartTime,
p.getEndTime(),
p.getDurationMs(),
p.getErrorMessage(), p.getErrorStacktrace(),
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
p.getInputHeaders() != null ? p.getInputHeaders().toString() : null,
p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null
));
if (p.getChildren() != null) {
flat.addAll(flattenProcessors(
p.getChildren(), executionId, execStartTime,
groupName, routeId, p.getProcessorId(), depth + 1));
}
}
return flat;
}
private String truncateBody(String body) {
if (body == null) return null;
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
return body;
}
}
```
- [ ] **Step 2: Commit**
```bash
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java
git commit -m "refactor: IngestionService uses synchronous ExecutionStore writes with event publishing"
```
---
## Chunk 3: PostgreSQL Store Implementations
### Task 10: Implement PostgresExecutionStore
**Files:**
- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java`
- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java`
- [ ] **Step 1: Write the failing test**
```java
package com.cameleer3.server.app.storage;
import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.*;
class PostgresExecutionStoreIT extends AbstractPostgresIT {
@Autowired
ExecutionStore executionStore;
@Test
void upsertAndFindById() {
Instant now = Instant.now();
ExecutionRecord record = new ExecutionRecord(
"exec-1", "route-a", "agent-1", "app-1",
"COMPLETED", "corr-1", "exchange-1",
now, now.plusMillis(100), 100L,
null, null, null);
executionStore.upsert(record);
Optional<ExecutionRecord> found = executionStore.findById("exec-1");
assertTrue(found.isPresent());
assertEquals("exec-1", found.get().executionId());
assertEquals("COMPLETED", found.get().status());
}
@Test
void upsertDeduplicatesByExecutionId() {
Instant now = Instant.now();
ExecutionRecord first = new ExecutionRecord(
"exec-dup", "route-a", "agent-1", "app-1",
"RUNNING", null, null, now, null, null, null, null, null);
ExecutionRecord second = new ExecutionRecord(
"exec-dup", "route-a", "agent-1", "app-1",
"COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null);
executionStore.upsert(first);
executionStore.upsert(second);
Optional<ExecutionRecord> found = executionStore.findById("exec-dup");
assertTrue(found.isPresent());
assertEquals("COMPLETED", found.get().status());
assertEquals(200L, found.get().durationMs());
}
@Test
void upsertProcessorsAndFind() {
Instant now = Instant.now();
ExecutionRecord exec = new ExecutionRecord(
"exec-proc", "route-a", "agent-1", "app-1",
"COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null);
executionStore.upsert(exec);
List<ProcessorRecord> processors = List.of(
new ProcessorRecord("exec-proc", "proc-1", "log", null,
"app-1", "route-a", 0, null, "COMPLETED",
now, now.plusMillis(10), 10L, null, null,
"input body", "output body", null, null),
new ProcessorRecord("exec-proc", "proc-2", "to", null,
"app-1", "route-a", 1, "proc-1", "COMPLETED",
now.plusMillis(10), now.plusMillis(30), 20L, null, null,
null, null, null, null)
);
executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors);
List<ProcessorRecord> found = executionStore.findProcessors("exec-proc");
assertEquals(2, found.size());
assertEquals("proc-1", found.get(0).processorId());
assertEquals("proc-2", found.get(1).processorId());
assertEquals("proc-1", found.get(1).parentProcessorId());
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `mvn test -pl cameleer3-server-app -Dtest=PostgresExecutionStoreIT -q`
Expected: FAIL — `ExecutionStore` bean not found
- [ ] **Step 3: Implement PostgresExecutionStore**
```java
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.storage.ExecutionStore;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
@Repository
public class PostgresExecutionStore implements ExecutionStore {
private final JdbcTemplate jdbc;
public PostgresExecutionStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public void upsert(ExecutionRecord execution) {
jdbc.update("""
INSERT INTO executions (execution_id, route_id, agent_id, group_name,
status, correlation_id, exchange_id, start_time, end_time,
duration_ms, error_message, error_stacktrace, diagram_content_hash,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())
ON CONFLICT (execution_id, start_time) DO UPDATE SET
status = CASE
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
AND executions.status = 'RUNNING'
THEN EXCLUDED.status
WHEN EXCLUDED.status = executions.status THEN executions.status
ELSE EXCLUDED.status
END,
end_time = COALESCE(EXCLUDED.end_time, executions.end_time),
duration_ms = COALESCE(EXCLUDED.duration_ms, executions.duration_ms),
error_message = COALESCE(EXCLUDED.error_message, executions.error_message),
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace),
diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash),
updated_at = now()
""",
execution.executionId(), execution.routeId(), execution.agentId(),
execution.groupName(), execution.status(), execution.correlationId(),
execution.exchangeId(),
Timestamp.from(execution.startTime()),
execution.endTime() != null ? Timestamp.from(execution.endTime()) : null,
execution.durationMs(), execution.errorMessage(),
execution.errorStacktrace(), execution.diagramContentHash());
}
@Override
public void upsertProcessors(String executionId, Instant startTime,
String groupName, String routeId,
List<ProcessorRecord> processors) {
jdbc.batchUpdate("""
INSERT INTO processor_executions (execution_id, processor_id, processor_type,
diagram_node_id, group_name, route_id, depth, parent_processor_id,
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
input_body, output_body, input_headers, output_headers)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb)
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
status = EXCLUDED.status,
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
duration_ms = COALESCE(EXCLUDED.duration_ms, processor_executions.duration_ms),
error_message = COALESCE(EXCLUDED.error_message, processor_executions.error_message),
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, processor_executions.error_stacktrace),
input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body),
output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body),
input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers),
output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers)
""",
processors.stream().map(p -> new Object[]{
p.executionId(), p.processorId(), p.processorType(),
p.diagramNodeId(), p.groupName(), p.routeId(),
p.depth(), p.parentProcessorId(), p.status(),
Timestamp.from(p.startTime()),
p.endTime() != null ? Timestamp.from(p.endTime()) : null,
p.durationMs(), p.errorMessage(), p.errorStacktrace(),
p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders()
}).toList());
}
@Override
public Optional<ExecutionRecord> findById(String executionId) {
List<ExecutionRecord> results = jdbc.query(
"SELECT * FROM executions WHERE execution_id = ? ORDER BY start_time DESC LIMIT 1",
EXECUTION_MAPPER, executionId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public List<ProcessorRecord> findProcessors(String executionId) {
return jdbc.query(
"SELECT * FROM processor_executions WHERE execution_id = ? ORDER BY depth, start_time",
PROCESSOR_MAPPER, executionId);
}
private static final RowMapper<ExecutionRecord> EXECUTION_MAPPER = (rs, rowNum) ->
new ExecutionRecord(
rs.getString("execution_id"), rs.getString("route_id"),
rs.getString("agent_id"), rs.getString("group_name"),
rs.getString("status"), rs.getString("correlation_id"),
rs.getString("exchange_id"),
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
rs.getString("error_message"), rs.getString("error_stacktrace"),
rs.getString("diagram_content_hash"));
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
new ProcessorRecord(
rs.getString("execution_id"), rs.getString("processor_id"),
rs.getString("processor_type"), rs.getString("diagram_node_id"),
rs.getString("group_name"), rs.getString("route_id"),
rs.getInt("depth"), rs.getString("parent_processor_id"),
rs.getString("status"),
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
rs.getString("error_message"), rs.getString("error_stacktrace"),
rs.getString("input_body"), rs.getString("output_body"),
rs.getString("input_headers"), rs.getString("output_headers"));
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
Timestamp ts = rs.getTimestamp(column);
return ts != null ? ts.toInstant() : null;
}
}
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `mvn test -pl cameleer3-server-app -Dtest=PostgresExecutionStoreIT -q`
Expected: PASS — all 3 tests green
- [ ] **Step 5: Commit**
```bash
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java
git commit -m "feat: implement PostgresExecutionStore with upsert and dedup"
```
### Task 11: Implement PostgresStatsStore
**Files:**
- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java`
- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java`
- [ ] **Step 1: Write the failing test**
```java
package com.cameleer3.server.app.storage;
import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.StatsStore;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import static org.junit.jupiter.api.Assertions.*;
class PostgresStatsStoreIT extends AbstractPostgresIT {
@Autowired StatsStore statsStore;
@Autowired ExecutionStore executionStore;
@Autowired JdbcTemplate jdbc;
@Test
void statsReturnsCountsForTimeWindow() {
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
insertExecution("stats-1", "route-a", "app-1", "COMPLETED", now, 100L);
insertExecution("stats-2", "route-a", "app-1", "FAILED", now.plusSeconds(10), 200L);
insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L);
// Force continuous aggregate refresh
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)");
ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60));
assertEquals(3, stats.totalCount());
assertEquals(1, stats.failedCount());
}
@Test
void timeseriesReturnsBuckets() {
Instant now = Instant.now().truncatedTo(ChronoUnit.MINUTES);
for (int i = 0; i < 10; i++) {
insertExecution("ts-" + i, "route-a", "app-1", "COMPLETED",
now.plusSeconds(i * 30), 100L + i);
}
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)");
StatsTimeseries ts = statsStore.timeseries(now.minusMinutes(1), now.plusMinutes(10), 5);
assertNotNull(ts);
assertFalse(ts.buckets().isEmpty());
}
private void insertExecution(String id, String routeId, String groupName,
String status, Instant startTime, long durationMs) {
executionStore.upsert(new ExecutionRecord(
id, routeId, "agent-1", groupName, status, null, null,
startTime, startTime.plusMillis(durationMs), durationMs,
status.equals("FAILED") ? "error" : null, null, null));
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `mvn test -pl cameleer3-server-app -Dtest=PostgresStatsStoreIT -q`
Expected: FAIL — `StatsStore` bean not found
- [ ] **Step 3: Implement PostgresStatsStore**
```java
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket;
import com.cameleer3.server.core.storage.StatsStore;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
@Repository
public class PostgresStatsStore implements StatsStore {
private final JdbcTemplate jdbc;
public PostgresStatsStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public ExecutionStats stats(Instant from, Instant to) {
return queryStats("stats_1m_all", from, to, List.of());
}
@Override
public ExecutionStats statsForApp(Instant from, Instant to, String groupName) {
return queryStats("stats_1m_app", from, to, List.of(
new Filter("group_name", groupName)));
}
@Override
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds) {
// Note: agentIds is accepted for interface compatibility but not filterable
// on the continuous aggregate (it groups by route_id, not agent_id).
// All agents for the same route contribute to the same aggregate.
return queryStats("stats_1m_route", from, to, List.of(
new Filter("route_id", routeId)));
}
@Override
public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) {
return queryStats("stats_1m_processor", from, to, List.of(
new Filter("route_id", routeId),
new Filter("processor_type", processorType)));
}
@Override
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true);
}
@Override
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName) {
return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
new Filter("group_name", groupName)), true);
}
@Override
public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
String routeId, List<String> agentIds) {
return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of(
new Filter("route_id", routeId)), true);
}
@Override
public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
String routeId, String processorType) {
// stats_1m_processor does NOT have running_count column
return queryTimeseries("stats_1m_processor", from, to, bucketCount, List.of(
new Filter("route_id", routeId),
new Filter("processor_type", processorType)), false);
}
private record Filter(String column, String value) {}
private ExecutionStats queryStats(String view, Instant from, Instant to, List<Filter> filters) {
// running_count only exists on execution-level aggregates, not processor
boolean hasRunning = !view.equals("stats_1m_processor");
String runningCol = hasRunning ? "COALESCE(SUM(running_count), 0)" : "0";
String sql = "SELECT COALESCE(SUM(total_count), 0) AS total_count, " +
"COALESCE(SUM(failed_count), 0) AS failed_count, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
"COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
runningCol + " AS active_count " +
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
List<Object> params = new ArrayList<>();
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
for (Filter f : filters) {
sql += " AND " + f.column() + " = ?";
params.add(f.value());
}
long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0;
var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
rs.getLong("total_count"), rs.getLong("failed_count"),
rs.getLong("avg_duration"), rs.getLong("p99_duration"),
rs.getLong("active_count")
}, params.toArray());
if (!currentResult.isEmpty()) {
long[] r = currentResult.get(0);
totalCount = r[0]; failedCount = r[1]; avgDuration = r[2];
p99Duration = r[3]; activeCount = r[4];
}
// Previous period (shifted back 24h)
Instant prevFrom = from.minus(Duration.ofHours(24));
Instant prevTo = to.minus(Duration.ofHours(24));
List<Object> prevParams = new ArrayList<>();
prevParams.add(Timestamp.from(prevFrom));
prevParams.add(Timestamp.from(prevTo));
for (Filter f : filters) prevParams.add(f.value());
String prevSql = sql; // same shape, different time params
long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0;
var prevResult = jdbc.query(prevSql, (rs, rowNum) -> new long[]{
rs.getLong("total_count"), rs.getLong("failed_count"),
rs.getLong("avg_duration"), rs.getLong("p99_duration")
}, prevParams.toArray());
if (!prevResult.isEmpty()) {
long[] r = prevResult.get(0);
prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3];
}
// Today total (from midnight UTC)
Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
List<Object> todayParams = new ArrayList<>();
todayParams.add(Timestamp.from(todayStart));
todayParams.add(Timestamp.from(Instant.now()));
for (Filter f : filters) todayParams.add(f.value());
String todaySql = sql;
long totalToday = 0;
var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"),
todayParams.toArray());
if (!todayResult.isEmpty()) totalToday = todayResult.get(0);
return new ExecutionStats(
totalCount, failedCount, avgDuration, p99Duration, activeCount,
totalToday, prevTotal, prevFailed, prevAvg, prevP99);
}
private StatsTimeseries queryTimeseries(String view, Instant from, Instant to,
int bucketCount, List<Filter> filters,
boolean hasRunningCount) {
long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
if (intervalSeconds < 60) intervalSeconds = 60;
String runningCol = hasRunningCount ? "COALESCE(SUM(running_count), 0)" : "0";
String sql = "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " +
"COALESCE(SUM(total_count), 0) AS total_count, " +
"COALESCE(SUM(failed_count), 0) AS failed_count, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
"COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
runningCol + " AS active_count " +
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
List<Object> params = new ArrayList<>();
params.add(intervalSeconds);
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
for (Filter f : filters) {
sql += " AND " + f.column() + " = ?";
params.add(f.value());
}
sql += " GROUP BY period ORDER BY period";
List<TimeseriesBucket> buckets = jdbc.query(sql, (rs, rowNum) ->
new TimeseriesBucket(
rs.getTimestamp("period").toInstant(),
rs.getLong("total_count"), rs.getLong("failed_count"),
rs.getLong("avg_duration"), rs.getLong("p99_duration"),
rs.getLong("active_count")
), params.toArray());
return new StatsTimeseries(buckets);
}
}
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `mvn test -pl cameleer3-server-app -Dtest=PostgresStatsStoreIT -q`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java
git commit -m "feat: implement PostgresStatsStore querying continuous aggregates"
```
### Task 12: Implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore
**Files:**
- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java`
- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java`
- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresOidcConfigRepository.java`
- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java`
- [ ] **Step 1: Write failing test for PostgresDiagramStore**
Create `PostgresDiagramStoreIT.java` with tests for:
- `store()` + `findByContentHash()` roundtrip
- Content-hash dedup (store same hash twice, verify single row)
- `findContentHashForRoute()` returns latest for route+agent
- `findContentHashForRouteByAgents()` returns across agent list
- [ ] **Step 2: Implement PostgresDiagramStore**
Straightforward CRUD with `ON CONFLICT (content_hash) DO NOTHING`. Port the SHA-256 hashing logic from `ClickHouseDiagramRepository`. Use `JdbcTemplate` queries.
- [ ] **Step 3: Run diagram tests, verify pass**
- [ ] **Step 4: Implement PostgresUserRepository**
Implements `UserRepository` interface (existing interface in `core.security`, unchanged).
```java
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.security.UserInfo;
import com.cameleer3.server.core.security.UserRepository;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Array;
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;
@Repository
public class PostgresUserRepository implements UserRepository {
private final JdbcTemplate jdbc;
public PostgresUserRepository(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public Optional<UserInfo> findById(String userId) {
var results = jdbc.query(
"SELECT * FROM users WHERE user_id = ?",
(rs, rowNum) -> mapUser(rs), userId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public List<UserInfo> findAll() {
return jdbc.query("SELECT * FROM users ORDER BY user_id",
(rs, rowNum) -> mapUser(rs));
}
@Override
public void upsert(UserInfo user) {
jdbc.update("""
INSERT INTO users (user_id, provider, email, display_name, roles, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, now(), now())
ON CONFLICT (user_id) DO UPDATE SET
provider = EXCLUDED.provider, email = EXCLUDED.email,
display_name = EXCLUDED.display_name, roles = EXCLUDED.roles,
updated_at = now()
""",
user.userId(), user.provider(), user.email(), user.displayName(),
user.roles().toArray(new String[0]));
}
@Override
public void updateRoles(String userId, List<String> roles) {
jdbc.update("UPDATE users SET roles = ?, updated_at = now() WHERE user_id = ?",
roles.toArray(new String[0]), userId);
}
@Override
public void delete(String userId) {
jdbc.update("DELETE FROM users WHERE user_id = ?", userId);
}
private UserInfo mapUser(java.sql.ResultSet rs) throws java.sql.SQLException {
Array rolesArray = rs.getArray("roles");
String[] roles = rolesArray != null ? (String[]) rolesArray.getArray() : new String[0];
return new UserInfo(
rs.getString("user_id"), rs.getString("provider"),
rs.getString("email"), rs.getString("display_name"),
List.of(roles));
}
}
```
- [ ] **Step 5: Implement PostgresOidcConfigRepository**
Implements `OidcConfigRepository` interface (existing interface in `core.security`).
```java
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.security.OidcConfig;
import com.cameleer3.server.core.security.OidcConfigRepository;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Array;
import java.util.List;
import java.util.Optional;
@Repository
public class PostgresOidcConfigRepository implements OidcConfigRepository {
private final JdbcTemplate jdbc;
public PostgresOidcConfigRepository(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public Optional<OidcConfig> find() {
var results = jdbc.query(
"SELECT * FROM oidc_config WHERE config_id = 'default'",
(rs, rowNum) -> {
Array arr = rs.getArray("default_roles");
String[] roles = arr != null ? (String[]) arr.getArray() : new String[0];
return new OidcConfig(
rs.getBoolean("enabled"), rs.getString("issuer_uri"),
rs.getString("client_id"), rs.getString("client_secret"),
rs.getString("roles_claim"), List.of(roles),
rs.getBoolean("auto_signup"), rs.getString("display_name_claim"));
});
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public void save(OidcConfig config) {
jdbc.update("""
INSERT INTO oidc_config (config_id, enabled, issuer_uri, client_id, client_secret,
roles_claim, default_roles, auto_signup, display_name_claim, updated_at)
VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, now())
ON CONFLICT (config_id) DO UPDATE SET
enabled = EXCLUDED.enabled, issuer_uri = EXCLUDED.issuer_uri,
client_id = EXCLUDED.client_id, client_secret = EXCLUDED.client_secret,
roles_claim = EXCLUDED.roles_claim, default_roles = EXCLUDED.default_roles,
auto_signup = EXCLUDED.auto_signup, display_name_claim = EXCLUDED.display_name_claim,
updated_at = now()
""",
config.enabled(), config.issuerUri(), config.clientId(), config.clientSecret(),
config.rolesClaim(), config.defaultRoles().toArray(new String[0]),
config.autoSignup(), config.displayNameClaim());
}
@Override
public void delete() {
jdbc.update("DELETE FROM oidc_config WHERE config_id = 'default'");
}
}
```
- [ ] **Step 6: Implement PostgresMetricsStore**
```java
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.storage.MetricsStore;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Timestamp;
import java.util.List;
@Repository
public class PostgresMetricsStore implements MetricsStore {
private static final ObjectMapper MAPPER = new ObjectMapper();
private final JdbcTemplate jdbc;
public PostgresMetricsStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public void insertBatch(List<MetricsSnapshot> snapshots) {
jdbc.batchUpdate("""
INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags,
collected_at, server_received_at)
VALUES (?, ?, ?, ?::jsonb, ?, now())
""",
snapshots.stream().map(s -> new Object[]{
s.agentId(), s.metricName(), s.metricValue(),
tagsToJson(s.tags()),
Timestamp.from(s.collectedAt())
}).toList());
}
private String tagsToJson(java.util.Map<String, String> tags) {
if (tags == null || tags.isEmpty()) return null;
try { return MAPPER.writeValueAsString(tags); }
catch (JsonProcessingException e) { return null; }
}
}
```
- [ ] **Step 7: Run all store tests, verify pass**
Run: `mvn test -pl cameleer3-server-app -Dtest="Postgres*IT" -q`
- [ ] **Step 8: Commit**
```bash
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/Postgres*.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/Postgres*.java
git commit -m "feat: implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore"
```
---
## Chunk 4: OpenSearch Integration
### Task 13: Implement OpenSearchIndex
**Files:**
- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java`
- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java`
- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java`
- [ ] **Step 1: Write failing test**
```java
package com.cameleer3.server.app.search;
import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import org.junit.jupiter.api.Test;
import org.opensearch.testcontainers.OpensearchContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.junit.jupiter.Container;
import java.time.Instant;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
// Extends AbstractPostgresIT for PostgreSQL datasource needed by Spring context
class OpenSearchIndexIT extends AbstractPostgresIT {
@Container
static final OpensearchContainer<?> opensearch =
new OpensearchContainer<>("opensearchproject/opensearch:2.19.0")
.withSecurityEnabled(false);
@DynamicPropertySource
static void configureOpenSearch(DynamicPropertyRegistry registry) {
registry.add("opensearch.url", opensearch::getHttpHostAddress);
}
@Autowired
SearchIndex searchIndex;
@Test
void indexAndSearchByText() throws Exception {
Instant now = Instant.now();
ExecutionDocument doc = new ExecutionDocument(
"search-1", "route-a", "agent-1", "app-1",
"FAILED", "corr-1", "exch-1",
now, now.plusMillis(100), 100L,
"OrderNotFoundException: order-12345 not found", null,
List.of(new ProcessorDoc("proc-1", "log", "COMPLETED",
null, null, "request body with customer-99", null, null, null)));
searchIndex.index(doc);
Thread.sleep(1500); // Allow OpenSearch refresh
SearchRequest request = new SearchRequest(
null, now.minusSeconds(60), now.plusSeconds(60),
null, null, null,
"OrderNotFoundException", null, null, null,
null, null, null, null, null,
0, 50, "startTime", "desc");
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertTrue(result.total() > 0);
assertEquals("search-1", result.items().get(0).executionId());
}
@Test
void wildcardSearchFindsSubstring() throws Exception {
Instant now = Instant.now();
ExecutionDocument doc = new ExecutionDocument(
"wild-1", "route-b", "agent-1", "app-1",
"COMPLETED", null, null,
now, now.plusMillis(50), 50L, null, null,
List.of(new ProcessorDoc("proc-1", "bean", "COMPLETED",
null, null, "UniquePayloadIdentifier12345", null, null, null)));
searchIndex.index(doc);
Thread.sleep(1500);
SearchRequest request = new SearchRequest(
null, now.minusSeconds(60), now.plusSeconds(60),
null, null, null,
"PayloadIdentifier", null, null, null,
null, null, null, null, null,
0, 50, "startTime", "desc");
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertTrue(result.total() > 0);
}
}
```
- [ ] **Step 2: Create OpenSearchConfig**
```java
package com.cameleer3.server.app.config;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HttpHost;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class OpenSearchConfig {
@Value("${opensearch.url:http://localhost:9200}")
private String opensearchUrl;
@Bean
public OpenSearchClient openSearchClient() {
HttpHost host = HttpHost.create(opensearchUrl);
var transport = ApacheHttpClient5TransportBuilder.builder(host).build();
return new OpenSearchClient(transport);
}
}
```
- [ ] **Step 3: Implement OpenSearchIndex**
```java
package com.cameleer3.server.app.search;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import jakarta.annotation.PostConstruct;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.SortOrder;
import org.opensearch.client.opensearch._types.query_dsl.*;
import org.opensearch.client.opensearch.core.*;
import org.opensearch.client.opensearch.core.search.Hit;
import org.opensearch.client.opensearch.indices.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@Repository
public class OpenSearchIndex implements SearchIndex {
private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class);
private static final String INDEX_PREFIX = "executions-";
private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd")
.withZone(ZoneOffset.UTC);
private final OpenSearchClient client;
public OpenSearchIndex(OpenSearchClient client) {
this.client = client;
}
@PostConstruct
void ensureIndexTemplate() {
// Full template with ngram analyzer for infix wildcard search.
// The template JSON matches the spec's OpenSearch index template definition.
try {
boolean exists = client.indices().existsIndexTemplate(
ExistsIndexTemplateRequest.of(b -> b.name("executions-template"))).value();
if (!exists) {
client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b
.name("executions-template")
.indexPatterns(List.of("executions-*"))
.template(t -> t
.settings(s -> s
.numberOfShards("3")
.numberOfReplicas("1")
.analysis(a -> a
.analyzer("ngram_analyzer", an -> an
.custom(c -> c
.tokenizer("ngram_tokenizer")
.filter("lowercase")))
.tokenizer("ngram_tokenizer", tk -> tk
.definition(d -> d
.ngram(ng -> ng
.minGram(3)
.maxGram(4)
.tokenChars(TokenChar.Letter,
TokenChar.Digit,
TokenChar.Punctuation,
TokenChar.Symbol)))))))));
log.info("OpenSearch index template created with ngram analyzer");
}
} catch (IOException e) {
log.error("Failed to create index template", e);
}
}
@Override
public void index(ExecutionDocument doc) {
String indexName = INDEX_PREFIX + DAY_FMT.format(doc.startTime());
try {
client.index(IndexRequest.of(b -> b
.index(indexName)
.id(doc.executionId())
.document(toMap(doc))));
} catch (IOException e) {
log.error("Failed to index execution {}", doc.executionId(), e);
}
}
@Override
public SearchResult<ExecutionSummary> search(SearchRequest request) {
try {
var searchReq = buildSearchRequest(request, request.limit());
var response = client.search(searchReq, Map.class);
List<ExecutionSummary> items = response.hits().hits().stream()
.map(this::hitToSummary)
.collect(Collectors.toList());
long total = response.hits().total() != null ? response.hits().total().value() : 0;
return new SearchResult<>(items, total);
} catch (IOException e) {
log.error("Search failed", e);
return new SearchResult<>(List.of(), 0);
}
}
@Override
public long count(SearchRequest request) {
try {
var countReq = CountRequest.of(b -> b
.index(INDEX_PREFIX + "*")
.query(buildQuery(request)));
return client.count(countReq).count();
} catch (IOException e) {
log.error("Count failed", e);
return 0;
}
}
@Override
public void delete(String executionId) {
try {
client.deleteByQuery(DeleteByQueryRequest.of(b -> b
.index(List.of(INDEX_PREFIX + "*"))
.query(Query.of(q -> q.term(t -> t
.field("execution_id").value(executionId))))));
} catch (IOException e) {
log.error("Failed to delete execution {}", executionId, e);
}
}
private org.opensearch.client.opensearch.core.SearchRequest buildSearchRequest(
SearchRequest request, int size) {
return org.opensearch.client.opensearch.core.SearchRequest.of(b -> {
b.index(INDEX_PREFIX + "*")
.query(buildQuery(request))
.size(size)
.from(request.offset())
.sort(s -> s.field(f -> f
.field(request.sortColumn())
.order("asc".equalsIgnoreCase(request.sortDir())
? SortOrder.Asc : SortOrder.Desc)));
return b;
});
}
private Query buildQuery(SearchRequest request) {
List<Query> must = new ArrayList<>();
List<Query> filter = new ArrayList<>();
// Time range
if (request.timeFrom() != null || request.timeTo() != null) {
filter.add(Query.of(q -> q.range(r -> {
r.field("start_time");
if (request.timeFrom() != null)
r.gte(jakarta.json.Json.createValue(request.timeFrom().toString()));
if (request.timeTo() != null)
r.lte(jakarta.json.Json.createValue(request.timeTo().toString()));
return r;
})));
}
// Keyword filters
if (request.status() != null)
filter.add(termQuery("status", request.status()));
if (request.routeId() != null)
filter.add(termQuery("route_id", request.routeId()));
if (request.agentId() != null)
filter.add(termQuery("agent_id", request.agentId()));
if (request.correlationId() != null)
filter.add(termQuery("correlation_id", request.correlationId()));
// Full-text search across all fields + nested processor fields
if (request.text() != null && !request.text().isBlank()) {
String text = request.text();
List<Query> textQueries = new ArrayList<>();
// Search top-level text fields
textQueries.add(Query.of(q -> q.multiMatch(m -> m
.query(text)
.fields("error_message", "error_stacktrace",
"error_message.ngram", "error_stacktrace.ngram"))));
// Search nested processor fields
textQueries.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(text)
.fields("processors.input_body", "processors.output_body",
"processors.input_headers", "processors.output_headers",
"processors.error_message", "processors.error_stacktrace",
"processors.input_body.ngram", "processors.output_body.ngram",
"processors.input_headers.ngram", "processors.output_headers.ngram",
"processors.error_message.ngram", "processors.error_stacktrace.ngram"))))));
// Also try keyword fields for exact matches
textQueries.add(Query.of(q -> q.multiMatch(m -> m
.query(text)
.fields("execution_id", "route_id", "agent_id", "correlation_id", "exchange_id"))));
must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1"))));
}
// Scoped text searches
if (request.textInBody() != null && !request.textInBody().isBlank()) {
must.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(request.textInBody())
.fields("processors.input_body", "processors.output_body",
"processors.input_body.ngram", "processors.output_body.ngram"))))));
}
if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) {
must.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(request.textInHeaders())
.fields("processors.input_headers", "processors.output_headers",
"processors.input_headers.ngram", "processors.output_headers.ngram"))))));
}
if (request.textInErrors() != null && !request.textInErrors().isBlank()) {
String errText = request.textInErrors();
must.add(Query.of(q -> q.bool(b -> b.should(
Query.of(sq -> sq.multiMatch(m -> m
.query(errText)
.fields("error_message", "error_stacktrace",
"error_message.ngram", "error_stacktrace.ngram"))),
Query.of(sq -> sq.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(errText)
.fields("processors.error_message", "processors.error_stacktrace",
"processors.error_message.ngram", "processors.error_stacktrace.ngram")))))
).minimumShouldMatch("1"))));
}
// Duration range
if (request.durationMin() != null || request.durationMax() != null) {
filter.add(Query.of(q -> q.range(r -> {
r.field("duration_ms");
if (request.durationMin() != null)
r.gte(jakarta.json.Json.createValue(request.durationMin()));
if (request.durationMax() != null)
r.lte(jakarta.json.Json.createValue(request.durationMax()));
return r;
})));
}
return Query.of(q -> q.bool(b -> {
if (!must.isEmpty()) b.must(must);
if (!filter.isEmpty()) b.filter(filter);
if (must.isEmpty() && filter.isEmpty()) b.must(Query.of(mq -> mq.matchAll(m -> m)));
return b;
}));
}
private Query termQuery(String field, String value) {
return Query.of(q -> q.term(t -> t.field(field).value(value)));
}
private Map<String, Object> toMap(ExecutionDocument doc) {
Map<String, Object> map = new LinkedHashMap<>();
map.put("execution_id", doc.executionId());
map.put("route_id", doc.routeId());
map.put("agent_id", doc.agentId());
map.put("group_name", doc.groupName());
map.put("status", doc.status());
map.put("correlation_id", doc.correlationId());
map.put("exchange_id", doc.exchangeId());
map.put("start_time", doc.startTime() != null ? doc.startTime().toString() : null);
map.put("end_time", doc.endTime() != null ? doc.endTime().toString() : null);
map.put("duration_ms", doc.durationMs());
map.put("error_message", doc.errorMessage());
map.put("error_stacktrace", doc.errorStacktrace());
if (doc.processors() != null) {
map.put("processors", doc.processors().stream().map(p -> {
Map<String, Object> pm = new LinkedHashMap<>();
pm.put("processor_id", p.processorId());
pm.put("processor_type", p.processorType());
pm.put("status", p.status());
pm.put("error_message", p.errorMessage());
pm.put("error_stacktrace", p.errorStacktrace());
pm.put("input_body", p.inputBody());
pm.put("output_body", p.outputBody());
pm.put("input_headers", p.inputHeaders());
pm.put("output_headers", p.outputHeaders());
return pm;
}).toList());
}
return map;
}
@SuppressWarnings("unchecked")
private ExecutionSummary hitToSummary(Hit<Map> hit) {
Map<String, Object> src = hit.source();
if (src == null) return null;
return new ExecutionSummary(
(String) src.get("execution_id"),
(String) src.get("route_id"),
(String) src.get("agent_id"),
(String) src.get("status"),
src.get("start_time") != null ? Instant.parse((String) src.get("start_time")) : null,
src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null,
src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L,
(String) src.get("correlation_id"),
(String) src.get("error_message"));
}
}
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `mvn test -pl cameleer3-server-app -Dtest=OpenSearchIndexIT -q`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java
git commit -m "feat: implement OpenSearchIndex with full-text and wildcard search"
```
### Task 14: Implement SearchIndexer (debounced event-driven indexer)
**Files:**
- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java`
- [ ] **Step 1: Implement SearchIndexer**
```java
package com.cameleer3.server.core.indexing;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
public class SearchIndexer {
private static final Logger log = LoggerFactory.getLogger(SearchIndexer.class);
private final ExecutionStore executionStore;
private final SearchIndex searchIndex;
private final long debounceMs;
private final int queueCapacity;
private final Map<String, ScheduledFuture<?>> pending = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
r -> { Thread t = new Thread(r, "search-indexer"); t.setDaemon(true); return t; });
public SearchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
long debounceMs, int queueCapacity) {
this.executionStore = executionStore;
this.searchIndex = searchIndex;
this.debounceMs = debounceMs;
this.queueCapacity = queueCapacity;
}
public void onExecutionUpdated(ExecutionUpdatedEvent event) {
if (pending.size() >= queueCapacity) {
log.warn("Search indexer queue full, dropping event for {}", event.executionId());
return;
}
ScheduledFuture<?> existing = pending.put(event.executionId(),
scheduler.schedule(() -> indexExecution(event.executionId()),
debounceMs, TimeUnit.MILLISECONDS));
if (existing != null) {
existing.cancel(false);
}
}
private void indexExecution(String executionId) {
pending.remove(executionId);
try {
ExecutionRecord exec = executionStore.findById(executionId).orElse(null);
if (exec == null) return;
List<ProcessorRecord> processors = executionStore.findProcessors(executionId);
List<ProcessorDoc> processorDocs = processors.stream()
.map(p -> new ProcessorDoc(
p.processorId(), p.processorType(), p.status(),
p.errorMessage(), p.errorStacktrace(),
p.inputBody(), p.outputBody(),
p.inputHeaders(), p.outputHeaders()))
.toList();
searchIndex.index(new ExecutionDocument(
exec.executionId(), exec.routeId(), exec.agentId(), exec.groupName(),
exec.status(), exec.correlationId(), exec.exchangeId(),
exec.startTime(), exec.endTime(), exec.durationMs(),
exec.errorMessage(), exec.errorStacktrace(), processorDocs));
} catch (Exception e) {
log.error("Failed to index execution {}", executionId, e);
}
}
public void shutdown() {
scheduler.shutdown();
}
}
```
- [ ] **Step 2: Commit**
```bash
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java
git commit -m "feat: implement debounced SearchIndexer for async OpenSearch indexing"
```
---
## Chunk 5: Wiring, Cleanup, and Integration
### Task 15: Create bean configuration and wire everything
**Files:**
- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java`
- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java`
- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java`
- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java`
- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java`
- [ ] **Step 1: Create StorageBeanConfig**
Wire `DetailService`, `SearchIndexer`, `IngestionService` with new store beans:
```java
package com.cameleer3.server.app.config;
import com.cameleer3.server.core.detail.DetailService;
import com.cameleer3.server.core.indexing.SearchIndexer;
import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.*;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class StorageBeanConfig {
@Bean
public DetailService detailService(ExecutionStore executionStore) {
return new DetailService(executionStore);
}
@Bean(destroyMethod = "shutdown")
public SearchIndexer searchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
@Value("${opensearch.debounce-ms:2000}") long debounceMs,
@Value("${opensearch.queue-size:10000}") int queueSize) {
return new SearchIndexer(executionStore, searchIndex, debounceMs, queueSize);
}
@Bean
public IngestionService ingestionService(ExecutionStore executionStore,
DiagramStore diagramStore,
WriteBuffer<MetricsSnapshot> metricsBuffer,
SearchIndexer searchIndexer,
@Value("${cameleer.body-size-limit:16384}") int bodySizeLimit) {
return new IngestionService(executionStore, diagramStore, metricsBuffer,
searchIndexer::onExecutionUpdated, bodySizeLimit);
}
}
```
- [ ] **Step 2: Update SearchBeanConfig**
Wire `SearchService` with `SearchIndex` + `StatsStore`:
```java
@Bean
public SearchService searchService(SearchIndex searchIndex, StatsStore statsStore) {
return new SearchService(searchIndex, statsStore);
}
```
- [ ] **Step 3: Update IngestionBeanConfig**
Remove execution and diagram write buffers. Keep only metrics write buffer:
```java
@Bean
public WriteBuffer<MetricsSnapshot> metricsBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
```
- [ ] **Step 4: Create MetricsFlushScheduler**
Create `cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java`:
```java
package com.cameleer3.server.app.ingestion;
import com.cameleer3.server.app.config.IngestionConfig;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.MetricsStore;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class MetricsFlushScheduler implements SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(MetricsFlushScheduler.class);
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
private final MetricsStore metricsStore;
private final int batchSize;
private volatile boolean running = false;
public MetricsFlushScheduler(WriteBuffer<MetricsSnapshot> metricsBuffer,
MetricsStore metricsStore,
IngestionConfig config) {
this.metricsBuffer = metricsBuffer;
this.metricsStore = metricsStore;
this.batchSize = config.getBatchSize();
}
@Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
public void flush() {
try {
List<MetricsSnapshot> batch = metricsBuffer.drain(batchSize);
if (!batch.isEmpty()) {
metricsStore.insertBatch(batch);
log.debug("Flushed {} metrics to PostgreSQL", batch.size());
}
} catch (Exception e) {
log.error("Failed to flush metrics", e);
}
}
@Override public void start() { running = true; }
@Override public void stop() {
// Drain remaining on shutdown
while (metricsBuffer.size() > 0) {
List<MetricsSnapshot> batch = metricsBuffer.drain(batchSize);
if (batch.isEmpty()) break;
try { metricsStore.insertBatch(batch); }
catch (Exception e) { log.error("Failed to flush metrics during shutdown", e); break; }
}
running = false;
}
@Override public boolean isRunning() { return running; }
@Override public int getPhase() { return Integer.MAX_VALUE - 1; }
}
```
- [ ] **Step 5: Create RetentionScheduler**
Create `cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java`:
```java
package com.cameleer3.server.app.retention;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class RetentionScheduler {
private static final Logger log = LoggerFactory.getLogger(RetentionScheduler.class);
private final JdbcTemplate jdbc;
private final int retentionDays;
public RetentionScheduler(JdbcTemplate jdbc,
@Value("${cameleer.retention-days:30}") int retentionDays) {
this.jdbc = jdbc;
this.retentionDays = retentionDays;
}
@Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM UTC
public void dropExpiredChunks() {
String interval = retentionDays + " days";
try {
// Raw data
jdbc.execute("SELECT drop_chunks('executions', INTERVAL '" + interval + "')");
jdbc.execute("SELECT drop_chunks('processor_executions', INTERVAL '" + interval + "')");
jdbc.execute("SELECT drop_chunks('agent_metrics', INTERVAL '" + interval + "')");
// Continuous aggregates (keep 3x longer)
String caggInterval = (retentionDays * 3) + " days";
jdbc.execute("SELECT drop_chunks('stats_1m_all', INTERVAL '" + caggInterval + "')");
jdbc.execute("SELECT drop_chunks('stats_1m_app', INTERVAL '" + caggInterval + "')");
jdbc.execute("SELECT drop_chunks('stats_1m_route', INTERVAL '" + caggInterval + "')");
jdbc.execute("SELECT drop_chunks('stats_1m_processor', INTERVAL '" + caggInterval + "')");
log.info("Retention: dropped chunks older than {} days (aggregates: {} days)",
retentionDays, retentionDays * 3);
} catch (Exception e) {
log.error("Retention job failed", e);
}
}
// Note: OpenSearch daily index deletion should be handled via ILM policy
// configured at deployment time, not in application code.
}
```
- [ ] **Step 6: Commit**
```bash
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java
git commit -m "feat: wire new storage beans, add MetricsFlushScheduler and RetentionScheduler"
```
### Task 16: Delete ClickHouse code and old interfaces
**Files:**
- Delete all files listed in "Files to delete" section above
- [ ] **Step 1: Delete ClickHouse implementations**
```bash
rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouse*.java
rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java
rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java
rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java
```
- [ ] **Step 2: Delete old core interfaces replaced by new ones**
`UserRepository` and `OidcConfigRepository` in `core.security` are **kept** — the new Postgres implementations implement them. Only interfaces replaced by new storage interfaces are deleted.
```bash
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java
```
- [ ] **Step 3: Delete ClickHouse SQL migrations**
```bash
rm -r cameleer3-server-app/src/main/resources/clickhouse/
```
- [ ] **Step 4: Delete old test base class**
```bash
rm cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java
```
- [ ] **Step 5: Fix compilation errors in specific files**
Each file and what to change:
- `DiagramRenderController.java` — change `DiagramRepository` to `DiagramStore`
- `DiagramController.java` — change `DiagramRepository` to `DiagramStore`
- `SearchController.java` — already uses `SearchService`, verify no direct `SearchEngine` refs
- `DetailController.java` — already uses `DetailService`, verify no direct `ExecutionRepository` refs
- `TreeReconstructionTest.java` — rewrite to test `DetailService.buildTree()` with `ProcessorRecord` list input
- `ExecutionController.java` — update to call `IngestionService.ingestExecution()` (synchronous, catches exceptions for 503)
- `DiagramController.java` — update to call `IngestionService.ingestDiagram()` (synchronous)
- `MetricsController.java` — update to call `IngestionService.acceptMetrics()` (still buffered)
- `SearchBeanConfig.java` — wire `SearchService(SearchIndex, StatsStore)` instead of `SearchService(SearchEngine)`
- `IngestionBeanConfig.java` — remove execution/diagram buffer beans, add `bodySizeLimit` to `IngestionService` constructor
- All ITs extending `AbstractClickHouseIT` — change to extend `AbstractPostgresIT`
- [ ] **Step 6: Verify compilation**
Run: `mvn compile -q`
Expected: BUILD SUCCESS
- [ ] **Step 7: Commit**
```bash
git add -A
git commit -m "refactor: remove all ClickHouse code, old interfaces, and SQL migrations"
```
### Task 17: Update existing integration tests
**Files:**
- Modify: all IT files under `cameleer3-server-app/src/test/`
- [ ] **Step 1: Update all ITs to extend AbstractPostgresIT**
Every IT that currently extends `AbstractClickHouseIT` must extend `AbstractPostgresIT` instead. Also add OpenSearch Testcontainer where search tests are needed.
- [ ] **Step 2: Update ExecutionControllerIT**
Adapt to synchronous writes (no buffer flush delay). The controller now returns 202 on success, 503 on DB write failure.
- [ ] **Step 3: Update SearchControllerIT**
Search now hits OpenSearch. Add OpenSearch Testcontainer. Allow time for async indexing (use Awaitility to poll search results rather than `Thread.sleep`).
- [ ] **Step 4: Update DetailControllerIT**
Detail now reads from `PostgresExecutionStore` directly. Simpler setup — just insert execution + processors.
- [ ] **Step 5: Run full test suite**
Run: `mvn verify -q`
Expected: BUILD SUCCESS, all tests pass
- [ ] **Step 6: Commit**
```bash
git add -A
git commit -m "test: migrate all integration tests from ClickHouse to PostgreSQL + OpenSearch"
```
### Task 18: Update Dockerfile and K8s manifests
**Files:**
- Modify: `Dockerfile`
- Modify: `deploy/*.yaml` (K8s manifests)
- [ ] **Step 1: Update Dockerfile**
No JDBC driver changes needed in Dockerfile (drivers are in the fat JAR). Just verify the `REGISTRY_TOKEN` build arg still works for `cameleer3-common` resolution.
- [ ] **Step 2: Update K8s manifests**
- Replace ClickHouse StatefulSet with PostgreSQL/TimescaleDB StatefulSet
- Add OpenSearch StatefulSet (single-node for dev, clustered for prod)
- Update server Deployment env vars: `SPRING_DATASOURCE_URL`, `OPENSEARCH_URL`
- Update secrets: replace `clickhouse-credentials` with `postgres-credentials`
- Update health check probes: PostgreSQL uses standard port 5432, OpenSearch uses 9200
- [ ] **Step 3: Commit**
```bash
git add Dockerfile deploy/
git commit -m "deploy: replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch in K8s manifests"
```
### Task 19: Update CI workflow
**Files:**
- Modify: `.gitea/workflows/ci.yml`
- [ ] **Step 1: Update CI workflow**
Changes needed:
- Docker build: no ClickHouse references in the image (it's a fat JAR, driver is bundled)
- Deploy step: update K8s secret names from `clickhouse-credentials` to `postgres-credentials`
- Deploy step: add OpenSearch deployment manifests
- Verify `REGISTRY_TOKEN` build arg still works for `cameleer3-common`
- Integration tests still skipped in CI (`-DskipITs`) — Testcontainers needs Docker-in-Docker
- [ ] **Step 2: Commit**
```bash
git add .gitea/workflows/ci.yml
git commit -m "ci: update workflow for PostgreSQL + OpenSearch deployment"
```
### Task 20: Update OpenAPI spec
- [ ] **Step 1: Regenerate openapi.json**
Run the server locally (or via test) and export the OpenAPI spec. The API surface hasn't changed — only the storage backend.
Run: `mvn spring-boot:run` (briefly, to generate spec), then fetch `/api/v1/api-docs`.
- [ ] **Step 2: Commit**
```bash
git add cameleer3-server-app/src/main/resources/static/openapi.json
git commit -m "docs: regenerate openapi.json after storage layer refactor"
```
### Task 21: Final verification
- [ ] **Step 1: Full build**
Run: `mvn clean verify -q`
Expected: BUILD SUCCESS
- [ ] **Step 2: Manual smoke test**
Start server against local PostgreSQL/TimescaleDB + OpenSearch (Docker Compose or individual containers):
1. POST an execution → verify 202
2. GET `/api/v1/search?text=...` → verify search works
3. GET `/api/v1/stats` → verify stats return
4. GET `/api/v1/detail/{id}` → verify detail with processor tree
- [ ] **Step 3: Commit any remaining fixes**
```bash
git add -A
git commit -m "fix: final adjustments from smoke testing"
```