Files
cameleer-server/docs/superpowers/plans/2026-03-16-storage-layer-refactor.md
hsiegeln ccc3f9fd92
All checks were successful
CI / build (push) Successful in 1m25s
CI / docker (push) Successful in 21s
CI / deploy (push) Successful in 32s
Add storage layer refactor spec and implementation plan
Design to replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch.
PostgreSQL as source of truth with continuous aggregates for analytics,
OpenSearch for full-text wildcard search. 21-task implementation plan.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-16 18:05:16 +01:00

108 KiB

Storage Layer Refactor Implementation Plan

For agentic workers: REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch as the storage and search backends.

Architecture: PostgreSQL/TimescaleDB is the source of truth for all data and analytics (continuous aggregates). OpenSearch is an async search index for full-text/wildcard queries. Core module defines interfaces; app module provides implementations.

Tech Stack: Java 17, Spring Boot 3.4.3, PostgreSQL 16 + TimescaleDB, OpenSearch 2.x, Flyway, Testcontainers, OpenSearch Java Client

Spec: docs/superpowers/specs/2026-03-16-storage-layer-design.md


File Structure

New files

Core module (cameleer3-server-core/src/main/java/com/cameleer3/server/core/):

  • storage/ExecutionStore.java — new interface replacing ExecutionRepository
  • storage/StatsStore.java — new interface for stats from continuous aggregates
  • storage/SearchIndex.java — new interface for OpenSearch operations
  • storage/DiagramStore.java — new interface replacing DiagramRepository
  • storage/MetricsStore.java — new interface replacing MetricsRepository
  • storage/model/ExecutionDocument.java — document model for OpenSearch indexing
  • search/StatsRequest.java — request DTO for stats queries (level, scope, time range)
  • search/TimeSeriesRequest.java — request DTO for time-series queries (bucket size)
  • indexing/SearchIndexer.java — debounced event listener for OpenSearch indexing
  • indexing/ExecutionUpdatedEvent.java — event published after execution write

App module (cameleer3-server-app/src/main/java/com/cameleer3/server/app/):

  • storage/PostgresExecutionStore.java — ExecutionStore impl with upsert
  • storage/PostgresStatsStore.java — StatsStore impl querying continuous aggregates
  • storage/PostgresDiagramStore.java — DiagramStore impl
  • storage/PostgresUserRepository.java — UserRepository impl (keeps existing core interface)
  • storage/PostgresOidcConfigRepository.java — OidcConfigRepository impl (keeps existing core interface)
  • storage/PostgresMetricsStore.java — MetricsStore impl
  • search/OpenSearchIndex.java — SearchIndex impl
  • config/OpenSearchConfig.java — OpenSearch client bean
  • config/StorageBeanConfig.java — wires all store beans
  • ingestion/MetricsFlushScheduler.java — scheduled metrics buffer flush (replaces ClickHouseFlushScheduler, metrics only)
  • retention/RetentionScheduler.java — scheduled job for drop_chunks and OpenSearch index deletion

Flyway migrations (cameleer3-server-app/src/main/resources/db/migration/):

  • V1__extensions.sql — CREATE EXTENSION timescaledb, timescaledb_toolkit
  • V2__executions.sql — executions hypertable
  • V3__processor_executions.sql — processor_executions hypertable
  • V4__agent_metrics.sql — agent_metrics hypertable
  • V5__route_diagrams.sql — route_diagrams table
  • V6__users.sql — users table
  • V7__oidc_config.sql — oidc_config table
  • V8__continuous_aggregates.sql — all 4 continuous aggregates + refresh policies

Note: Retention is NOT in a Flyway migration (Flyway migrations are immutable once applied). No V9 file. Retention is handled by RetentionScheduler at runtime with configurable intervals.

Test files (cameleer3-server-app/src/test/java/com/cameleer3/server/app/):

  • AbstractPostgresIT.java — replaces AbstractClickHouseIT (TimescaleDB Testcontainer)
  • storage/PostgresExecutionStoreIT.java — upsert, dedup, chunked arrival tests
  • storage/PostgresStatsStoreIT.java — continuous aggregate query tests
  • storage/PostgresDiagramStoreIT.java — content-hash dedup tests
  • storage/PostgresUserRepositoryIT.java — CRUD tests
  • search/OpenSearchIndexIT.java — index, search, wildcard tests

Files to modify

  • pom.xml (root) — no changes needed
  • cameleer3-server-app/pom.xml — swap clickhouse-jdbc for postgresql + opensearch-java + flyway
  • cameleer3-server-core/.../core/search/SearchService.java — split: search delegates to SearchIndex, stats/timeseries to StatsStore
  • cameleer3-server-core/.../core/detail/DetailService.java — use ExecutionStore instead of ExecutionRepository
  • cameleer3-server-core/.../core/detail/RawExecutionRow.java — remove (replaced by normalized model)
  • cameleer3-server-core/.../core/ingestion/IngestionService.java — synchronous execution/diagram writes, keep buffer for metrics
  • cameleer3-server-app/.../app/config/SearchBeanConfig.java — wire StatsStore into SearchService
  • cameleer3-server-app/.../app/config/IngestionBeanConfig.java — update bean wiring
  • cameleer3-server-app/src/main/resources/application.yml — PostgreSQL + OpenSearch config
  • cameleer3-server-app/src/test/resources/application-test.yml — test config

Files to delete

  • cameleer3-server-app/.../app/storage/ClickHouseExecutionRepository.java
  • cameleer3-server-app/.../app/storage/ClickHouseDiagramRepository.java
  • cameleer3-server-app/.../app/storage/ClickHouseMetricsRepository.java
  • cameleer3-server-app/.../app/storage/ClickHouseUserRepository.java
  • cameleer3-server-app/.../app/storage/ClickHouseOidcConfigRepository.java
  • cameleer3-server-app/.../app/search/ClickHouseSearchEngine.java
  • cameleer3-server-app/.../app/ingestion/ClickHouseFlushScheduler.java
  • cameleer3-server-app/.../app/config/ClickHouseConfig.java
  • cameleer3-server-core/.../core/storage/ExecutionRepository.java
  • cameleer3-server-core/.../core/storage/DiagramRepository.java
  • cameleer3-server-core/.../core/storage/MetricsRepository.java
  • cameleer3-server-core/.../core/search/SearchEngine.java
  • cameleer3-server-core/.../core/detail/RawExecutionRow.java

Note: UserRepository and OidcConfigRepository interfaces in core.security are kept — the new Postgres implementations implement these existing interfaces. No rename needed since their contracts are unchanged.

  • cameleer3-server-app/src/main/resources/clickhouse/*.sql (all 8 files)
  • cameleer3-server-app/src/test/.../app/AbstractClickHouseIT.java

Chunk 1: Dependencies, Flyway Migrations, and Test Infrastructure

Task 1: Update Maven dependencies

Files:

  • Modify: cameleer3-server-app/pom.xml

  • Step 1: Replace ClickHouse JDBC with PostgreSQL driver + Flyway + OpenSearch client

In cameleer3-server-app/pom.xml, replace the ClickHouse dependency and add new ones:

Remove:

<dependency>
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.9.7</version>
    <classifier>all</classifier>
</dependency>

Add:

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
</dependency>
<dependency>
    <groupId>org.flywaydb</groupId>
    <artifactId>flyway-core</artifactId>
</dependency>
<dependency>
    <groupId>org.flywaydb</groupId>
    <artifactId>flyway-database-postgresql</artifactId>
</dependency>
<dependency>
    <groupId>org.opensearch.client</groupId>
    <artifactId>opensearch-java</artifactId>
    <version>2.19.0</version>
</dependency>
<dependency>
    <groupId>org.opensearch.client</groupId>
    <artifactId>opensearch-rest-client</artifactId>
    <version>2.19.0</version>
</dependency>

Replace the ClickHouse Testcontainer:

<!-- Remove testcontainers-clickhouse -->
<!-- Add: -->
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>postgresql</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.opensearch</groupId>
    <artifactId>opensearch-testcontainers</artifactId>
    <version>2.1.1</version>
    <scope>test</scope>
</dependency>

Note: postgresql driver and flyway-core versions are managed by Spring Boot parent POM. Testcontainers BOM version is ${testcontainers.version} (2.0.3) from root POM.

  • Step 2: Commit (compilation will fail until ClickHouse code is deleted in Task 16 — this is expected)
git add cameleer3-server-app/pom.xml
git commit -m "chore: swap ClickHouse deps for PostgreSQL, Flyway, OpenSearch"

Task 2: Write Flyway migrations

Files:

  • Create: cameleer3-server-app/src/main/resources/db/migration/V1__extensions.sql

  • Create: cameleer3-server-app/src/main/resources/db/migration/V2__executions.sql

  • Create: cameleer3-server-app/src/main/resources/db/migration/V3__processor_executions.sql

  • Create: cameleer3-server-app/src/main/resources/db/migration/V4__agent_metrics.sql

  • Create: cameleer3-server-app/src/main/resources/db/migration/V5__route_diagrams.sql

  • Create: cameleer3-server-app/src/main/resources/db/migration/V6__users.sql

  • Create: cameleer3-server-app/src/main/resources/db/migration/V7__oidc_config.sql

  • Create: cameleer3-server-app/src/main/resources/db/migration/V8__continuous_aggregates.sql

  • Create: cameleer3-server-app/src/main/resources/db/migration/V9__retention_policies.sql

  • Step 1: Create V1__extensions.sql

CREATE EXTENSION IF NOT EXISTS timescaledb;
CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit;
  • Step 2: Create V2__executions.sql
CREATE TABLE executions (
    execution_id    TEXT        NOT NULL,
    route_id        TEXT        NOT NULL,
    agent_id        TEXT        NOT NULL,
    group_name      TEXT        NOT NULL,
    status          TEXT        NOT NULL,
    correlation_id  TEXT,
    exchange_id     TEXT,
    start_time      TIMESTAMPTZ NOT NULL,
    end_time        TIMESTAMPTZ,
    duration_ms     BIGINT,
    error_message   TEXT,
    error_stacktrace TEXT,
    diagram_content_hash TEXT,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    PRIMARY KEY (execution_id, start_time)
);

SELECT create_hypertable('executions', 'start_time', chunk_time_interval => INTERVAL '1 day');

CREATE INDEX idx_executions_agent_time ON executions (agent_id, start_time DESC);
CREATE INDEX idx_executions_route_time ON executions (route_id, start_time DESC);
CREATE INDEX idx_executions_group_time ON executions (group_name, start_time DESC);
CREATE INDEX idx_executions_correlation ON executions (correlation_id);
  • Step 3: Create V3__processor_executions.sql
CREATE TABLE processor_executions (
    id                  BIGSERIAL,
    execution_id        TEXT        NOT NULL,
    processor_id        TEXT        NOT NULL,
    processor_type      TEXT        NOT NULL,
    diagram_node_id     TEXT,
    group_name          TEXT        NOT NULL,
    route_id            TEXT        NOT NULL,
    depth               INT         NOT NULL,
    parent_processor_id TEXT,
    status              TEXT        NOT NULL,
    start_time          TIMESTAMPTZ NOT NULL,
    end_time            TIMESTAMPTZ,
    duration_ms         BIGINT,
    error_message       TEXT,
    error_stacktrace    TEXT,
    input_body          TEXT,
    output_body         TEXT,
    input_headers       JSONB,
    output_headers      JSONB,
    created_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
    UNIQUE (execution_id, processor_id, start_time)
);

SELECT create_hypertable('processor_executions', 'start_time', chunk_time_interval => INTERVAL '1 day');

CREATE INDEX idx_proc_exec_execution ON processor_executions (execution_id);
CREATE INDEX idx_proc_exec_type_time ON processor_executions (processor_type, start_time DESC);
  • Step 4: Create V4__agent_metrics.sql
CREATE TABLE agent_metrics (
    agent_id          TEXT             NOT NULL,
    metric_name       TEXT             NOT NULL,
    metric_value      DOUBLE PRECISION NOT NULL,
    tags              JSONB,
    collected_at      TIMESTAMPTZ      NOT NULL,
    server_received_at TIMESTAMPTZ     NOT NULL DEFAULT now()
);

SELECT create_hypertable('agent_metrics', 'collected_at', chunk_time_interval => INTERVAL '1 day');

CREATE INDEX idx_metrics_agent_name ON agent_metrics (agent_id, metric_name, collected_at DESC);
  • Step 5: Create V5__route_diagrams.sql
CREATE TABLE route_diagrams (
    content_hash TEXT        PRIMARY KEY,
    route_id     TEXT        NOT NULL,
    agent_id     TEXT        NOT NULL,
    definition   TEXT        NOT NULL,
    created_at   TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_diagrams_route_agent ON route_diagrams (route_id, agent_id);
  • Step 6: Create V6__users.sql
CREATE TABLE users (
    user_id      TEXT        PRIMARY KEY,
    provider     TEXT        NOT NULL,
    email        TEXT,
    display_name TEXT,
    roles        TEXT[]      NOT NULL DEFAULT '{}',
    created_at   TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at   TIMESTAMPTZ NOT NULL DEFAULT now()
);
  • Step 7: Create V7__oidc_config.sql
CREATE TABLE oidc_config (
    config_id          TEXT        PRIMARY KEY DEFAULT 'default',
    enabled            BOOLEAN     NOT NULL DEFAULT false,
    issuer_uri         TEXT,
    client_id          TEXT,
    client_secret      TEXT,
    roles_claim        TEXT,
    default_roles      TEXT[]      NOT NULL DEFAULT '{}',
    auto_signup        BOOLEAN     DEFAULT false,
    display_name_claim TEXT,
    updated_at         TIMESTAMPTZ NOT NULL DEFAULT now()
);
  • Step 8: Create V8__continuous_aggregates.sql
-- Global stats
CREATE MATERIALIZED VIEW stats_1m_all
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 minute', start_time) AS bucket,
    COUNT(*) AS total_count,
    COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
    COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
    SUM(duration_ms) AS duration_sum,
    MAX(duration_ms) AS duration_max,
    approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket;

SELECT add_continuous_aggregate_policy('stats_1m_all',
    start_offset => INTERVAL '1 hour',
    end_offset   => INTERVAL '1 minute',
    schedule_interval => INTERVAL '1 minute');

-- Per-application stats
CREATE MATERIALIZED VIEW stats_1m_app
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 minute', start_time) AS bucket,
    group_name,
    COUNT(*) AS total_count,
    COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
    COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
    SUM(duration_ms) AS duration_sum,
    MAX(duration_ms) AS duration_max,
    approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket, group_name;

SELECT add_continuous_aggregate_policy('stats_1m_app',
    start_offset => INTERVAL '1 hour',
    end_offset   => INTERVAL '1 minute',
    schedule_interval => INTERVAL '1 minute');

-- Per-route stats
CREATE MATERIALIZED VIEW stats_1m_route
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 minute', start_time) AS bucket,
    group_name,
    route_id,
    COUNT(*) AS total_count,
    COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
    COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
    SUM(duration_ms) AS duration_sum,
    MAX(duration_ms) AS duration_max,
    approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM executions
WHERE status IS NOT NULL
GROUP BY bucket, group_name, route_id;

SELECT add_continuous_aggregate_policy('stats_1m_route',
    start_offset => INTERVAL '1 hour',
    end_offset   => INTERVAL '1 minute',
    schedule_interval => INTERVAL '1 minute');

-- Per-processor stats (uses denormalized group_name/route_id on processor_executions)
CREATE MATERIALIZED VIEW stats_1m_processor
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 minute', start_time) AS bucket,
    group_name,
    route_id,
    processor_type,
    COUNT(*) AS total_count,
    COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
    SUM(duration_ms) AS duration_sum,
    MAX(duration_ms) AS duration_max,
    approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
FROM processor_executions
GROUP BY bucket, group_name, route_id, processor_type;

SELECT add_continuous_aggregate_policy('stats_1m_processor',
    start_offset => INTERVAL '1 hour',
    end_offset   => INTERVAL '1 minute',
    schedule_interval => INTERVAL '1 minute');
  • Step 9: Commit
git add cameleer3-server-app/src/main/resources/db/migration/
git commit -m "feat: add Flyway migrations for PostgreSQL/TimescaleDB schema"

Task 3: Create test base class with TimescaleDB Testcontainer

Files:

  • Create: cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java

  • Step 1: Write AbstractPostgresIT

package com.cameleer3.server.app;

import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
public abstract class AbstractPostgresIT {

    @Container
    static final PostgreSQLContainer<?> postgres =
            new PostgreSQLContainer<>("timescale/timescaledb:latest-pg16")
                    .withDatabaseName("cameleer3")
                    .withUsername("cameleer")
                    .withPassword("test");

    @DynamicPropertySource
    static void configureProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.datasource.url", postgres::getJdbcUrl);
        registry.add("spring.datasource.username", postgres::getUsername);
        registry.add("spring.datasource.password", postgres::getPassword);
        registry.add("spring.datasource.driver-class-name", () -> "org.postgresql.Driver");
        registry.add("spring.flyway.enabled", () -> "true");
    }
}
  • Step 2: Write a smoke test to verify migrations run

Create cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/FlywayMigrationIT.java:

package com.cameleer3.server.app.storage;

import com.cameleer3.server.app.AbstractPostgresIT;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;

import static org.junit.jupiter.api.Assertions.*;

class FlywayMigrationIT extends AbstractPostgresIT {

    @Autowired
    JdbcTemplate jdbcTemplate;

    @Test
    void allMigrationsApplySuccessfully() {
        // Verify core tables exist
        Integer execCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM executions", Integer.class);
        assertEquals(0, execCount);

        Integer procCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM processor_executions", Integer.class);
        assertEquals(0, procCount);

        Integer userCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM users", Integer.class);
        assertEquals(0, userCount);

        // Verify continuous aggregates exist
        Integer caggCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM timescaledb_information.continuous_aggregates",
                Integer.class);
        assertEquals(4, caggCount);
    }
}
  • Step 3: Verify test passes (this test will not compile until Task 16 deletes ClickHouse code. Run it after Task 16 is complete. Listed here for logical grouping.)

Run: mvn test -pl cameleer3-server-app -Dtest=FlywayMigrationIT -q Expected: PASS — all migrations apply, tables and continuous aggregates exist

  • Step 4: Commit
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/FlywayMigrationIT.java
git commit -m "test: add TimescaleDB test base class and Flyway migration smoke test"

Task 4: Update application.yml for PostgreSQL + OpenSearch

Files:

  • Modify: cameleer3-server-app/src/main/resources/application.yml

  • Modify: cameleer3-server-app/src/test/resources/application-test.yml

  • Step 1: Update application.yml datasource section

Replace:

spring:
  datasource:
    url: jdbc:ch://localhost:8123/cameleer3
    username: cameleer
    password: cameleer_dev
    driver-class-name: com.clickhouse.jdbc.ClickHouseDriver

With:

spring:
  datasource:
    url: jdbc:postgresql://localhost:5432/cameleer3
    username: cameleer
    password: ${CAMELEER_DB_PASSWORD:cameleer_dev}
    driver-class-name: org.postgresql.Driver
  flyway:
    enabled: true
    locations: classpath:db/migration

Add OpenSearch config section:

opensearch:
  url: ${OPENSEARCH_URL:http://localhost:9200}
  queue-size: ${CAMELEER_OPENSEARCH_QUEUE_SIZE:10000}
  debounce-ms: ${CAMELEER_OPENSEARCH_DEBOUNCE_MS:2000}

Add body size limit:

cameleer:
  body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384}
  retention-days: ${CAMELEER_RETENTION_DAYS:30}

Remove the clickhouse: section.

  • Step 2: Update application-test.yml
spring:
  flyway:
    enabled: true
opensearch:
  url: http://localhost:9200
  • Step 3: Commit
git add cameleer3-server-app/src/main/resources/application.yml
git add cameleer3-server-app/src/test/resources/application-test.yml
git commit -m "config: switch datasource to PostgreSQL, add OpenSearch and Flyway config"

Chunk 2: Core Module Interfaces and Models

Task 5: Create new storage interfaces in core module

Files:

  • Create: cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionStore.java

  • Create: cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/StatsStore.java

  • Create: cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/SearchIndex.java

  • Create: cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramStore.java

  • Create: cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsStore.java

  • Step 1: Create ExecutionStore interface

package com.cameleer3.server.core.storage;

import com.cameleer3.server.core.detail.ProcessorNode;

import java.time.Instant;
import java.util.List;
import java.util.Optional;

public interface ExecutionStore {

    void upsert(ExecutionRecord execution);

    void upsertProcessors(String executionId, Instant startTime,
                          String groupName, String routeId,
                          List<ProcessorRecord> processors);

    Optional<ExecutionRecord> findById(String executionId);

    List<ProcessorRecord> findProcessors(String executionId);

    record ExecutionRecord(
            String executionId, String routeId, String agentId, String groupName,
            String status, String correlationId, String exchangeId,
            Instant startTime, Instant endTime, Long durationMs,
            String errorMessage, String errorStacktrace, String diagramContentHash
    ) {}

    record ProcessorRecord(
            String executionId, String processorId, String processorType,
            String diagramNodeId, String groupName, String routeId,
            int depth, String parentProcessorId, String status,
            Instant startTime, Instant endTime, Long durationMs,
            String errorMessage, String errorStacktrace,
            String inputBody, String outputBody, String inputHeaders, String outputHeaders
    ) {}
}
  • Step 2: Create StatsStore interface

Supports all 4 aggregation levels: global, per-app, per-route, per-processor.

package com.cameleer3.server.core.storage;

import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;

import java.time.Instant;
import java.util.List;

public interface StatsStore {

    // Global stats (stats_1m_all)
    ExecutionStats stats(Instant from, Instant to);

    // Per-app stats (stats_1m_app)
    ExecutionStats statsForApp(Instant from, Instant to, String groupName);

    // Per-route stats (stats_1m_route), optionally scoped to specific agents
    ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds);

    // Per-processor stats (stats_1m_processor)
    ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType);

    // Global timeseries
    StatsTimeseries timeseries(Instant from, Instant to, int bucketCount);

    // Per-app timeseries
    StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName);

    // Per-route timeseries, optionally scoped to specific agents
    StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
                                        String routeId, List<String> agentIds);

    // Per-processor timeseries
    StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
                                            String routeId, String processorType);
}
  • Step 3: Create SearchIndex interface
package com.cameleer3.server.core.storage;

import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.model.ExecutionDocument;

public interface SearchIndex {

    SearchResult<ExecutionSummary> search(SearchRequest request);

    long count(SearchRequest request);

    void index(ExecutionDocument document);

    void delete(String executionId);
}
  • Step 4: Create DiagramStore interface
package com.cameleer3.server.core.storage;

import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.server.core.ingestion.TaggedDiagram;

import java.util.List;
import java.util.Optional;

public interface DiagramStore {

    void store(TaggedDiagram diagram);

    Optional<RouteGraph> findByContentHash(String contentHash);

    Optional<String> findContentHashForRoute(String routeId, String agentId);

    Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds);
}
  • Step 5: Create MetricsStore interface
package com.cameleer3.server.core.storage;

import com.cameleer3.server.core.storage.model.MetricsSnapshot;

import java.util.List;

public interface MetricsStore {

    void insertBatch(List<MetricsSnapshot> snapshots);
}
  • Step 6: Commit
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/
git commit -m "feat: add new storage interfaces for PostgreSQL/OpenSearch backends"

Task 6: Create ExecutionDocument model and indexing event

Files:

  • Create: cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java

  • Create: cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/ExecutionUpdatedEvent.java

  • Step 1: Create ExecutionDocument

package com.cameleer3.server.core.storage.model;

import java.time.Instant;
import java.util.List;

public record ExecutionDocument(
        String executionId, String routeId, String agentId, String groupName,
        String status, String correlationId, String exchangeId,
        Instant startTime, Instant endTime, Long durationMs,
        String errorMessage, String errorStacktrace,
        List<ProcessorDoc> processors
) {
    public record ProcessorDoc(
            String processorId, String processorType, String status,
            String errorMessage, String errorStacktrace,
            String inputBody, String outputBody,
            String inputHeaders, String outputHeaders
    ) {}
}
  • Step 2: Create ExecutionUpdatedEvent
package com.cameleer3.server.core.indexing;

import java.time.Instant;

public record ExecutionUpdatedEvent(String executionId, Instant startTime) {}
  • Step 3: Commit
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/
git commit -m "feat: add ExecutionDocument model and ExecutionUpdatedEvent"

Task 7: Update SearchService to use StatsStore for stats/timeseries

Files:

  • Modify: cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchService.java

  • Step 1: Refactor SearchService to accept SearchIndex + StatsStore

Replace the single SearchEngine dependency with two dependencies:

package com.cameleer3.server.core.search;

import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.StatsStore;

import java.time.Instant;
import java.util.List;

public class SearchService {

    private final SearchIndex searchIndex;
    private final StatsStore statsStore;

    public SearchService(SearchIndex searchIndex, StatsStore statsStore) {
        this.searchIndex = searchIndex;
        this.statsStore = statsStore;
    }

    public SearchResult<ExecutionSummary> search(SearchRequest request) {
        return searchIndex.search(request);
    }

    public long count(SearchRequest request) {
        return searchIndex.count(request);
    }

    public ExecutionStats stats(Instant from, Instant to) {
        return statsStore.stats(from, to);
    }

    public ExecutionStats stats(Instant from, Instant to, String routeId, List<String> agentIds) {
        return statsStore.statsForRoute(from, to, routeId, agentIds);
    }

    public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
        return statsStore.timeseries(from, to, bucketCount);
    }

    public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount,
                                       String routeId, List<String> agentIds) {
        return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, agentIds);
    }
}
  • Step 2: Commit
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchService.java
git commit -m "refactor: SearchService uses SearchIndex + StatsStore instead of SearchEngine"

Task 8: Update DetailService to use ExecutionStore

Files:

  • Modify: cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java

  • Step 1: Rewrite DetailService to use ExecutionStore

The tree reconstruction from parallel arrays is no longer needed. Processors are now individual records with parentProcessorId for tree structure.

package com.cameleer3.server.core.detail;

import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;

import java.util.*;

public class DetailService {

    private final ExecutionStore executionStore;

    public DetailService(ExecutionStore executionStore) {
        this.executionStore = executionStore;
    }

    public Optional<ExecutionDetail> getDetail(String executionId) {
        return executionStore.findById(executionId)
                .map(exec -> {
                    List<ProcessorRecord> processors = executionStore.findProcessors(executionId);
                    List<ProcessorNode> roots = buildTree(processors);
                    return new ExecutionDetail(
                            exec.executionId(), exec.routeId(), exec.agentId(),
                            exec.status(), exec.startTime(), exec.endTime(),
                            exec.durationMs() != null ? exec.durationMs() : 0L,
                            exec.correlationId(), exec.exchangeId(),
                            exec.errorMessage(), exec.errorStacktrace(),
                            exec.diagramContentHash(), roots
                    );
                });
    }

    List<ProcessorNode> buildTree(List<ProcessorRecord> processors) {
        if (processors.isEmpty()) return List.of();

        Map<String, ProcessorNode> nodeMap = new LinkedHashMap<>();
        for (ProcessorRecord p : processors) {
            nodeMap.put(p.processorId(), new ProcessorNode(
                    p.processorId(), p.processorType(), p.status(),
                    p.startTime(), p.endTime(),
                    p.durationMs() != null ? p.durationMs() : 0L,
                    p.diagramNodeId(), p.errorMessage(), p.errorStacktrace()
            ));
        }

        List<ProcessorNode> roots = new ArrayList<>();
        for (ProcessorRecord p : processors) {
            ProcessorNode node = nodeMap.get(p.processorId());
            if (p.parentProcessorId() == null) {
                roots.add(node);
            } else {
                ProcessorNode parent = nodeMap.get(p.parentProcessorId());
                if (parent != null) {
                    parent.addChild(node);
                } else {
                    roots.add(node); // orphan safety
                }
            }
        }
        return roots;
    }
}
  • Step 2: Commit
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java
git commit -m "refactor: DetailService uses ExecutionStore, tree built from parentProcessorId"

Task 9: Update IngestionService for synchronous writes

Files:

  • Modify: cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java

  • Step 1: Rewrite IngestionService

Executions and diagrams become synchronous writes. Metrics keep the write buffer. Add event publishing for OpenSearch indexing.

package com.cameleer3.server.core.ingestion;

import com.cameleer3.common.model.ProcessorExecution;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.indexing.ExecutionUpdatedEvent;
import com.cameleer3.server.core.storage.DiagramStore;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class IngestionService {

    private final ExecutionStore executionStore;
    private final DiagramStore diagramStore;
    private final WriteBuffer<MetricsSnapshot> metricsBuffer;
    private final Consumer<ExecutionUpdatedEvent> eventPublisher;
    private final int bodySizeLimit;

    public IngestionService(ExecutionStore executionStore,
                            DiagramStore diagramStore,
                            WriteBuffer<MetricsSnapshot> metricsBuffer,
                            Consumer<ExecutionUpdatedEvent> eventPublisher,
                            int bodySizeLimit) {
        this.executionStore = executionStore;
        this.diagramStore = diagramStore;
        this.metricsBuffer = metricsBuffer;
        this.eventPublisher = eventPublisher;
        this.bodySizeLimit = bodySizeLimit;
    }

    public void ingestExecution(String agentId, String groupName, RouteExecution execution) {
        ExecutionRecord record = toExecutionRecord(agentId, groupName, execution);
        executionStore.upsert(record);

        if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) {
            List<ProcessorRecord> processors = flattenProcessors(
                    execution.getProcessors(), record.executionId(),
                    record.startTime(), groupName, execution.getRouteId(),
                    null, 0);
            executionStore.upsertProcessors(
                    record.executionId(), record.startTime(),
                    groupName, execution.getRouteId(), processors);
        }

        eventPublisher.accept(new ExecutionUpdatedEvent(
                record.executionId(), record.startTime()));
    }

    public void ingestDiagram(TaggedDiagram diagram) {
        diagramStore.store(diagram);
    }

    public boolean acceptMetrics(List<MetricsSnapshot> metrics) {
        return metricsBuffer.offerBatch(metrics);
    }

    public int getMetricsBufferDepth() {
        return metricsBuffer.size();
    }

    public WriteBuffer<MetricsSnapshot> getMetricsBuffer() {
        return metricsBuffer;
    }

    private ExecutionRecord toExecutionRecord(String agentId, String groupName,
                                               RouteExecution exec) {
        return new ExecutionRecord(
                exec.getExecutionId(), exec.getRouteId(), agentId, groupName,
                exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
                exec.getCorrelationId(), exec.getExchangeId(),
                exec.getStartTime(), exec.getEndTime(),
                exec.getDurationMs(),
                exec.getErrorMessage(), exec.getErrorStacktrace(),
                null // diagramContentHash set separately
        );
    }

    private List<ProcessorRecord> flattenProcessors(
            List<ProcessorExecution> processors, String executionId,
            java.time.Instant execStartTime, String groupName, String routeId,
            String parentProcessorId, int depth) {
        List<ProcessorRecord> flat = new ArrayList<>();
        for (ProcessorExecution p : processors) {
            flat.add(new ProcessorRecord(
                    executionId, p.getProcessorId(), p.getProcessorType(),
                    p.getDiagramNodeId(), groupName, routeId,
                    depth, parentProcessorId,
                    p.getStatus() != null ? p.getStatus().name() : "RUNNING",
                    p.getStartTime() != null ? p.getStartTime() : execStartTime,
                    p.getEndTime(),
                    p.getDurationMs(),
                    p.getErrorMessage(), p.getErrorStacktrace(),
                    truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
                    p.getInputHeaders() != null ? p.getInputHeaders().toString() : null,
                    p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null
            ));
            if (p.getChildren() != null) {
                flat.addAll(flattenProcessors(
                        p.getChildren(), executionId, execStartTime,
                        groupName, routeId, p.getProcessorId(), depth + 1));
            }
        }
        return flat;
    }

    private String truncateBody(String body) {
        if (body == null) return null;
        if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
        return body;
    }
}
  • Step 2: Commit
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java
git commit -m "refactor: IngestionService uses synchronous ExecutionStore writes with event publishing"

Chunk 3: PostgreSQL Store Implementations

Task 10: Implement PostgresExecutionStore

Files:

  • Create: cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java

  • Create: cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java

  • Step 1: Write the failing test

package com.cameleer3.server.app.storage;

import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

import java.time.Instant;
import java.util.List;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.*;

class PostgresExecutionStoreIT extends AbstractPostgresIT {

    @Autowired
    ExecutionStore executionStore;

    @Test
    void upsertAndFindById() {
        Instant now = Instant.now();
        ExecutionRecord record = new ExecutionRecord(
                "exec-1", "route-a", "agent-1", "app-1",
                "COMPLETED", "corr-1", "exchange-1",
                now, now.plusMillis(100), 100L,
                null, null, null);

        executionStore.upsert(record);
        Optional<ExecutionRecord> found = executionStore.findById("exec-1");

        assertTrue(found.isPresent());
        assertEquals("exec-1", found.get().executionId());
        assertEquals("COMPLETED", found.get().status());
    }

    @Test
    void upsertDeduplicatesByExecutionId() {
        Instant now = Instant.now();
        ExecutionRecord first = new ExecutionRecord(
                "exec-dup", "route-a", "agent-1", "app-1",
                "RUNNING", null, null, now, null, null, null, null, null);
        ExecutionRecord second = new ExecutionRecord(
                "exec-dup", "route-a", "agent-1", "app-1",
                "COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null);

        executionStore.upsert(first);
        executionStore.upsert(second);

        Optional<ExecutionRecord> found = executionStore.findById("exec-dup");
        assertTrue(found.isPresent());
        assertEquals("COMPLETED", found.get().status());
        assertEquals(200L, found.get().durationMs());
    }

    @Test
    void upsertProcessorsAndFind() {
        Instant now = Instant.now();
        ExecutionRecord exec = new ExecutionRecord(
                "exec-proc", "route-a", "agent-1", "app-1",
                "COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null);
        executionStore.upsert(exec);

        List<ProcessorRecord> processors = List.of(
                new ProcessorRecord("exec-proc", "proc-1", "log", null,
                        "app-1", "route-a", 0, null, "COMPLETED",
                        now, now.plusMillis(10), 10L, null, null,
                        "input body", "output body", null, null),
                new ProcessorRecord("exec-proc", "proc-2", "to", null,
                        "app-1", "route-a", 1, "proc-1", "COMPLETED",
                        now.plusMillis(10), now.plusMillis(30), 20L, null, null,
                        null, null, null, null)
        );
        executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors);

        List<ProcessorRecord> found = executionStore.findProcessors("exec-proc");
        assertEquals(2, found.size());
        assertEquals("proc-1", found.get(0).processorId());
        assertEquals("proc-2", found.get(1).processorId());
        assertEquals("proc-1", found.get(1).parentProcessorId());
    }
}
  • Step 2: Run test to verify it fails

Run: mvn test -pl cameleer3-server-app -Dtest=PostgresExecutionStoreIT -q Expected: FAIL — ExecutionStore bean not found

  • Step 3: Implement PostgresExecutionStore
package com.cameleer3.server.app.storage;

import com.cameleer3.server.core.storage.ExecutionStore;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Optional;

@Repository
public class PostgresExecutionStore implements ExecutionStore {

    private final JdbcTemplate jdbc;

    public PostgresExecutionStore(JdbcTemplate jdbc) {
        this.jdbc = jdbc;
    }

    @Override
    public void upsert(ExecutionRecord execution) {
        jdbc.update("""
                INSERT INTO executions (execution_id, route_id, agent_id, group_name,
                    status, correlation_id, exchange_id, start_time, end_time,
                    duration_ms, error_message, error_stacktrace, diagram_content_hash,
                    created_at, updated_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())
                ON CONFLICT (execution_id, start_time) DO UPDATE SET
                    status = CASE
                        WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
                            AND executions.status = 'RUNNING'
                        THEN EXCLUDED.status
                        WHEN EXCLUDED.status = executions.status THEN executions.status
                        ELSE EXCLUDED.status
                    END,
                    end_time = COALESCE(EXCLUDED.end_time, executions.end_time),
                    duration_ms = COALESCE(EXCLUDED.duration_ms, executions.duration_ms),
                    error_message = COALESCE(EXCLUDED.error_message, executions.error_message),
                    error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace),
                    diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash),
                    updated_at = now()
                """,
                execution.executionId(), execution.routeId(), execution.agentId(),
                execution.groupName(), execution.status(), execution.correlationId(),
                execution.exchangeId(),
                Timestamp.from(execution.startTime()),
                execution.endTime() != null ? Timestamp.from(execution.endTime()) : null,
                execution.durationMs(), execution.errorMessage(),
                execution.errorStacktrace(), execution.diagramContentHash());
    }

    @Override
    public void upsertProcessors(String executionId, Instant startTime,
                                  String groupName, String routeId,
                                  List<ProcessorRecord> processors) {
        jdbc.batchUpdate("""
                INSERT INTO processor_executions (execution_id, processor_id, processor_type,
                    diagram_node_id, group_name, route_id, depth, parent_processor_id,
                    status, start_time, end_time, duration_ms, error_message, error_stacktrace,
                    input_body, output_body, input_headers, output_headers)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb)
                ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
                    status = EXCLUDED.status,
                    end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
                    duration_ms = COALESCE(EXCLUDED.duration_ms, processor_executions.duration_ms),
                    error_message = COALESCE(EXCLUDED.error_message, processor_executions.error_message),
                    error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, processor_executions.error_stacktrace),
                    input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body),
                    output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body),
                    input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers),
                    output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers)
                """,
                processors.stream().map(p -> new Object[]{
                        p.executionId(), p.processorId(), p.processorType(),
                        p.diagramNodeId(), p.groupName(), p.routeId(),
                        p.depth(), p.parentProcessorId(), p.status(),
                        Timestamp.from(p.startTime()),
                        p.endTime() != null ? Timestamp.from(p.endTime()) : null,
                        p.durationMs(), p.errorMessage(), p.errorStacktrace(),
                        p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders()
                }).toList());
    }

    @Override
    public Optional<ExecutionRecord> findById(String executionId) {
        List<ExecutionRecord> results = jdbc.query(
                "SELECT * FROM executions WHERE execution_id = ? ORDER BY start_time DESC LIMIT 1",
                EXECUTION_MAPPER, executionId);
        return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
    }

    @Override
    public List<ProcessorRecord> findProcessors(String executionId) {
        return jdbc.query(
                "SELECT * FROM processor_executions WHERE execution_id = ? ORDER BY depth, start_time",
                PROCESSOR_MAPPER, executionId);
    }

    private static final RowMapper<ExecutionRecord> EXECUTION_MAPPER = (rs, rowNum) ->
            new ExecutionRecord(
                    rs.getString("execution_id"), rs.getString("route_id"),
                    rs.getString("agent_id"), rs.getString("group_name"),
                    rs.getString("status"), rs.getString("correlation_id"),
                    rs.getString("exchange_id"),
                    toInstant(rs, "start_time"), toInstant(rs, "end_time"),
                    rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
                    rs.getString("error_message"), rs.getString("error_stacktrace"),
                    rs.getString("diagram_content_hash"));

    private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
            new ProcessorRecord(
                    rs.getString("execution_id"), rs.getString("processor_id"),
                    rs.getString("processor_type"), rs.getString("diagram_node_id"),
                    rs.getString("group_name"), rs.getString("route_id"),
                    rs.getInt("depth"), rs.getString("parent_processor_id"),
                    rs.getString("status"),
                    toInstant(rs, "start_time"), toInstant(rs, "end_time"),
                    rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
                    rs.getString("error_message"), rs.getString("error_stacktrace"),
                    rs.getString("input_body"), rs.getString("output_body"),
                    rs.getString("input_headers"), rs.getString("output_headers"));

    private static Instant toInstant(ResultSet rs, String column) throws SQLException {
        Timestamp ts = rs.getTimestamp(column);
        return ts != null ? ts.toInstant() : null;
    }
}
  • Step 4: Run tests to verify they pass

Run: mvn test -pl cameleer3-server-app -Dtest=PostgresExecutionStoreIT -q Expected: PASS — all 3 tests green

  • Step 5: Commit
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresExecutionStore.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresExecutionStoreIT.java
git commit -m "feat: implement PostgresExecutionStore with upsert and dedup"

Task 11: Implement PostgresStatsStore

Files:

  • Create: cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java

  • Create: cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java

  • Step 1: Write the failing test

package com.cameleer3.server.app.storage;

import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.StatsStore;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;

import java.time.Instant;
import java.time.temporal.ChronoUnit;

import static org.junit.jupiter.api.Assertions.*;

class PostgresStatsStoreIT extends AbstractPostgresIT {

    @Autowired StatsStore statsStore;
    @Autowired ExecutionStore executionStore;
    @Autowired JdbcTemplate jdbc;

    @Test
    void statsReturnsCountsForTimeWindow() {
        Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
        insertExecution("stats-1", "route-a", "app-1", "COMPLETED", now, 100L);
        insertExecution("stats-2", "route-a", "app-1", "FAILED", now.plusSeconds(10), 200L);
        insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L);

        // Force continuous aggregate refresh
        jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)");

        ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60));
        assertEquals(3, stats.totalCount());
        assertEquals(1, stats.failedCount());
    }

    @Test
    void timeseriesReturnsBuckets() {
        Instant now = Instant.now().truncatedTo(ChronoUnit.MINUTES);
        for (int i = 0; i < 10; i++) {
            insertExecution("ts-" + i, "route-a", "app-1", "COMPLETED",
                    now.plusSeconds(i * 30), 100L + i);
        }

        jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)");

        StatsTimeseries ts = statsStore.timeseries(now.minusMinutes(1), now.plusMinutes(10), 5);
        assertNotNull(ts);
        assertFalse(ts.buckets().isEmpty());
    }

    private void insertExecution(String id, String routeId, String groupName,
                                  String status, Instant startTime, long durationMs) {
        executionStore.upsert(new ExecutionRecord(
                id, routeId, "agent-1", groupName, status, null, null,
                startTime, startTime.plusMillis(durationMs), durationMs,
                status.equals("FAILED") ? "error" : null, null, null));
    }
}
  • Step 2: Run test to verify it fails

Run: mvn test -pl cameleer3-server-app -Dtest=PostgresStatsStoreIT -q Expected: FAIL — StatsStore bean not found

  • Step 3: Implement PostgresStatsStore
package com.cameleer3.server.app.storage;

import com.cameleer3.server.core.search.ExecutionStats;
import com.cameleer3.server.core.search.StatsTimeseries;
import com.cameleer3.server.core.search.StatsTimeseries.TimeseriesBucket;
import com.cameleer3.server.core.storage.StatsStore;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;

import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;

@Repository
public class PostgresStatsStore implements StatsStore {

    private final JdbcTemplate jdbc;

    public PostgresStatsStore(JdbcTemplate jdbc) {
        this.jdbc = jdbc;
    }

    @Override
    public ExecutionStats stats(Instant from, Instant to) {
        return queryStats("stats_1m_all", from, to, List.of());
    }

    @Override
    public ExecutionStats statsForApp(Instant from, Instant to, String groupName) {
        return queryStats("stats_1m_app", from, to, List.of(
                new Filter("group_name", groupName)));
    }

    @Override
    public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds) {
        // Note: agentIds is accepted for interface compatibility but not filterable
        // on the continuous aggregate (it groups by route_id, not agent_id).
        // All agents for the same route contribute to the same aggregate.
        return queryStats("stats_1m_route", from, to, List.of(
                new Filter("route_id", routeId)));
    }

    @Override
    public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) {
        return queryStats("stats_1m_processor", from, to, List.of(
                new Filter("route_id", routeId),
                new Filter("processor_type", processorType)));
    }

    @Override
    public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
        return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true);
    }

    @Override
    public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName) {
        return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
                new Filter("group_name", groupName)), true);
    }

    @Override
    public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
                                               String routeId, List<String> agentIds) {
        return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of(
                new Filter("route_id", routeId)), true);
    }

    @Override
    public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
                                                    String routeId, String processorType) {
        // stats_1m_processor does NOT have running_count column
        return queryTimeseries("stats_1m_processor", from, to, bucketCount, List.of(
                new Filter("route_id", routeId),
                new Filter("processor_type", processorType)), false);
    }

    private record Filter(String column, String value) {}

    private ExecutionStats queryStats(String view, Instant from, Instant to, List<Filter> filters) {
        // running_count only exists on execution-level aggregates, not processor
        boolean hasRunning = !view.equals("stats_1m_processor");
        String runningCol = hasRunning ? "COALESCE(SUM(running_count), 0)" : "0";

        String sql = "SELECT COALESCE(SUM(total_count), 0) AS total_count, " +
                "COALESCE(SUM(failed_count), 0) AS failed_count, " +
                "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
                "COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
                runningCol + " AS active_count " +
                "FROM " + view + " WHERE bucket >= ? AND bucket < ?";

        List<Object> params = new ArrayList<>();
        params.add(Timestamp.from(from));
        params.add(Timestamp.from(to));
        for (Filter f : filters) {
            sql += " AND " + f.column() + " = ?";
            params.add(f.value());
        }

        long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0;
        var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
                rs.getLong("total_count"), rs.getLong("failed_count"),
                rs.getLong("avg_duration"), rs.getLong("p99_duration"),
                rs.getLong("active_count")
        }, params.toArray());
        if (!currentResult.isEmpty()) {
            long[] r = currentResult.get(0);
            totalCount = r[0]; failedCount = r[1]; avgDuration = r[2];
            p99Duration = r[3]; activeCount = r[4];
        }

        // Previous period (shifted back 24h)
        Instant prevFrom = from.minus(Duration.ofHours(24));
        Instant prevTo = to.minus(Duration.ofHours(24));
        List<Object> prevParams = new ArrayList<>();
        prevParams.add(Timestamp.from(prevFrom));
        prevParams.add(Timestamp.from(prevTo));
        for (Filter f : filters) prevParams.add(f.value());
        String prevSql = sql; // same shape, different time params

        long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0;
        var prevResult = jdbc.query(prevSql, (rs, rowNum) -> new long[]{
                rs.getLong("total_count"), rs.getLong("failed_count"),
                rs.getLong("avg_duration"), rs.getLong("p99_duration")
        }, prevParams.toArray());
        if (!prevResult.isEmpty()) {
            long[] r = prevResult.get(0);
            prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3];
        }

        // Today total (from midnight UTC)
        Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
        List<Object> todayParams = new ArrayList<>();
        todayParams.add(Timestamp.from(todayStart));
        todayParams.add(Timestamp.from(Instant.now()));
        for (Filter f : filters) todayParams.add(f.value());
        String todaySql = sql;

        long totalToday = 0;
        var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"),
                todayParams.toArray());
        if (!todayResult.isEmpty()) totalToday = todayResult.get(0);

        return new ExecutionStats(
                totalCount, failedCount, avgDuration, p99Duration, activeCount,
                totalToday, prevTotal, prevFailed, prevAvg, prevP99);
    }

    private StatsTimeseries queryTimeseries(String view, Instant from, Instant to,
                                             int bucketCount, List<Filter> filters,
                                             boolean hasRunningCount) {
        long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
        if (intervalSeconds < 60) intervalSeconds = 60;

        String runningCol = hasRunningCount ? "COALESCE(SUM(running_count), 0)" : "0";

        String sql = "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " +
                "COALESCE(SUM(total_count), 0) AS total_count, " +
                "COALESCE(SUM(failed_count), 0) AS failed_count, " +
                "CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
                "COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
                runningCol + " AS active_count " +
                "FROM " + view + " WHERE bucket >= ? AND bucket < ?";

        List<Object> params = new ArrayList<>();
        params.add(intervalSeconds);
        params.add(Timestamp.from(from));
        params.add(Timestamp.from(to));
        for (Filter f : filters) {
            sql += " AND " + f.column() + " = ?";
            params.add(f.value());
        }
        sql += " GROUP BY period ORDER BY period";

        List<TimeseriesBucket> buckets = jdbc.query(sql, (rs, rowNum) ->
                new TimeseriesBucket(
                        rs.getTimestamp("period").toInstant(),
                        rs.getLong("total_count"), rs.getLong("failed_count"),
                        rs.getLong("avg_duration"), rs.getLong("p99_duration"),
                        rs.getLong("active_count")
                ), params.toArray());

        return new StatsTimeseries(buckets);
    }
}
  • Step 4: Run tests to verify they pass

Run: mvn test -pl cameleer3-server-app -Dtest=PostgresStatsStoreIT -q Expected: PASS

  • Step 5: Commit
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/PostgresStatsStoreIT.java
git commit -m "feat: implement PostgresStatsStore querying continuous aggregates"

Task 12: Implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore

Files:

  • Create: cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDiagramStore.java

  • Create: cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresUserRepository.java

  • Create: cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresOidcConfigRepository.java

  • Create: cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java

  • Step 1: Write failing test for PostgresDiagramStore

Create PostgresDiagramStoreIT.java with tests for:

  • store() + findByContentHash() roundtrip

  • Content-hash dedup (store same hash twice, verify single row)

  • findContentHashForRoute() returns latest for route+agent

  • findContentHashForRouteByAgents() returns across agent list

  • Step 2: Implement PostgresDiagramStore

Straightforward CRUD with ON CONFLICT (content_hash) DO NOTHING. Port the SHA-256 hashing logic from ClickHouseDiagramRepository. Use JdbcTemplate queries.

  • Step 3: Run diagram tests, verify pass

  • Step 4: Implement PostgresUserRepository

Implements UserRepository interface (existing interface in core.security, unchanged).

package com.cameleer3.server.app.storage;

import com.cameleer3.server.core.security.UserInfo;
import com.cameleer3.server.core.security.UserRepository;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;

import java.sql.Array;
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;

@Repository
public class PostgresUserRepository implements UserRepository {

    private final JdbcTemplate jdbc;

    public PostgresUserRepository(JdbcTemplate jdbc) {
        this.jdbc = jdbc;
    }

    @Override
    public Optional<UserInfo> findById(String userId) {
        var results = jdbc.query(
                "SELECT * FROM users WHERE user_id = ?",
                (rs, rowNum) -> mapUser(rs), userId);
        return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
    }

    @Override
    public List<UserInfo> findAll() {
        return jdbc.query("SELECT * FROM users ORDER BY user_id",
                (rs, rowNum) -> mapUser(rs));
    }

    @Override
    public void upsert(UserInfo user) {
        jdbc.update("""
                INSERT INTO users (user_id, provider, email, display_name, roles, created_at, updated_at)
                VALUES (?, ?, ?, ?, ?, now(), now())
                ON CONFLICT (user_id) DO UPDATE SET
                    provider = EXCLUDED.provider, email = EXCLUDED.email,
                    display_name = EXCLUDED.display_name, roles = EXCLUDED.roles,
                    updated_at = now()
                """,
                user.userId(), user.provider(), user.email(), user.displayName(),
                user.roles().toArray(new String[0]));
    }

    @Override
    public void updateRoles(String userId, List<String> roles) {
        jdbc.update("UPDATE users SET roles = ?, updated_at = now() WHERE user_id = ?",
                roles.toArray(new String[0]), userId);
    }

    @Override
    public void delete(String userId) {
        jdbc.update("DELETE FROM users WHERE user_id = ?", userId);
    }

    private UserInfo mapUser(java.sql.ResultSet rs) throws java.sql.SQLException {
        Array rolesArray = rs.getArray("roles");
        String[] roles = rolesArray != null ? (String[]) rolesArray.getArray() : new String[0];
        return new UserInfo(
                rs.getString("user_id"), rs.getString("provider"),
                rs.getString("email"), rs.getString("display_name"),
                List.of(roles));
    }
}
  • Step 5: Implement PostgresOidcConfigRepository

Implements OidcConfigRepository interface (existing interface in core.security).

package com.cameleer3.server.app.storage;

import com.cameleer3.server.core.security.OidcConfig;
import com.cameleer3.server.core.security.OidcConfigRepository;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;

import java.sql.Array;
import java.util.List;
import java.util.Optional;

@Repository
public class PostgresOidcConfigRepository implements OidcConfigRepository {

    private final JdbcTemplate jdbc;

    public PostgresOidcConfigRepository(JdbcTemplate jdbc) {
        this.jdbc = jdbc;
    }

    @Override
    public Optional<OidcConfig> find() {
        var results = jdbc.query(
                "SELECT * FROM oidc_config WHERE config_id = 'default'",
                (rs, rowNum) -> {
                    Array arr = rs.getArray("default_roles");
                    String[] roles = arr != null ? (String[]) arr.getArray() : new String[0];
                    return new OidcConfig(
                            rs.getBoolean("enabled"), rs.getString("issuer_uri"),
                            rs.getString("client_id"), rs.getString("client_secret"),
                            rs.getString("roles_claim"), List.of(roles),
                            rs.getBoolean("auto_signup"), rs.getString("display_name_claim"));
                });
        return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
    }

    @Override
    public void save(OidcConfig config) {
        jdbc.update("""
                INSERT INTO oidc_config (config_id, enabled, issuer_uri, client_id, client_secret,
                    roles_claim, default_roles, auto_signup, display_name_claim, updated_at)
                VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, now())
                ON CONFLICT (config_id) DO UPDATE SET
                    enabled = EXCLUDED.enabled, issuer_uri = EXCLUDED.issuer_uri,
                    client_id = EXCLUDED.client_id, client_secret = EXCLUDED.client_secret,
                    roles_claim = EXCLUDED.roles_claim, default_roles = EXCLUDED.default_roles,
                    auto_signup = EXCLUDED.auto_signup, display_name_claim = EXCLUDED.display_name_claim,
                    updated_at = now()
                """,
                config.enabled(), config.issuerUri(), config.clientId(), config.clientSecret(),
                config.rolesClaim(), config.defaultRoles().toArray(new String[0]),
                config.autoSignup(), config.displayNameClaim());
    }

    @Override
    public void delete() {
        jdbc.update("DELETE FROM oidc_config WHERE config_id = 'default'");
    }
}
  • Step 6: Implement PostgresMetricsStore
package com.cameleer3.server.app.storage;

import com.cameleer3.server.core.storage.MetricsStore;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;

import java.sql.Timestamp;
import java.util.List;

@Repository
public class PostgresMetricsStore implements MetricsStore {

    private static final ObjectMapper MAPPER = new ObjectMapper();
    private final JdbcTemplate jdbc;

    public PostgresMetricsStore(JdbcTemplate jdbc) {
        this.jdbc = jdbc;
    }

    @Override
    public void insertBatch(List<MetricsSnapshot> snapshots) {
        jdbc.batchUpdate("""
                INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags,
                    collected_at, server_received_at)
                VALUES (?, ?, ?, ?::jsonb, ?, now())
                """,
                snapshots.stream().map(s -> new Object[]{
                        s.agentId(), s.metricName(), s.metricValue(),
                        tagsToJson(s.tags()),
                        Timestamp.from(s.collectedAt())
                }).toList());
    }

    private String tagsToJson(java.util.Map<String, String> tags) {
        if (tags == null || tags.isEmpty()) return null;
        try { return MAPPER.writeValueAsString(tags); }
        catch (JsonProcessingException e) { return null; }
    }
}
  • Step 7: Run all store tests, verify pass

Run: mvn test -pl cameleer3-server-app -Dtest="Postgres*IT" -q

  • Step 8: Commit
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/Postgres*.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/Postgres*.java
git commit -m "feat: implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore"

Chunk 4: OpenSearch Integration

Task 13: Implement OpenSearchIndex

Files:

  • Create: cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java

  • Create: cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java

  • Create: cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java

  • Step 1: Write failing test

package com.cameleer3.server.app.search;

import com.cameleer3.server.app.AbstractPostgresIT;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import org.junit.jupiter.api.Test;
import org.opensearch.testcontainers.OpensearchContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.junit.jupiter.Container;

import java.time.Instant;
import java.util.List;

import static org.junit.jupiter.api.Assertions.*;

// Extends AbstractPostgresIT for PostgreSQL datasource needed by Spring context
class OpenSearchIndexIT extends AbstractPostgresIT {

    @Container
    static final OpensearchContainer<?> opensearch =
            new OpensearchContainer<>("opensearchproject/opensearch:2.19.0")
                    .withSecurityEnabled(false);

    @DynamicPropertySource
    static void configureOpenSearch(DynamicPropertyRegistry registry) {
        registry.add("opensearch.url", opensearch::getHttpHostAddress);
    }

    @Autowired
    SearchIndex searchIndex;

    @Test
    void indexAndSearchByText() throws Exception {
        Instant now = Instant.now();
        ExecutionDocument doc = new ExecutionDocument(
                "search-1", "route-a", "agent-1", "app-1",
                "FAILED", "corr-1", "exch-1",
                now, now.plusMillis(100), 100L,
                "OrderNotFoundException: order-12345 not found", null,
                List.of(new ProcessorDoc("proc-1", "log", "COMPLETED",
                        null, null, "request body with customer-99", null, null, null)));

        searchIndex.index(doc);
        Thread.sleep(1500); // Allow OpenSearch refresh

        SearchRequest request = new SearchRequest(
                null, now.minusSeconds(60), now.plusSeconds(60),
                null, null, null,
                "OrderNotFoundException", null, null, null,
                null, null, null, null, null,
                0, 50, "startTime", "desc");

        SearchResult<ExecutionSummary> result = searchIndex.search(request);
        assertTrue(result.total() > 0);
        assertEquals("search-1", result.items().get(0).executionId());
    }

    @Test
    void wildcardSearchFindsSubstring() throws Exception {
        Instant now = Instant.now();
        ExecutionDocument doc = new ExecutionDocument(
                "wild-1", "route-b", "agent-1", "app-1",
                "COMPLETED", null, null,
                now, now.plusMillis(50), 50L, null, null,
                List.of(new ProcessorDoc("proc-1", "bean", "COMPLETED",
                        null, null, "UniquePayloadIdentifier12345", null, null, null)));

        searchIndex.index(doc);
        Thread.sleep(1500);

        SearchRequest request = new SearchRequest(
                null, now.minusSeconds(60), now.plusSeconds(60),
                null, null, null,
                "PayloadIdentifier", null, null, null,
                null, null, null, null, null,
                0, 50, "startTime", "desc");

        SearchResult<ExecutionSummary> result = searchIndex.search(request);
        assertTrue(result.total() > 0);
    }
}
  • Step 2: Create OpenSearchConfig
package com.cameleer3.server.app.config;

import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HttpHost;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class OpenSearchConfig {

    @Value("${opensearch.url:http://localhost:9200}")
    private String opensearchUrl;

    @Bean
    public OpenSearchClient openSearchClient() {
        HttpHost host = HttpHost.create(opensearchUrl);
        var transport = ApacheHttpClient5TransportBuilder.builder(host).build();
        return new OpenSearchClient(transport);
    }
}
  • Step 3: Implement OpenSearchIndex
package com.cameleer3.server.app.search;

import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import jakarta.annotation.PostConstruct;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.SortOrder;
import org.opensearch.client.opensearch._types.query_dsl.*;
import org.opensearch.client.opensearch.core.*;
import org.opensearch.client.opensearch.core.search.Hit;
import org.opensearch.client.opensearch.indices.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;

@Repository
public class OpenSearchIndex implements SearchIndex {

    private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class);
    private static final String INDEX_PREFIX = "executions-";
    private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd")
            .withZone(ZoneOffset.UTC);

    private final OpenSearchClient client;

    public OpenSearchIndex(OpenSearchClient client) {
        this.client = client;
    }

    @PostConstruct
    void ensureIndexTemplate() {
        // Full template with ngram analyzer for infix wildcard search.
        // The template JSON matches the spec's OpenSearch index template definition.
        try {
            boolean exists = client.indices().existsIndexTemplate(
                    ExistsIndexTemplateRequest.of(b -> b.name("executions-template"))).value();
            if (!exists) {
                client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b
                        .name("executions-template")
                        .indexPatterns(List.of("executions-*"))
                        .template(t -> t
                                .settings(s -> s
                                        .numberOfShards("3")
                                        .numberOfReplicas("1")
                                        .analysis(a -> a
                                                .analyzer("ngram_analyzer", an -> an
                                                        .custom(c -> c
                                                                .tokenizer("ngram_tokenizer")
                                                                .filter("lowercase")))
                                                .tokenizer("ngram_tokenizer", tk -> tk
                                                        .definition(d -> d
                                                                .ngram(ng -> ng
                                                                        .minGram(3)
                                                                        .maxGram(4)
                                                                        .tokenChars(TokenChar.Letter,
                                                                                TokenChar.Digit,
                                                                                TokenChar.Punctuation,
                                                                                TokenChar.Symbol)))))))));
                log.info("OpenSearch index template created with ngram analyzer");
            }
        } catch (IOException e) {
            log.error("Failed to create index template", e);
        }
    }

    @Override
    public void index(ExecutionDocument doc) {
        String indexName = INDEX_PREFIX + DAY_FMT.format(doc.startTime());
        try {
            client.index(IndexRequest.of(b -> b
                    .index(indexName)
                    .id(doc.executionId())
                    .document(toMap(doc))));
        } catch (IOException e) {
            log.error("Failed to index execution {}", doc.executionId(), e);
        }
    }

    @Override
    public SearchResult<ExecutionSummary> search(SearchRequest request) {
        try {
            var searchReq = buildSearchRequest(request, request.limit());
            var response = client.search(searchReq, Map.class);

            List<ExecutionSummary> items = response.hits().hits().stream()
                    .map(this::hitToSummary)
                    .collect(Collectors.toList());

            long total = response.hits().total() != null ? response.hits().total().value() : 0;
            return new SearchResult<>(items, total);
        } catch (IOException e) {
            log.error("Search failed", e);
            return new SearchResult<>(List.of(), 0);
        }
    }

    @Override
    public long count(SearchRequest request) {
        try {
            var countReq = CountRequest.of(b -> b
                    .index(INDEX_PREFIX + "*")
                    .query(buildQuery(request)));
            return client.count(countReq).count();
        } catch (IOException e) {
            log.error("Count failed", e);
            return 0;
        }
    }

    @Override
    public void delete(String executionId) {
        try {
            client.deleteByQuery(DeleteByQueryRequest.of(b -> b
                    .index(List.of(INDEX_PREFIX + "*"))
                    .query(Query.of(q -> q.term(t -> t
                            .field("execution_id").value(executionId))))));
        } catch (IOException e) {
            log.error("Failed to delete execution {}", executionId, e);
        }
    }

    private org.opensearch.client.opensearch.core.SearchRequest buildSearchRequest(
            SearchRequest request, int size) {
        return org.opensearch.client.opensearch.core.SearchRequest.of(b -> {
            b.index(INDEX_PREFIX + "*")
             .query(buildQuery(request))
             .size(size)
             .from(request.offset())
             .sort(s -> s.field(f -> f
                     .field(request.sortColumn())
                     .order("asc".equalsIgnoreCase(request.sortDir())
                             ? SortOrder.Asc : SortOrder.Desc)));
            return b;
        });
    }

    private Query buildQuery(SearchRequest request) {
        List<Query> must = new ArrayList<>();
        List<Query> filter = new ArrayList<>();

        // Time range
        if (request.timeFrom() != null || request.timeTo() != null) {
            filter.add(Query.of(q -> q.range(r -> {
                r.field("start_time");
                if (request.timeFrom() != null)
                    r.gte(jakarta.json.Json.createValue(request.timeFrom().toString()));
                if (request.timeTo() != null)
                    r.lte(jakarta.json.Json.createValue(request.timeTo().toString()));
                return r;
            })));
        }

        // Keyword filters
        if (request.status() != null)
            filter.add(termQuery("status", request.status()));
        if (request.routeId() != null)
            filter.add(termQuery("route_id", request.routeId()));
        if (request.agentId() != null)
            filter.add(termQuery("agent_id", request.agentId()));
        if (request.correlationId() != null)
            filter.add(termQuery("correlation_id", request.correlationId()));

        // Full-text search across all fields + nested processor fields
        if (request.text() != null && !request.text().isBlank()) {
            String text = request.text();
            List<Query> textQueries = new ArrayList<>();

            // Search top-level text fields
            textQueries.add(Query.of(q -> q.multiMatch(m -> m
                    .query(text)
                    .fields("error_message", "error_stacktrace",
                            "error_message.ngram", "error_stacktrace.ngram"))));

            // Search nested processor fields
            textQueries.add(Query.of(q -> q.nested(n -> n
                    .path("processors")
                    .query(nq -> nq.multiMatch(m -> m
                            .query(text)
                            .fields("processors.input_body", "processors.output_body",
                                    "processors.input_headers", "processors.output_headers",
                                    "processors.error_message", "processors.error_stacktrace",
                                    "processors.input_body.ngram", "processors.output_body.ngram",
                                    "processors.input_headers.ngram", "processors.output_headers.ngram",
                                    "processors.error_message.ngram", "processors.error_stacktrace.ngram"))))));

            // Also try keyword fields for exact matches
            textQueries.add(Query.of(q -> q.multiMatch(m -> m
                    .query(text)
                    .fields("execution_id", "route_id", "agent_id", "correlation_id", "exchange_id"))));

            must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1"))));
        }

        // Scoped text searches
        if (request.textInBody() != null && !request.textInBody().isBlank()) {
            must.add(Query.of(q -> q.nested(n -> n
                    .path("processors")
                    .query(nq -> nq.multiMatch(m -> m
                            .query(request.textInBody())
                            .fields("processors.input_body", "processors.output_body",
                                    "processors.input_body.ngram", "processors.output_body.ngram"))))));
        }
        if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) {
            must.add(Query.of(q -> q.nested(n -> n
                    .path("processors")
                    .query(nq -> nq.multiMatch(m -> m
                            .query(request.textInHeaders())
                            .fields("processors.input_headers", "processors.output_headers",
                                    "processors.input_headers.ngram", "processors.output_headers.ngram"))))));
        }
        if (request.textInErrors() != null && !request.textInErrors().isBlank()) {
            String errText = request.textInErrors();
            must.add(Query.of(q -> q.bool(b -> b.should(
                    Query.of(sq -> sq.multiMatch(m -> m
                            .query(errText)
                            .fields("error_message", "error_stacktrace",
                                    "error_message.ngram", "error_stacktrace.ngram"))),
                    Query.of(sq -> sq.nested(n -> n
                            .path("processors")
                            .query(nq -> nq.multiMatch(m -> m
                                    .query(errText)
                                    .fields("processors.error_message", "processors.error_stacktrace",
                                            "processors.error_message.ngram", "processors.error_stacktrace.ngram")))))
            ).minimumShouldMatch("1"))));
        }

        // Duration range
        if (request.durationMin() != null || request.durationMax() != null) {
            filter.add(Query.of(q -> q.range(r -> {
                r.field("duration_ms");
                if (request.durationMin() != null)
                    r.gte(jakarta.json.Json.createValue(request.durationMin()));
                if (request.durationMax() != null)
                    r.lte(jakarta.json.Json.createValue(request.durationMax()));
                return r;
            })));
        }

        return Query.of(q -> q.bool(b -> {
            if (!must.isEmpty()) b.must(must);
            if (!filter.isEmpty()) b.filter(filter);
            if (must.isEmpty() && filter.isEmpty()) b.must(Query.of(mq -> mq.matchAll(m -> m)));
            return b;
        }));
    }

    private Query termQuery(String field, String value) {
        return Query.of(q -> q.term(t -> t.field(field).value(value)));
    }

    private Map<String, Object> toMap(ExecutionDocument doc) {
        Map<String, Object> map = new LinkedHashMap<>();
        map.put("execution_id", doc.executionId());
        map.put("route_id", doc.routeId());
        map.put("agent_id", doc.agentId());
        map.put("group_name", doc.groupName());
        map.put("status", doc.status());
        map.put("correlation_id", doc.correlationId());
        map.put("exchange_id", doc.exchangeId());
        map.put("start_time", doc.startTime() != null ? doc.startTime().toString() : null);
        map.put("end_time", doc.endTime() != null ? doc.endTime().toString() : null);
        map.put("duration_ms", doc.durationMs());
        map.put("error_message", doc.errorMessage());
        map.put("error_stacktrace", doc.errorStacktrace());
        if (doc.processors() != null) {
            map.put("processors", doc.processors().stream().map(p -> {
                Map<String, Object> pm = new LinkedHashMap<>();
                pm.put("processor_id", p.processorId());
                pm.put("processor_type", p.processorType());
                pm.put("status", p.status());
                pm.put("error_message", p.errorMessage());
                pm.put("error_stacktrace", p.errorStacktrace());
                pm.put("input_body", p.inputBody());
                pm.put("output_body", p.outputBody());
                pm.put("input_headers", p.inputHeaders());
                pm.put("output_headers", p.outputHeaders());
                return pm;
            }).toList());
        }
        return map;
    }

    @SuppressWarnings("unchecked")
    private ExecutionSummary hitToSummary(Hit<Map> hit) {
        Map<String, Object> src = hit.source();
        if (src == null) return null;
        return new ExecutionSummary(
                (String) src.get("execution_id"),
                (String) src.get("route_id"),
                (String) src.get("agent_id"),
                (String) src.get("status"),
                src.get("start_time") != null ? Instant.parse((String) src.get("start_time")) : null,
                src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null,
                src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L,
                (String) src.get("correlation_id"),
                (String) src.get("error_message"));
    }
}
  • Step 4: Run tests to verify they pass

Run: mvn test -pl cameleer3-server-app -Dtest=OpenSearchIndexIT -q Expected: PASS

  • Step 5: Commit
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java
git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java
git commit -m "feat: implement OpenSearchIndex with full-text and wildcard search"

Task 14: Implement SearchIndexer (debounced event-driven indexer)

Files:

  • Create: cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java

  • Step 1: Implement SearchIndexer

package com.cameleer3.server.core.indexing;

import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

public class SearchIndexer {

    private static final Logger log = LoggerFactory.getLogger(SearchIndexer.class);

    private final ExecutionStore executionStore;
    private final SearchIndex searchIndex;
    private final long debounceMs;
    private final int queueCapacity;

    private final Map<String, ScheduledFuture<?>> pending = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
            r -> { Thread t = new Thread(r, "search-indexer"); t.setDaemon(true); return t; });

    public SearchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
                          long debounceMs, int queueCapacity) {
        this.executionStore = executionStore;
        this.searchIndex = searchIndex;
        this.debounceMs = debounceMs;
        this.queueCapacity = queueCapacity;
    }

    public void onExecutionUpdated(ExecutionUpdatedEvent event) {
        if (pending.size() >= queueCapacity) {
            log.warn("Search indexer queue full, dropping event for {}", event.executionId());
            return;
        }

        ScheduledFuture<?> existing = pending.put(event.executionId(),
                scheduler.schedule(() -> indexExecution(event.executionId()),
                        debounceMs, TimeUnit.MILLISECONDS));
        if (existing != null) {
            existing.cancel(false);
        }
    }

    private void indexExecution(String executionId) {
        pending.remove(executionId);
        try {
            ExecutionRecord exec = executionStore.findById(executionId).orElse(null);
            if (exec == null) return;

            List<ProcessorRecord> processors = executionStore.findProcessors(executionId);
            List<ProcessorDoc> processorDocs = processors.stream()
                    .map(p -> new ProcessorDoc(
                            p.processorId(), p.processorType(), p.status(),
                            p.errorMessage(), p.errorStacktrace(),
                            p.inputBody(), p.outputBody(),
                            p.inputHeaders(), p.outputHeaders()))
                    .toList();

            searchIndex.index(new ExecutionDocument(
                    exec.executionId(), exec.routeId(), exec.agentId(), exec.groupName(),
                    exec.status(), exec.correlationId(), exec.exchangeId(),
                    exec.startTime(), exec.endTime(), exec.durationMs(),
                    exec.errorMessage(), exec.errorStacktrace(), processorDocs));
        } catch (Exception e) {
            log.error("Failed to index execution {}", executionId, e);
        }
    }

    public void shutdown() {
        scheduler.shutdown();
    }
}
  • Step 2: Commit
git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java
git commit -m "feat: implement debounced SearchIndexer for async OpenSearch indexing"

Chunk 5: Wiring, Cleanup, and Integration

Task 15: Create bean configuration and wire everything

Files:

  • Create: cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java

  • Create: cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java

  • Create: cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java

  • Modify: cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java

  • Modify: cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java

  • Step 1: Create StorageBeanConfig

Wire DetailService, SearchIndexer, IngestionService with new store beans:

package com.cameleer3.server.app.config;

import com.cameleer3.server.core.detail.DetailService;
import com.cameleer3.server.core.indexing.SearchIndexer;
import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.*;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StorageBeanConfig {

    @Bean
    public DetailService detailService(ExecutionStore executionStore) {
        return new DetailService(executionStore);
    }

    @Bean(destroyMethod = "shutdown")
    public SearchIndexer searchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
                                        @Value("${opensearch.debounce-ms:2000}") long debounceMs,
                                        @Value("${opensearch.queue-size:10000}") int queueSize) {
        return new SearchIndexer(executionStore, searchIndex, debounceMs, queueSize);
    }

    @Bean
    public IngestionService ingestionService(ExecutionStore executionStore,
                                              DiagramStore diagramStore,
                                              WriteBuffer<MetricsSnapshot> metricsBuffer,
                                              SearchIndexer searchIndexer,
                                              @Value("${cameleer.body-size-limit:16384}") int bodySizeLimit) {
        return new IngestionService(executionStore, diagramStore, metricsBuffer,
                searchIndexer::onExecutionUpdated, bodySizeLimit);
    }
}
  • Step 2: Update SearchBeanConfig

Wire SearchService with SearchIndex + StatsStore:

@Bean
public SearchService searchService(SearchIndex searchIndex, StatsStore statsStore) {
    return new SearchService(searchIndex, statsStore);
}
  • Step 3: Update IngestionBeanConfig

Remove execution and diagram write buffers. Keep only metrics write buffer:

@Bean
public WriteBuffer<MetricsSnapshot> metricsBuffer(IngestionConfig config) {
    return new WriteBuffer<>(config.getBufferCapacity());
}
  • Step 4: Create MetricsFlushScheduler

Create cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java:

package com.cameleer3.server.app.ingestion;

import com.cameleer3.server.app.config.IngestionConfig;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.MetricsStore;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class MetricsFlushScheduler implements SmartLifecycle {

    private static final Logger log = LoggerFactory.getLogger(MetricsFlushScheduler.class);

    private final WriteBuffer<MetricsSnapshot> metricsBuffer;
    private final MetricsStore metricsStore;
    private final int batchSize;
    private volatile boolean running = false;

    public MetricsFlushScheduler(WriteBuffer<MetricsSnapshot> metricsBuffer,
                                  MetricsStore metricsStore,
                                  IngestionConfig config) {
        this.metricsBuffer = metricsBuffer;
        this.metricsStore = metricsStore;
        this.batchSize = config.getBatchSize();
    }

    @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
    public void flush() {
        try {
            List<MetricsSnapshot> batch = metricsBuffer.drain(batchSize);
            if (!batch.isEmpty()) {
                metricsStore.insertBatch(batch);
                log.debug("Flushed {} metrics to PostgreSQL", batch.size());
            }
        } catch (Exception e) {
            log.error("Failed to flush metrics", e);
        }
    }

    @Override public void start() { running = true; }
    @Override public void stop() {
        // Drain remaining on shutdown
        while (metricsBuffer.size() > 0) {
            List<MetricsSnapshot> batch = metricsBuffer.drain(batchSize);
            if (batch.isEmpty()) break;
            try { metricsStore.insertBatch(batch); }
            catch (Exception e) { log.error("Failed to flush metrics during shutdown", e); break; }
        }
        running = false;
    }
    @Override public boolean isRunning() { return running; }
    @Override public int getPhase() { return Integer.MAX_VALUE - 1; }
}
  • Step 5: Create RetentionScheduler

Create cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java:

package com.cameleer3.server.app.retention;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class RetentionScheduler {

    private static final Logger log = LoggerFactory.getLogger(RetentionScheduler.class);

    private final JdbcTemplate jdbc;
    private final int retentionDays;

    public RetentionScheduler(JdbcTemplate jdbc,
                               @Value("${cameleer.retention-days:30}") int retentionDays) {
        this.jdbc = jdbc;
        this.retentionDays = retentionDays;
    }

    @Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM UTC
    public void dropExpiredChunks() {
        String interval = retentionDays + " days";
        try {
            // Raw data
            jdbc.execute("SELECT drop_chunks('executions', INTERVAL '" + interval + "')");
            jdbc.execute("SELECT drop_chunks('processor_executions', INTERVAL '" + interval + "')");
            jdbc.execute("SELECT drop_chunks('agent_metrics', INTERVAL '" + interval + "')");

            // Continuous aggregates (keep 3x longer)
            String caggInterval = (retentionDays * 3) + " days";
            jdbc.execute("SELECT drop_chunks('stats_1m_all', INTERVAL '" + caggInterval + "')");
            jdbc.execute("SELECT drop_chunks('stats_1m_app', INTERVAL '" + caggInterval + "')");
            jdbc.execute("SELECT drop_chunks('stats_1m_route', INTERVAL '" + caggInterval + "')");
            jdbc.execute("SELECT drop_chunks('stats_1m_processor', INTERVAL '" + caggInterval + "')");

            log.info("Retention: dropped chunks older than {} days (aggregates: {} days)",
                    retentionDays, retentionDays * 3);
        } catch (Exception e) {
            log.error("Retention job failed", e);
        }
    }
    // Note: OpenSearch daily index deletion should be handled via ILM policy
    // configured at deployment time, not in application code.
}
  • Step 6: Commit
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/SearchBeanConfig.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/MetricsFlushScheduler.java
git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java
git commit -m "feat: wire new storage beans, add MetricsFlushScheduler and RetentionScheduler"

Task 16: Delete ClickHouse code and old interfaces

Files:

  • Delete all files listed in "Files to delete" section above

  • Step 1: Delete ClickHouse implementations

rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouse*.java
rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchEngine.java
rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java
rm cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java
  • Step 2: Delete old core interfaces replaced by new ones

UserRepository and OidcConfigRepository in core.security are kept — the new Postgres implementations implement them. Only interfaces replaced by new storage interfaces are deleted.

rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/ExecutionRepository.java
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/DiagramRepository.java
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsRepository.java
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchEngine.java
rm cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/RawExecutionRow.java
  • Step 3: Delete ClickHouse SQL migrations
rm -r cameleer3-server-app/src/main/resources/clickhouse/
  • Step 4: Delete old test base class
rm cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractClickHouseIT.java
  • Step 5: Fix compilation errors in specific files

Each file and what to change:

  • DiagramRenderController.java — change DiagramRepository to DiagramStore

  • DiagramController.java — change DiagramRepository to DiagramStore

  • SearchController.java — already uses SearchService, verify no direct SearchEngine refs

  • DetailController.java — already uses DetailService, verify no direct ExecutionRepository refs

  • TreeReconstructionTest.java — rewrite to test DetailService.buildTree() with ProcessorRecord list input

  • ExecutionController.java — update to call IngestionService.ingestExecution() (synchronous, catches exceptions for 503)

  • DiagramController.java — update to call IngestionService.ingestDiagram() (synchronous)

  • MetricsController.java — update to call IngestionService.acceptMetrics() (still buffered)

  • SearchBeanConfig.java — wire SearchService(SearchIndex, StatsStore) instead of SearchService(SearchEngine)

  • IngestionBeanConfig.java — remove execution/diagram buffer beans, add bodySizeLimit to IngestionService constructor

  • All ITs extending AbstractClickHouseIT — change to extend AbstractPostgresIT

  • Step 6: Verify compilation

Run: mvn compile -q Expected: BUILD SUCCESS

  • Step 7: Commit
git add -A
git commit -m "refactor: remove all ClickHouse code, old interfaces, and SQL migrations"

Task 17: Update existing integration tests

Files:

  • Modify: all IT files under cameleer3-server-app/src/test/

  • Step 1: Update all ITs to extend AbstractPostgresIT

Every IT that currently extends AbstractClickHouseIT must extend AbstractPostgresIT instead. Also add OpenSearch Testcontainer where search tests are needed.

  • Step 2: Update ExecutionControllerIT

Adapt to synchronous writes (no buffer flush delay). The controller now returns 202 on success, 503 on DB write failure.

  • Step 3: Update SearchControllerIT

Search now hits OpenSearch. Add OpenSearch Testcontainer. Allow time for async indexing (use Awaitility to poll search results rather than Thread.sleep).

  • Step 4: Update DetailControllerIT

Detail now reads from PostgresExecutionStore directly. Simpler setup — just insert execution + processors.

  • Step 5: Run full test suite

Run: mvn verify -q Expected: BUILD SUCCESS, all tests pass

  • Step 6: Commit
git add -A
git commit -m "test: migrate all integration tests from ClickHouse to PostgreSQL + OpenSearch"

Task 18: Update Dockerfile and K8s manifests

Files:

  • Modify: Dockerfile

  • Modify: deploy/*.yaml (K8s manifests)

  • Step 1: Update Dockerfile

No JDBC driver changes needed in Dockerfile (drivers are in the fat JAR). Just verify the REGISTRY_TOKEN build arg still works for cameleer3-common resolution.

  • Step 2: Update K8s manifests

  • Replace ClickHouse StatefulSet with PostgreSQL/TimescaleDB StatefulSet

  • Add OpenSearch StatefulSet (single-node for dev, clustered for prod)

  • Update server Deployment env vars: SPRING_DATASOURCE_URL, OPENSEARCH_URL

  • Update secrets: replace clickhouse-credentials with postgres-credentials

  • Update health check probes: PostgreSQL uses standard port 5432, OpenSearch uses 9200

  • Step 3: Commit

git add Dockerfile deploy/
git commit -m "deploy: replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch in K8s manifests"

Task 19: Update CI workflow

Files:

  • Modify: .gitea/workflows/ci.yml

  • Step 1: Update CI workflow

Changes needed:

  • Docker build: no ClickHouse references in the image (it's a fat JAR, driver is bundled)

  • Deploy step: update K8s secret names from clickhouse-credentials to postgres-credentials

  • Deploy step: add OpenSearch deployment manifests

  • Verify REGISTRY_TOKEN build arg still works for cameleer3-common

  • Integration tests still skipped in CI (-DskipITs) — Testcontainers needs Docker-in-Docker

  • Step 2: Commit

git add .gitea/workflows/ci.yml
git commit -m "ci: update workflow for PostgreSQL + OpenSearch deployment"

Task 20: Update OpenAPI spec

  • Step 1: Regenerate openapi.json

Run the server locally (or via test) and export the OpenAPI spec. The API surface hasn't changed — only the storage backend.

Run: mvn spring-boot:run (briefly, to generate spec), then fetch /api/v1/api-docs.

  • Step 2: Commit
git add cameleer3-server-app/src/main/resources/static/openapi.json
git commit -m "docs: regenerate openapi.json after storage layer refactor"

Task 21: Final verification

  • Step 1: Full build

Run: mvn clean verify -q Expected: BUILD SUCCESS

  • Step 2: Manual smoke test

Start server against local PostgreSQL/TimescaleDB + OpenSearch (Docker Compose or individual containers):

  1. POST an execution → verify 202
  2. GET /api/v1/search?text=... → verify search works
  3. GET /api/v1/stats → verify stats return
  4. GET /api/v1/detail/{id} → verify detail with processor tree
  • Step 3: Commit any remaining fixes
git add -A
git commit -m "fix: final adjustments from smoke testing"