Rename Java packages from com.cameleer3 to com.cameleer, module directories from cameleer3-* to cameleer-*, and all references throughout workflows, Dockerfiles, docs, migrations, and pom.xml. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
23 KiB
Storage Layer Refactor: PostgreSQL + TimescaleDB + OpenSearch
Date: 2026-03-16 Status: Draft Scope: Replace ClickHouse with PostgreSQL/TimescaleDB (source of truth, analytics) and OpenSearch (full-text search index)
Motivation
The current all-in-ClickHouse storage layer has structural problems at scale:
- OOM on batch inserts: Wide rows with parallel arrays of variable-length blobs (exchange bodies, headers, stacktraces) exhaust ClickHouse server memory during batch insert processing.
- CRUD misfit: Users, OIDC config, and diagrams use ReplacingMergeTree, requiring
FINALon every read and workarounds to prevent version row accumulation. - Weak full-text search: LIKE patterns with tokenbf_v1 skip indexes provide no ranking, stemming, fuzzy matching, or infix wildcard support.
- Rigid data model: Parallel arrays for processor executions prevent chunked/streaming ingestion and require ~100 lines of array type conversion workarounds.
Requirements
- Hundreds of applications, thousands of routes, billions of records
- Statistics at four levels: all, application, route, processor
- Arbitrary time bucket sizes for statistics queries
- Full-text wildcard (infix) search across all fields
- P99 response time < 2000ms
- Support chunked/streaming execution ingestion (partial updates for long-running routes)
- Idempotent inserts (deduplication on execution_id)
- All software must be free (Apache 2.0, MIT, BSD, PostgreSQL License)
- Deployment target: Kubernetes (k3s)
- Data expired per day by dropping partitions/indexes — no row-level deletes
Architecture
Two backends:
- PostgreSQL + TimescaleDB — source of truth for all data, analytics via continuous aggregates
- OpenSearch — asynchronous search index for full-text and wildcard queries
OpenSearch is a derived index, not a source of truth. If it goes down, writes and detail views continue via PostgreSQL. If an index corrupts, it is rebuilt from PostgreSQL.
Data Flow
Agent POST -> Server
|-- ExecutionStore.upsert() -> PostgreSQL (immediate)
|-- ExecutionStore.upsertProcessors() -> PostgreSQL (immediate)
|-- DiagramStore.store() -> PostgreSQL (immediate, idempotent)
'-- publish(ExecutionUpdatedEvent) -> in-process event queue
MetricsController -> WriteBuffer -> MetricsFlushScheduler -> MetricsStore (batched)
SearchIndexer (event listener)
|-- debounce by execution_id (1-2s window)
|-- read consolidated state from ExecutionStore
'-- SearchIndex.index(document) -> OpenSearch
Stats query -> PostgreSQL continuous aggregate (sub-100ms)
Wildcard search -> OpenSearch -> execution_ids -> PostgreSQL for detail
Detail lookup -> PostgreSQL by execution_id
CRUD -> PostgreSQL
PostgreSQL Data Model
Hypertables (TimescaleDB, partitioned by day)
executions
| Column | Type | Notes |
|---|---|---|
| execution_id | TEXT | NOT NULL, natural key from agent (dedup) |
| route_id | TEXT | NOT NULL |
| agent_id | TEXT | NOT NULL |
| group_name | TEXT | NOT NULL, application name |
| status | TEXT | NOT NULL: RUNNING, COMPLETED, FAILED |
| correlation_id | TEXT | |
| exchange_id | TEXT | |
| start_time | TIMESTAMPTZ | NOT NULL, hypertable partition key |
| end_time | TIMESTAMPTZ | |
| duration_ms | BIGINT | |
| error_message | TEXT | |
| error_stacktrace | TEXT | |
| diagram_content_hash | TEXT | |
| created_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
| updated_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
- Hypertable chunk interval: 1 day
- Primary key:
(execution_id, start_time)— TimescaleDB requires the partition column in unique constraints ON CONFLICT (execution_id, start_time) DO UPDATEfor dedup and status progression (RUNNING -> COMPLETED/FAILED guard: only update if new status supersedes old)- Indexes:
(agent_id, start_time),(route_id, start_time),(group_name, start_time),(correlation_id)
processor_executions
| Column | Type | Notes |
|---|---|---|
| id | BIGSERIAL | |
| execution_id | TEXT | NOT NULL, FK to executions |
| processor_id | TEXT | NOT NULL |
| processor_type | TEXT | NOT NULL |
| diagram_node_id | TEXT | |
| group_name | TEXT | NOT NULL, denormalized from execution |
| route_id | TEXT | NOT NULL, denormalized from execution |
| depth | INT | NOT NULL |
| parent_processor_id | TEXT | Self-reference for tree structure |
| status | TEXT | NOT NULL |
| start_time | TIMESTAMPTZ | NOT NULL, hypertable partition key |
| end_time | TIMESTAMPTZ | |
| duration_ms | BIGINT | |
| error_message | TEXT | |
| error_stacktrace | TEXT | |
| input_body | TEXT | Size-limited at ingestion (configurable) |
| output_body | TEXT | Size-limited at ingestion (configurable) |
| input_headers | JSONB | |
| output_headers | JSONB | |
| created_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
- Hypertable chunk interval: 1 day
- Unique constraint:
(execution_id, processor_id, start_time)— TimescaleDB requires partition column in unique constraints ON CONFLICT (execution_id, processor_id, start_time) DO UPDATEfor re-sent processorsgroup_nameandroute_iddenormalized from the parent execution (immutable per execution, set at ingestion) to enable JOIN-free continuous aggregates- Indexes:
(execution_id),(processor_type, start_time)
agent_metrics
| Column | Type | Notes |
|---|---|---|
| agent_id | TEXT | NOT NULL |
| metric_name | TEXT | NOT NULL |
| metric_value | DOUBLE PRECISION | NOT NULL |
| tags | JSONB | Arbitrary key-value pairs |
| collected_at | TIMESTAMPTZ | NOT NULL, hypertable partition key |
| server_received_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
- Hypertable chunk interval: 1 day
- Buffered ingestion (write buffer retained for metrics)
Regular tables
route_diagrams
| Column | Type | Notes |
|---|---|---|
| content_hash | TEXT | PK, SHA-256 of definition |
| route_id | TEXT | NOT NULL |
| agent_id | TEXT | NOT NULL |
| definition | TEXT | JSON of RouteGraph |
| created_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
ON CONFLICT (content_hash) DO NOTHINGfor content-addressed dedup
users
| Column | Type | Notes |
|---|---|---|
| user_id | TEXT | PK |
| provider | TEXT | NOT NULL |
| TEXT | ||
| display_name | TEXT | |
| roles | TEXT[] | AGENT, VIEWER, OPERATOR, ADMIN |
| created_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
| updated_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
oidc_config
| Column | Type | Notes |
|---|---|---|
| config_id | TEXT | PK, always 'default' |
| enabled | BOOLEAN | NOT NULL |
| issuer_uri | TEXT | |
| client_id | TEXT | |
| client_secret | TEXT | |
| roles_claim | TEXT | |
| default_roles | TEXT[] | |
| auto_signup | BOOLEAN | |
| display_name_claim | TEXT | |
| updated_at | TIMESTAMPTZ | NOT NULL DEFAULT now() |
Schema migrations
Flyway replaces the current custom ClickHouseSchemaInitializer. Versioned migrations in src/main/resources/db/migration/.
Required extensions
The following PostgreSQL extensions must be created in the initial Flyway migration:
timescaledb— hypertables, continuous aggregates, chunk managementtimescaledb_toolkit—percentile_agg()andapprox_percentile()for P99 calculations
The TimescaleDB Docker image (timescale/timescaledb) includes both extensions. The Flyway V1 migration must CREATE EXTENSION IF NOT EXISTS for both before creating hypertables.
Continuous Aggregates (Statistics)
Four continuous aggregates at 1-minute resolution, one per aggregation level.
stats_1m_all — global
CREATE MATERIALIZED VIEW stats_1m_all
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket;
stats_1m_app — per application
Group by: bucket, group_name
stats_1m_route — per route
Group by: bucket, group_name, route_id
stats_1m_processor — per processor type within a route
-- No JOIN needed: group_name and route_id are denormalized on processor_executions
CREATE MATERIALIZED VIEW stats_1m_processor
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
group_name,
route_id,
processor_type,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM processor_executions
GROUP BY bucket, group_name, route_id, processor_type;
Note: TimescaleDB continuous aggregates only support single-hypertable queries (no JOINs). This is why group_name and route_id are denormalized onto processor_executions.
Query pattern for arbitrary buckets
SELECT time_bucket('30 minutes', bucket) AS period,
SUM(total_count) AS total_count,
SUM(failed_count) AS failed_count,
SUM(duration_sum) / NULLIF(SUM(total_count), 0) AS avg_duration
FROM stats_1m_route
WHERE route_id = ? AND bucket >= now() - interval '24 hours'
GROUP BY period
ORDER BY period;
Refresh policy
SELECT add_continuous_aggregate_policy('stats_1m_all',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
Retention
- Raw data (
executions,processor_executions): configurable, default 30 days - 1-minute rollups: configurable, default 90 days
- All retention via
drop_chunks()— drops entire daily partitions
OpenSearch Index
Index template
Index pattern: executions-YYYY-MM-DD (daily rollover, aligned with PostgreSQL daily partitions for synchronized expiry).
{
"index_patterns": ["executions-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"ngram_analyzer": {
"type": "custom",
"tokenizer": "ngram_tokenizer",
"filter": ["lowercase"]
}
},
"tokenizer": {
"ngram_tokenizer": {
"type": "ngram",
"min_gram": 3,
"max_gram": 4,
"token_chars": ["letter", "digit", "punctuation", "symbol"]
}
}
}
},
"mappings": {
"properties": {
"execution_id": { "type": "keyword" },
"route_id": { "type": "keyword" },
"agent_id": { "type": "keyword" },
"group_name": { "type": "keyword" },
"status": { "type": "keyword" },
"correlation_id": { "type": "keyword" },
"exchange_id": { "type": "keyword" },
"start_time": { "type": "date" },
"end_time": { "type": "date" },
"duration_ms": { "type": "long" },
"error_message": {
"type": "text",
"analyzer": "standard",
"fields": {
"ngram": { "type": "text", "analyzer": "ngram_analyzer" }
}
},
"error_stacktrace": {
"type": "text",
"analyzer": "standard",
"fields": {
"ngram": { "type": "text", "analyzer": "ngram_analyzer" }
}
},
"processors": {
"type": "nested",
"properties": {
"processor_id": { "type": "keyword" },
"processor_type": { "type": "keyword" },
"status": { "type": "keyword" },
"error_message": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } },
"error_stacktrace": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } },
"input_body": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } },
"output_body": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } },
"input_headers": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } },
"output_headers": { "type": "text", "fields": { "ngram": { "type": "text", "analyzer": "ngram_analyzer" } } }
}
}
}
}
}
}
Indexing strategy
- Asynchronous, event-driven from PostgreSQL writes
- Debounced by
execution_idwith a 1-2 second window to batch rapid processor updates - Document ID =
execution_id(OpenSearch replaces existing document on re-index) - Daily indexes:
executions-2026-03-16,executions-2026-03-17, etc. - Index selected by
start_timeof the execution
Search strategy
- Unified search box:
multi_matchacross all text fields + nested processor fields - Infix wildcard: ngram sub-fields (3-4 character grams) for fast substring matching
- Keyword filters:
termqueries on status, route_id, agent_id, group_name, correlation_id - Time range:
rangequery on start_time - Duration range:
rangequery on duration_ms - Pagination:
search_after(no OFFSET scaling problem) - Returns: execution_ids with highlights, detail fetched from PostgreSQL
Index lifecycle
- Daily rollover aligned with PostgreSQL chunk intervals
- ILM policy: hot (current day) -> delete (after N days, matching PostgreSQL retention)
- Index deletion is instant — no row-level deletes
Java Interfaces
Core module interfaces
public interface ExecutionStore {
void upsert(Execution execution);
void upsertProcessors(String executionId, List<ProcessorExecution> processors);
Optional<Execution> findById(String executionId);
List<ProcessorExecution> findProcessors(String executionId);
}
public interface StatsStore {
Stats getStats(StatsRequest request);
List<TimeBucket> getTimeSeries(TimeSeriesRequest request);
}
public interface SearchIndex {
SearchResult search(SearchRequest request);
long count(SearchRequest request);
void index(ExecutionDocument document);
void delete(String executionId);
}
// Note: stats() and timeseries() methods move from the old SearchEngine interface
// to StatsStore, since they now query PostgreSQL continuous aggregates instead of
// ClickHouse. Callers (SearchService, controllers) must be updated to use StatsStore.
public interface DiagramStore {
void store(String contentHash, String routeId, String agentId, String definition);
Optional<String> findByContentHash(String contentHash);
Optional<String> findContentHashForRoute(String routeId, String agentId);
Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds);
}
public interface UserStore {
Optional<UserInfo> findById(String userId);
List<UserInfo> findAll();
void upsert(UserInfo user);
void updateRoles(String userId, List<String> roles);
void delete(String userId);
}
public interface OidcConfigStore {
Optional<OidcConfig> find();
void save(OidcConfig config);
void delete();
}
public interface MetricsStore {
void insertBatch(List<MetricsSnapshot> snapshots);
}
App module implementations
| Interface | Implementation | Backend |
|---|---|---|
| ExecutionStore | PostgresExecutionStore | PostgreSQL/TimescaleDB |
| StatsStore | PostgresStatsStore | PostgreSQL continuous aggregates |
| SearchIndex | OpenSearchIndex | OpenSearch |
| DiagramStore | PostgresDiagramStore | PostgreSQL |
| UserStore | PostgresUserStore | PostgreSQL |
| OidcConfigStore | PostgresOidcConfigStore | PostgreSQL |
| MetricsStore | PostgresMetricsStore | PostgreSQL/TimescaleDB |
Ingestion flow
ExecutionController (HTTP POST)
|-- IngestionService.acceptExecution(tagged)
|-- ExecutionStore.upsert(execution)
|-- ExecutionStore.upsertProcessors(executionId, processors)
|-- eventPublisher.publish(new ExecutionUpdatedEvent(executionId))
'-- return success/backpressure
SearchIndexer (Spring @EventListener or in-process queue)
|-- debounce by execution_id (configurable, default 1-2s)
|-- ExecutionStore.findById(executionId) + findProcessors(executionId)
|-- build ExecutionDocument
'-- SearchIndex.index(document)
The IngestionService interface changes from buffer-based to synchronous for executions:
// New IngestionService (core module)
public class IngestionService {
// Executions: synchronous write to PostgreSQL, returns void, throws on failure.
// Controller catches exceptions and returns 503 with Retry-After.
// This replaces the boolean return / WriteBuffer pattern for executions.
void ingestExecution(String agentId, RouteExecution execution);
// Metrics: retains write buffer pattern (high-volume, low-urgency)
boolean acceptMetrics(List<MetricsSnapshot> metrics);
// Diagrams: synchronous write (low-volume, idempotent)
void ingestDiagram(String agentId, TaggedDiagram diagram);
}
Metrics ingestion retains the write buffer pattern:
MetricsController (HTTP POST)
|-- WriteBuffer<MetricsSnapshot>.offer(batch)
MetricsFlushScheduler (@Scheduled)
|-- drain(batchSize)
'-- MetricsStore.insertBatch(batch)
What gets deleted
ClickHouseExecutionRepository— replaced byPostgresExecutionStoreClickHouseSearchEngine— replaced byOpenSearchIndexClickHouseFlushScheduler— simplified, only retained for metricsClickHouseDiagramRepository— replaced byPostgresDiagramStoreClickHouseUserRepository— replaced byPostgresUserStoreClickHouseOidcConfigRepository— replaced byPostgresOidcConfigStoreClickHouseMetricsRepository— replaced byPostgresMetricsStoreClickHouseSchemaInitializer— replaced by Flyway- All
clickhouse/*.sqlmigration files — replaced by Flyway migrations - Array type conversion helpers,
FINALworkarounds,ifNotFinite()guards
Error Handling and Resilience
PostgreSQL (source of truth)
- Write failure: Return 503 to agent with
Retry-Afterheader. - Connection pool exhaustion: HikariCP handles queueing. Sustained exhaustion triggers backpressure via 503.
- Schema migrations: Flyway with versioned migrations. Validates on startup.
OpenSearch (search index)
- Unavailable at write time: Events accumulate in bounded in-memory queue. If queue fills, new index events are dropped silently (log warning). No data loss — PostgreSQL has the data.
- Unavailable at search time: Search endpoint returns 503. Stats and detail endpoints still work via PostgreSQL.
- Index corruption/drift: Rebuild via admin API endpoint (
POST /api/v1/admin/search/rebuild?from=&to=) that re-indexes from PostgreSQL, scoped by time range. Processes in background, returns job status. - Document staleness: Debouncer provides eventual consistency with 1-2 second typical lag.
Partial execution handling
- Execution arrives with
status=RUNNING->INSERT ... ON CONFLICT (execution_id, start_time) DO UPDATEwith status progression guard (only update if new status supersedes old: RUNNING < COMPLETED/FAILED) - Processors arrive in chunks ->
INSERT ... ON CONFLICT (execution_id, processor_id, start_time) DO UPDATEper processor - Completion signal -> same upsert as step 1, with
status='COMPLETED',duration_ms,end_time— the progression guard allows this update - Each mutation publishes
ExecutionUpdatedEvent-> debounced OpenSearch re-index - Timeout: configurable threshold marks executions stuck in RUNNING as STALE
Data lifecycle
- All time-series data partitioned by day
- Expiry by dropping entire daily partitions/indexes — no row-level deletes
- PostgreSQL:
SELECT drop_chunks(older_than => INTERVAL 'N days')on each hypertable - OpenSearch: ILM delete action on daily indexes
- Retention periods (configurable):
- Raw data (executions, processor_executions, metrics): default 30 days
- 1-minute continuous aggregates: default 90 days
- Users, diagrams, OIDC config: no expiry
Testing Strategy
Unit tests
- Store interface tests using Testcontainers
PostgreSQLContainerwith TimescaleDB image - SearchIndex tests using Testcontainers
OpensearchContainer - Dedup: insert same execution_id twice, verify single row
- Chunked arrival: insert execution, then processors in separate calls, verify consolidated state
- Upsert: update execution status from RUNNING to COMPLETED, verify single row with updated fields
Integration tests
- Full ingestion flow: POST execution -> verify PostgreSQL row -> verify OpenSearch document
- Partial execution: POST RUNNING -> POST processors -> POST COMPLETED -> verify state transitions and OpenSearch updates
- Search -> detail roundtrip: index document, search by text, fetch detail by returned execution_id
- Stats accuracy: insert known executions, query continuous aggregate, verify counts/durations
- Arbitrary bucket re-aggregation: insert data spanning 1 hour, query with 15-minute buckets, verify grouping
Resilience tests
- OpenSearch down: verify PostgreSQL writes succeed, search returns 503, detail still works
- Re-index: delete OpenSearch index, trigger rebuild from PostgreSQL, verify search recovery
Test infrastructure
- Local Docker for Testcontainers (PostgreSQL + TimescaleDB, OpenSearch)
- Remote k3s for deployment testing
- Flyway migrations tested via Testcontainers on every build
Deployment (Kubernetes)
New components
- PostgreSQL + TimescaleDB: StatefulSet with persistent volume (replaces ClickHouse StatefulSet)
- OpenSearch: StatefulSet (single node for small deployments, cluster for production)
Removed components
- ClickHouse StatefulSet and associated ConfigMaps/Secrets
Configuration
Environment variables (existing pattern):
SPRING_DATASOURCE_URL=jdbc:postgresql://cameleer-postgres:5432/cameleerSPRING_DATASOURCE_USERNAME,SPRING_DATASOURCE_PASSWORDOPENSEARCH_URL=http://opensearch:9200CAMELEER_RETENTION_DAYS=30(applies to both PostgreSQL and OpenSearch)CAMELEER_BODY_SIZE_LIMIT=16384(configurable body size limit in bytes)CAMELEER_OPENSEARCH_QUEUE_SIZE=10000(bounded in-memory queue for async indexing)
Health checks
- PostgreSQL: Spring Boot actuator datasource health
- OpenSearch: cluster health endpoint
- Combined health at
/api/v1/health
Migration Strategy
This is a clean cutover, not a live migration. No data migration from ClickHouse to PostgreSQL/OpenSearch is planned. Existing ClickHouse data will be abandoned (it has a 30-day TTL anyway). The refactor is implemented on a separate git branch and deployed as a replacement.