Design to replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch. PostgreSQL as source of truth with continuous aggregates for analytics, OpenSearch for full-text wildcard search. 21-task implementation plan. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
108 KiB
Storage Layer Refactor Implementation Plan
For agentic workers: REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch as the storage and search backends.
Architecture: PostgreSQL/TimescaleDB is the source of truth for all data and analytics (continuous aggregates). OpenSearch is an async search index for full-text/wildcard queries. Core module defines interfaces; app module provides implementations.
Tech Stack: Java 17, Spring Boot 3.4.3, PostgreSQL 16 + TimescaleDB, OpenSearch 2.x, Flyway, Testcontainers, OpenSearch Java Client
Spec: docs/superpowers/specs/2026-03-16-storage-layer-design.md
File Structure
New files
Core module (cameleer3-server-core/src/main/java/com/cameleer3/server/core/):
storage/ExecutionStore.java— new interface replacing ExecutionRepositorystorage/StatsStore.java— new interface for stats from continuous aggregatesstorage/SearchIndex.java— new interface for OpenSearch operationsstorage/DiagramStore.java— new interface replacing DiagramRepositorystorage/MetricsStore.java— new interface replacing MetricsRepositorystorage/model/ExecutionDocument.java— document model for OpenSearch indexingsearch/StatsRequest.java— request DTO for stats queries (level, scope, time range)search/TimeSeriesRequest.java— request DTO for time-series queries (bucket size)indexing/SearchIndexer.java— debounced event listener for OpenSearch indexingindexing/ExecutionUpdatedEvent.java— event published after execution write
App module (cameleer3-server-app/src/main/java/com/cameleer3/server/app/):
storage/PostgresExecutionStore.java— ExecutionStore impl with upsertstorage/PostgresStatsStore.java— StatsStore impl querying continuous aggregatesstorage/PostgresDiagramStore.java— DiagramStore implstorage/PostgresUserRepository.java— UserRepository impl (keeps existing core interface)storage/PostgresOidcConfigRepository.java— OidcConfigRepository impl (keeps existing core interface)storage/PostgresMetricsStore.java— MetricsStore implsearch/OpenSearchIndex.java— SearchIndex implconfig/OpenSearchConfig.java— OpenSearch client beanconfig/StorageBeanConfig.java— wires all store beansingestion/MetricsFlushScheduler.java— scheduled metrics buffer flush (replaces ClickHouseFlushScheduler, metrics only)retention/RetentionScheduler.java— scheduled job for drop_chunks and OpenSearch index deletion
Flyway migrations (cameleer3-server-app/src/main/resources/db/migration/):
V1__extensions.sql— CREATE EXTENSION timescaledb, timescaledb_toolkitV2__executions.sql— executions hypertableV3__processor_executions.sql— processor_executions hypertableV4__agent_metrics.sql— agent_metrics hypertableV5__route_diagrams.sql— route_diagrams tableV6__users.sql— users tableV7__oidc_config.sql— oidc_config tableV8__continuous_aggregates.sql— all 4 continuous aggregates + refresh policies
Note: Retention is NOT in a Flyway migration (Flyway migrations are immutable once applied). No V9 file. Retention is handled by RetentionScheduler at runtime with configurable intervals.
Test files (cameleer3-server-app/src/test/java/com/cameleer3/server/app/):
AbstractPostgresIT.java— replaces AbstractClickHouseIT (TimescaleDB Testcontainer)storage/PostgresExecutionStoreIT.java— upsert, dedup, chunked arrival testsstorage/PostgresStatsStoreIT.java— continuous aggregate query testsstorage/PostgresDiagramStoreIT.java— content-hash dedup testsstorage/PostgresUserRepositoryIT.java— CRUD testssearch/OpenSearchIndexIT.java— index, search, wildcard tests
Files to modify
pom.xml(root) — no changes neededcameleer3-server-app/pom.xml— swap clickhouse-jdbc for postgresql + opensearch-java + flywaycameleer3-server-core/.../core/search/SearchService.java— split: search delegates to SearchIndex, stats/timeseries to StatsStorecameleer3-server-core/.../core/detail/DetailService.java— use ExecutionStore instead of ExecutionRepositorycameleer3-server-core/.../core/detail/RawExecutionRow.java— remove (replaced by normalized model)cameleer3-server-core/.../core/ingestion/IngestionService.java— synchronous execution/diagram writes, keep buffer for metricscameleer3-server-app/.../app/config/SearchBeanConfig.java— wire StatsStore into SearchServicecameleer3-server-app/.../app/config/IngestionBeanConfig.java— update bean wiringcameleer3-server-app/src/main/resources/application.yml— PostgreSQL + OpenSearch configcameleer3-server-app/src/test/resources/application-test.yml— test config
Files to delete
cameleer3-server-app/.../app/storage/ClickHouseExecutionRepository.javacameleer3-server-app/.../app/storage/ClickHouseDiagramRepository.javacameleer3-server-app/.../app/storage/ClickHouseMetricsRepository.javacameleer3-server-app/.../app/storage/ClickHouseUserRepository.javacameleer3-server-app/.../app/storage/ClickHouseOidcConfigRepository.javacameleer3-server-app/.../app/search/ClickHouseSearchEngine.javacameleer3-server-app/.../app/ingestion/ClickHouseFlushScheduler.javacameleer3-server-app/.../app/config/ClickHouseConfig.javacameleer3-server-core/.../core/storage/ExecutionRepository.javacameleer3-server-core/.../core/storage/DiagramRepository.javacameleer3-server-core/.../core/storage/MetricsRepository.javacameleer3-server-core/.../core/search/SearchEngine.javacameleer3-server-core/.../core/detail/RawExecutionRow.java
Note: UserRepository and OidcConfigRepository interfaces in core.security are kept — the new Postgres implementations implement these existing interfaces. No rename needed since their contracts are unchanged.
cameleer3-server-app/src/main/resources/clickhouse/*.sql(all 8 files)cameleer3-server-app/src/test/.../app/AbstractClickHouseIT.java
Chunk 1: Dependencies, Flyway Migrations, and Test Infrastructure
Task 1: Update Maven dependencies
Files:
-
Modify:
cameleer3-server-app/pom.xml -
Step 1: Replace ClickHouse JDBC with PostgreSQL driver + Flyway + OpenSearch client
In cameleer3-server-app/pom.xml, replace the ClickHouse dependency and add new ones:
Remove:
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.9.7</version>
<classifier>all</classifier>
</dependency>
Add:
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-database-postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-java</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-client</artifactId>
<version>2.19.0</version>
</dependency>
Replace the ClickHouse Testcontainer:
<!-- Remove testcontainers-clickhouse -->
<!-- Add: -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.opensearch</groupId>
<artifactId>opensearch-testcontainers</artifactId>
<version>2.1.1</version>
<scope>test</scope>
</dependency>
Note: postgresql driver and flyway-core versions are managed by Spring Boot parent POM. Testcontainers BOM version is ${testcontainers.version} (2.0.3) from root POM.
- Step 2: Commit (compilation will fail until ClickHouse code is deleted in Task 16 — this is expected)
git add cameleer3-server-app/pom.xml
git commit -m "chore: swap ClickHouse deps for PostgreSQL, Flyway, OpenSearch"
Task 2: Write Flyway migrations
Files:
-
Create:
cameleer3-server-app/src/main/resources/db/migration/V1__extensions.sql -
Create:
cameleer3-server-app/src/main/resources/db/migration/V2__executions.sql -
Create:
cameleer3-server-app/src/main/resources/db/migration/V3__processor_executions.sql -
Create:
cameleer3-server-app/src/main/resources/db/migration/V4__agent_metrics.sql -
Create:
cameleer3-server-app/src/main/resources/db/migration/V5__route_diagrams.sql -
Create:
cameleer3-server-app/src/main/resources/db/migration/V6__users.sql -
Create:
cameleer3-server-app/src/main/resources/db/migration/V7__oidc_config.sql -
Create:
cameleer3-server-app/src/main/resources/db/migration/V8__continuous_aggregates.sql -
Create:
cameleer3-server-app/src/main/resources/db/migration/V9__retention_policies.sql -
Step 1: Create V1__extensions.sql
CREATE EXTENSION IF NOT EXISTS timescaledb;
CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit;
- Step 2: Create V2__executions.sql
CREATE TABLE executions (
execution_id TEXT NOT NULL,
route_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
group_name TEXT NOT NULL,
status TEXT NOT NULL,
correlation_id TEXT,
exchange_id TEXT,
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ,
duration_ms BIGINT,
error_message TEXT,
error_stacktrace TEXT,
diagram_content_hash TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (execution_id, start_time)
);
SELECT create_hypertable('executions', 'start_time', chunk_time_interval => INTERVAL '1 day');
CREATE INDEX idx_executions_agent_time ON executions (agent_id, start_time DESC);
CREATE INDEX idx_executions_route_time ON executions (route_id, start_time DESC);
CREATE INDEX idx_executions_group_time ON executions (group_name, start_time DESC);
CREATE INDEX idx_executions_correlation ON executions (correlation_id);
- Step 3: Create V3__processor_executions.sql
CREATE TABLE processor_executions (
id BIGSERIAL,
execution_id TEXT NOT NULL,
processor_id TEXT NOT NULL,
processor_type TEXT NOT NULL,
diagram_node_id TEXT,
group_name TEXT NOT NULL,
route_id TEXT NOT NULL,
depth INT NOT NULL,
parent_processor_id TEXT,
status TEXT NOT NULL,
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ,
duration_ms BIGINT,
error_message TEXT,
error_stacktrace TEXT,
input_body TEXT,
output_body TEXT,
input_headers JSONB,
output_headers JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (execution_id, processor_id, start_time)
);
SELECT create_hypertable('processor_executions', 'start_time', chunk_time_interval => INTERVAL '1 day');
CREATE INDEX idx_proc_exec_execution ON processor_executions (execution_id);
CREATE INDEX idx_proc_exec_type_time ON processor_executions (processor_type, start_time DESC);
- Step 4: Create V4__agent_metrics.sql
CREATE TABLE agent_metrics (
agent_id TEXT NOT NULL,
metric_name TEXT NOT NULL,
metric_value DOUBLE PRECISION NOT NULL,
tags JSONB,
collected_at TIMESTAMPTZ NOT NULL,
server_received_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
SELECT create_hypertable('agent_metrics', 'collected_at', chunk_time_interval => INTERVAL '1 day');
CREATE INDEX idx_metrics_agent_name ON agent_metrics (agent_id, metric_name, collected_at DESC);
- Step 5: Create V5__route_diagrams.sql
CREATE TABLE route_diagrams (
content_hash TEXT PRIMARY KEY,
route_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
definition TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_diagrams_route_agent ON route_diagrams (route_id, agent_id);
- Step 6: Create V6__users.sql
CREATE TABLE users (
user_id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
email TEXT,
display_name TEXT,
roles TEXT[] NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
- Step 7: Create V7__oidc_config.sql
CREATE TABLE oidc_config (
config_id TEXT PRIMARY KEY DEFAULT 'default',
enabled BOOLEAN NOT NULL DEFAULT false,
issuer_uri TEXT,
client_id TEXT,
client_secret TEXT,
roles_claim TEXT,
default_roles TEXT[] NOT NULL DEFAULT '{}',
auto_signup BOOLEAN DEFAULT false,
display_name_claim TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
- Step 8: Create V8__continuous_aggregates.sql
-- Global stats
CREATE MATERIALIZED VIEW stats_1m_all
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket;
SELECT add_continuous_aggregate_policy('stats_1m_all',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
-- Per-application stats
CREATE MATERIALIZED VIEW stats_1m_app
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
group_name,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket, group_name;
SELECT add_continuous_aggregate_policy('stats_1m_app',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
-- Per-route stats
CREATE MATERIALIZED VIEW stats_1m_route
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
group_name,
route_id,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket, group_name, route_id;
SELECT add_continuous_aggregate_policy('stats_1m_route',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
-- Per-processor stats (uses denormalized group_name/route_id on processor_executions)
CREATE MATERIALIZED VIEW stats_1m_processor
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', start_time) AS bucket,
group_name,
route_id,
processor_type,
COUNT(*) AS total_count,
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
SUM(duration_ms) AS duration_sum,
MAX(duration_ms) AS duration_max,
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM processor_executions
GROUP BY bucket, group_name, route_id, processor_type;
SELECT add_continuous_aggregate_policy('stats_1m_processor',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
- Step 9: Commit
git add cameleer3-server-app/src/main/resources/db/migration/
git commit -m "feat: add Flyway migrations for PostgreSQL/TimescaleDB schema"
Task 3: Create test base class with TimescaleDB Testcontainer
Files:
-
Create:
cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java -
Step 1: Write AbstractPostgresIT
package com.cameleer3.server.app;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
public abstract class AbstractPostgresIT {
@Container
static final PostgreSQLContainer<?> postgres =
new PostgreSQLContainer<>("timescale/timescaledb:latest-pg16")
.withDatabaseName("cameleer3")
.withUsername("cameleer")
.withPassword("test");
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", postgres::getJdbcUrl);
registry.add("spring.datasource.username", postgres::getUsername);
registry.add("spring.datasource.password", postgres::getPassword);
registry.add("spring.datasource.driver-class-name", () -> "org.postgresql.Driver");
registry.add("spring.flyway.enabled", () -> "true");
}
}
- Step 2: Write a smoke test to verify migrations run
Create cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/FlywayMigrationIT.java:
package com.cameleer3.server.app.storage;
import com.cameleer3.server.app.AbstractPostgresIT;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import static org.junit.jupiter.api.Assertions.*;
class FlywayMigrationIT extends AbstractPostgresIT {
@Autowired
JdbcTemplate jdbcTemplate;
@Test
void allMigrationsApplySuccessfully() {
// Verify core tables exist
Integer execCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM executions", Integer.class);
assertEquals(0, execCount);
Integer procCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processor_executions", Integer.class);
assertEquals(0, procCount);
Integer userCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM users", Integer.class);
assertEquals(0, userCount);
// Verify continuous aggregates exist
Integer caggCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM timescaledb_information.continuous_aggregates",
Integer.class);
assertEquals(4, caggCount);
}
}
- Step 3: Verify test passes (this test will not compile until Task 16 deletes ClickHouse code. Run it after Task 16 is complete. Listed here for logical grouping.)
Run: mvn test -pl cameleer3-server-app -Dtest=FlywayMigrationIT -q
Expected: PASS — all migrations apply, tables and continuous aggregates exist
- Step 4: Commit
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/FlywayMigrationIT.java
git commit -m "test: add TimescaleDB test base class and Flyway migration smoke test"
Task 4: Update application.yml for PostgreSQL + OpenSearch
Files:
-
Modify:
cameleer3-server-app/src/main/resources/application.yml -
Modify:
cameleer3-server-app/src/test/resources/application-test.yml -
Step 1: Update application.yml datasource section
Replace:
spring:
datasource:
url: jdbc:ch://localhost:8123/cameleer3
username: cameleer
password: cameleer_dev
driver-class-name: com.clickhouse.jdbc.ClickHouseDriver
With:
spring:
datasource:
url: jdbc:postgresql://localhost:5432/cameleer3
username: cameleer
password: ${CAMELEER_DB_PASSWORD:cameleer_dev}
driver-class-name: org.postgresql.Driver
flyway:
enabled: true
locations: classpath:db/migration
Add OpenSearch config section:
opensearch:
url: ${OPENSEARCH_URL:http://localhost:9200}
queue-size: ${CAMELEER_OPENSEARCH_QUEUE_SIZE:10000}
debounce-ms: ${CAMELEER_OPENSEARCH_DEBOUNCE_MS:2000}
Add body size limit:
cameleer:
body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384}
retention-days: ${CAMELEER_RETENTION_DAYS:30}
Remove the clickhouse: section.
- Step 2: Update application-test.yml
spring:
flyway:
enabled: true
opensearch:
url: http://localhost:9200
- Step 3: Commit
git add cameleer3-server-app/src/main/resources/application.yml
git add cameleer3-server-app/src/test/resources/application-test.yml
git commit -m "config: switch datasource to PostgreSQL, add OpenSearch and Flyway config"
Chunk 2: Core Module Interfaces and Models
Task 5: Create new storage interfaces in core module
Files:
-
Create:
cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java -
Create:
cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/StatsStore.java -
Create:
cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/SearchIndex.java -
Create:
cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramStore.java -
Create:
cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsStore.java -
Step 1: Create ExecutionStore interface
package com.cameleer3.server.core.storage;
import com.cameleer3.server.core.detail.ProcessorNode;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
public interface ExecutionStore {
void upsert(ExecutionRecord execution);
void upsertProcessors(String executionId, Instant startTime,
String groupName, String routeId,
List<ProcessorRecord> processors);
Optional<ExecutionRecord> findById(String executionId);
List<ProcessorRecord> findProcessors(String executionId);
record ExecutionRecord(
String executionId, String routeId, String agentId, String groupName,
String status, String correlationId, String exchangeId,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace, String diagramContentHash
) {}
record ProcessorRecord(
String executionId, String processorId, String processorType,
String diagramNodeId, String groupName, String routeId,
int depth, String parentProcessorId, String status,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace,
String inputBody, String outputBody, String inputHeaders, String outputHeaders
) {}
}
- Step 2: Create StatsStore interface
Supports all 4 aggregation levels: global, per-app, per-route, per-processor.
package com.cameleer3.server.core.storage;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import java.time.Instant;
import java.util.List;
public interface StatsStore {
// Global stats (stats_1m_all)
ExecutionStats stats(Instant from, Instant to);
// Per-app stats (stats_1m_app)
ExecutionStats statsForApp(Instant from, Instant to, String groupName);
// Per-route stats (stats_1m_route), optionally scoped to specific agents
ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds);
// Per-processor stats (stats_1m_processor)
ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType);
// Global timeseries
StatsTimeseries timeseries(Instant from, Instant to, int bucketCount);
// Per-app timeseries
StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName);
// Per-route timeseries, optionally scoped to specific agents
StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
String routeId, List<String> agentIds);
// Per-processor timeseries
StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
String routeId, String processorType);
}
- Step 3: Create SearchIndex interface
package com.cameleer3.server.core.storage;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
public interface SearchIndex {
SearchResult<ExecutionSummary> search(SearchRequest request);
long count(SearchRequest request);
void index(ExecutionDocument document);
void delete(String executionId);
}
- Step 4: Create DiagramStore interface
package com.cameleer3.server.core.storage;
import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.server.core.ingestion.TaggedDiagram;
import java.util.List;
import java.util.Optional;
public interface DiagramStore {
void store(TaggedDiagram diagram);
Optional<RouteGraph> findByContentHash(String contentHash);
Optional<String> findContentHashForRoute(String routeId, String agentId);
Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds);
}
- Step 5: Create MetricsStore interface
package com.cameleer3.server.core.storage;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import java.util.List;
public interface MetricsStore {
void insertBatch(List<MetricsSnapshot> snapshots);
}
- Step 6: Commit
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/
git commit -m "feat: add new storage interfaces for PostgreSQL/OpenSearch backends"
Task 6: Create ExecutionDocument model and indexing event
Files:
-
Create:
cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java -
Create:
cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/ExecutionUpdatedEvent.java -
Step 1: Create ExecutionDocument
package com.cameleer3.server.core.storage.model;
import java.time.Instant;
import java.util.List;
public record ExecutionDocument(
String executionId, String routeId, String agentId, String groupName,
String status, String correlationId, String exchangeId,
Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace,
List<ProcessorDoc> processors
) {
public record ProcessorDoc(
String processorId, String processorType, String status,
String errorMessage, String errorStacktrace,
String inputBody, String outputBody,
String inputHeaders, String outputHeaders
) {}
}
- Step 2: Create ExecutionUpdatedEvent
package com.cameleer3.server.core.indexing;
import java.time.Instant;
public record ExecutionUpdatedEvent(String executionId, Instant startTime) {}
- Step 3: Commit
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/
git commit -m "feat: add ExecutionDocument model and ExecutionUpdatedEvent"
Task 7: Update SearchService to use StatsStore for stats/timeseries
Files:
-
Modify:
cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchService.java -
Step 1: Refactor SearchService to accept SearchIndex + StatsStore
Replace the single SearchEngine dependency with two dependencies:
package com.cameleer3.server.core.search;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.StatsStore;
import java.time.Instant;
import java.util.List;
public class SearchService {
private final SearchIndex searchIndex;
private final StatsStore statsStore;
public SearchService(SearchIndex searchIndex, StatsStore statsStore) {
this.searchIndex = searchIndex;
this.statsStore = statsStore;
}
public SearchResult<ExecutionSummary> search(SearchRequest request) {
return searchIndex.search(request);
}
public long count(SearchRequest request) {
return searchIndex.count(request);
}
public ExecutionStats stats(Instant from, Instant to) {
return statsStore.stats(from, to);
}
public ExecutionStats stats(Instant from, Instant to, String routeId, List<String> agentIds) {
return statsStore.statsForRoute(from, to, routeId, agentIds);
}
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
return statsStore.timeseries(from, to, bucketCount);
}
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount,
String routeId, List<String> agentIds) {
return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, agentIds);
}
}
- Step 2: Commit
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchService.java
git commit -m "refactor: SearchService uses SearchIndex + StatsStore instead of SearchEngine"
Task 8: Update DetailService to use ExecutionStore
Files:
-
Modify:
cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java -
Step 1: Rewrite DetailService to use ExecutionStore
The tree reconstruction from parallel arrays is no longer needed. Processors are now individual records with parentProcessorId for tree structure.
package com.cameleer3.server.core.detail;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import java.util.*;
public class DetailService {
private final ExecutionStore executionStore;
public DetailService(ExecutionStore executionStore) {
this.executionStore = executionStore;
}
public Optional<ExecutionDetail> getDetail(String executionId) {
return executionStore.findById(executionId)
.map(exec -> {
List<ProcessorRecord> processors = executionStore.findProcessors(executionId);
List<ProcessorNode> roots = buildTree(processors);
return new ExecutionDetail(
exec.executionId(), exec.routeId(), exec.agentId(),
exec.status(), exec.startTime(), exec.endTime(),
exec.durationMs() != null ? exec.durationMs() : 0L,
exec.correlationId(), exec.exchangeId(),
exec.errorMessage(), exec.errorStacktrace(),
exec.diagramContentHash(), roots
);
});
}
List<ProcessorNode> buildTree(List<ProcessorRecord> processors) {
if (processors.isEmpty()) return List.of();
Map<String, ProcessorNode> nodeMap = new LinkedHashMap<>();
for (ProcessorRecord p : processors) {
nodeMap.put(p.processorId(), new ProcessorNode(
p.processorId(), p.processorType(), p.status(),
p.startTime(), p.endTime(),
p.durationMs() != null ? p.durationMs() : 0L,
p.diagramNodeId(), p.errorMessage(), p.errorStacktrace()
));
}
List<ProcessorNode> roots = new ArrayList<>();
for (ProcessorRecord p : processors) {
ProcessorNode node = nodeMap.get(p.processorId());
if (p.parentProcessorId() == null) {
roots.add(node);
} else {
ProcessorNode parent = nodeMap.get(p.parentProcessorId());
if (parent != null) {
parent.addChild(node);
} else {
roots.add(node); // orphan safety
}
}
}
return roots;
}
}
- Step 2: Commit
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java
git commit -m "refactor: DetailService uses ExecutionStore, tree built from parentProcessorId"
Task 9: Update IngestionService for synchronous writes
Files:
-
Modify:
cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java -
Step 1: Rewrite IngestionService
Executions and diagrams become synchronous writes. Metrics keep the write buffer. Add event publishing for OpenSearch indexing.
package com.cameleer3.server.core.ingestion;
import com.cameleer3.common.model.ProcessorExecution;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.indexing.ExecutionUpdatedEvent;
import com.cameleer3.server.core.storage.DiagramStore;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
public class IngestionService {
private final ExecutionStore executionStore;
private final DiagramStore diagramStore;
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
private final Consumer<ExecutionUpdatedEvent> eventPublisher;
private final int bodySizeLimit;
public IngestionService(ExecutionStore executionStore,
DiagramStore diagramStore,
WriteBuffer<MetricsSnapshot> metricsBuffer,
Consumer<ExecutionUpdatedEvent> eventPublisher,
int bodySizeLimit) {
this.executionStore = executionStore;
this.diagramStore = diagramStore;
this.metricsBuffer = metricsBuffer;
this.eventPublisher = eventPublisher;
this.bodySizeLimit = bodySizeLimit;
}
public void ingestExecution(String agentId, String groupName, RouteExecution execution) {
ExecutionRecord record = toExecutionRecord(agentId, groupName, execution);
executionStore.upsert(record);
if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) {
List<ProcessorRecord> processors = flattenProcessors(
execution.getProcessors(), record.executionId(),
record.startTime(), groupName, execution.getRouteId(),
null, 0);
executionStore.upsertProcessors(
record.executionId(), record.startTime(),
groupName, execution.getRouteId(), processors);
}
eventPublisher.accept(new ExecutionUpdatedEvent(
record.executionId(), record.startTime()));
}
public void ingestDiagram(TaggedDiagram diagram) {
diagramStore.store(diagram);
}
public boolean acceptMetrics(List<MetricsSnapshot> metrics) {
return metricsBuffer.offerBatch(metrics);
}
public int getMetricsBufferDepth() {
return metricsBuffer.size();
}
public WriteBuffer<MetricsSnapshot> getMetricsBuffer() {
return metricsBuffer;
}
private ExecutionRecord toExecutionRecord(String agentId, String groupName,
RouteExecution exec) {
return new ExecutionRecord(
exec.getExecutionId(), exec.getRouteId(), agentId, groupName,
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
exec.getCorrelationId(), exec.getExchangeId(),
exec.getStartTime(), exec.getEndTime(),
exec.getDurationMs(),
exec.getErrorMessage(), exec.getErrorStacktrace(),
null // diagramContentHash set separately
);
}
private List<ProcessorRecord> flattenProcessors(
List<ProcessorExecution> processors, String executionId,
java.time.Instant execStartTime, String groupName, String routeId,
String parentProcessorId, int depth) {
List<ProcessorRecord> flat = new ArrayList<>();
for (ProcessorExecution p : processors) {
flat.add(new ProcessorRecord(
executionId, p.getProcessorId(), p.getProcessorType(),
p.getDiagramNodeId(), groupName, routeId,
depth, parentProcessorId,
p.getStatus() != null ? p.getStatus().name() : "RUNNING",
p.getStartTime() != null ? p.getStartTime() : execStartTime,
p.getEndTime(),
p.getDurationMs(),
p.getErrorMessage(), p.getErrorStacktrace(),
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
p.getInputHeaders() != null ? p.getInputHeaders().toString() : null,
p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null
));
if (p.getChildren() != null) {
flat.addAll(flattenProcessors(
p.getChildren(), executionId, execStartTime,
groupName, routeId, p.getProcessorId(), depth + 1));
}
}
return flat;
}
private String truncateBody(String body) {
if (body == null) return null;
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
return body;
}
}
- Step 2: Commit
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java
git commit -m "refactor: IngestionService uses synchronous ExecutionStore writes with event publishing"
Chunk 3: PostgreSQL Store Implementations
Task 10: Implement PostgresExecutionStore
Files:
-
Create:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java -
Create:
cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java -
Step 1: Write the failing test
package com.cameleer3.server.app.storage;
import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.*;
class PostgresExecutionStoreIT extends AbstractPostgresIT {
@Autowired
ExecutionStore executionStore;
@Test
void upsertAndFindById() {
Instant now = Instant.now();
ExecutionRecord record = new ExecutionRecord(
"exec-1", "route-a", "agent-1", "app-1",
"COMPLETED", "corr-1", "exchange-1",
now, now.plusMillis(100), 100L,
null, null, null);
executionStore.upsert(record);
Optional<ExecutionRecord> found = executionStore.findById("exec-1");
assertTrue(found.isPresent());
assertEquals("exec-1", found.get().executionId());
assertEquals("COMPLETED", found.get().status());
}
@Test
void upsertDeduplicatesByExecutionId() {
Instant now = Instant.now();
ExecutionRecord first = new ExecutionRecord(
"exec-dup", "route-a", "agent-1", "app-1",
"RUNNING", null, null, now, null, null, null, null, null);
ExecutionRecord second = new ExecutionRecord(
"exec-dup", "route-a", "agent-1", "app-1",
"COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null);
executionStore.upsert(first);
executionStore.upsert(second);
Optional<ExecutionRecord> found = executionStore.findById("exec-dup");
assertTrue(found.isPresent());
assertEquals("COMPLETED", found.get().status());
assertEquals(200L, found.get().durationMs());
}
@Test
void upsertProcessorsAndFind() {
Instant now = Instant.now();
ExecutionRecord exec = new ExecutionRecord(
"exec-proc", "route-a", "agent-1", "app-1",
"COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null);
executionStore.upsert(exec);
List<ProcessorRecord> processors = List.of(
new ProcessorRecord("exec-proc", "proc-1", "log", null,
"app-1", "route-a", 0, null, "COMPLETED",
now, now.plusMillis(10), 10L, null, null,
"input body", "output body", null, null),
new ProcessorRecord("exec-proc", "proc-2", "to", null,
"app-1", "route-a", 1, "proc-1", "COMPLETED",
now.plusMillis(10), now.plusMillis(30), 20L, null, null,
null, null, null, null)
);
executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors);
List<ProcessorRecord> found = executionStore.findProcessors("exec-proc");
assertEquals(2, found.size());
assertEquals("proc-1", found.get(0).processorId());
assertEquals("proc-2", found.get(1).processorId());
assertEquals("proc-1", found.get(1).parentProcessorId());
}
}
- Step 2: Run test to verify it fails
Run: mvn test -pl cameleer3-server-app -Dtest=PostgresExecutionStoreIT -q
Expected: FAIL — ExecutionStore bean not found
- Step 3: Implement PostgresExecutionStore
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.storage.ExecutionStore;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
@Repository
public class PostgresExecutionStore implements ExecutionStore {
private final JdbcTemplate jdbc;
public PostgresExecutionStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public void upsert(ExecutionRecord execution) {
jdbc.update("""
INSERT INTO executions (execution_id, route_id, agent_id, group_name,
status, correlation_id, exchange_id, start_time, end_time,
duration_ms, error_message, error_stacktrace, diagram_content_hash,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())
ON CONFLICT (execution_id, start_time) DO UPDATE SET
status = CASE
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
AND executions.status = 'RUNNING'
THEN EXCLUDED.status
WHEN EXCLUDED.status = executions.status THEN executions.status
ELSE EXCLUDED.status
END,
end_time = COALESCE(EXCLUDED.end_time, executions.end_time),
duration_ms = COALESCE(EXCLUDED.duration_ms, executions.duration_ms),
error_message = COALESCE(EXCLUDED.error_message, executions.error_message),
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace),
diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash),
updated_at = now()
""",
execution.executionId(), execution.routeId(), execution.agentId(),
execution.groupName(), execution.status(), execution.correlationId(),
execution.exchangeId(),
Timestamp.from(execution.startTime()),
execution.endTime() != null ? Timestamp.from(execution.endTime()) : null,
execution.durationMs(), execution.errorMessage(),
execution.errorStacktrace(), execution.diagramContentHash());
}
@Override
public void upsertProcessors(String executionId, Instant startTime,
String groupName, String routeId,
List<ProcessorRecord> processors) {
jdbc.batchUpdate("""
INSERT INTO processor_executions (execution_id, processor_id, processor_type,
diagram_node_id, group_name, route_id, depth, parent_processor_id,
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
input_body, output_body, input_headers, output_headers)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb)
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
status = EXCLUDED.status,
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
duration_ms = COALESCE(EXCLUDED.duration_ms, processor_executions.duration_ms),
error_message = COALESCE(EXCLUDED.error_message, processor_executions.error_message),
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, processor_executions.error_stacktrace),
input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body),
output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body),
input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers),
output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers)
""",
processors.stream().map(p -> new Object[]{
p.executionId(), p.processorId(), p.processorType(),
p.diagramNodeId(), p.groupName(), p.routeId(),
p.depth(), p.parentProcessorId(), p.status(),
Timestamp.from(p.startTime()),
p.endTime() != null ? Timestamp.from(p.endTime()) : null,
p.durationMs(), p.errorMessage(), p.errorStacktrace(),
p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders()
}).toList());
}
@Override
public Optional<ExecutionRecord> findById(String executionId) {
List<ExecutionRecord> results = jdbc.query(
"SELECT * FROM executions WHERE execution_id = ? ORDER BY start_time DESC LIMIT 1",
EXECUTION_MAPPER, executionId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public List<ProcessorRecord> findProcessors(String executionId) {
return jdbc.query(
"SELECT * FROM processor_executions WHERE execution_id = ? ORDER BY depth, start_time",
PROCESSOR_MAPPER, executionId);
}
private static final RowMapper<ExecutionRecord> EXECUTION_MAPPER = (rs, rowNum) ->
new ExecutionRecord(
rs.getString("execution_id"), rs.getString("route_id"),
rs.getString("agent_id"), rs.getString("group_name"),
rs.getString("status"), rs.getString("correlation_id"),
rs.getString("exchange_id"),
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
rs.getString("error_message"), rs.getString("error_stacktrace"),
rs.getString("diagram_content_hash"));
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
new ProcessorRecord(
rs.getString("execution_id"), rs.getString("processor_id"),
rs.getString("processor_type"), rs.getString("diagram_node_id"),
rs.getString("group_name"), rs.getString("route_id"),
rs.getInt("depth"), rs.getString("parent_processor_id"),
rs.getString("status"),
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
rs.getString("error_message"), rs.getString("error_stacktrace"),
rs.getString("input_body"), rs.getString("output_body"),
rs.getString("input_headers"), rs.getString("output_headers"));
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
Timestamp ts = rs.getTimestamp(column);
return ts != null ? ts.toInstant() : null;
}
}
- Step 4: Run tests to verify they pass
Run: mvn test -pl cameleer3-server-app -Dtest=PostgresExecutionStoreIT -q
Expected: PASS — all 3 tests green
- Step 5: Commit
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java
git commit -m "feat: implement PostgresExecutionStore with upsert and dedup"
Task 11: Implement PostgresStatsStore
Files:
-
Create:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java -
Create:
cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java -
Step 1: Write the failing test
package com.cameleer3.server.app.storage;
import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.StatsStore;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import static org.junit.jupiter.api.Assertions.*;
class PostgresStatsStoreIT extends AbstractPostgresIT {
@Autowired StatsStore statsStore;
@Autowired ExecutionStore executionStore;
@Autowired JdbcTemplate jdbc;
@Test
void statsReturnsCountsForTimeWindow() {
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
insertExecution("stats-1", "route-a", "app-1", "COMPLETED", now, 100L);
insertExecution("stats-2", "route-a", "app-1", "FAILED", now.plusSeconds(10), 200L);
insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L);
// Force continuous aggregate refresh
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)");
ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60));
assertEquals(3, stats.totalCount());
assertEquals(1, stats.failedCount());
}
@Test
void timeseriesReturnsBuckets() {
Instant now = Instant.now().truncatedTo(ChronoUnit.MINUTES);
for (int i = 0; i < 10; i++) {
insertExecution("ts-" + i, "route-a", "app-1", "COMPLETED",
now.plusSeconds(i * 30), 100L + i);
}
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)");
StatsTimeseries ts = statsStore.timeseries(now.minusMinutes(1), now.plusMinutes(10), 5);
assertNotNull(ts);
assertFalse(ts.buckets().isEmpty());
}
private void insertExecution(String id, String routeId, String groupName,
String status, Instant startTime, long durationMs) {
executionStore.upsert(new ExecutionRecord(
id, routeId, "agent-1", groupName, status, null, null,
startTime, startTime.plusMillis(durationMs), durationMs,
status.equals("FAILED") ? "error" : null, null, null));
}
}
- Step 2: Run test to verify it fails
Run: mvn test -pl cameleer3-server-app -Dtest=PostgresStatsStoreIT -q
Expected: FAIL — StatsStore bean not found
- Step 3: Implement PostgresStatsStore
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket;
import com.cameleer3.server.core.storage.StatsStore;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
@Repository
public class PostgresStatsStore implements StatsStore {
private final JdbcTemplate jdbc;
public PostgresStatsStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public ExecutionStats stats(Instant from, Instant to) {
return queryStats("stats_1m_all", from, to, List.of());
}
@Override
public ExecutionStats statsForApp(Instant from, Instant to, String groupName) {
return queryStats("stats_1m_app", from, to, List.of(
new Filter("group_name", groupName)));
}
@Override
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds) {
// Note: agentIds is accepted for interface compatibility but not filterable
// on the continuous aggregate (it groups by route_id, not agent_id).
// All agents for the same route contribute to the same aggregate.
return queryStats("stats_1m_route", from, to, List.of(
new Filter("route_id", routeId)));
}
@Override
public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) {
return queryStats("stats_1m_processor", from, to, List.of(
new Filter("route_id", routeId),
new Filter("processor_type", processorType)));
}
@Override
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true);
}
@Override
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName) {
return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
new Filter("group_name", groupName)), true);
}
@Override
public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
String routeId, List<String> agentIds) {
return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of(
new Filter("route_id", routeId)), true);
}
@Override
public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
String routeId, String processorType) {
// stats_1m_processor does NOT have running_count column
return queryTimeseries("stats_1m_processor", from, to, bucketCount, List.of(
new Filter("route_id", routeId),
new Filter("processor_type", processorType)), false);
}
private record Filter(String column, String value) {}
private ExecutionStats queryStats(String view, Instant from, Instant to, List<Filter> filters) {
// running_count only exists on execution-level aggregates, not processor
boolean hasRunning = !view.equals("stats_1m_processor");
String runningCol = hasRunning ? "COALESCE(SUM(running_count), 0)" : "0";
String sql = "SELECT COALESCE(SUM(total_count), 0) AS total_count, " +
"COALESCE(SUM(failed_count), 0) AS failed_count, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
"COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
runningCol + " AS active_count " +
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
List<Object> params = new ArrayList<>();
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
for (Filter f : filters) {
sql += " AND " + f.column() + " = ?";
params.add(f.value());
}
long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0;
var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
rs.getLong("total_count"), rs.getLong("failed_count"),
rs.getLong("avg_duration"), rs.getLong("p99_duration"),
rs.getLong("active_count")
}, params.toArray());
if (!currentResult.isEmpty()) {
long[] r = currentResult.get(0);
totalCount = r[0]; failedCount = r[1]; avgDuration = r[2];
p99Duration = r[3]; activeCount = r[4];
}
// Previous period (shifted back 24h)
Instant prevFrom = from.minus(Duration.ofHours(24));
Instant prevTo = to.minus(Duration.ofHours(24));
List<Object> prevParams = new ArrayList<>();
prevParams.add(Timestamp.from(prevFrom));
prevParams.add(Timestamp.from(prevTo));
for (Filter f : filters) prevParams.add(f.value());
String prevSql = sql; // same shape, different time params
long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0;
var prevResult = jdbc.query(prevSql, (rs, rowNum) -> new long[]{
rs.getLong("total_count"), rs.getLong("failed_count"),
rs.getLong("avg_duration"), rs.getLong("p99_duration")
}, prevParams.toArray());
if (!prevResult.isEmpty()) {
long[] r = prevResult.get(0);
prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3];
}
// Today total (from midnight UTC)
Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
List<Object> todayParams = new ArrayList<>();
todayParams.add(Timestamp.from(todayStart));
todayParams.add(Timestamp.from(Instant.now()));
for (Filter f : filters) todayParams.add(f.value());
String todaySql = sql;
long totalToday = 0;
var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"),
todayParams.toArray());
if (!todayResult.isEmpty()) totalToday = todayResult.get(0);
return new ExecutionStats(
totalCount, failedCount, avgDuration, p99Duration, activeCount,
totalToday, prevTotal, prevFailed, prevAvg, prevP99);
}
private StatsTimeseries queryTimeseries(String view, Instant from, Instant to,
int bucketCount, List<Filter> filters,
boolean hasRunningCount) {
long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
if (intervalSeconds < 60) intervalSeconds = 60;
String runningCol = hasRunningCount ? "COALESCE(SUM(running_count), 0)" : "0";
String sql = "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " +
"COALESCE(SUM(total_count), 0) AS total_count, " +
"COALESCE(SUM(failed_count), 0) AS failed_count, " +
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
"COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
runningCol + " AS active_count " +
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
List<Object> params = new ArrayList<>();
params.add(intervalSeconds);
params.add(Timestamp.from(from));
params.add(Timestamp.from(to));
for (Filter f : filters) {
sql += " AND " + f.column() + " = ?";
params.add(f.value());
}
sql += " GROUP BY period ORDER BY period";
List<TimeseriesBucket> buckets = jdbc.query(sql, (rs, rowNum) ->
new TimeseriesBucket(
rs.getTimestamp("period").toInstant(),
rs.getLong("total_count"), rs.getLong("failed_count"),
rs.getLong("avg_duration"), rs.getLong("p99_duration"),
rs.getLong("active_count")
), params.toArray());
return new StatsTimeseries(buckets);
}
}
- Step 4: Run tests to verify they pass
Run: mvn test -pl cameleer3-server-app -Dtest=PostgresStatsStoreIT -q
Expected: PASS
- Step 5: Commit
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java
git commit -m "feat: implement PostgresStatsStore querying continuous aggregates"
Task 12: Implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore
Files:
-
Create:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java -
Create:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java -
Create:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresOidcConfigRepository.java -
Create:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java -
Step 1: Write failing test for PostgresDiagramStore
Create PostgresDiagramStoreIT.java with tests for:
-
store()+findByContentHash()roundtrip -
Content-hash dedup (store same hash twice, verify single row)
-
findContentHashForRoute()returns latest for route+agent -
findContentHashForRouteByAgents()returns across agent list -
Step 2: Implement PostgresDiagramStore
Straightforward CRUD with ON CONFLICT (content_hash) DO NOTHING. Port the SHA-256 hashing logic from ClickHouseDiagramRepository. Use JdbcTemplate queries.
-
Step 3: Run diagram tests, verify pass
-
Step 4: Implement PostgresUserRepository
Implements UserRepository interface (existing interface in core.security, unchanged).
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.security.UserInfo;
import com.cameleer3.server.core.security.UserRepository;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Array;
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;
@Repository
public class PostgresUserRepository implements UserRepository {
private final JdbcTemplate jdbc;
public PostgresUserRepository(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public Optional<UserInfo> findById(String userId) {
var results = jdbc.query(
"SELECT * FROM users WHERE user_id = ?",
(rs, rowNum) -> mapUser(rs), userId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public List<UserInfo> findAll() {
return jdbc.query("SELECT * FROM users ORDER BY user_id",
(rs, rowNum) -> mapUser(rs));
}
@Override
public void upsert(UserInfo user) {
jdbc.update("""
INSERT INTO users (user_id, provider, email, display_name, roles, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, now(), now())
ON CONFLICT (user_id) DO UPDATE SET
provider = EXCLUDED.provider, email = EXCLUDED.email,
display_name = EXCLUDED.display_name, roles = EXCLUDED.roles,
updated_at = now()
""",
user.userId(), user.provider(), user.email(), user.displayName(),
user.roles().toArray(new String[0]));
}
@Override
public void updateRoles(String userId, List<String> roles) {
jdbc.update("UPDATE users SET roles = ?, updated_at = now() WHERE user_id = ?",
roles.toArray(new String[0]), userId);
}
@Override
public void delete(String userId) {
jdbc.update("DELETE FROM users WHERE user_id = ?", userId);
}
private UserInfo mapUser(java.sql.ResultSet rs) throws java.sql.SQLException {
Array rolesArray = rs.getArray("roles");
String[] roles = rolesArray != null ? (String[]) rolesArray.getArray() : new String[0];
return new UserInfo(
rs.getString("user_id"), rs.getString("provider"),
rs.getString("email"), rs.getString("display_name"),
List.of(roles));
}
}
- Step 5: Implement PostgresOidcConfigRepository
Implements OidcConfigRepository interface (existing interface in core.security).
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.security.OidcConfig;
import com.cameleer3.server.core.security.OidcConfigRepository;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Array;
import java.util.List;
import java.util.Optional;
@Repository
public class PostgresOidcConfigRepository implements OidcConfigRepository {
private final JdbcTemplate jdbc;
public PostgresOidcConfigRepository(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public Optional<OidcConfig> find() {
var results = jdbc.query(
"SELECT * FROM oidc_config WHERE config_id = 'default'",
(rs, rowNum) -> {
Array arr = rs.getArray("default_roles");
String[] roles = arr != null ? (String[]) arr.getArray() : new String[0];
return new OidcConfig(
rs.getBoolean("enabled"), rs.getString("issuer_uri"),
rs.getString("client_id"), rs.getString("client_secret"),
rs.getString("roles_claim"), List.of(roles),
rs.getBoolean("auto_signup"), rs.getString("display_name_claim"));
});
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public void save(OidcConfig config) {
jdbc.update("""
INSERT INTO oidc_config (config_id, enabled, issuer_uri, client_id, client_secret,
roles_claim, default_roles, auto_signup, display_name_claim, updated_at)
VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, now())
ON CONFLICT (config_id) DO UPDATE SET
enabled = EXCLUDED.enabled, issuer_uri = EXCLUDED.issuer_uri,
client_id = EXCLUDED.client_id, client_secret = EXCLUDED.client_secret,
roles_claim = EXCLUDED.roles_claim, default_roles = EXCLUDED.default_roles,
auto_signup = EXCLUDED.auto_signup, display_name_claim = EXCLUDED.display_name_claim,
updated_at = now()
""",
config.enabled(), config.issuerUri(), config.clientId(), config.clientSecret(),
config.rolesClaim(), config.defaultRoles().toArray(new String[0]),
config.autoSignup(), config.displayNameClaim());
}
@Override
public void delete() {
jdbc.update("DELETE FROM oidc_config WHERE config_id = 'default'");
}
}
- Step 6: Implement PostgresMetricsStore
package com.cameleer3.server.app.storage;
import com.cameleer3.server.core.storage.MetricsStore;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Timestamp;
import java.util.List;
@Repository
public class PostgresMetricsStore implements MetricsStore {
private static final ObjectMapper MAPPER = new ObjectMapper();
private final JdbcTemplate jdbc;
public PostgresMetricsStore(JdbcTemplate jdbc) {
this.jdbc = jdbc;
}
@Override
public void insertBatch(List<MetricsSnapshot> snapshots) {
jdbc.batchUpdate("""
INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags,
collected_at, server_received_at)
VALUES (?, ?, ?, ?::jsonb, ?, now())
""",
snapshots.stream().map(s -> new Object[]{
s.agentId(), s.metricName(), s.metricValue(),
tagsToJson(s.tags()),
Timestamp.from(s.collectedAt())
}).toList());
}
private String tagsToJson(java.util.Map<String, String> tags) {
if (tags == null || tags.isEmpty()) return null;
try { return MAPPER.writeValueAsString(tags); }
catch (JsonProcessingException e) { return null; }
}
}
- Step 7: Run all store tests, verify pass
Run: mvn test -pl cameleer3-server-app -Dtest="Postgres*IT" -q
- Step 8: Commit
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/Postgres*.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/Postgres*.java
git commit -m "feat: implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore"
Chunk 4: OpenSearch Integration
Task 13: Implement OpenSearchIndex
Files:
-
Create:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java -
Create:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java -
Create:
cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java -
Step 1: Write failing test
package com.cameleer3.server.app.search;
import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import org.junit.jupiter.api.Test;
import org.opensearch.testcontainers.OpensearchContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.junit.jupiter.Container;
import java.time.Instant;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
// Extends AbstractPostgresIT for PostgreSQL datasource needed by Spring context
class OpenSearchIndexIT extends AbstractPostgresIT {
@Container
static final OpensearchContainer<?> opensearch =
new OpensearchContainer<>("opensearchproject/opensearch:2.19.0")
.withSecurityEnabled(false);
@DynamicPropertySource
static void configureOpenSearch(DynamicPropertyRegistry registry) {
registry.add("opensearch.url", opensearch::getHttpHostAddress);
}
@Autowired
SearchIndex searchIndex;
@Test
void indexAndSearchByText() throws Exception {
Instant now = Instant.now();
ExecutionDocument doc = new ExecutionDocument(
"search-1", "route-a", "agent-1", "app-1",
"FAILED", "corr-1", "exch-1",
now, now.plusMillis(100), 100L,
"OrderNotFoundException: order-12345 not found", null,
List.of(new ProcessorDoc("proc-1", "log", "COMPLETED",
null, null, "request body with customer-99", null, null, null)));
searchIndex.index(doc);
Thread.sleep(1500); // Allow OpenSearch refresh
SearchRequest request = new SearchRequest(
null, now.minusSeconds(60), now.plusSeconds(60),
null, null, null,
"OrderNotFoundException", null, null, null,
null, null, null, null, null,
0, 50, "startTime", "desc");
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertTrue(result.total() > 0);
assertEquals("search-1", result.items().get(0).executionId());
}
@Test
void wildcardSearchFindsSubstring() throws Exception {
Instant now = Instant.now();
ExecutionDocument doc = new ExecutionDocument(
"wild-1", "route-b", "agent-1", "app-1",
"COMPLETED", null, null,
now, now.plusMillis(50), 50L, null, null,
List.of(new ProcessorDoc("proc-1", "bean", "COMPLETED",
null, null, "UniquePayloadIdentifier12345", null, null, null)));
searchIndex.index(doc);
Thread.sleep(1500);
SearchRequest request = new SearchRequest(
null, now.minusSeconds(60), now.plusSeconds(60),
null, null, null,
"PayloadIdentifier", null, null, null,
null, null, null, null, null,
0, 50, "startTime", "desc");
SearchResult<ExecutionSummary> result = searchIndex.search(request);
assertTrue(result.total() > 0);
}
}
- Step 2: Create OpenSearchConfig
package com.cameleer3.server.app.config;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HttpHost;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class OpenSearchConfig {
@Value("${opensearch.url:http://localhost:9200}")
private String opensearchUrl;
@Bean
public OpenSearchClient openSearchClient() {
HttpHost host = HttpHost.create(opensearchUrl);
var transport = ApacheHttpClient5TransportBuilder.builder(host).build();
return new OpenSearchClient(transport);
}
}
- Step 3: Implement OpenSearchIndex
package com.cameleer3.server.app.search;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import jakarta.annotation.PostConstruct;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.SortOrder;
import org.opensearch.client.opensearch._types.query_dsl.*;
import org.opensearch.client.opensearch.core.*;
import org.opensearch.client.opensearch.core.search.Hit;
import org.opensearch.client.opensearch.indices.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@Repository
public class OpenSearchIndex implements SearchIndex {
private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class);
private static final String INDEX_PREFIX = "executions-";
private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd")
.withZone(ZoneOffset.UTC);
private final OpenSearchClient client;
public OpenSearchIndex(OpenSearchClient client) {
this.client = client;
}
@PostConstruct
void ensureIndexTemplate() {
// Full template with ngram analyzer for infix wildcard search.
// The template JSON matches the spec's OpenSearch index template definition.
try {
boolean exists = client.indices().existsIndexTemplate(
ExistsIndexTemplateRequest.of(b -> b.name("executions-template"))).value();
if (!exists) {
client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b
.name("executions-template")
.indexPatterns(List.of("executions-*"))
.template(t -> t
.settings(s -> s
.numberOfShards("3")
.numberOfReplicas("1")
.analysis(a -> a
.analyzer("ngram_analyzer", an -> an
.custom(c -> c
.tokenizer("ngram_tokenizer")
.filter("lowercase")))
.tokenizer("ngram_tokenizer", tk -> tk
.definition(d -> d
.ngram(ng -> ng
.minGram(3)
.maxGram(4)
.tokenChars(TokenChar.Letter,
TokenChar.Digit,
TokenChar.Punctuation,
TokenChar.Symbol)))))))));
log.info("OpenSearch index template created with ngram analyzer");
}
} catch (IOException e) {
log.error("Failed to create index template", e);
}
}
@Override
public void index(ExecutionDocument doc) {
String indexName = INDEX_PREFIX + DAY_FMT.format(doc.startTime());
try {
client.index(IndexRequest.of(b -> b
.index(indexName)
.id(doc.executionId())
.document(toMap(doc))));
} catch (IOException e) {
log.error("Failed to index execution {}", doc.executionId(), e);
}
}
@Override
public SearchResult<ExecutionSummary> search(SearchRequest request) {
try {
var searchReq = buildSearchRequest(request, request.limit());
var response = client.search(searchReq, Map.class);
List<ExecutionSummary> items = response.hits().hits().stream()
.map(this::hitToSummary)
.collect(Collectors.toList());
long total = response.hits().total() != null ? response.hits().total().value() : 0;
return new SearchResult<>(items, total);
} catch (IOException e) {
log.error("Search failed", e);
return new SearchResult<>(List.of(), 0);
}
}
@Override
public long count(SearchRequest request) {
try {
var countReq = CountRequest.of(b -> b
.index(INDEX_PREFIX + "*")
.query(buildQuery(request)));
return client.count(countReq).count();
} catch (IOException e) {
log.error("Count failed", e);
return 0;
}
}
@Override
public void delete(String executionId) {
try {
client.deleteByQuery(DeleteByQueryRequest.of(b -> b
.index(List.of(INDEX_PREFIX + "*"))
.query(Query.of(q -> q.term(t -> t
.field("execution_id").value(executionId))))));
} catch (IOException e) {
log.error("Failed to delete execution {}", executionId, e);
}
}
private org.opensearch.client.opensearch.core.SearchRequest buildSearchRequest(
SearchRequest request, int size) {
return org.opensearch.client.opensearch.core.SearchRequest.of(b -> {
b.index(INDEX_PREFIX + "*")
.query(buildQuery(request))
.size(size)
.from(request.offset())
.sort(s -> s.field(f -> f
.field(request.sortColumn())
.order("asc".equalsIgnoreCase(request.sortDir())
? SortOrder.Asc : SortOrder.Desc)));
return b;
});
}
private Query buildQuery(SearchRequest request) {
List<Query> must = new ArrayList<>();
List<Query> filter = new ArrayList<>();
// Time range
if (request.timeFrom() != null || request.timeTo() != null) {
filter.add(Query.of(q -> q.range(r -> {
r.field("start_time");
if (request.timeFrom() != null)
r.gte(jakarta.json.Json.createValue(request.timeFrom().toString()));
if (request.timeTo() != null)
r.lte(jakarta.json.Json.createValue(request.timeTo().toString()));
return r;
})));
}
// Keyword filters
if (request.status() != null)
filter.add(termQuery("status", request.status()));
if (request.routeId() != null)
filter.add(termQuery("route_id", request.routeId()));
if (request.agentId() != null)
filter.add(termQuery("agent_id", request.agentId()));
if (request.correlationId() != null)
filter.add(termQuery("correlation_id", request.correlationId()));
// Full-text search across all fields + nested processor fields
if (request.text() != null && !request.text().isBlank()) {
String text = request.text();
List<Query> textQueries = new ArrayList<>();
// Search top-level text fields
textQueries.add(Query.of(q -> q.multiMatch(m -> m
.query(text)
.fields("error_message", "error_stacktrace",
"error_message.ngram", "error_stacktrace.ngram"))));
// Search nested processor fields
textQueries.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(text)
.fields("processors.input_body", "processors.output_body",
"processors.input_headers", "processors.output_headers",
"processors.error_message", "processors.error_stacktrace",
"processors.input_body.ngram", "processors.output_body.ngram",
"processors.input_headers.ngram", "processors.output_headers.ngram",
"processors.error_message.ngram", "processors.error_stacktrace.ngram"))))));
// Also try keyword fields for exact matches
textQueries.add(Query.of(q -> q.multiMatch(m -> m
.query(text)
.fields("execution_id", "route_id", "agent_id", "correlation_id", "exchange_id"))));
must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1"))));
}
// Scoped text searches
if (request.textInBody() != null && !request.textInBody().isBlank()) {
must.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(request.textInBody())
.fields("processors.input_body", "processors.output_body",
"processors.input_body.ngram", "processors.output_body.ngram"))))));
}
if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) {
must.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(request.textInHeaders())
.fields("processors.input_headers", "processors.output_headers",
"processors.input_headers.ngram", "processors.output_headers.ngram"))))));
}
if (request.textInErrors() != null && !request.textInErrors().isBlank()) {
String errText = request.textInErrors();
must.add(Query.of(q -> q.bool(b -> b.should(
Query.of(sq -> sq.multiMatch(m -> m
.query(errText)
.fields("error_message", "error_stacktrace",
"error_message.ngram", "error_stacktrace.ngram"))),
Query.of(sq -> sq.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(errText)
.fields("processors.error_message", "processors.error_stacktrace",
"processors.error_message.ngram", "processors.error_stacktrace.ngram")))))
).minimumShouldMatch("1"))));
}
// Duration range
if (request.durationMin() != null || request.durationMax() != null) {
filter.add(Query.of(q -> q.range(r -> {
r.field("duration_ms");
if (request.durationMin() != null)
r.gte(jakarta.json.Json.createValue(request.durationMin()));
if (request.durationMax() != null)
r.lte(jakarta.json.Json.createValue(request.durationMax()));
return r;
})));
}
return Query.of(q -> q.bool(b -> {
if (!must.isEmpty()) b.must(must);
if (!filter.isEmpty()) b.filter(filter);
if (must.isEmpty() && filter.isEmpty()) b.must(Query.of(mq -> mq.matchAll(m -> m)));
return b;
}));
}
private Query termQuery(String field, String value) {
return Query.of(q -> q.term(t -> t.field(field).value(value)));
}
private Map<String, Object> toMap(ExecutionDocument doc) {
Map<String, Object> map = new LinkedHashMap<>();
map.put("execution_id", doc.executionId());
map.put("route_id", doc.routeId());
map.put("agent_id", doc.agentId());
map.put("group_name", doc.groupName());
map.put("status", doc.status());
map.put("correlation_id", doc.correlationId());
map.put("exchange_id", doc.exchangeId());
map.put("start_time", doc.startTime() != null ? doc.startTime().toString() : null);
map.put("end_time", doc.endTime() != null ? doc.endTime().toString() : null);
map.put("duration_ms", doc.durationMs());
map.put("error_message", doc.errorMessage());
map.put("error_stacktrace", doc.errorStacktrace());
if (doc.processors() != null) {
map.put("processors", doc.processors().stream().map(p -> {
Map<String, Object> pm = new LinkedHashMap<>();
pm.put("processor_id", p.processorId());
pm.put("processor_type", p.processorType());
pm.put("status", p.status());
pm.put("error_message", p.errorMessage());
pm.put("error_stacktrace", p.errorStacktrace());
pm.put("input_body", p.inputBody());
pm.put("output_body", p.outputBody());
pm.put("input_headers", p.inputHeaders());
pm.put("output_headers", p.outputHeaders());
return pm;
}).toList());
}
return map;
}
@SuppressWarnings("unchecked")
private ExecutionSummary hitToSummary(Hit<Map> hit) {
Map<String, Object> src = hit.source();
if (src == null) return null;
return new ExecutionSummary(
(String) src.get("execution_id"),
(String) src.get("route_id"),
(String) src.get("agent_id"),
(String) src.get("status"),
src.get("start_time") != null ? Instant.parse((String) src.get("start_time")) : null,
src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null,
src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L,
(String) src.get("correlation_id"),
(String) src.get("error_message"));
}
}
- Step 4: Run tests to verify they pass
Run: mvn test -pl cameleer3-server-app -Dtest=OpenSearchIndexIT -q
Expected: PASS
- Step 5: Commit
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java
git commit -m "feat: implement OpenSearchIndex with full-text and wildcard search"
Task 14: Implement SearchIndexer (debounced event-driven indexer)
Files:
-
Create:
cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java -
Step 1: Implement SearchIndexer
package com.cameleer3.server.core.indexing;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
public class SearchIndexer {
private static final Logger log = LoggerFactory.getLogger(SearchIndexer.class);
private final ExecutionStore executionStore;
private final SearchIndex searchIndex;
private final long debounceMs;
private final int queueCapacity;
private final Map<String, ScheduledFuture<?>> pending = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
r -> { Thread t = new Thread(r, "search-indexer"); t.setDaemon(true); return t; });
public SearchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
long debounceMs, int queueCapacity) {
this.executionStore = executionStore;
this.searchIndex = searchIndex;
this.debounceMs = debounceMs;
this.queueCapacity = queueCapacity;
}
public void onExecutionUpdated(ExecutionUpdatedEvent event) {
if (pending.size() >= queueCapacity) {
log.warn("Search indexer queue full, dropping event for {}", event.executionId());
return;
}
ScheduledFuture<?> existing = pending.put(event.executionId(),
scheduler.schedule(() -> indexExecution(event.executionId()),
debounceMs, TimeUnit.MILLISECONDS));
if (existing != null) {
existing.cancel(false);
}
}
private void indexExecution(String executionId) {
pending.remove(executionId);
try {
ExecutionRecord exec = executionStore.findById(executionId).orElse(null);
if (exec == null) return;
List<ProcessorRecord> processors = executionStore.findProcessors(executionId);
List<ProcessorDoc> processorDocs = processors.stream()
.map(p -> new ProcessorDoc(
p.processorId(), p.processorType(), p.status(),
p.errorMessage(), p.errorStacktrace(),
p.inputBody(), p.outputBody(),
p.inputHeaders(), p.outputHeaders()))
.toList();
searchIndex.index(new ExecutionDocument(
exec.executionId(), exec.routeId(), exec.agentId(), exec.groupName(),
exec.status(), exec.correlationId(), exec.exchangeId(),
exec.startTime(), exec.endTime(), exec.durationMs(),
exec.errorMessage(), exec.errorStacktrace(), processorDocs));
} catch (Exception e) {
log.error("Failed to index execution {}", executionId, e);
}
}
public void shutdown() {
scheduler.shutdown();
}
}
- Step 2: Commit
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java
git commit -m "feat: implement debounced SearchIndexer for async OpenSearch indexing"
Chunk 5: Wiring, Cleanup, and Integration
Task 15: Create bean configuration and wire everything
Files:
-
Create:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java -
Create:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java -
Create:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java -
Modify:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java -
Modify:
cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java -
Step 1: Create StorageBeanConfig
Wire DetailService, SearchIndexer, IngestionService with new store beans:
package com.cameleer3.server.app.config;
import com.cameleer3.server.core.detail.DetailService;
import com.cameleer3.server.core.indexing.SearchIndexer;
import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.*;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class StorageBeanConfig {
@Bean
public DetailService detailService(ExecutionStore executionStore) {
return new DetailService(executionStore);
}
@Bean(destroyMethod = "shutdown")
public SearchIndexer searchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
@Value("${opensearch.debounce-ms:2000}") long debounceMs,
@Value("${opensearch.queue-size:10000}") int queueSize) {
return new SearchIndexer(executionStore, searchIndex, debounceMs, queueSize);
}
@Bean
public IngestionService ingestionService(ExecutionStore executionStore,
DiagramStore diagramStore,
WriteBuffer<MetricsSnapshot> metricsBuffer,
SearchIndexer searchIndexer,
@Value("${cameleer.body-size-limit:16384}") int bodySizeLimit) {
return new IngestionService(executionStore, diagramStore, metricsBuffer,
searchIndexer::onExecutionUpdated, bodySizeLimit);
}
}
- Step 2: Update SearchBeanConfig
Wire SearchService with SearchIndex + StatsStore:
@Bean
public SearchService searchService(SearchIndex searchIndex, StatsStore statsStore) {
return new SearchService(searchIndex, statsStore);
}
- Step 3: Update IngestionBeanConfig
Remove execution and diagram write buffers. Keep only metrics write buffer:
@Bean
public WriteBuffer<MetricsSnapshot> metricsBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
- Step 4: Create MetricsFlushScheduler
Create cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java:
package com.cameleer3.server.app.ingestion;
import com.cameleer3.server.app.config.IngestionConfig;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.MetricsStore;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class MetricsFlushScheduler implements SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(MetricsFlushScheduler.class);
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
private final MetricsStore metricsStore;
private final int batchSize;
private volatile boolean running = false;
public MetricsFlushScheduler(WriteBuffer<MetricsSnapshot> metricsBuffer,
MetricsStore metricsStore,
IngestionConfig config) {
this.metricsBuffer = metricsBuffer;
this.metricsStore = metricsStore;
this.batchSize = config.getBatchSize();
}
@Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
public void flush() {
try {
List<MetricsSnapshot> batch = metricsBuffer.drain(batchSize);
if (!batch.isEmpty()) {
metricsStore.insertBatch(batch);
log.debug("Flushed {} metrics to PostgreSQL", batch.size());
}
} catch (Exception e) {
log.error("Failed to flush metrics", e);
}
}
@Override public void start() { running = true; }
@Override public void stop() {
// Drain remaining on shutdown
while (metricsBuffer.size() > 0) {
List<MetricsSnapshot> batch = metricsBuffer.drain(batchSize);
if (batch.isEmpty()) break;
try { metricsStore.insertBatch(batch); }
catch (Exception e) { log.error("Failed to flush metrics during shutdown", e); break; }
}
running = false;
}
@Override public boolean isRunning() { return running; }
@Override public int getPhase() { return Integer.MAX_VALUE - 1; }
}
- Step 5: Create RetentionScheduler
Create cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java:
package com.cameleer3.server.app.retention;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class RetentionScheduler {
private static final Logger log = LoggerFactory.getLogger(RetentionScheduler.class);
private final JdbcTemplate jdbc;
private final int retentionDays;
public RetentionScheduler(JdbcTemplate jdbc,
@Value("${cameleer.retention-days:30}") int retentionDays) {
this.jdbc = jdbc;
this.retentionDays = retentionDays;
}
@Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM UTC
public void dropExpiredChunks() {
String interval = retentionDays + " days";
try {
// Raw data
jdbc.execute("SELECT drop_chunks('executions', INTERVAL '" + interval + "')");
jdbc.execute("SELECT drop_chunks('processor_executions', INTERVAL '" + interval + "')");
jdbc.execute("SELECT drop_chunks('agent_metrics', INTERVAL '" + interval + "')");
// Continuous aggregates (keep 3x longer)
String caggInterval = (retentionDays * 3) + " days";
jdbc.execute("SELECT drop_chunks('stats_1m_all', INTERVAL '" + caggInterval + "')");
jdbc.execute("SELECT drop_chunks('stats_1m_app', INTERVAL '" + caggInterval + "')");
jdbc.execute("SELECT drop_chunks('stats_1m_route', INTERVAL '" + caggInterval + "')");
jdbc.execute("SELECT drop_chunks('stats_1m_processor', INTERVAL '" + caggInterval + "')");
log.info("Retention: dropped chunks older than {} days (aggregates: {} days)",
retentionDays, retentionDays * 3);
} catch (Exception e) {
log.error("Retention job failed", e);
}
}
// Note: OpenSearch daily index deletion should be handled via ILM policy
// configured at deployment time, not in application code.
}
- Step 6: Commit
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java
git commit -m "feat: wire new storage beans, add MetricsFlushScheduler and RetentionScheduler"
Task 16: Delete ClickHouse code and old interfaces
Files:
-
Delete all files listed in "Files to delete" section above
-
Step 1: Delete ClickHouse implementations
rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouse*.java
rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java
rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java
rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java
- Step 2: Delete old core interfaces replaced by new ones
UserRepository and OidcConfigRepository in core.security are kept — the new Postgres implementations implement them. Only interfaces replaced by new storage interfaces are deleted.
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java
- Step 3: Delete ClickHouse SQL migrations
rm -r cameleer3-server-app/src/main/resources/clickhouse/
- Step 4: Delete old test base class
rm cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java
- Step 5: Fix compilation errors in specific files
Each file and what to change:
-
DiagramRenderController.java— changeDiagramRepositorytoDiagramStore -
DiagramController.java— changeDiagramRepositorytoDiagramStore -
SearchController.java— already usesSearchService, verify no directSearchEnginerefs -
DetailController.java— already usesDetailService, verify no directExecutionRepositoryrefs -
TreeReconstructionTest.java— rewrite to testDetailService.buildTree()withProcessorRecordlist input -
ExecutionController.java— update to callIngestionService.ingestExecution()(synchronous, catches exceptions for 503) -
DiagramController.java— update to callIngestionService.ingestDiagram()(synchronous) -
MetricsController.java— update to callIngestionService.acceptMetrics()(still buffered) -
SearchBeanConfig.java— wireSearchService(SearchIndex, StatsStore)instead ofSearchService(SearchEngine) -
IngestionBeanConfig.java— remove execution/diagram buffer beans, addbodySizeLimittoIngestionServiceconstructor -
All ITs extending
AbstractClickHouseIT— change to extendAbstractPostgresIT -
Step 6: Verify compilation
Run: mvn compile -q
Expected: BUILD SUCCESS
- Step 7: Commit
git add -A
git commit -m "refactor: remove all ClickHouse code, old interfaces, and SQL migrations"
Task 17: Update existing integration tests
Files:
-
Modify: all IT files under
cameleer3-server-app/src/test/ -
Step 1: Update all ITs to extend AbstractPostgresIT
Every IT that currently extends AbstractClickHouseIT must extend AbstractPostgresIT instead. Also add OpenSearch Testcontainer where search tests are needed.
- Step 2: Update ExecutionControllerIT
Adapt to synchronous writes (no buffer flush delay). The controller now returns 202 on success, 503 on DB write failure.
- Step 3: Update SearchControllerIT
Search now hits OpenSearch. Add OpenSearch Testcontainer. Allow time for async indexing (use Awaitility to poll search results rather than Thread.sleep).
- Step 4: Update DetailControllerIT
Detail now reads from PostgresExecutionStore directly. Simpler setup — just insert execution + processors.
- Step 5: Run full test suite
Run: mvn verify -q
Expected: BUILD SUCCESS, all tests pass
- Step 6: Commit
git add -A
git commit -m "test: migrate all integration tests from ClickHouse to PostgreSQL + OpenSearch"
Task 18: Update Dockerfile and K8s manifests
Files:
-
Modify:
Dockerfile -
Modify:
deploy/*.yaml(K8s manifests) -
Step 1: Update Dockerfile
No JDBC driver changes needed in Dockerfile (drivers are in the fat JAR). Just verify the REGISTRY_TOKEN build arg still works for cameleer3-common resolution.
-
Step 2: Update K8s manifests
-
Replace ClickHouse StatefulSet with PostgreSQL/TimescaleDB StatefulSet
-
Add OpenSearch StatefulSet (single-node for dev, clustered for prod)
-
Update server Deployment env vars:
SPRING_DATASOURCE_URL,OPENSEARCH_URL -
Update secrets: replace
clickhouse-credentialswithpostgres-credentials -
Update health check probes: PostgreSQL uses standard port 5432, OpenSearch uses 9200
-
Step 3: Commit
git add Dockerfile deploy/
git commit -m "deploy: replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch in K8s manifests"
Task 19: Update CI workflow
Files:
-
Modify:
.gitea/workflows/ci.yml -
Step 1: Update CI workflow
Changes needed:
-
Docker build: no ClickHouse references in the image (it's a fat JAR, driver is bundled)
-
Deploy step: update K8s secret names from
clickhouse-credentialstopostgres-credentials -
Deploy step: add OpenSearch deployment manifests
-
Verify
REGISTRY_TOKENbuild arg still works forcameleer3-common -
Integration tests still skipped in CI (
-DskipITs) — Testcontainers needs Docker-in-Docker -
Step 2: Commit
git add .gitea/workflows/ci.yml
git commit -m "ci: update workflow for PostgreSQL + OpenSearch deployment"
Task 20: Update OpenAPI spec
- Step 1: Regenerate openapi.json
Run the server locally (or via test) and export the OpenAPI spec. The API surface hasn't changed — only the storage backend.
Run: mvn spring-boot:run (briefly, to generate spec), then fetch /api/v1/api-docs.
- Step 2: Commit
git add cameleer3-server-app/src/main/resources/static/openapi.json
git commit -m "docs: regenerate openapi.json after storage layer refactor"
Task 21: Final verification
- Step 1: Full build
Run: mvn clean verify -q
Expected: BUILD SUCCESS
- Step 2: Manual smoke test
Start server against local PostgreSQL/TimescaleDB + OpenSearch (Docker Compose or individual containers):
- POST an execution → verify 202
- GET
/api/v1/search?text=...→ verify search works - GET
/api/v1/stats→ verify stats return - GET
/api/v1/detail/{id}→ verify detail with processor tree
- Step 3: Commit any remaining fixes
git add -A
git commit -m "fix: final adjustments from smoke testing"