# Storage Layer Refactor Implementation Plan
> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch as the storage and search backends.
**Architecture:** PostgreSQL/TimescaleDB is the source of truth for all data and analytics (continuous aggregates). OpenSearch is an async search index for full-text/wildcard queries. Core module defines interfaces; app module provides implementations.
**Tech Stack:** Java 17, Spring Boot 3.4.3, PostgreSQL 16 + TimescaleDB, OpenSearch 2.x, Flyway, Testcontainers, OpenSearch Java Client
**Spec:** `docs/superpowers/specs/2026-03-16-storage-layer-design.md`
---
## File Structure
### New files
**Core module** (`cameleer-server-core/src/main/java/com/cameleer/server/core/`):
- `storage/ExecutionStore.java` — new interface replacing ExecutionRepository
- `storage/StatsStore.java` — new interface for stats from continuous aggregates
- `storage/SearchIndex.java` — new interface for OpenSearch operations
- `storage/DiagramStore.java` — new interface replacing DiagramRepository
- `storage/MetricsStore.java` — new interface replacing MetricsRepository
- `storage/model/ExecutionDocument.java` — document model for OpenSearch indexing
- `search/StatsRequest.java` — request DTO for stats queries (level, scope, time range)
- `search/TimeSeriesRequest.java` — request DTO for time-series queries (bucket size)
- `indexing/SearchIndexer.java` — debounced event listener for OpenSearch indexing
- `indexing/ExecutionUpdatedEvent.java` — event published after execution write
**App module** (`cameleer-server-app/src/main/java/com/cameleer/server/app/`):
- `storage/PostgresExecutionStore.java` — ExecutionStore impl with upsert
- `storage/PostgresStatsStore.java` — StatsStore impl querying continuous aggregates
- `storage/PostgresDiagramStore.java` — DiagramStore impl
- `storage/PostgresUserRepository.java` — UserRepository impl (keeps existing core interface)
- `storage/PostgresOidcConfigRepository.java` — OidcConfigRepository impl (keeps existing core interface)
- `storage/PostgresMetricsStore.java` — MetricsStore impl
- `search/OpenSearchIndex.java` — SearchIndex impl
- `config/OpenSearchConfig.java` — OpenSearch client bean
- `config/StorageBeanConfig.java` — wires all store beans
- `ingestion/MetricsFlushScheduler.java` — scheduled metrics buffer flush (replaces ClickHouseFlushScheduler, metrics only)
- `retention/RetentionScheduler.java` — scheduled job for drop_chunks and OpenSearch index deletion
**Flyway migrations** (`cameleer-server-app/src/main/resources/db/migration/`):
- `V1__extensions.sql` — CREATE EXTENSION timescaledb, timescaledb_toolkit
- `V2__executions.sql` — executions hypertable
- `V3__processor_executions.sql` — processor_executions hypertable
- `V4__agent_metrics.sql` — agent_metrics hypertable
- `V5__route_diagrams.sql` — route_diagrams table
- `V6__users.sql` — users table
- `V7__oidc_config.sql` — oidc_config table
- `V8__continuous_aggregates.sql` — all 4 continuous aggregates + refresh policies
Note: Retention is NOT in a Flyway migration (Flyway migrations are immutable once applied). No V9 file. Retention is handled by `RetentionScheduler` at runtime with configurable intervals.
**Test files** (`cameleer-server-app/src/test/java/com/cameleer/server/app/`):
- `AbstractPostgresIT.java` — replaces AbstractClickHouseIT (TimescaleDB Testcontainer)
- `storage/PostgresExecutionStoreIT.java` — upsert, dedup, chunked arrival tests
- `storage/PostgresStatsStoreIT.java` — continuous aggregate query tests
- `storage/PostgresDiagramStoreIT.java` — content-hash dedup tests
- `storage/PostgresUserRepositoryIT.java` — CRUD tests
- `search/OpenSearchIndexIT.java` — index, search, wildcard tests
### Files to modify
- `pom.xml` (root) — no changes needed
- `cameleer-server-app/pom.xml` — swap clickhouse-jdbc for postgresql + opensearch-java + flyway
- `cameleer-server-core/.../core/search/SearchService.java` — split: search delegates to SearchIndex, stats/timeseries to StatsStore
- `cameleer-server-core/.../core/detail/DetailService.java` — use ExecutionStore instead of ExecutionRepository
- `cameleer-server-core/.../core/detail/RawExecutionRow.java` — remove (replaced by normalized model)
- `cameleer-server-core/.../core/ingestion/IngestionService.java` — synchronous execution/diagram writes, keep buffer for metrics
- `cameleer-server-app/.../app/config/SearchBeanConfig.java` — wire StatsStore into SearchService
- `cameleer-server-app/.../app/config/IngestionBeanConfig.java` — update bean wiring
- `cameleer-server-app/src/main/resources/application.yml` — PostgreSQL + OpenSearch config
- `cameleer-server-app/src/test/resources/application-test.yml` — test config
### Files to delete
- `cameleer-server-app/.../app/storage/ClickHouseExecutionRepository.java`
- `cameleer-server-app/.../app/storage/ClickHouseDiagramRepository.java`
- `cameleer-server-app/.../app/storage/ClickHouseMetricsRepository.java`
- `cameleer-server-app/.../app/storage/ClickHouseUserRepository.java`
- `cameleer-server-app/.../app/storage/ClickHouseOidcConfigRepository.java`
- `cameleer-server-app/.../app/search/ClickHouseSearchEngine.java`
- `cameleer-server-app/.../app/ingestion/ClickHouseFlushScheduler.java`
- `cameleer-server-app/.../app/config/ClickHouseConfig.java`
- `cameleer-server-core/.../core/storage/ExecutionRepository.java`
- `cameleer-server-core/.../core/storage/DiagramRepository.java`
- `cameleer-server-core/.../core/storage/MetricsRepository.java`
- `cameleer-server-core/.../core/search/SearchEngine.java`
- `cameleer-server-core/.../core/detail/RawExecutionRow.java`
Note: `UserRepository` and `OidcConfigRepository` interfaces in `core.security` are **kept** — the new Postgres implementations implement these existing interfaces. No rename needed since their contracts are unchanged.
- `cameleer-server-app/src/main/resources/clickhouse/*.sql` (all 8 files)
- `cameleer-server-app/src/test/.../app/AbstractClickHouseIT.java`
---
## Chunk 1: Dependencies, Flyway Migrations, and Test Infrastructure
### Task 1: Update Maven dependencies
**Files:**
- Modify: `cameleer-server-app/pom.xml`
- [ ] **Step 1: Replace ClickHouse JDBC with PostgreSQL driver + Flyway + OpenSearch client**
In `cameleer-server-app/pom.xml`, replace the ClickHouse dependency and add new ones:
Remove:
```xml
com.clickhouse
clickhouse-jdbc
0.9.7
all
```
Add:
```xml
org.postgresql
postgresql
org.flywaydb
flyway-core
org.flywaydb
flyway-database-postgresql
org.opensearch.client
opensearch-java
2.19.0
org.opensearch.client
opensearch-rest-client
2.19.0
```
Replace the ClickHouse Testcontainer:
```xml
org.testcontainers
postgresql
test
org.opensearch
opensearch-testcontainers
2.1.1
test
```
Note: `postgresql` driver and `flyway-core` versions are managed by Spring Boot parent POM. Testcontainers BOM version is `${testcontainers.version}` (2.0.3) from root POM.
- [ ] **Step 2: Commit** (compilation will fail until ClickHouse code is deleted in Task 16 — this is expected)
```bash
git add cameleer-server-app/pom.xml
git commit -m "chore: swap ClickHouse deps for PostgreSQL, Flyway, OpenSearch"
```
### Task 2: Write Flyway migrations
**Files:**
- Create: `cameleer-server-app/src/main/resources/db/migration/V1__extensions.sql`
- Create: `cameleer-server-app/src/main/resources/db/migration/V2__executions.sql`
- Create: `cameleer-server-app/src/main/resources/db/migration/V3__processor_executions.sql`
- Create: `cameleer-server-app/src/main/resources/db/migration/V4__agent_metrics.sql`
- Create: `cameleer-server-app/src/main/resources/db/migration/V5__route_diagrams.sql`
- Create: `cameleer-server-app/src/main/resources/db/migration/V6__users.sql`
- Create: `cameleer-server-app/src/main/resources/db/migration/V7__oidc_config.sql`
- Create: `cameleer-server-app/src/main/resources/db/migration/V8__continuous_aggregates.sql`
- Create: `cameleer-server-app/src/main/resources/db/migration/V9__retention_policies.sql`
- [ ] **Step 1: Create V1__extensions.sql**
```sql
CREATE EXTENSION IF NOT EXISTS timescaledb;
CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit;
```
- [ ] **Step 2: Create V2__executions.sql**
```sql
CREATE TABLE executions (
execution_id TEXT NOT NULL,
route_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
group_name TEXT NOT NULL,
status TEXT NOT NULL,
correlation_id TEXT,
exchange_id TEXT,
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ,
duration_ms BIGINT,
error_message TEXT,
error_stacktrace TEXT,
diagram_content_hash TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (execution_id, start_time)
);
SELECT create_hypertable('executions', 'start_time', chunk_time_interval => INTERVAL '1 day');
CREATE INDEX idx_executions_agent_time ON executions (agent_id, start_time DESC);
CREATE INDEX idx_executions_route_time ON executions (route_id, start_time DESC);
CREATE INDEX idx_executions_group_time ON executions (group_name, start_time DESC);
CREATE INDEX idx_executions_correlation ON executions (correlation_id);
```
- [ ] **Step 3: Create V3__processor_executions.sql**
```sql
CREATE TABLE processor_executions (
id BIGSERIAL,
execution_id TEXT NOT NULL,
processor_id TEXT NOT NULL,
processor_type TEXT NOT NULL,
diagram_node_id TEXT,
group_name TEXT NOT NULL,
route_id TEXT NOT NULL,
depth INT NOT NULL,
parent_processor_id TEXT,
status TEXT NOT NULL,
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ,
duration_ms BIGINT,
error_message TEXT,
error_stacktrace TEXT,
input_body TEXT,
output_body TEXT,
input_headers JSONB,
output_headers JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (execution_id, processor_id, start_time)
);
SELECT create_hypertable('processor_executions', 'start_time', chunk_time_interval => INTERVAL '1 day');
CREATE INDEX idx_proc_exec_execution ON processor_executions (execution_id);
CREATE INDEX idx_proc_exec_type_time ON processor_executions (processor_type, start_time DESC);
```
- [ ] **Step 4: Create V4__agent_metrics.sql**
```sql
CREATE TABLE agent_metrics (
agent_id TEXT NOT NULL,
metric_name TEXT NOT NULL,
metric_value DOUBLE PRECISION NOT NULL,
tags JSONB,
collected_at TIMESTAMPTZ NOT NULL,
server_received_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
SELECT create_hypertable('agent_metrics', 'collected_at', chunk_time_interval => INTERVAL '1 day');
CREATE INDEX idx_metrics_agent_name ON agent_metrics (agent_id, metric_name, collected_at DESC);
```
- [ ] **Step 5: Create V5__route_diagrams.sql**
```sql
CREATE TABLE route_diagrams (
content_hash TEXT PRIMARY KEY,
route_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
definition TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_diagrams_route_agent ON route_diagrams (route_id, agent_id);
```
- [ ] **Step 6: Create V6__users.sql**
```sql
CREATE TABLE users (
user_id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
email TEXT,
display_name TEXT,
roles TEXT[] NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
- [ ] **Step 7: Create V7__oidc_config.sql**
```sql
CREATE TABLE oidc_config (
config_id TEXT PRIMARY KEY DEFAULT 'default',
enabled BOOLEAN NOT NULL DEFAULT false,
issuer_uri TEXT,
client_id TEXT,
client_secret TEXT,
roles_claim TEXT,
default_roles TEXT[] NOT NULL DEFAULT '{}',
auto_signup BOOLEAN DEFAULT false,
display_name_claim TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
- [ ] **Step 8: Create V8__continuous_aggregates.sql**
```sql
-- Global stats
CREATE MATERIALIZED VIEW stats_1m_all
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket;
SELECT add_continuous_aggregate_policy('stats_1m_all',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
-- Per-application stats
CREATE MATERIALIZED VIEW stats_1m_app
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
group_name,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket, group_name;
SELECT add_continuous_aggregate_policy('stats_1m_app',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
-- Per-route stats
CREATE MATERIALIZED VIEW stats_1m_route
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
group_name,
route_id,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket, group_name, route_id;
SELECT add_continuous_aggregate_policy('stats_1m_route',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
-- Per-processor stats (uses denormalized group_name/route_id on processor_executions)
CREATE MATERIALIZED VIEW stats_1m_processor
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
group_name,
route_id,
processor_type,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM processor_executions
GROUP BY bucket, group_name, route_id, processor_type;
SELECT add_continuous_aggregate_policy('stats_1m_processor',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
```
- [ ] **Step 9: Commit**
```bash
git add cameleer-server-app/src/main/resources/db/migration/
git commit -m "feat: add Flyway migrations for PostgreSQL/TimescaleDB schema"
```
### Task 3: Create test base class with TimescaleDB Testcontainer
**Files:**
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/AbstractPostgresIT.java`
- [ ] **Step 1: Write AbstractPostgresIT**
```java
package com.cameleer.server.app;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
public abstract class AbstractPostgresIT {
@Container
static final PostgreSQLContainer> postgres =
new PostgreSQLContainer<>("timescale/timescaledb:latest-pg16")
.withDatabaseName("cameleer")
.withUsername("cameleer")
.withPassword("test");
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", postgres::getJdbcUrl);
registry.add("spring.datasource.username", postgres::getUsername);
registry.add("spring.datasource.password", postgres::getPassword);
registry.add("spring.datasource.driver-class-name", () -> "org.postgresql.Driver");
registry.add("spring.flyway.enabled", () -> "true");
}
}
```
- [ ] **Step 2: Write a smoke test to verify migrations run**
Create `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/FlywayMigrationIT.java`:
```java
package com.cameleer.server.app.storage;
import com.cameleer.server.app.AbstractPostgresIT;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import static org.junit.jupiter.api.Assertions.*;
class FlywayMigrationIT extends AbstractPostgresIT {
@Autowired
JdbcTemplate jdbcTemplate;
@Test
void allMigrationsApplySuccessfully() {
// Verify core tables exist
Integer execCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM executions", Integer.class);
assertEquals(0, execCount);
Integer procCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processor_executions", Integer.class);
assertEquals(0, procCount);
Integer userCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM users", Integer.class);
assertEquals(0, userCount);
// Verify continuous aggregates exist
Integer caggCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM timescaledb_information.continuous_aggregates",
Integer.class);
assertEquals(4, caggCount);
}
}
```
- [ ] **Step 3: Verify test passes** (this test will not compile until Task 16 deletes ClickHouse code. Run it after Task 16 is complete. Listed here for logical grouping.)
Run: `mvn test -pl cameleer-server-app -Dtest=FlywayMigrationIT -q`
Expected: PASS — all migrations apply, tables and continuous aggregates exist
- [ ] **Step 4: Commit**
```bash
git add cameleer-server-app/src/test/java/com/cameleer/server/app/AbstractPostgresIT.java
git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/FlywayMigrationIT.java
git commit -m "test: add TimescaleDB test base class and Flyway migration smoke test"
```
### Task 4: Update application.yml for PostgreSQL + OpenSearch
**Files:**
- Modify: `cameleer-server-app/src/main/resources/application.yml`
- Modify: `cameleer-server-app/src/test/resources/application-test.yml`
- [ ] **Step 1: Update application.yml datasource section**
Replace:
```yaml
spring:
datasource:
url: jdbc:ch://localhost:8123/cameleer
username: cameleer
password: cameleer_dev
driver-class-name: com.clickhouse.jdbc.ClickHouseDriver
```
With:
```yaml
spring:
datasource:
url: jdbc:postgresql://localhost:5432/cameleer
username: cameleer
password: ${CAMELEER_DB_PASSWORD:cameleer_dev}
driver-class-name: org.postgresql.Driver
flyway:
enabled: true
locations: classpath:db/migration
```
Add OpenSearch config section:
```yaml
opensearch:
url: ${OPENSEARCH_URL:http://localhost:9200}
queue-size: ${CAMELEER_OPENSEARCH_QUEUE_SIZE:10000}
debounce-ms: ${CAMELEER_OPENSEARCH_DEBOUNCE_MS:2000}
```
Add body size limit:
```yaml
cameleer:
body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384}
retention-days: ${CAMELEER_RETENTION_DAYS:30}
```
Remove the `clickhouse:` section.
- [ ] **Step 2: Update application-test.yml**
```yaml
spring:
flyway:
enabled: true
opensearch:
url: http://localhost:9200
```
- [ ] **Step 3: Commit**
```bash
git add cameleer-server-app/src/main/resources/application.yml
git add cameleer-server-app/src/test/resources/application-test.yml
git commit -m "config: switch datasource to PostgreSQL, add OpenSearch and Flyway config"
```
---
## Chunk 2: Core Module Interfaces and Models
### Task 5: Create new storage interfaces in core module
**Files:**
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/ExecutionStore.java`
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/StatsStore.java`
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/SearchIndex.java`
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/DiagramStore.java`
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/MetricsStore.java`
- [ ] **Step 1: Create ExecutionStore interface**
```java
package com.cameleer.server.core.storage;
import com.cameleer.server.core.detail.ProcessorNode;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
public interface ExecutionStore {
void upsert(ExecutionRecord execution);
void upsertProcessors(String executionId, Instant startTime,
String groupName, String routeId,
List processors);
Optional findById(String executionId);
List findProcessors(String executionId);
record ExecutionRecord(
String executionId, String routeId, String agentId, String groupName,
String status, String correlationId, String exchangeId,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace, String diagramContentHash
) {}
record ProcessorRecord(
String executionId, String processorId, String processorType,
String diagramNodeId, String groupName, String routeId,
int depth, String parentProcessorId, String status,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace,
String inputBody, String outputBody, String inputHeaders, String outputHeaders
) {}
}
```
- [ ] **Step 2: Create StatsStore interface**
Supports all 4 aggregation levels: global, per-app, per-route, per-processor.
```java
package com.cameleer.server.core.storage;
import com.cameleer.server.core.search.ExecutionStats;
import com.cameleer.server.core.search.StatsTimeseries;
import java.time.Instant;
import java.util.List;
public interface StatsStore {
// Global stats (stats_1m_all)
ExecutionStats stats(Instant from, Instant to);
// Per-app stats (stats_1m_app)
ExecutionStats statsForApp(Instant from, Instant to, String groupName);
// Per-route stats (stats_1m_route), optionally scoped to specific agents
ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds);
// Per-processor stats (stats_1m_processor)
ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType);
// Global timeseries
StatsTimeseries timeseries(Instant from, Instant to, int bucketCount);
// Per-app timeseries
StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName);
// Per-route timeseries, optionally scoped to specific agents
StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
String routeId, List agentIds);
// Per-processor timeseries
StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
String routeId, String processorType);
}
```
- [ ] **Step 3: Create SearchIndex interface**
```java
package com.cameleer.server.core.storage;
import com.cameleer.server.core.search.ExecutionSummary;
import com.cameleer.server.core.search.SearchRequest;
import com.cameleer.server.core.search.SearchResult;
import com.cameleer.server.core.storage.model.ExecutionDocument;
public interface SearchIndex {
SearchResult search(SearchRequest request);
long count(SearchRequest request);
void index(ExecutionDocument document);
void delete(String executionId);
}
```
- [ ] **Step 4: Create DiagramStore interface**
```java
package com.cameleer.server.core.storage;
import com.cameleer.common.graph.RouteGraph;
import com.cameleer.server.core.ingestion.TaggedDiagram;
import java.util.List;
import java.util.Optional;
public interface DiagramStore {
void store(TaggedDiagram diagram);
Optional findByContentHash(String contentHash);
Optional findContentHashForRoute(String routeId, String agentId);
Optional findContentHashForRouteByAgents(String routeId, List agentIds);
}
```
- [ ] **Step 5: Create MetricsStore interface**
```java
package com.cameleer.server.core.storage;
import com.cameleer.server.core.storage.model.MetricsSnapshot;
import java.util.List;
public interface MetricsStore {
void insertBatch(List snapshots);
}
```
- [ ] **Step 6: Commit**
```bash
git add cameleer-server-core/src/main/java/com/cameleer/server/core/storage/
git commit -m "feat: add new storage interfaces for PostgreSQL/OpenSearch backends"
```
### Task 6: Create ExecutionDocument model and indexing event
**Files:**
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionDocument.java`
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/indexing/ExecutionUpdatedEvent.java`
- [ ] **Step 1: Create ExecutionDocument**
```java
package com.cameleer.server.core.storage.model;
import java.time.Instant;
import java.util.List;
public record ExecutionDocument(
String executionId, String routeId, String agentId, String groupName,
String status, String correlationId, String exchangeId,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace,
List processors
) {
public record ProcessorDoc(
String processorId, String processorType, String status,
String errorMessage, String errorStacktrace,
String inputBody, String outputBody,
String inputHeaders, String outputHeaders
) {}
}
```
- [ ] **Step 2: Create ExecutionUpdatedEvent**
```java
package com.cameleer.server.core.indexing;
import java.time.Instant;
public record ExecutionUpdatedEvent(String executionId, Instant startTime) {}
```
- [ ] **Step 3: Commit**
```bash
git add cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionDocument.java
git add cameleer-server-core/src/main/java/com/cameleer/server/core/indexing/
git commit -m "feat: add ExecutionDocument model and ExecutionUpdatedEvent"
```
### Task 7: Update SearchService to use StatsStore for stats/timeseries
**Files:**
- Modify: `cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchService.java`
- [ ] **Step 1: Refactor SearchService to accept SearchIndex + StatsStore**
Replace the single `SearchEngine` dependency with two dependencies:
```java
package com.cameleer.server.core.search;
import com.cameleer.server.core.storage.SearchIndex;
import com.cameleer.server.core.storage.StatsStore;
import java.time.Instant;
import java.util.List;
public class SearchService {
private final SearchIndex searchIndex;
private final StatsStore statsStore;
public SearchService(SearchIndex searchIndex, StatsStore statsStore) {
this.searchIndex = searchIndex;
this.statsStore = statsStore;
}
public SearchResult search(SearchRequest request) {
return searchIndex.search(request);
}
public long count(SearchRequest request) {
return searchIndex.count(request);
}
public ExecutionStats stats(Instant from, Instant to) {
return statsStore.stats(from, to);
}
public ExecutionStats stats(Instant from, Instant to, String routeId, List agentIds) {
return statsStore.statsForRoute(from, to, routeId, agentIds);
}
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
return statsStore.timeseries(from, to, bucketCount);
}
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount,
String routeId, List agentIds) {
return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, agentIds);
}
}
```
- [ ] **Step 2: Commit**
```bash
git add cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchService.java
git commit -m "refactor: SearchService uses SearchIndex + StatsStore instead of SearchEngine"
```
### Task 8: Update DetailService to use ExecutionStore
**Files:**
- Modify: `cameleer-server-core/src/main/java/com/cameleer/server/core/detail/DetailService.java`
- [ ] **Step 1: Rewrite DetailService to use ExecutionStore**
The tree reconstruction from parallel arrays is no longer needed. Processors are now individual records with `parentProcessorId` for tree structure.
```java
package com.cameleer.server.core.detail;
import com.cameleer.server.core.storage.ExecutionStore;
import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord;
import java.util.*;
public class DetailService {
private final ExecutionStore executionStore;
public DetailService(ExecutionStore executionStore) {
this.executionStore = executionStore;
}
public Optional getDetail(String executionId) {
return executionStore.findById(executionId)
.map(exec -> {
List processors = executionStore.findProcessors(executionId);
List roots = buildTree(processors);
return new ExecutionDetail(
exec.executionId(), exec.routeId(), exec.agentId(),
exec.status(), exec.startTime(), exec.endTime(),
exec.durationMs() != null ? exec.durationMs() : 0L,
exec.correlationId(), exec.exchangeId(),
exec.errorMessage(), exec.errorStacktrace(),
exec.diagramContentHash(), roots
);
});
}
List buildTree(List processors) {
if (processors.isEmpty()) return List.of();
Map nodeMap = new LinkedHashMap<>();
for (ProcessorRecord p : processors) {
nodeMap.put(p.processorId(), new ProcessorNode(
p.processorId(), p.processorType(), p.status(),
p.startTime(), p.endTime(),
p.durationMs() != null ? p.durationMs() : 0L,
p.diagramNodeId(), p.errorMessage(), p.errorStacktrace()
));
}
List roots = new ArrayList<>();
for (ProcessorRecord p : processors) {
ProcessorNode node = nodeMap.get(p.processorId());
if (p.parentProcessorId() == null) {
roots.add(node);
} else {
ProcessorNode parent = nodeMap.get(p.parentProcessorId());
if (parent != null) {
parent.addChild(node);
} else {
roots.add(node); // orphan safety
}
}
}
return roots;
}
}
```
- [ ] **Step 2: Commit**
```bash
git add cameleer-server-core/src/main/java/com/cameleer/server/core/detail/DetailService.java
git commit -m "refactor: DetailService uses ExecutionStore, tree built from parentProcessorId"
```
### Task 9: Update IngestionService for synchronous writes
**Files:**
- Modify: `cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java`
- [ ] **Step 1: Rewrite IngestionService**
Executions and diagrams become synchronous writes. Metrics keep the write buffer. Add event publishing for OpenSearch indexing.
```java
package com.cameleer.server.core.ingestion;
import com.cameleer.common.model.ProcessorExecution;
import com.cameleer.common.model.RouteExecution;
import com.cameleer.server.core.indexing.ExecutionUpdatedEvent;
import com.cameleer.server.core.storage.DiagramStore;
import com.cameleer.server.core.storage.ExecutionStore;
import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord;
import com.cameleer.server.core.storage.model.MetricsSnapshot;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
public class IngestionService {
private final ExecutionStore executionStore;
private final DiagramStore diagramStore;
private final WriteBuffer metricsBuffer;
private final Consumer eventPublisher;
private final int bodySizeLimit;
public IngestionService(ExecutionStore executionStore,
DiagramStore diagramStore,
WriteBuffer metricsBuffer,
Consumer eventPublisher,
int bodySizeLimit) {
this.executionStore = executionStore;
this.diagramStore = diagramStore;
this.metricsBuffer = metricsBuffer;
this.eventPublisher = eventPublisher;
this.bodySizeLimit = bodySizeLimit;
}
public void ingestExecution(String agentId, String groupName, RouteExecution execution) {
ExecutionRecord record = toExecutionRecord(agentId, groupName, execution);
executionStore.upsert(record);
if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) {
List processors = flattenProcessors(
execution.getProcessors(), record.executionId(),
record.startTime(), groupName, execution.getRouteId(),
null, 0);
executionStore.upsertProcessors(
record.executionId(), record.startTime(),
groupName, execution.getRouteId(), processors);
}
eventPublisher.accept(new ExecutionUpdatedEvent(
record.executionId(), record.startTime()));
}
public void ingestDiagram(TaggedDiagram diagram) {
diagramStore.store(diagram);
}
public boolean acceptMetrics(List metrics) {
return metricsBuffer.offerBatch(metrics);
}
public int getMetricsBufferDepth() {
return metricsBuffer.size();
}
public WriteBuffer getMetricsBuffer() {
return metricsBuffer;
}
private ExecutionRecord toExecutionRecord(String agentId, String groupName,
RouteExecution exec) {
return new ExecutionRecord(
exec.getExecutionId(), exec.getRouteId(), agentId, groupName,
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
exec.getCorrelationId(), exec.getExchangeId(),
exec.getStartTime(), exec.getEndTime(),
exec.getDurationMs(),
exec.getErrorMessage(), exec.getErrorStacktrace(),
null // diagramContentHash set separately
);
}
private List flattenProcessors(
List processors, String executionId,
java.time.Instant execStartTime, String groupName, String routeId,
String parentProcessorId, int depth) {
List flat = new ArrayList<>();
for (ProcessorExecution p : processors) {
flat.add(new ProcessorRecord(
executionId, p.getProcessorId(), p.getProcessorType(),
p.getDiagramNodeId(), groupName, routeId,
depth, parentProcessorId,
p.getStatus() != null ? p.getStatus().name() : "RUNNING",
p.getStartTime() != null ? p.getStartTime() : execStartTime,
p.getEndTime(),
p.getDurationMs(),
p.getErrorMessage(), p.getErrorStacktrace(),
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
p.getInputHeaders() != null ? p.getInputHeaders().toString() : null,
p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null
));
if (p.getChildren() != null) {
flat.addAll(flattenProcessors(
p.getChildren(), executionId, execStartTime,
groupName, routeId, p.getProcessorId(), depth + 1));
}
}
return flat;
}
private String truncateBody(String body) {
if (body == null) return null;
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
return body;
}
}
```
- [ ] **Step 2: Commit**
```bash
git add cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java
git commit -m "refactor: IngestionService uses synchronous ExecutionStore writes with event publishing"
```
---
## Chunk 3: PostgreSQL Store Implementations
### Task 10: Implement PostgresExecutionStore
**Files:**
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresExecutionStore.java`
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/PostgresExecutionStoreIT.java`
- [ ] **Step 1: Write the failing test**
```java
package com.cameleer.server.app.storage;
import com.cameleer.server.app.AbstractPostgresIT;
import com.cameleer.server.core.storage.ExecutionStore;
import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.*;
class PostgresExecutionStoreIT extends AbstractPostgresIT {
@Autowired
ExecutionStore executionStore;
@Test
void upsertAndFindById() {
Instant now = Instant.now();
ExecutionRecord record = new ExecutionRecord(
"exec-1", "route-a", "agent-1", "app-1",
"COMPLETED", "corr-1", "exchange-1",
now, now.plusMillis(100), 100L,
null, null, null);
executionStore.upsert(record);
Optional found = executionStore.findById("exec-1");
assertTrue(found.isPresent());
assertEquals("exec-1", found.get().executionId());
assertEquals("COMPLETED", found.get().status());
}
@Test
void upsertDeduplicatesByExecutionId() {
Instant now = Instant.now();
ExecutionRecord first = new ExecutionRecord(
"exec-dup", "route-a", "agent-1", "app-1",
"RUNNING", null, null, now, null, null, null, null, null);
ExecutionRecord second = new ExecutionRecord(
"exec-dup", "route-a", "agent-1", "app-1",
"COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null);
executionStore.upsert(first);
executionStore.upsert(second);
Optional found = executionStore.findById("exec-dup");
assertTrue(found.isPresent());
assertEquals("COMPLETED", found.get().status());
assertEquals(200L, found.get().durationMs());
}
@Test
void upsertProcessorsAndFind() {
Instant now = Instant.now();
ExecutionRecord exec = new ExecutionRecord(
"exec-proc", "route-a", "agent-1", "app-1",
"COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null);
executionStore.upsert(exec);
List processors = List.of(
new ProcessorRecord("exec-proc", "proc-1", "log", null,
"app-1", "route-a", 0, null, "COMPLETED",
now, now.plusMillis(10), 10L, null, null,
"input body", "output body", null, null),
new ProcessorRecord("exec-proc", "proc-2", "to", null,
"app-1", "route-a", 1, "proc-1", "COMPLETED",
now.plusMillis(10), now.plusMillis(30), 20L, null, null,
null, null, null, null)
);
executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors);
List found = executionStore.findProcessors("exec-proc");
assertEquals(2, found.size());
assertEquals("proc-1", found.get(0).processorId());
assertEquals("proc-2", found.get(1).processorId());
assertEquals("proc-1", found.get(1).parentProcessorId());
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `mvn test -pl cameleer-server-app -Dtest=PostgresExecutionStoreIT -q`
Expected: FAIL — `ExecutionStore` bean not found
- [ ] **Step 3: Implement PostgresExecutionStore**
```java
package com.cameleer.server.app.storage;
import com.cameleer.server.core.storage.ExecutionStore;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
@Repository
public class PostgresExecutionStore implements ExecutionStore {
private final JdbcTemplate jdbc;
public PostgresExecutionStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public void upsert(ExecutionRecord execution) {
jdbc.update("""
INSERT INTO executions (execution_id, route_id, agent_id, group_name,
status, correlation_id, exchange_id, start_time, end_time,
duration_ms, error_message, error_stacktrace, diagram_content_hash,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())
ON CONFLICT (execution_id, start_time) DO UPDATE SET
status = CASE
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
AND executions.status = 'RUNNING'
THEN EXCLUDED.status
WHEN EXCLUDED.status = executions.status THEN executions.status
ELSE EXCLUDED.status
END,
end_time = COALESCE(EXCLUDED.end_time, executions.end_time),
duration_ms = COALESCE(EXCLUDED.duration_ms, executions.duration_ms),
error_message = COALESCE(EXCLUDED.error_message, executions.error_message),
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace),
diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash),
updated_at = now()
""",
execution.executionId(), execution.routeId(), execution.agentId(),
execution.groupName(), execution.status(), execution.correlationId(),
execution.exchangeId(),
Timestamp.from(execution.startTime()),
execution.endTime() != null ? Timestamp.from(execution.endTime()) : null,
execution.durationMs(), execution.errorMessage(),
execution.errorStacktrace(), execution.diagramContentHash());
}
@Override
public void upsertProcessors(String executionId, Instant startTime,
String groupName, String routeId,
List processors) {
jdbc.batchUpdate("""
INSERT INTO processor_executions (execution_id, processor_id, processor_type,
diagram_node_id, group_name, route_id, depth, parent_processor_id,
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
input_body, output_body, input_headers, output_headers)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb)
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
status = EXCLUDED.status,
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
duration_ms = COALESCE(EXCLUDED.duration_ms, processor_executions.duration_ms),
error_message = COALESCE(EXCLUDED.error_message, processor_executions.error_message),
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, processor_executions.error_stacktrace),
input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body),
output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body),
input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers),
output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers)
""",
processors.stream().map(p -> new Object[]{
p.executionId(), p.processorId(), p.processorType(),
p.diagramNodeId(), p.groupName(), p.routeId(),
p.depth(), p.parentProcessorId(), p.status(),
Timestamp.from(p.startTime()),
p.endTime() != null ? Timestamp.from(p.endTime()) : null,
p.durationMs(), p.errorMessage(), p.errorStacktrace(),
p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders()
}).toList());
}
@Override
public Optional findById(String executionId) {
List results = jdbc.query(
"SELECT * FROM executions WHERE execution_id = ? ORDER BY start_time DESC LIMIT 1",
EXECUTION_MAPPER, executionId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public List findProcessors(String executionId) {
return jdbc.query(
"SELECT * FROM processor_executions WHERE execution_id = ? ORDER BY depth, start_time",
PROCESSOR_MAPPER, executionId);
}
private static final RowMapper EXECUTION_MAPPER = (rs, rowNum) ->
new ExecutionRecord(
rs.getString("execution_id"), rs.getString("route_id"),
rs.getString("agent_id"), rs.getString("group_name"),
rs.getString("status"), rs.getString("correlation_id"),
rs.getString("exchange_id"),
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
rs.getString("error_message"), rs.getString("error_stacktrace"),
rs.getString("diagram_content_hash"));
private static final RowMapper PROCESSOR_MAPPER = (rs, rowNum) ->
new ProcessorRecord(
rs.getString("execution_id"), rs.getString("processor_id"),
rs.getString("processor_type"), rs.getString("diagram_node_id"),
rs.getString("group_name"), rs.getString("route_id"),
rs.getInt("depth"), rs.getString("parent_processor_id"),
rs.getString("status"),
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
rs.getString("error_message"), rs.getString("error_stacktrace"),
rs.getString("input_body"), rs.getString("output_body"),
rs.getString("input_headers"), rs.getString("output_headers"));
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
Timestamp ts = rs.getTimestamp(column);
return ts != null ? ts.toInstant() : null;
}
}
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `mvn test -pl cameleer-server-app -Dtest=PostgresExecutionStoreIT -q`
Expected: PASS — all 3 tests green
- [ ] **Step 5: Commit**
```bash
git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresExecutionStore.java
git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/PostgresExecutionStoreIT.java
git commit -m "feat: implement PostgresExecutionStore with upsert and dedup"
```
### Task 11: Implement PostgresStatsStore
**Files:**
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresStatsStore.java`
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/PostgresStatsStoreIT.java`
- [ ] **Step 1: Write the failing test**
```java
package com.cameleer.server.app.storage;
import com.cameleer.server.app.AbstractPostgresIT;
import com.cameleer.server.core.search.ExecutionStats;
import com.cameleer.server.core.search.StatsTimeseries;
import com.cameleer.server.core.storage.ExecutionStore;
import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer.server.core.storage.StatsStore;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import static org.junit.jupiter.api.Assertions.*;
class PostgresStatsStoreIT extends AbstractPostgresIT {
@Autowired StatsStore statsStore;
@Autowired ExecutionStore executionStore;
@Autowired JdbcTemplate jdbc;
@Test
void statsReturnsCountsForTimeWindow() {
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
insertExecution("stats-1", "route-a", "app-1", "COMPLETED", now, 100L);
insertExecution("stats-2", "route-a", "app-1", "FAILED", now.plusSeconds(10), 200L);
insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L);
// Force continuous aggregate refresh
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)");
ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60));
assertEquals(3, stats.totalCount());
assertEquals(1, stats.failedCount());
}
@Test
void timeseriesReturnsBuckets() {
Instant now = Instant.now().truncatedTo(ChronoUnit.MINUTES);
for (int i = 0; i < 10; i++) {
insertExecution("ts-" + i, "route-a", "app-1", "COMPLETED",
now.plusSeconds(i * 30), 100L + i);
}
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)");
StatsTimeseries ts = statsStore.timeseries(now.minusMinutes(1), now.plusMinutes(10), 5);
assertNotNull(ts);
assertFalse(ts.buckets().isEmpty());
}
private void insertExecution(String id, String routeId, String groupName,
String status, Instant startTime, long durationMs) {
executionStore.upsert(new ExecutionRecord(
id, routeId, "agent-1", groupName, status, null, null,
startTime, startTime.plusMillis(durationMs), durationMs,
status.equals("FAILED") ? "error" : null, null, null));
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `mvn test -pl cameleer-server-app -Dtest=PostgresStatsStoreIT -q`
Expected: FAIL — `StatsStore` bean not found
- [ ] **Step 3: Implement PostgresStatsStore**
```java
package com.cameleer.server.app.storage;
import com.cameleer.server.core.search.ExecutionStats;
import com.cameleer.server.core.search.StatsTimeseries;
import com.cameleer.server.core.search.StatsTimeseries.TimeseriesBucket;
import com.cameleer.server.core.storage.StatsStore;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
@Repository
public class PostgresStatsStore implements StatsStore {
private final JdbcTemplate jdbc;
public PostgresStatsStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public ExecutionStats stats(Instant from, Instant to) {
return queryStats("stats_1m_all", from, to, List.of());
}
@Override
public ExecutionStats statsForApp(Instant from, Instant to, String groupName) {
return queryStats("stats_1m_app", from, to, List.of(
new Filter("group_name", groupName)));
}
@Override
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List agentIds) {
// Note: agentIds is accepted for interface compatibility but not filterable
// on the continuous aggregate (it groups by route_id, not agent_id).
// All agents for the same route contribute to the same aggregate.
return queryStats("stats_1m_route", from, to, List.of(
new Filter("route_id", routeId)));
}
@Override
public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) {
return queryStats("stats_1m_processor", from, to, List.of(
new Filter("route_id", routeId),
new Filter("processor_type", processorType)));
}
@Override
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true);
}
@Override
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName) {
return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
new Filter("group_name", groupName)), true);
}
@Override
public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
String routeId, List agentIds) {
return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of(
new Filter("route_id", routeId)), true);
}
@Override
public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
String routeId, String processorType) {
// stats_1m_processor does NOT have running_count column
return queryTimeseries("stats_1m_processor", from, to, bucketCount, List.of(
new Filter("route_id", routeId),
new Filter("processor_type", processorType)), false);
}
private record Filter(String column, String value) {}
private ExecutionStats queryStats(String view, Instant from, Instant to, List filters) {
// running_count only exists on execution-level aggregates, not processor
boolean hasRunning = !view.equals("stats_1m_processor");
String runningCol = hasRunning ? "COALESCE(SUM(running_count), 0)" : "0";
String sql = "SELECT COALESCE(SUM(total_count), 0) AS total_count, " +
"COALESCE(SUM(failed_count), 0) AS failed_count, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
"COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
runningCol + " AS active_count " +
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
List