Files
cameleer-server/docs/superpowers/specs/2026-03-16-storage-layer-design.md

587 lines
23 KiB
Markdown
Raw Normal View History

# Storage Layer Refactor: PostgreSQL + TimescaleDB + OpenSearch
**Date:** 2026-03-16
**Status:** Draft
**Scope:** Replace ClickHouse with PostgreSQL/TimescaleDB (source of truth, analytics) and OpenSearch (full-text search index)
## Motivation
The current all-in-ClickHouse storage layer has structural problems at scale:
- **OOM on batch inserts**: Wide rows with parallel arrays of variable-length blobs (exchange bodies, headers, stacktraces) exhaust ClickHouse server memory during batch insert processing.
- **CRUD misfit**: Users, OIDC config, and diagrams use ReplacingMergeTree, requiring `FINAL` on every read and workarounds to prevent version row accumulation.
- **Weak full-text search**: LIKE patterns with tokenbf_v1 skip indexes provide no ranking, stemming, fuzzy matching, or infix wildcard support.
- **Rigid data model**: Parallel arrays for processor executions prevent chunked/streaming ingestion and require ~100 lines of array type conversion workarounds.
## Requirements
- Hundreds of applications, thousands of routes, billions of records
- Statistics at four levels: all, application, route, processor
- Arbitrary time bucket sizes for statistics queries
- Full-text wildcard (infix) search across all fields
- P99 response time < 2000ms
- Support chunked/streaming execution ingestion (partial updates for long-running routes)
- Idempotent inserts (deduplication on execution_id)
- All software must be free (Apache 2.0, MIT, BSD, PostgreSQL License)
- Deployment target: Kubernetes (k3s)
- Data expired per day by dropping partitions/indexes — no row-level deletes
## Architecture
Two backends:
1. **PostgreSQL + TimescaleDB** — source of truth for all data, analytics via continuous aggregates
2. **OpenSearch** — asynchronous search index for full-text and wildcard queries
OpenSearch is a derived index, not a source of truth. If it goes down, writes and detail views continue via PostgreSQL. If an index corrupts, it is rebuilt from PostgreSQL.
### Data Flow
```
Agent POST -> Server
|-- ExecutionStore.upsert() -> PostgreSQL (immediate)
|-- ExecutionStore.upsertProcessors() -> PostgreSQL (immediate)
|-- DiagramStore.store() -> PostgreSQL (immediate, idempotent)
'-- publish(ExecutionUpdatedEvent) -> in-process event queue
MetricsController -> WriteBuffer -> MetricsFlushScheduler -> MetricsStore (batched)
SearchIndexer (event listener)
|-- debounce by execution_id (1-2s window)
|-- read consolidated state from ExecutionStore
'-- SearchIndex.index(document) -> OpenSearch
Stats query -> PostgreSQL continuous aggregate (sub-100ms)
Wildcard search -> OpenSearch -> execution_ids -> PostgreSQL for detail
Detail lookup -> PostgreSQL by execution_id
CRUD -> PostgreSQL
```
## PostgreSQL Data Model
### Hypertables (TimescaleDB, partitioned by day)
#### `executions`
| Column | Type | Notes |
|---|---|---|
| execution_id | TEXT | NOT NULL, natural key from agent (dedup) |
| route_id | TEXT | NOT NULL |
| agent_id | TEXT | NOT NULL |
| group_name | TEXT | NOT NULL, application name |
| status | TEXT | NOT NULL: RUNNING, COMPLETED, FAILED |
| correlation_id | TEXT | |
| exchange_id | TEXT | |
| start_time | TIMESTAMPTZ | NOT NULL, hypertable partition key |
| end_time | TIMESTAMPTZ | |
| duration_ms | BIGINT | |
| error_message | TEXT | |
| error_stacktrace | TEXT | |
| diagram_content_hash | TEXT | |
| created_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
| updated_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
- Hypertable chunk interval: 1 day
- Primary key: `(execution_id, start_time)` — TimescaleDB requires the partition column in unique constraints
- `ON CONFLICT (execution_id, start_time) DO UPDATE` for dedup and status progression (RUNNING -> COMPLETED/FAILED guard: only update if new status supersedes old)
- Indexes: `(agent_id, start_time)`, `(route_id, start_time)`, `(group_name, start_time)`, `(correlation_id)`
#### `processor_executions`
| Column | Type | Notes |
|---|---|---|
| id | BIGSERIAL | |
| execution_id | TEXT | NOT NULL, FK to executions |
| processor_id | TEXT | NOT NULL |
| processor_type | TEXT | NOT NULL |
| diagram_node_id | TEXT | |
| group_name | TEXT | NOT NULL, denormalized from execution |
| route_id | TEXT | NOT NULL, denormalized from execution |
| depth | INT | NOT NULL |
| parent_processor_id | TEXT | Self-reference for tree structure |
| status | TEXT | NOT NULL |
| start_time | TIMESTAMPTZ | NOT NULL, hypertable partition key |
| end_time | TIMESTAMPTZ | |
| duration_ms | BIGINT | |
| error_message | TEXT | |
| error_stacktrace | TEXT | |
| input_body | TEXT | Size-limited at ingestion (configurable) |
| output_body | TEXT | Size-limited at ingestion (configurable) |
| input_headers | JSONB | |
| output_headers | JSONB | |
| created_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
- Hypertable chunk interval: 1 day
- Unique constraint: `(execution_id, processor_id, start_time)` — TimescaleDB requires partition column in unique constraints
- `ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE` for re-sent processors
- `group_name` and `route_id` denormalized from the parent execution (immutable per execution, set at ingestion) to enable JOIN-free continuous aggregates
- Indexes: `(execution_id)`, `(processor_type, start_time)`
#### `agent_metrics`
| Column | Type | Notes |
|---|---|---|
| agent_id | TEXT | NOT NULL |
| metric_name | TEXT | NOT NULL |
| metric_value | DOUBLE PRECISION | NOT NULL |
| tags | JSONB | Arbitrary key-value pairs |
| collected_at | TIMESTAMPTZ | NOT NULL, hypertable partition key |
| server_received_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
- Hypertable chunk interval: 1 day
- Buffered ingestion (write buffer retained for metrics)
### Regular tables
#### `route_diagrams`
| Column | Type | Notes |
|---|---|---|
| content_hash | TEXT | PK, SHA-256 of definition |
| route_id | TEXT | NOT NULL |
| agent_id | TEXT | NOT NULL |
| definition | TEXT | JSON of RouteGraph |
| created_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
- `ON CONFLICT (content_hash) DO NOTHING` for content-addressed dedup
#### `users`
| Column | Type | Notes |
|---|---|---|
| user_id | TEXT | PK |
| provider | TEXT | NOT NULL |
| email | TEXT | |
| display_name | TEXT | |
| roles | TEXT[] | AGENT, VIEWER, OPERATOR, ADMIN |
| created_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
| updated_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
#### `oidc_config`
| Column | Type | Notes |
|---|---|---|
| config_id | TEXT | PK, always 'default' |
| enabled | BOOLEAN | NOT NULL |
| issuer_uri | TEXT | |
| client_id | TEXT | |
| client_secret | TEXT | |
| roles_claim | TEXT | |
| default_roles | TEXT[] | |
| auto_signup | BOOLEAN | |
| display_name_claim | TEXT | |
| updated_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
### Schema migrations
Flyway replaces the current custom `ClickHouseSchemaInitializer`. Versioned migrations in `src/main/resources/db/migration/`.
### Required extensions
The following PostgreSQL extensions must be created in the initial Flyway migration:
- `timescaledb` — hypertables, continuous aggregates, chunk management
- `timescaledb_toolkit``percentile_agg()` and `approx_percentile()` for P99 calculations
The TimescaleDB Docker image (`timescale/timescaledb`) includes both extensions. The Flyway V1 migration must `CREATE EXTENSION IF NOT EXISTS` for both before creating hypertables.
## Continuous Aggregates (Statistics)
Four continuous aggregates at 1-minute resolution, one per aggregation level.
### `stats_1m_all` — global
```sql
CREATE MATERIALIZED VIEW stats_1m_all
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket;
```
### `stats_1m_app` — per application
Group by: `bucket, group_name`
### `stats_1m_route` — per route
Group by: `bucket, group_name, route_id`
### `stats_1m_processor` — per processor type within a route
```sql
-- No JOIN needed: group_name and route_id are denormalized on processor_executions
CREATE MATERIALIZED VIEW stats_1m_processor
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
group_name,
route_id,
processor_type,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM processor_executions
GROUP BY bucket, group_name, route_id, processor_type;
```
Note: TimescaleDB continuous aggregates only support single-hypertable queries (no JOINs). This is why `group_name` and `route_id` are denormalized onto `processor_executions`.
### Query pattern for arbitrary buckets
```sql
SELECT time_bucket('30 minutes', bucket) AS period,
SUM(total_count) AS total_count,
SUM(failed_count) AS failed_count,
SUM(duration_sum) / NULLIF(SUM(total_count), 0) AS avg_duration
FROM stats_1m_route
WHERE route_id = ? AND bucket >= now() - interval '24 hours'
GROUP BY period
ORDER BY period;
```
### Refresh policy
```sql
SELECT add_continuous_aggregate_policy('stats_1m_all',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
```
### Retention
- Raw data (`executions`, `processor_executions`): configurable, default 30 days
- 1-minute rollups: configurable, default 90 days
- All retention via `drop_chunks()` — drops entire daily partitions
## OpenSearch Index
### Index template
Index pattern: `executions-YYYY-MM-DD` (daily rollover, aligned with PostgreSQL daily partitions for synchronized expiry).
```json
{
"index_patterns": ["executions-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"ngram_analyzer": {
"type": "custom",
"tokenizer": "ngram_tokenizer",
"filter": ["lowercase"]
}
},
"tokenizer": {
"ngram_tokenizer": {
"type": "ngram",
"min_gram": 3,
"max_gram": 4,
"token_chars": ["letter", "digit", "punctuation", "symbol"]
}
}
}
},
"mappings": {
"properties": {
"execution_id": { "type": "keyword" },
"route_id": { "type": "keyword" },
"agent_id": { "type": "keyword" },
"group_name": { "type": "keyword" },
"status": { "type": "keyword" },
"correlation_id": { "type": "keyword" },
"exchange_id": { "type": "keyword" },
"start_time": { "type": "date" },
"end_time": { "type": "date" },
"duration_ms": { "type": "long" },
"error_message": {
"type": "text",
"analyzer": "standard",
"fields": {
"ngram": { "type": "text", "analyzer": "ngram_analyzer" }
}
},
"error_stacktrace": {
"type": "text",
"analyzer": "standard",
"fields": {
"ngram": { "type": "text", "analyzer": "ngram_analyzer" }
}
},
"processors": {
"type": "nested",
"properties": {
"processor_id": { "type": "keyword" },
"processor_type": { "type": "keyword" },
"status": { "type": "keyword" },
"error_message": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } },
"error_stacktrace": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } },
"input_body": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } },
"output_body": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } },
"input_headers": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } },
"output_headers": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } }
}
}
}
}
}
}
```
### Indexing strategy
- Asynchronous, event-driven from PostgreSQL writes
- Debounced by `execution_id` with a 1-2 second window to batch rapid processor updates
- Document ID = `execution_id` (OpenSearch replaces existing document on re-index)
- Daily indexes: `executions-2026-03-16`, `executions-2026-03-17`, etc.
- Index selected by `start_time` of the execution
### Search strategy
- Unified search box: `multi_match` across all text fields + nested processor fields
- Infix wildcard: ngram sub-fields (3-4 character grams) for fast substring matching
- Keyword filters: `term` queries on status, route_id, agent_id, group_name, correlation_id
- Time range: `range` query on start_time
- Duration range: `range` query on duration_ms
- Pagination: `search_after` (no OFFSET scaling problem)
- Returns: execution_ids with highlights, detail fetched from PostgreSQL
### Index lifecycle
- Daily rollover aligned with PostgreSQL chunk intervals
- ILM policy: hot (current day) -> delete (after N days, matching PostgreSQL retention)
- Index deletion is instant — no row-level deletes
## Java Interfaces
### Core module interfaces
```java
public interface ExecutionStore {
void upsert(Execution execution);
void upsertProcessors(String executionId, List<ProcessorExecution> processors);
Optional<Execution> findById(String executionId);
List<ProcessorExecution> findProcessors(String executionId);
}
public interface StatsStore {
Stats getStats(StatsRequest request);
List<TimeBucket> getTimeSeries(TimeSeriesRequest request);
}
public interface SearchIndex {
SearchResult search(SearchRequest request);
long count(SearchRequest request);
void index(ExecutionDocument document);
void delete(String executionId);
}
// Note: stats() and timeseries() methods move from the old SearchEngine interface
// to StatsStore, since they now query PostgreSQL continuous aggregates instead of
// ClickHouse. Callers (SearchService, controllers) must be updated to use StatsStore.
public interface DiagramStore {
void store(String contentHash, String routeId, String agentId, String definition);
Optional<String> findByContentHash(String contentHash);
Optional<String> findContentHashForRoute(String routeId, String agentId);
Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds);
}
public interface UserStore {
Optional<UserInfo> findById(String userId);
List<UserInfo> findAll();
void upsert(UserInfo user);
void updateRoles(String userId, List<String> roles);
void delete(String userId);
}
public interface OidcConfigStore {
Optional<OidcConfig> find();
void save(OidcConfig config);
void delete();
}
public interface MetricsStore {
void insertBatch(List<MetricsSnapshot> snapshots);
}
```
### App module implementations
| Interface | Implementation | Backend |
|---|---|---|
| ExecutionStore | PostgresExecutionStore | PostgreSQL/TimescaleDB |
| StatsStore | PostgresStatsStore | PostgreSQL continuous aggregates |
| SearchIndex | OpenSearchIndex | OpenSearch |
| DiagramStore | PostgresDiagramStore | PostgreSQL |
| UserStore | PostgresUserStore | PostgreSQL |
| OidcConfigStore | PostgresOidcConfigStore | PostgreSQL |
| MetricsStore | PostgresMetricsStore | PostgreSQL/TimescaleDB |
### Ingestion flow
```
ExecutionController (HTTP POST)
|-- IngestionService.acceptExecution(tagged)
|-- ExecutionStore.upsert(execution)
|-- ExecutionStore.upsertProcessors(executionId, processors)
|-- eventPublisher.publish(new ExecutionUpdatedEvent(executionId))
'-- return success/backpressure
SearchIndexer (Spring @EventListener or in-process queue)
|-- debounce by execution_id (configurable, default 1-2s)
|-- ExecutionStore.findById(executionId) + findProcessors(executionId)
|-- build ExecutionDocument
'-- SearchIndex.index(document)
```
The `IngestionService` interface changes from buffer-based to synchronous for executions:
```java
// New IngestionService (core module)
public class IngestionService {
// Executions: synchronous write to PostgreSQL, returns void, throws on failure.
// Controller catches exceptions and returns 503 with Retry-After.
// This replaces the boolean return / WriteBuffer pattern for executions.
void ingestExecution(String agentId, RouteExecution execution);
// Metrics: retains write buffer pattern (high-volume, low-urgency)
boolean acceptMetrics(List<MetricsSnapshot> metrics);
// Diagrams: synchronous write (low-volume, idempotent)
void ingestDiagram(String agentId, TaggedDiagram diagram);
}
```
Metrics ingestion retains the write buffer pattern:
```
MetricsController (HTTP POST)
|-- WriteBuffer<MetricsSnapshot>.offer(batch)
MetricsFlushScheduler (@Scheduled)
|-- drain(batchSize)
'-- MetricsStore.insertBatch(batch)
```
### What gets deleted
- `ClickHouseExecutionRepository` — replaced by `PostgresExecutionStore`
- `ClickHouseSearchEngine` — replaced by `OpenSearchIndex`
- `ClickHouseFlushScheduler` — simplified, only retained for metrics
- `ClickHouseDiagramRepository` — replaced by `PostgresDiagramStore`
- `ClickHouseUserRepository` — replaced by `PostgresUserStore`
- `ClickHouseOidcConfigRepository` — replaced by `PostgresOidcConfigStore`
- `ClickHouseMetricsRepository` — replaced by `PostgresMetricsStore`
- `ClickHouseSchemaInitializer` — replaced by Flyway
- All `clickhouse/*.sql` migration files — replaced by Flyway migrations
- Array type conversion helpers, `FINAL` workarounds, `ifNotFinite()` guards
## Error Handling and Resilience
### PostgreSQL (source of truth)
- **Write failure**: Return 503 to agent with `Retry-After` header.
- **Connection pool exhaustion**: HikariCP handles queueing. Sustained exhaustion triggers backpressure via 503.
- **Schema migrations**: Flyway with versioned migrations. Validates on startup.
### OpenSearch (search index)
- **Unavailable at write time**: Events accumulate in bounded in-memory queue. If queue fills, new index events are dropped silently (log warning). No data loss — PostgreSQL has the data.
- **Unavailable at search time**: Search endpoint returns 503. Stats and detail endpoints still work via PostgreSQL.
- **Index corruption/drift**: Rebuild via admin API endpoint (`POST /api/v1/admin/search/rebuild?from=&to=`) that re-indexes from PostgreSQL, scoped by time range. Processes in background, returns job status.
- **Document staleness**: Debouncer provides eventual consistency with 1-2 second typical lag.
### Partial execution handling
1. Execution arrives with `status=RUNNING` -> `INSERT ... ON CONFLICT (execution_id, start_time) DO UPDATE` with status progression guard (only update if new status supersedes old: RUNNING < COMPLETED/FAILED)
2. Processors arrive in chunks -> `INSERT ... ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE` per processor
3. Completion signal -> same upsert as step 1, with `status='COMPLETED'`, `duration_ms`, `end_time` — the progression guard allows this update
4. Each mutation publishes `ExecutionUpdatedEvent` -> debounced OpenSearch re-index
5. Timeout: configurable threshold marks executions stuck in RUNNING as STALE
### Data lifecycle
- All time-series data partitioned by day
- Expiry by dropping entire daily partitions/indexes — no row-level deletes
- PostgreSQL: `SELECT drop_chunks(older_than => INTERVAL 'N days')` on each hypertable
- OpenSearch: ILM delete action on daily indexes
- Retention periods (configurable):
- Raw data (executions, processor_executions, metrics): default 30 days
- 1-minute continuous aggregates: default 90 days
- Users, diagrams, OIDC config: no expiry
## Testing Strategy
### Unit tests
- Store interface tests using Testcontainers `PostgreSQLContainer` with TimescaleDB image
- SearchIndex tests using Testcontainers `OpensearchContainer`
- Dedup: insert same execution_id twice, verify single row
- Chunked arrival: insert execution, then processors in separate calls, verify consolidated state
- Upsert: update execution status from RUNNING to COMPLETED, verify single row with updated fields
### Integration tests
- Full ingestion flow: POST execution -> verify PostgreSQL row -> verify OpenSearch document
- Partial execution: POST RUNNING -> POST processors -> POST COMPLETED -> verify state transitions and OpenSearch updates
- Search -> detail roundtrip: index document, search by text, fetch detail by returned execution_id
- Stats accuracy: insert known executions, query continuous aggregate, verify counts/durations
- Arbitrary bucket re-aggregation: insert data spanning 1 hour, query with 15-minute buckets, verify grouping
### Resilience tests
- OpenSearch down: verify PostgreSQL writes succeed, search returns 503, detail still works
- Re-index: delete OpenSearch index, trigger rebuild from PostgreSQL, verify search recovery
### Test infrastructure
- Local Docker for Testcontainers (PostgreSQL + TimescaleDB, OpenSearch)
- Remote k3s for deployment testing
- Flyway migrations tested via Testcontainers on every build
## Deployment (Kubernetes)
### New components
- **PostgreSQL + TimescaleDB**: StatefulSet with persistent volume (replaces ClickHouse StatefulSet)
- **OpenSearch**: StatefulSet (single node for small deployments, cluster for production)
### Removed components
- ClickHouse StatefulSet and associated ConfigMaps/Secrets
### Configuration
Environment variables (existing pattern):
- `SPRING_DATASOURCE_URL=jdbc:postgresql://cameleer-postgres:5432/cameleer3`
- `SPRING_DATASOURCE_USERNAME`, `SPRING_DATASOURCE_PASSWORD`
- `OPENSEARCH_URL=http://opensearch:9200`
- `CAMELEER_RETENTION_DAYS=30` (applies to both PostgreSQL and OpenSearch)
- `CAMELEER_BODY_SIZE_LIMIT=16384` (configurable body size limit in bytes)
- `CAMELEER_OPENSEARCH_QUEUE_SIZE=10000` (bounded in-memory queue for async indexing)
### Health checks
- PostgreSQL: Spring Boot actuator datasource health
- OpenSearch: cluster health endpoint
- Combined health at `/api/v1/health`
## Migration Strategy
This is a clean cutover, not a live migration. No data migration from ClickHouse to PostgreSQL/OpenSearch is planned. Existing ClickHouse data will be abandoned (it has a 30-day TTL anyway). The refactor is implemented on a separate git branch and deployed as a replacement.