Rename Java packages from com.cameleer3 to com.cameleer, module directories from cameleer3-* to cameleer-*, and all references throughout workflows, Dockerfiles, docs, migrations, and pom.xml. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2766 lines
108 KiB
Markdown
2766 lines
108 KiB
Markdown
# Storage Layer Refactor Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch as the storage and search backends.
|
|
|
|
**Architecture:** PostgreSQL/TimescaleDB is the source of truth for all data and analytics (continuous aggregates). OpenSearch is an async search index for full-text/wildcard queries. Core module defines interfaces; app module provides implementations.
|
|
|
|
**Tech Stack:** Java 17, Spring Boot 3.4.3, PostgreSQL 16 + TimescaleDB, OpenSearch 2.x, Flyway, Testcontainers, OpenSearch Java Client
|
|
|
|
**Spec:** `docs/superpowers/specs/2026-03-16-storage-layer-design.md`
|
|
|
|
---
|
|
|
|
## File Structure
|
|
|
|
### New files
|
|
|
|
**Core module** (`cameleer-server-core/src/main/java/com/cameleer/server/core/`):
|
|
- `storage/ExecutionStore.java` — new interface replacing ExecutionRepository
|
|
- `storage/StatsStore.java` — new interface for stats from continuous aggregates
|
|
- `storage/SearchIndex.java` — new interface for OpenSearch operations
|
|
- `storage/DiagramStore.java` — new interface replacing DiagramRepository
|
|
- `storage/MetricsStore.java` — new interface replacing MetricsRepository
|
|
- `storage/model/ExecutionDocument.java` — document model for OpenSearch indexing
|
|
- `search/StatsRequest.java` — request DTO for stats queries (level, scope, time range)
|
|
- `search/TimeSeriesRequest.java` — request DTO for time-series queries (bucket size)
|
|
- `indexing/SearchIndexer.java` — debounced event listener for OpenSearch indexing
|
|
- `indexing/ExecutionUpdatedEvent.java` — event published after execution write
|
|
|
|
**App module** (`cameleer-server-app/src/main/java/com/cameleer/server/app/`):
|
|
- `storage/PostgresExecutionStore.java` — ExecutionStore impl with upsert
|
|
- `storage/PostgresStatsStore.java` — StatsStore impl querying continuous aggregates
|
|
- `storage/PostgresDiagramStore.java` — DiagramStore impl
|
|
- `storage/PostgresUserRepository.java` — UserRepository impl (keeps existing core interface)
|
|
- `storage/PostgresOidcConfigRepository.java` — OidcConfigRepository impl (keeps existing core interface)
|
|
- `storage/PostgresMetricsStore.java` — MetricsStore impl
|
|
- `search/OpenSearchIndex.java` — SearchIndex impl
|
|
- `config/OpenSearchConfig.java` — OpenSearch client bean
|
|
- `config/StorageBeanConfig.java` — wires all store beans
|
|
- `ingestion/MetricsFlushScheduler.java` — scheduled metrics buffer flush (replaces ClickHouseFlushScheduler, metrics only)
|
|
- `retention/RetentionScheduler.java` — scheduled job for drop_chunks and OpenSearch index deletion
|
|
|
|
**Flyway migrations** (`cameleer-server-app/src/main/resources/db/migration/`):
|
|
- `V1__extensions.sql` — CREATE EXTENSION timescaledb, timescaledb_toolkit
|
|
- `V2__executions.sql` — executions hypertable
|
|
- `V3__processor_executions.sql` — processor_executions hypertable
|
|
- `V4__agent_metrics.sql` — agent_metrics hypertable
|
|
- `V5__route_diagrams.sql` — route_diagrams table
|
|
- `V6__users.sql` — users table
|
|
- `V7__oidc_config.sql` — oidc_config table
|
|
- `V8__continuous_aggregates.sql` — all 4 continuous aggregates + refresh policies
|
|
|
|
Note: Retention is NOT in a Flyway migration (Flyway migrations are immutable once applied). No V9 file. Retention is handled by `RetentionScheduler` at runtime with configurable intervals.
|
|
|
|
**Test files** (`cameleer-server-app/src/test/java/com/cameleer/server/app/`):
|
|
- `AbstractPostgresIT.java` — replaces AbstractClickHouseIT (TimescaleDB Testcontainer)
|
|
- `storage/PostgresExecutionStoreIT.java` — upsert, dedup, chunked arrival tests
|
|
- `storage/PostgresStatsStoreIT.java` — continuous aggregate query tests
|
|
- `storage/PostgresDiagramStoreIT.java` — content-hash dedup tests
|
|
- `storage/PostgresUserRepositoryIT.java` — CRUD tests
|
|
- `search/OpenSearchIndexIT.java` — index, search, wildcard tests
|
|
|
|
### Files to modify
|
|
|
|
- `pom.xml` (root) — no changes needed
|
|
- `cameleer-server-app/pom.xml` — swap clickhouse-jdbc for postgresql + opensearch-java + flyway
|
|
- `cameleer-server-core/.../core/search/SearchService.java` — split: search delegates to SearchIndex, stats/timeseries to StatsStore
|
|
- `cameleer-server-core/.../core/detail/DetailService.java` — use ExecutionStore instead of ExecutionRepository
|
|
- `cameleer-server-core/.../core/detail/RawExecutionRow.java` — remove (replaced by normalized model)
|
|
- `cameleer-server-core/.../core/ingestion/IngestionService.java` — synchronous execution/diagram writes, keep buffer for metrics
|
|
- `cameleer-server-app/.../app/config/SearchBeanConfig.java` — wire StatsStore into SearchService
|
|
- `cameleer-server-app/.../app/config/IngestionBeanConfig.java` — update bean wiring
|
|
- `cameleer-server-app/src/main/resources/application.yml` — PostgreSQL + OpenSearch config
|
|
- `cameleer-server-app/src/test/resources/application-test.yml` — test config
|
|
|
|
### Files to delete
|
|
|
|
- `cameleer-server-app/.../app/storage/ClickHouseExecutionRepository.java`
|
|
- `cameleer-server-app/.../app/storage/ClickHouseDiagramRepository.java`
|
|
- `cameleer-server-app/.../app/storage/ClickHouseMetricsRepository.java`
|
|
- `cameleer-server-app/.../app/storage/ClickHouseUserRepository.java`
|
|
- `cameleer-server-app/.../app/storage/ClickHouseOidcConfigRepository.java`
|
|
- `cameleer-server-app/.../app/search/ClickHouseSearchEngine.java`
|
|
- `cameleer-server-app/.../app/ingestion/ClickHouseFlushScheduler.java`
|
|
- `cameleer-server-app/.../app/config/ClickHouseConfig.java`
|
|
- `cameleer-server-core/.../core/storage/ExecutionRepository.java`
|
|
- `cameleer-server-core/.../core/storage/DiagramRepository.java`
|
|
- `cameleer-server-core/.../core/storage/MetricsRepository.java`
|
|
- `cameleer-server-core/.../core/search/SearchEngine.java`
|
|
- `cameleer-server-core/.../core/detail/RawExecutionRow.java`
|
|
|
|
Note: `UserRepository` and `OidcConfigRepository` interfaces in `core.security` are **kept** — the new Postgres implementations implement these existing interfaces. No rename needed since their contracts are unchanged.
|
|
- `cameleer-server-app/src/main/resources/clickhouse/*.sql` (all 8 files)
|
|
- `cameleer-server-app/src/test/.../app/AbstractClickHouseIT.java`
|
|
|
|
---
|
|
|
|
## Chunk 1: Dependencies, Flyway Migrations, and Test Infrastructure
|
|
|
|
### Task 1: Update Maven dependencies
|
|
|
|
**Files:**
|
|
- Modify: `cameleer-server-app/pom.xml`
|
|
|
|
- [ ] **Step 1: Replace ClickHouse JDBC with PostgreSQL driver + Flyway + OpenSearch client**
|
|
|
|
In `cameleer-server-app/pom.xml`, replace the ClickHouse dependency and add new ones:
|
|
|
|
Remove:
|
|
```xml
|
|
<dependency>
|
|
<groupId>com.clickhouse</groupId>
|
|
<artifactId>clickhouse-jdbc</artifactId>
|
|
<version>0.9.7</version>
|
|
<classifier>all</classifier>
|
|
</dependency>
|
|
```
|
|
|
|
Add:
|
|
```xml
|
|
<dependency>
|
|
<groupId>org.postgresql</groupId>
|
|
<artifactId>postgresql</artifactId>
|
|
</dependency>
|
|
<dependency>
|
|
<groupId>org.flywaydb</groupId>
|
|
<artifactId>flyway-core</artifactId>
|
|
</dependency>
|
|
<dependency>
|
|
<groupId>org.flywaydb</groupId>
|
|
<artifactId>flyway-database-postgresql</artifactId>
|
|
</dependency>
|
|
<dependency>
|
|
<groupId>org.opensearch.client</groupId>
|
|
<artifactId>opensearch-java</artifactId>
|
|
<version>2.19.0</version>
|
|
</dependency>
|
|
<dependency>
|
|
<groupId>org.opensearch.client</groupId>
|
|
<artifactId>opensearch-rest-client</artifactId>
|
|
<version>2.19.0</version>
|
|
</dependency>
|
|
```
|
|
|
|
Replace the ClickHouse Testcontainer:
|
|
```xml
|
|
<!-- Remove testcontainers-clickhouse -->
|
|
<!-- Add: -->
|
|
<dependency>
|
|
<groupId>org.testcontainers</groupId>
|
|
<artifactId>postgresql</artifactId>
|
|
<scope>test</scope>
|
|
</dependency>
|
|
<dependency>
|
|
<groupId>org.opensearch</groupId>
|
|
<artifactId>opensearch-testcontainers</artifactId>
|
|
<version>2.1.1</version>
|
|
<scope>test</scope>
|
|
</dependency>
|
|
```
|
|
|
|
Note: `postgresql` driver and `flyway-core` versions are managed by Spring Boot parent POM. Testcontainers BOM version is `${testcontainers.version}` (2.0.3) from root POM.
|
|
|
|
- [ ] **Step 2: Commit** (compilation will fail until ClickHouse code is deleted in Task 16 — this is expected)
|
|
|
|
```bash
|
|
git add cameleer-server-app/pom.xml
|
|
git commit -m "chore: swap ClickHouse deps for PostgreSQL, Flyway, OpenSearch"
|
|
```
|
|
|
|
### Task 2: Write Flyway migrations
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/main/resources/db/migration/V1__extensions.sql`
|
|
- Create: `cameleer-server-app/src/main/resources/db/migration/V2__executions.sql`
|
|
- Create: `cameleer-server-app/src/main/resources/db/migration/V3__processor_executions.sql`
|
|
- Create: `cameleer-server-app/src/main/resources/db/migration/V4__agent_metrics.sql`
|
|
- Create: `cameleer-server-app/src/main/resources/db/migration/V5__route_diagrams.sql`
|
|
- Create: `cameleer-server-app/src/main/resources/db/migration/V6__users.sql`
|
|
- Create: `cameleer-server-app/src/main/resources/db/migration/V7__oidc_config.sql`
|
|
- Create: `cameleer-server-app/src/main/resources/db/migration/V8__continuous_aggregates.sql`
|
|
- Create: `cameleer-server-app/src/main/resources/db/migration/V9__retention_policies.sql`
|
|
|
|
- [ ] **Step 1: Create V1__extensions.sql**
|
|
|
|
```sql
|
|
CREATE EXTENSION IF NOT EXISTS timescaledb;
|
|
CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit;
|
|
```
|
|
|
|
- [ ] **Step 2: Create V2__executions.sql**
|
|
|
|
```sql
|
|
CREATE TABLE executions (
|
|
execution_id TEXT NOT NULL,
|
|
route_id TEXT NOT NULL,
|
|
agent_id TEXT NOT NULL,
|
|
group_name TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
correlation_id TEXT,
|
|
exchange_id TEXT,
|
|
start_time TIMESTAMPTZ NOT NULL,
|
|
end_time TIMESTAMPTZ,
|
|
duration_ms BIGINT,
|
|
error_message TEXT,
|
|
error_stacktrace TEXT,
|
|
diagram_content_hash TEXT,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
PRIMARY KEY (execution_id, start_time)
|
|
);
|
|
|
|
SELECT create_hypertable('executions', 'start_time', chunk_time_interval => INTERVAL '1 day');
|
|
|
|
CREATE INDEX idx_executions_agent_time ON executions (agent_id, start_time DESC);
|
|
CREATE INDEX idx_executions_route_time ON executions (route_id, start_time DESC);
|
|
CREATE INDEX idx_executions_group_time ON executions (group_name, start_time DESC);
|
|
CREATE INDEX idx_executions_correlation ON executions (correlation_id);
|
|
```
|
|
|
|
- [ ] **Step 3: Create V3__processor_executions.sql**
|
|
|
|
```sql
|
|
CREATE TABLE processor_executions (
|
|
id BIGSERIAL,
|
|
execution_id TEXT NOT NULL,
|
|
processor_id TEXT NOT NULL,
|
|
processor_type TEXT NOT NULL,
|
|
diagram_node_id TEXT,
|
|
group_name TEXT NOT NULL,
|
|
route_id TEXT NOT NULL,
|
|
depth INT NOT NULL,
|
|
parent_processor_id TEXT,
|
|
status TEXT NOT NULL,
|
|
start_time TIMESTAMPTZ NOT NULL,
|
|
end_time TIMESTAMPTZ,
|
|
duration_ms BIGINT,
|
|
error_message TEXT,
|
|
error_stacktrace TEXT,
|
|
input_body TEXT,
|
|
output_body TEXT,
|
|
input_headers JSONB,
|
|
output_headers JSONB,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
UNIQUE (execution_id, processor_id, start_time)
|
|
);
|
|
|
|
SELECT create_hypertable('processor_executions', 'start_time', chunk_time_interval => INTERVAL '1 day');
|
|
|
|
CREATE INDEX idx_proc_exec_execution ON processor_executions (execution_id);
|
|
CREATE INDEX idx_proc_exec_type_time ON processor_executions (processor_type, start_time DESC);
|
|
```
|
|
|
|
- [ ] **Step 4: Create V4__agent_metrics.sql**
|
|
|
|
```sql
|
|
CREATE TABLE agent_metrics (
|
|
agent_id TEXT NOT NULL,
|
|
metric_name TEXT NOT NULL,
|
|
metric_value DOUBLE PRECISION NOT NULL,
|
|
tags JSONB,
|
|
collected_at TIMESTAMPTZ NOT NULL,
|
|
server_received_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
);
|
|
|
|
SELECT create_hypertable('agent_metrics', 'collected_at', chunk_time_interval => INTERVAL '1 day');
|
|
|
|
CREATE INDEX idx_metrics_agent_name ON agent_metrics (agent_id, metric_name, collected_at DESC);
|
|
```
|
|
|
|
- [ ] **Step 5: Create V5__route_diagrams.sql**
|
|
|
|
```sql
|
|
CREATE TABLE route_diagrams (
|
|
content_hash TEXT PRIMARY KEY,
|
|
route_id TEXT NOT NULL,
|
|
agent_id TEXT NOT NULL,
|
|
definition TEXT NOT NULL,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
);
|
|
|
|
CREATE INDEX idx_diagrams_route_agent ON route_diagrams (route_id, agent_id);
|
|
```
|
|
|
|
- [ ] **Step 6: Create V6__users.sql**
|
|
|
|
```sql
|
|
CREATE TABLE users (
|
|
user_id TEXT PRIMARY KEY,
|
|
provider TEXT NOT NULL,
|
|
email TEXT,
|
|
display_name TEXT,
|
|
roles TEXT[] NOT NULL DEFAULT '{}',
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
);
|
|
```
|
|
|
|
- [ ] **Step 7: Create V7__oidc_config.sql**
|
|
|
|
```sql
|
|
CREATE TABLE oidc_config (
|
|
config_id TEXT PRIMARY KEY DEFAULT 'default',
|
|
enabled BOOLEAN NOT NULL DEFAULT false,
|
|
issuer_uri TEXT,
|
|
client_id TEXT,
|
|
client_secret TEXT,
|
|
roles_claim TEXT,
|
|
default_roles TEXT[] NOT NULL DEFAULT '{}',
|
|
auto_signup BOOLEAN DEFAULT false,
|
|
display_name_claim TEXT,
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
);
|
|
```
|
|
|
|
- [ ] **Step 8: Create V8__continuous_aggregates.sql**
|
|
|
|
```sql
|
|
-- Global stats
|
|
CREATE MATERIALIZED VIEW stats_1m_all
|
|
WITH (timescaledb.continuous) AS
|
|
SELECT
|
|
time_bucket('1 minute', start_time) AS bucket,
|
|
COUNT(*) AS total_count,
|
|
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
|
|
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
|
|
SUM(duration_ms) AS duration_sum,
|
|
MAX(duration_ms) AS duration_max,
|
|
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
|
|
FROM executions
|
|
WHERE status IS NOT NULL
|
|
GROUP BY bucket;
|
|
|
|
SELECT add_continuous_aggregate_policy('stats_1m_all',
|
|
start_offset => INTERVAL '1 hour',
|
|
end_offset => INTERVAL '1 minute',
|
|
schedule_interval => INTERVAL '1 minute');
|
|
|
|
-- Per-application stats
|
|
CREATE MATERIALIZED VIEW stats_1m_app
|
|
WITH (timescaledb.continuous) AS
|
|
SELECT
|
|
time_bucket('1 minute', start_time) AS bucket,
|
|
group_name,
|
|
COUNT(*) AS total_count,
|
|
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
|
|
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
|
|
SUM(duration_ms) AS duration_sum,
|
|
MAX(duration_ms) AS duration_max,
|
|
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
|
|
FROM executions
|
|
WHERE status IS NOT NULL
|
|
GROUP BY bucket, group_name;
|
|
|
|
SELECT add_continuous_aggregate_policy('stats_1m_app',
|
|
start_offset => INTERVAL '1 hour',
|
|
end_offset => INTERVAL '1 minute',
|
|
schedule_interval => INTERVAL '1 minute');
|
|
|
|
-- Per-route stats
|
|
CREATE MATERIALIZED VIEW stats_1m_route
|
|
WITH (timescaledb.continuous) AS
|
|
SELECT
|
|
time_bucket('1 minute', start_time) AS bucket,
|
|
group_name,
|
|
route_id,
|
|
COUNT(*) AS total_count,
|
|
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
|
|
COUNT(*) FILTER (WHERE status = 'RUNNING') AS running_count,
|
|
SUM(duration_ms) AS duration_sum,
|
|
MAX(duration_ms) AS duration_max,
|
|
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
|
|
FROM executions
|
|
WHERE status IS NOT NULL
|
|
GROUP BY bucket, group_name, route_id;
|
|
|
|
SELECT add_continuous_aggregate_policy('stats_1m_route',
|
|
start_offset => INTERVAL '1 hour',
|
|
end_offset => INTERVAL '1 minute',
|
|
schedule_interval => INTERVAL '1 minute');
|
|
|
|
-- Per-processor stats (uses denormalized group_name/route_id on processor_executions)
|
|
CREATE MATERIALIZED VIEW stats_1m_processor
|
|
WITH (timescaledb.continuous) AS
|
|
SELECT
|
|
time_bucket('1 minute', start_time) AS bucket,
|
|
group_name,
|
|
route_id,
|
|
processor_type,
|
|
COUNT(*) AS total_count,
|
|
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count,
|
|
SUM(duration_ms) AS duration_sum,
|
|
MAX(duration_ms) AS duration_max,
|
|
approx_percentile(0.99, percentile_agg(duration_ms::DOUBLE PRECISION)) AS p99_duration
|
|
FROM processor_executions
|
|
GROUP BY bucket, group_name, route_id, processor_type;
|
|
|
|
SELECT add_continuous_aggregate_policy('stats_1m_processor',
|
|
start_offset => INTERVAL '1 hour',
|
|
end_offset => INTERVAL '1 minute',
|
|
schedule_interval => INTERVAL '1 minute');
|
|
```
|
|
|
|
- [ ] **Step 9: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/main/resources/db/migration/
|
|
git commit -m "feat: add Flyway migrations for PostgreSQL/TimescaleDB schema"
|
|
```
|
|
|
|
### Task 3: Create test base class with TimescaleDB Testcontainer
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/AbstractPostgresIT.java`
|
|
|
|
- [ ] **Step 1: Write AbstractPostgresIT**
|
|
|
|
```java
|
|
package com.cameleer.server.app;
|
|
|
|
import org.springframework.boot.test.context.SpringBootTest;
|
|
import org.springframework.test.context.DynamicPropertyRegistry;
|
|
import org.springframework.test.context.DynamicPropertySource;
|
|
import org.testcontainers.containers.PostgreSQLContainer;
|
|
import org.testcontainers.junit.jupiter.Container;
|
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
|
|
|
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
|
@Testcontainers
|
|
public abstract class AbstractPostgresIT {
|
|
|
|
@Container
|
|
static final PostgreSQLContainer<?> postgres =
|
|
new PostgreSQLContainer<>("timescale/timescaledb:latest-pg16")
|
|
.withDatabaseName("cameleer")
|
|
.withUsername("cameleer")
|
|
.withPassword("test");
|
|
|
|
@DynamicPropertySource
|
|
static void configureProperties(DynamicPropertyRegistry registry) {
|
|
registry.add("spring.datasource.url", postgres::getJdbcUrl);
|
|
registry.add("spring.datasource.username", postgres::getUsername);
|
|
registry.add("spring.datasource.password", postgres::getPassword);
|
|
registry.add("spring.datasource.driver-class-name", () -> "org.postgresql.Driver");
|
|
registry.add("spring.flyway.enabled", () -> "true");
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Write a smoke test to verify migrations run**
|
|
|
|
Create `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/FlywayMigrationIT.java`:
|
|
|
|
```java
|
|
package com.cameleer.server.app.storage;
|
|
|
|
import com.cameleer.server.app.AbstractPostgresIT;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
|
|
import static org.junit.jupiter.api.Assertions.*;
|
|
|
|
class FlywayMigrationIT extends AbstractPostgresIT {
|
|
|
|
@Autowired
|
|
JdbcTemplate jdbcTemplate;
|
|
|
|
@Test
|
|
void allMigrationsApplySuccessfully() {
|
|
// Verify core tables exist
|
|
Integer execCount = jdbcTemplate.queryForObject(
|
|
"SELECT COUNT(*) FROM executions", Integer.class);
|
|
assertEquals(0, execCount);
|
|
|
|
Integer procCount = jdbcTemplate.queryForObject(
|
|
"SELECT COUNT(*) FROM processor_executions", Integer.class);
|
|
assertEquals(0, procCount);
|
|
|
|
Integer userCount = jdbcTemplate.queryForObject(
|
|
"SELECT COUNT(*) FROM users", Integer.class);
|
|
assertEquals(0, userCount);
|
|
|
|
// Verify continuous aggregates exist
|
|
Integer caggCount = jdbcTemplate.queryForObject(
|
|
"SELECT COUNT(*) FROM timescaledb_information.continuous_aggregates",
|
|
Integer.class);
|
|
assertEquals(4, caggCount);
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 3: Verify test passes** (this test will not compile until Task 16 deletes ClickHouse code. Run it after Task 16 is complete. Listed here for logical grouping.)
|
|
|
|
Run: `mvn test -pl cameleer-server-app -Dtest=FlywayMigrationIT -q`
|
|
Expected: PASS — all migrations apply, tables and continuous aggregates exist
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/test/java/com/cameleer/server/app/AbstractPostgresIT.java
|
|
git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/FlywayMigrationIT.java
|
|
git commit -m "test: add TimescaleDB test base class and Flyway migration smoke test"
|
|
```
|
|
|
|
### Task 4: Update application.yml for PostgreSQL + OpenSearch
|
|
|
|
**Files:**
|
|
- Modify: `cameleer-server-app/src/main/resources/application.yml`
|
|
- Modify: `cameleer-server-app/src/test/resources/application-test.yml`
|
|
|
|
- [ ] **Step 1: Update application.yml datasource section**
|
|
|
|
Replace:
|
|
```yaml
|
|
spring:
|
|
datasource:
|
|
url: jdbc:ch://localhost:8123/cameleer
|
|
username: cameleer
|
|
password: cameleer_dev
|
|
driver-class-name: com.clickhouse.jdbc.ClickHouseDriver
|
|
```
|
|
|
|
With:
|
|
```yaml
|
|
spring:
|
|
datasource:
|
|
url: jdbc:postgresql://localhost:5432/cameleer
|
|
username: cameleer
|
|
password: ${CAMELEER_DB_PASSWORD:cameleer_dev}
|
|
driver-class-name: org.postgresql.Driver
|
|
flyway:
|
|
enabled: true
|
|
locations: classpath:db/migration
|
|
```
|
|
|
|
Add OpenSearch config section:
|
|
```yaml
|
|
opensearch:
|
|
url: ${OPENSEARCH_URL:http://localhost:9200}
|
|
queue-size: ${CAMELEER_OPENSEARCH_QUEUE_SIZE:10000}
|
|
debounce-ms: ${CAMELEER_OPENSEARCH_DEBOUNCE_MS:2000}
|
|
```
|
|
|
|
Add body size limit:
|
|
```yaml
|
|
cameleer:
|
|
body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384}
|
|
retention-days: ${CAMELEER_RETENTION_DAYS:30}
|
|
```
|
|
|
|
Remove the `clickhouse:` section.
|
|
|
|
- [ ] **Step 2: Update application-test.yml**
|
|
|
|
```yaml
|
|
spring:
|
|
flyway:
|
|
enabled: true
|
|
opensearch:
|
|
url: http://localhost:9200
|
|
```
|
|
|
|
- [ ] **Step 3: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/main/resources/application.yml
|
|
git add cameleer-server-app/src/test/resources/application-test.yml
|
|
git commit -m "config: switch datasource to PostgreSQL, add OpenSearch and Flyway config"
|
|
```
|
|
|
|
---
|
|
|
|
## Chunk 2: Core Module Interfaces and Models
|
|
|
|
### Task 5: Create new storage interfaces in core module
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/ExecutionStore.java`
|
|
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/StatsStore.java`
|
|
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/SearchIndex.java`
|
|
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/DiagramStore.java`
|
|
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/MetricsStore.java`
|
|
|
|
- [ ] **Step 1: Create ExecutionStore interface**
|
|
|
|
```java
|
|
package com.cameleer.server.core.storage;
|
|
|
|
import com.cameleer.server.core.detail.ProcessorNode;
|
|
|
|
import java.time.Instant;
|
|
import java.util.List;
|
|
import java.util.Optional;
|
|
|
|
public interface ExecutionStore {
|
|
|
|
void upsert(ExecutionRecord execution);
|
|
|
|
void upsertProcessors(String executionId, Instant startTime,
|
|
String groupName, String routeId,
|
|
List<ProcessorRecord> processors);
|
|
|
|
Optional<ExecutionRecord> findById(String executionId);
|
|
|
|
List<ProcessorRecord> findProcessors(String executionId);
|
|
|
|
record ExecutionRecord(
|
|
String executionId, String routeId, String agentId, String groupName,
|
|
String status, String correlationId, String exchangeId,
|
|
Instant startTime, Instant endTime, Long durationMs,
|
|
String errorMessage, String errorStacktrace, String diagramContentHash
|
|
) {}
|
|
|
|
record ProcessorRecord(
|
|
String executionId, String processorId, String processorType,
|
|
String diagramNodeId, String groupName, String routeId,
|
|
int depth, String parentProcessorId, String status,
|
|
Instant startTime, Instant endTime, Long durationMs,
|
|
String errorMessage, String errorStacktrace,
|
|
String inputBody, String outputBody, String inputHeaders, String outputHeaders
|
|
) {}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Create StatsStore interface**
|
|
|
|
Supports all 4 aggregation levels: global, per-app, per-route, per-processor.
|
|
|
|
```java
|
|
package com.cameleer.server.core.storage;
|
|
|
|
import com.cameleer.server.core.search.ExecutionStats;
|
|
import com.cameleer.server.core.search.StatsTimeseries;
|
|
|
|
import java.time.Instant;
|
|
import java.util.List;
|
|
|
|
public interface StatsStore {
|
|
|
|
// Global stats (stats_1m_all)
|
|
ExecutionStats stats(Instant from, Instant to);
|
|
|
|
// Per-app stats (stats_1m_app)
|
|
ExecutionStats statsForApp(Instant from, Instant to, String groupName);
|
|
|
|
// Per-route stats (stats_1m_route), optionally scoped to specific agents
|
|
ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds);
|
|
|
|
// Per-processor stats (stats_1m_processor)
|
|
ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType);
|
|
|
|
// Global timeseries
|
|
StatsTimeseries timeseries(Instant from, Instant to, int bucketCount);
|
|
|
|
// Per-app timeseries
|
|
StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName);
|
|
|
|
// Per-route timeseries, optionally scoped to specific agents
|
|
StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
|
|
String routeId, List<String> agentIds);
|
|
|
|
// Per-processor timeseries
|
|
StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
|
|
String routeId, String processorType);
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 3: Create SearchIndex interface**
|
|
|
|
```java
|
|
package com.cameleer.server.core.storage;
|
|
|
|
import com.cameleer.server.core.search.ExecutionSummary;
|
|
import com.cameleer.server.core.search.SearchRequest;
|
|
import com.cameleer.server.core.search.SearchResult;
|
|
import com.cameleer.server.core.storage.model.ExecutionDocument;
|
|
|
|
public interface SearchIndex {
|
|
|
|
SearchResult<ExecutionSummary> search(SearchRequest request);
|
|
|
|
long count(SearchRequest request);
|
|
|
|
void index(ExecutionDocument document);
|
|
|
|
void delete(String executionId);
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Create DiagramStore interface**
|
|
|
|
```java
|
|
package com.cameleer.server.core.storage;
|
|
|
|
import com.cameleer.common.graph.RouteGraph;
|
|
import com.cameleer.server.core.ingestion.TaggedDiagram;
|
|
|
|
import java.util.List;
|
|
import java.util.Optional;
|
|
|
|
public interface DiagramStore {
|
|
|
|
void store(TaggedDiagram diagram);
|
|
|
|
Optional<RouteGraph> findByContentHash(String contentHash);
|
|
|
|
Optional<String> findContentHashForRoute(String routeId, String agentId);
|
|
|
|
Optional<String> findContentHashForRouteByAgents(String routeId, List<String> agentIds);
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 5: Create MetricsStore interface**
|
|
|
|
```java
|
|
package com.cameleer.server.core.storage;
|
|
|
|
import com.cameleer.server.core.storage.model.MetricsSnapshot;
|
|
|
|
import java.util.List;
|
|
|
|
public interface MetricsStore {
|
|
|
|
void insertBatch(List<MetricsSnapshot> snapshots);
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-core/src/main/java/com/cameleer/server/core/storage/
|
|
git commit -m "feat: add new storage interfaces for PostgreSQL/OpenSearch backends"
|
|
```
|
|
|
|
### Task 6: Create ExecutionDocument model and indexing event
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionDocument.java`
|
|
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/indexing/ExecutionUpdatedEvent.java`
|
|
|
|
- [ ] **Step 1: Create ExecutionDocument**
|
|
|
|
```java
|
|
package com.cameleer.server.core.storage.model;
|
|
|
|
import java.time.Instant;
|
|
import java.util.List;
|
|
|
|
public record ExecutionDocument(
|
|
String executionId, String routeId, String agentId, String groupName,
|
|
String status, String correlationId, String exchangeId,
|
|
Instant startTime, Instant endTime, Long durationMs,
|
|
String errorMessage, String errorStacktrace,
|
|
List<ProcessorDoc> processors
|
|
) {
|
|
public record ProcessorDoc(
|
|
String processorId, String processorType, String status,
|
|
String errorMessage, String errorStacktrace,
|
|
String inputBody, String outputBody,
|
|
String inputHeaders, String outputHeaders
|
|
) {}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Create ExecutionUpdatedEvent**
|
|
|
|
```java
|
|
package com.cameleer.server.core.indexing;
|
|
|
|
import java.time.Instant;
|
|
|
|
public record ExecutionUpdatedEvent(String executionId, Instant startTime) {}
|
|
```
|
|
|
|
- [ ] **Step 3: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-core/src/main/java/com/cameleer/server/core/storage/model/ExecutionDocument.java
|
|
git add cameleer-server-core/src/main/java/com/cameleer/server/core/indexing/
|
|
git commit -m "feat: add ExecutionDocument model and ExecutionUpdatedEvent"
|
|
```
|
|
|
|
### Task 7: Update SearchService to use StatsStore for stats/timeseries
|
|
|
|
**Files:**
|
|
- Modify: `cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchService.java`
|
|
|
|
- [ ] **Step 1: Refactor SearchService to accept SearchIndex + StatsStore**
|
|
|
|
Replace the single `SearchEngine` dependency with two dependencies:
|
|
|
|
```java
|
|
package com.cameleer.server.core.search;
|
|
|
|
import com.cameleer.server.core.storage.SearchIndex;
|
|
import com.cameleer.server.core.storage.StatsStore;
|
|
|
|
import java.time.Instant;
|
|
import java.util.List;
|
|
|
|
public class SearchService {
|
|
|
|
private final SearchIndex searchIndex;
|
|
private final StatsStore statsStore;
|
|
|
|
public SearchService(SearchIndex searchIndex, StatsStore statsStore) {
|
|
this.searchIndex = searchIndex;
|
|
this.statsStore = statsStore;
|
|
}
|
|
|
|
public SearchResult<ExecutionSummary> search(SearchRequest request) {
|
|
return searchIndex.search(request);
|
|
}
|
|
|
|
public long count(SearchRequest request) {
|
|
return searchIndex.count(request);
|
|
}
|
|
|
|
public ExecutionStats stats(Instant from, Instant to) {
|
|
return statsStore.stats(from, to);
|
|
}
|
|
|
|
public ExecutionStats stats(Instant from, Instant to, String routeId, List<String> agentIds) {
|
|
return statsStore.statsForRoute(from, to, routeId, agentIds);
|
|
}
|
|
|
|
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
|
|
return statsStore.timeseries(from, to, bucketCount);
|
|
}
|
|
|
|
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount,
|
|
String routeId, List<String> agentIds) {
|
|
return statsStore.timeseriesForRoute(from, to, bucketCount, routeId, agentIds);
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchService.java
|
|
git commit -m "refactor: SearchService uses SearchIndex + StatsStore instead of SearchEngine"
|
|
```
|
|
|
|
### Task 8: Update DetailService to use ExecutionStore
|
|
|
|
**Files:**
|
|
- Modify: `cameleer-server-core/src/main/java/com/cameleer/server/core/detail/DetailService.java`
|
|
|
|
- [ ] **Step 1: Rewrite DetailService to use ExecutionStore**
|
|
|
|
The tree reconstruction from parallel arrays is no longer needed. Processors are now individual records with `parentProcessorId` for tree structure.
|
|
|
|
```java
|
|
package com.cameleer.server.core.detail;
|
|
|
|
import com.cameleer.server.core.storage.ExecutionStore;
|
|
import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord;
|
|
|
|
import java.util.*;
|
|
|
|
public class DetailService {
|
|
|
|
private final ExecutionStore executionStore;
|
|
|
|
public DetailService(ExecutionStore executionStore) {
|
|
this.executionStore = executionStore;
|
|
}
|
|
|
|
public Optional<ExecutionDetail> getDetail(String executionId) {
|
|
return executionStore.findById(executionId)
|
|
.map(exec -> {
|
|
List<ProcessorRecord> processors = executionStore.findProcessors(executionId);
|
|
List<ProcessorNode> roots = buildTree(processors);
|
|
return new ExecutionDetail(
|
|
exec.executionId(), exec.routeId(), exec.agentId(),
|
|
exec.status(), exec.startTime(), exec.endTime(),
|
|
exec.durationMs() != null ? exec.durationMs() : 0L,
|
|
exec.correlationId(), exec.exchangeId(),
|
|
exec.errorMessage(), exec.errorStacktrace(),
|
|
exec.diagramContentHash(), roots
|
|
);
|
|
});
|
|
}
|
|
|
|
List<ProcessorNode> buildTree(List<ProcessorRecord> processors) {
|
|
if (processors.isEmpty()) return List.of();
|
|
|
|
Map<String, ProcessorNode> nodeMap = new LinkedHashMap<>();
|
|
for (ProcessorRecord p : processors) {
|
|
nodeMap.put(p.processorId(), new ProcessorNode(
|
|
p.processorId(), p.processorType(), p.status(),
|
|
p.startTime(), p.endTime(),
|
|
p.durationMs() != null ? p.durationMs() : 0L,
|
|
p.diagramNodeId(), p.errorMessage(), p.errorStacktrace()
|
|
));
|
|
}
|
|
|
|
List<ProcessorNode> roots = new ArrayList<>();
|
|
for (ProcessorRecord p : processors) {
|
|
ProcessorNode node = nodeMap.get(p.processorId());
|
|
if (p.parentProcessorId() == null) {
|
|
roots.add(node);
|
|
} else {
|
|
ProcessorNode parent = nodeMap.get(p.parentProcessorId());
|
|
if (parent != null) {
|
|
parent.addChild(node);
|
|
} else {
|
|
roots.add(node); // orphan safety
|
|
}
|
|
}
|
|
}
|
|
return roots;
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-core/src/main/java/com/cameleer/server/core/detail/DetailService.java
|
|
git commit -m "refactor: DetailService uses ExecutionStore, tree built from parentProcessorId"
|
|
```
|
|
|
|
### Task 9: Update IngestionService for synchronous writes
|
|
|
|
**Files:**
|
|
- Modify: `cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java`
|
|
|
|
- [ ] **Step 1: Rewrite IngestionService**
|
|
|
|
Executions and diagrams become synchronous writes. Metrics keep the write buffer. Add event publishing for OpenSearch indexing.
|
|
|
|
```java
|
|
package com.cameleer.server.core.ingestion;
|
|
|
|
import com.cameleer.common.model.ProcessorExecution;
|
|
import com.cameleer.common.model.RouteExecution;
|
|
import com.cameleer.server.core.indexing.ExecutionUpdatedEvent;
|
|
import com.cameleer.server.core.storage.DiagramStore;
|
|
import com.cameleer.server.core.storage.ExecutionStore;
|
|
import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord;
|
|
import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord;
|
|
import com.cameleer.server.core.storage.model.MetricsSnapshot;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.function.Consumer;
|
|
|
|
public class IngestionService {
|
|
|
|
private final ExecutionStore executionStore;
|
|
private final DiagramStore diagramStore;
|
|
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
|
|
private final Consumer<ExecutionUpdatedEvent> eventPublisher;
|
|
private final int bodySizeLimit;
|
|
|
|
public IngestionService(ExecutionStore executionStore,
|
|
DiagramStore diagramStore,
|
|
WriteBuffer<MetricsSnapshot> metricsBuffer,
|
|
Consumer<ExecutionUpdatedEvent> eventPublisher,
|
|
int bodySizeLimit) {
|
|
this.executionStore = executionStore;
|
|
this.diagramStore = diagramStore;
|
|
this.metricsBuffer = metricsBuffer;
|
|
this.eventPublisher = eventPublisher;
|
|
this.bodySizeLimit = bodySizeLimit;
|
|
}
|
|
|
|
public void ingestExecution(String agentId, String groupName, RouteExecution execution) {
|
|
ExecutionRecord record = toExecutionRecord(agentId, groupName, execution);
|
|
executionStore.upsert(record);
|
|
|
|
if (execution.getProcessors() != null && !execution.getProcessors().isEmpty()) {
|
|
List<ProcessorRecord> processors = flattenProcessors(
|
|
execution.getProcessors(), record.executionId(),
|
|
record.startTime(), groupName, execution.getRouteId(),
|
|
null, 0);
|
|
executionStore.upsertProcessors(
|
|
record.executionId(), record.startTime(),
|
|
groupName, execution.getRouteId(), processors);
|
|
}
|
|
|
|
eventPublisher.accept(new ExecutionUpdatedEvent(
|
|
record.executionId(), record.startTime()));
|
|
}
|
|
|
|
public void ingestDiagram(TaggedDiagram diagram) {
|
|
diagramStore.store(diagram);
|
|
}
|
|
|
|
public boolean acceptMetrics(List<MetricsSnapshot> metrics) {
|
|
return metricsBuffer.offerBatch(metrics);
|
|
}
|
|
|
|
public int getMetricsBufferDepth() {
|
|
return metricsBuffer.size();
|
|
}
|
|
|
|
public WriteBuffer<MetricsSnapshot> getMetricsBuffer() {
|
|
return metricsBuffer;
|
|
}
|
|
|
|
private ExecutionRecord toExecutionRecord(String agentId, String groupName,
|
|
RouteExecution exec) {
|
|
return new ExecutionRecord(
|
|
exec.getExecutionId(), exec.getRouteId(), agentId, groupName,
|
|
exec.getStatus() != null ? exec.getStatus().name() : "RUNNING",
|
|
exec.getCorrelationId(), exec.getExchangeId(),
|
|
exec.getStartTime(), exec.getEndTime(),
|
|
exec.getDurationMs(),
|
|
exec.getErrorMessage(), exec.getErrorStacktrace(),
|
|
null // diagramContentHash set separately
|
|
);
|
|
}
|
|
|
|
private List<ProcessorRecord> flattenProcessors(
|
|
List<ProcessorExecution> processors, String executionId,
|
|
java.time.Instant execStartTime, String groupName, String routeId,
|
|
String parentProcessorId, int depth) {
|
|
List<ProcessorRecord> flat = new ArrayList<>();
|
|
for (ProcessorExecution p : processors) {
|
|
flat.add(new ProcessorRecord(
|
|
executionId, p.getProcessorId(), p.getProcessorType(),
|
|
p.getDiagramNodeId(), groupName, routeId,
|
|
depth, parentProcessorId,
|
|
p.getStatus() != null ? p.getStatus().name() : "RUNNING",
|
|
p.getStartTime() != null ? p.getStartTime() : execStartTime,
|
|
p.getEndTime(),
|
|
p.getDurationMs(),
|
|
p.getErrorMessage(), p.getErrorStacktrace(),
|
|
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
|
|
p.getInputHeaders() != null ? p.getInputHeaders().toString() : null,
|
|
p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null
|
|
));
|
|
if (p.getChildren() != null) {
|
|
flat.addAll(flattenProcessors(
|
|
p.getChildren(), executionId, execStartTime,
|
|
groupName, routeId, p.getProcessorId(), depth + 1));
|
|
}
|
|
}
|
|
return flat;
|
|
}
|
|
|
|
private String truncateBody(String body) {
|
|
if (body == null) return null;
|
|
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
|
|
return body;
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java
|
|
git commit -m "refactor: IngestionService uses synchronous ExecutionStore writes with event publishing"
|
|
```
|
|
|
|
---
|
|
|
|
## Chunk 3: PostgreSQL Store Implementations
|
|
|
|
### Task 10: Implement PostgresExecutionStore
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresExecutionStore.java`
|
|
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/PostgresExecutionStoreIT.java`
|
|
|
|
- [ ] **Step 1: Write the failing test**
|
|
|
|
```java
|
|
package com.cameleer.server.app.storage;
|
|
|
|
import com.cameleer.server.app.AbstractPostgresIT;
|
|
import com.cameleer.server.core.storage.ExecutionStore;
|
|
import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord;
|
|
import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
import java.time.Instant;
|
|
import java.util.List;
|
|
import java.util.Optional;
|
|
|
|
import static org.junit.jupiter.api.Assertions.*;
|
|
|
|
class PostgresExecutionStoreIT extends AbstractPostgresIT {
|
|
|
|
@Autowired
|
|
ExecutionStore executionStore;
|
|
|
|
@Test
|
|
void upsertAndFindById() {
|
|
Instant now = Instant.now();
|
|
ExecutionRecord record = new ExecutionRecord(
|
|
"exec-1", "route-a", "agent-1", "app-1",
|
|
"COMPLETED", "corr-1", "exchange-1",
|
|
now, now.plusMillis(100), 100L,
|
|
null, null, null);
|
|
|
|
executionStore.upsert(record);
|
|
Optional<ExecutionRecord> found = executionStore.findById("exec-1");
|
|
|
|
assertTrue(found.isPresent());
|
|
assertEquals("exec-1", found.get().executionId());
|
|
assertEquals("COMPLETED", found.get().status());
|
|
}
|
|
|
|
@Test
|
|
void upsertDeduplicatesByExecutionId() {
|
|
Instant now = Instant.now();
|
|
ExecutionRecord first = new ExecutionRecord(
|
|
"exec-dup", "route-a", "agent-1", "app-1",
|
|
"RUNNING", null, null, now, null, null, null, null, null);
|
|
ExecutionRecord second = new ExecutionRecord(
|
|
"exec-dup", "route-a", "agent-1", "app-1",
|
|
"COMPLETED", null, null, now, now.plusMillis(200), 200L, null, null, null);
|
|
|
|
executionStore.upsert(first);
|
|
executionStore.upsert(second);
|
|
|
|
Optional<ExecutionRecord> found = executionStore.findById("exec-dup");
|
|
assertTrue(found.isPresent());
|
|
assertEquals("COMPLETED", found.get().status());
|
|
assertEquals(200L, found.get().durationMs());
|
|
}
|
|
|
|
@Test
|
|
void upsertProcessorsAndFind() {
|
|
Instant now = Instant.now();
|
|
ExecutionRecord exec = new ExecutionRecord(
|
|
"exec-proc", "route-a", "agent-1", "app-1",
|
|
"COMPLETED", null, null, now, now.plusMillis(50), 50L, null, null, null);
|
|
executionStore.upsert(exec);
|
|
|
|
List<ProcessorRecord> processors = List.of(
|
|
new ProcessorRecord("exec-proc", "proc-1", "log", null,
|
|
"app-1", "route-a", 0, null, "COMPLETED",
|
|
now, now.plusMillis(10), 10L, null, null,
|
|
"input body", "output body", null, null),
|
|
new ProcessorRecord("exec-proc", "proc-2", "to", null,
|
|
"app-1", "route-a", 1, "proc-1", "COMPLETED",
|
|
now.plusMillis(10), now.plusMillis(30), 20L, null, null,
|
|
null, null, null, null)
|
|
);
|
|
executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors);
|
|
|
|
List<ProcessorRecord> found = executionStore.findProcessors("exec-proc");
|
|
assertEquals(2, found.size());
|
|
assertEquals("proc-1", found.get(0).processorId());
|
|
assertEquals("proc-2", found.get(1).processorId());
|
|
assertEquals("proc-1", found.get(1).parentProcessorId());
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Run test to verify it fails**
|
|
|
|
Run: `mvn test -pl cameleer-server-app -Dtest=PostgresExecutionStoreIT -q`
|
|
Expected: FAIL — `ExecutionStore` bean not found
|
|
|
|
- [ ] **Step 3: Implement PostgresExecutionStore**
|
|
|
|
```java
|
|
package com.cameleer.server.app.storage;
|
|
|
|
import com.cameleer.server.core.storage.ExecutionStore;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.jdbc.core.RowMapper;
|
|
import org.springframework.stereotype.Repository;
|
|
|
|
import java.sql.ResultSet;
|
|
import java.sql.SQLException;
|
|
import java.sql.Timestamp;
|
|
import java.time.Instant;
|
|
import java.util.List;
|
|
import java.util.Optional;
|
|
|
|
@Repository
|
|
public class PostgresExecutionStore implements ExecutionStore {
|
|
|
|
private final JdbcTemplate jdbc;
|
|
|
|
public PostgresExecutionStore(JdbcTemplate jdbc) {
|
|
this.jdbc = jdbc;
|
|
}
|
|
|
|
@Override
|
|
public void upsert(ExecutionRecord execution) {
|
|
jdbc.update("""
|
|
INSERT INTO executions (execution_id, route_id, agent_id, group_name,
|
|
status, correlation_id, exchange_id, start_time, end_time,
|
|
duration_ms, error_message, error_stacktrace, diagram_content_hash,
|
|
created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())
|
|
ON CONFLICT (execution_id, start_time) DO UPDATE SET
|
|
status = CASE
|
|
WHEN EXCLUDED.status IN ('COMPLETED', 'FAILED')
|
|
AND executions.status = 'RUNNING'
|
|
THEN EXCLUDED.status
|
|
WHEN EXCLUDED.status = executions.status THEN executions.status
|
|
ELSE EXCLUDED.status
|
|
END,
|
|
end_time = COALESCE(EXCLUDED.end_time, executions.end_time),
|
|
duration_ms = COALESCE(EXCLUDED.duration_ms, executions.duration_ms),
|
|
error_message = COALESCE(EXCLUDED.error_message, executions.error_message),
|
|
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, executions.error_stacktrace),
|
|
diagram_content_hash = COALESCE(EXCLUDED.diagram_content_hash, executions.diagram_content_hash),
|
|
updated_at = now()
|
|
""",
|
|
execution.executionId(), execution.routeId(), execution.agentId(),
|
|
execution.groupName(), execution.status(), execution.correlationId(),
|
|
execution.exchangeId(),
|
|
Timestamp.from(execution.startTime()),
|
|
execution.endTime() != null ? Timestamp.from(execution.endTime()) : null,
|
|
execution.durationMs(), execution.errorMessage(),
|
|
execution.errorStacktrace(), execution.diagramContentHash());
|
|
}
|
|
|
|
@Override
|
|
public void upsertProcessors(String executionId, Instant startTime,
|
|
String groupName, String routeId,
|
|
List<ProcessorRecord> processors) {
|
|
jdbc.batchUpdate("""
|
|
INSERT INTO processor_executions (execution_id, processor_id, processor_type,
|
|
diagram_node_id, group_name, route_id, depth, parent_processor_id,
|
|
status, start_time, end_time, duration_ms, error_message, error_stacktrace,
|
|
input_body, output_body, input_headers, output_headers)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb)
|
|
ON CONFLICT (execution_id, processor_id, start_time) DO UPDATE SET
|
|
status = EXCLUDED.status,
|
|
end_time = COALESCE(EXCLUDED.end_time, processor_executions.end_time),
|
|
duration_ms = COALESCE(EXCLUDED.duration_ms, processor_executions.duration_ms),
|
|
error_message = COALESCE(EXCLUDED.error_message, processor_executions.error_message),
|
|
error_stacktrace = COALESCE(EXCLUDED.error_stacktrace, processor_executions.error_stacktrace),
|
|
input_body = COALESCE(EXCLUDED.input_body, processor_executions.input_body),
|
|
output_body = COALESCE(EXCLUDED.output_body, processor_executions.output_body),
|
|
input_headers = COALESCE(EXCLUDED.input_headers, processor_executions.input_headers),
|
|
output_headers = COALESCE(EXCLUDED.output_headers, processor_executions.output_headers)
|
|
""",
|
|
processors.stream().map(p -> new Object[]{
|
|
p.executionId(), p.processorId(), p.processorType(),
|
|
p.diagramNodeId(), p.groupName(), p.routeId(),
|
|
p.depth(), p.parentProcessorId(), p.status(),
|
|
Timestamp.from(p.startTime()),
|
|
p.endTime() != null ? Timestamp.from(p.endTime()) : null,
|
|
p.durationMs(), p.errorMessage(), p.errorStacktrace(),
|
|
p.inputBody(), p.outputBody(), p.inputHeaders(), p.outputHeaders()
|
|
}).toList());
|
|
}
|
|
|
|
@Override
|
|
public Optional<ExecutionRecord> findById(String executionId) {
|
|
List<ExecutionRecord> results = jdbc.query(
|
|
"SELECT * FROM executions WHERE execution_id = ? ORDER BY start_time DESC LIMIT 1",
|
|
EXECUTION_MAPPER, executionId);
|
|
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
|
}
|
|
|
|
@Override
|
|
public List<ProcessorRecord> findProcessors(String executionId) {
|
|
return jdbc.query(
|
|
"SELECT * FROM processor_executions WHERE execution_id = ? ORDER BY depth, start_time",
|
|
PROCESSOR_MAPPER, executionId);
|
|
}
|
|
|
|
private static final RowMapper<ExecutionRecord> EXECUTION_MAPPER = (rs, rowNum) ->
|
|
new ExecutionRecord(
|
|
rs.getString("execution_id"), rs.getString("route_id"),
|
|
rs.getString("agent_id"), rs.getString("group_name"),
|
|
rs.getString("status"), rs.getString("correlation_id"),
|
|
rs.getString("exchange_id"),
|
|
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
|
|
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
|
|
rs.getString("error_message"), rs.getString("error_stacktrace"),
|
|
rs.getString("diagram_content_hash"));
|
|
|
|
private static final RowMapper<ProcessorRecord> PROCESSOR_MAPPER = (rs, rowNum) ->
|
|
new ProcessorRecord(
|
|
rs.getString("execution_id"), rs.getString("processor_id"),
|
|
rs.getString("processor_type"), rs.getString("diagram_node_id"),
|
|
rs.getString("group_name"), rs.getString("route_id"),
|
|
rs.getInt("depth"), rs.getString("parent_processor_id"),
|
|
rs.getString("status"),
|
|
toInstant(rs, "start_time"), toInstant(rs, "end_time"),
|
|
rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null,
|
|
rs.getString("error_message"), rs.getString("error_stacktrace"),
|
|
rs.getString("input_body"), rs.getString("output_body"),
|
|
rs.getString("input_headers"), rs.getString("output_headers"));
|
|
|
|
private static Instant toInstant(ResultSet rs, String column) throws SQLException {
|
|
Timestamp ts = rs.getTimestamp(column);
|
|
return ts != null ? ts.toInstant() : null;
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `mvn test -pl cameleer-server-app -Dtest=PostgresExecutionStoreIT -q`
|
|
Expected: PASS — all 3 tests green
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresExecutionStore.java
|
|
git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/PostgresExecutionStoreIT.java
|
|
git commit -m "feat: implement PostgresExecutionStore with upsert and dedup"
|
|
```
|
|
|
|
### Task 11: Implement PostgresStatsStore
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresStatsStore.java`
|
|
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/storage/PostgresStatsStoreIT.java`
|
|
|
|
- [ ] **Step 1: Write the failing test**
|
|
|
|
```java
|
|
package com.cameleer.server.app.storage;
|
|
|
|
import com.cameleer.server.app.AbstractPostgresIT;
|
|
import com.cameleer.server.core.search.ExecutionStats;
|
|
import com.cameleer.server.core.search.StatsTimeseries;
|
|
import com.cameleer.server.core.storage.ExecutionStore;
|
|
import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord;
|
|
import com.cameleer.server.core.storage.StatsStore;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
|
|
import java.time.Instant;
|
|
import java.time.temporal.ChronoUnit;
|
|
|
|
import static org.junit.jupiter.api.Assertions.*;
|
|
|
|
class PostgresStatsStoreIT extends AbstractPostgresIT {
|
|
|
|
@Autowired StatsStore statsStore;
|
|
@Autowired ExecutionStore executionStore;
|
|
@Autowired JdbcTemplate jdbc;
|
|
|
|
@Test
|
|
void statsReturnsCountsForTimeWindow() {
|
|
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
|
|
insertExecution("stats-1", "route-a", "app-1", "COMPLETED", now, 100L);
|
|
insertExecution("stats-2", "route-a", "app-1", "FAILED", now.plusSeconds(10), 200L);
|
|
insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L);
|
|
|
|
// Force continuous aggregate refresh
|
|
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)");
|
|
|
|
ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60));
|
|
assertEquals(3, stats.totalCount());
|
|
assertEquals(1, stats.failedCount());
|
|
}
|
|
|
|
@Test
|
|
void timeseriesReturnsBuckets() {
|
|
Instant now = Instant.now().truncatedTo(ChronoUnit.MINUTES);
|
|
for (int i = 0; i < 10; i++) {
|
|
insertExecution("ts-" + i, "route-a", "app-1", "COMPLETED",
|
|
now.plusSeconds(i * 30), 100L + i);
|
|
}
|
|
|
|
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', null, null)");
|
|
|
|
StatsTimeseries ts = statsStore.timeseries(now.minusMinutes(1), now.plusMinutes(10), 5);
|
|
assertNotNull(ts);
|
|
assertFalse(ts.buckets().isEmpty());
|
|
}
|
|
|
|
private void insertExecution(String id, String routeId, String groupName,
|
|
String status, Instant startTime, long durationMs) {
|
|
executionStore.upsert(new ExecutionRecord(
|
|
id, routeId, "agent-1", groupName, status, null, null,
|
|
startTime, startTime.plusMillis(durationMs), durationMs,
|
|
status.equals("FAILED") ? "error" : null, null, null));
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Run test to verify it fails**
|
|
|
|
Run: `mvn test -pl cameleer-server-app -Dtest=PostgresStatsStoreIT -q`
|
|
Expected: FAIL — `StatsStore` bean not found
|
|
|
|
- [ ] **Step 3: Implement PostgresStatsStore**
|
|
|
|
```java
|
|
package com.cameleer.server.app.storage;
|
|
|
|
import com.cameleer.server.core.search.ExecutionStats;
|
|
import com.cameleer.server.core.search.StatsTimeseries;
|
|
import com.cameleer.server.core.search.StatsTimeseries.TimeseriesBucket;
|
|
import com.cameleer.server.core.storage.StatsStore;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Repository;
|
|
|
|
import java.sql.Timestamp;
|
|
import java.time.Duration;
|
|
import java.time.Instant;
|
|
import java.time.temporal.ChronoUnit;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
|
|
@Repository
|
|
public class PostgresStatsStore implements StatsStore {
|
|
|
|
private final JdbcTemplate jdbc;
|
|
|
|
public PostgresStatsStore(JdbcTemplate jdbc) {
|
|
this.jdbc = jdbc;
|
|
}
|
|
|
|
@Override
|
|
public ExecutionStats stats(Instant from, Instant to) {
|
|
return queryStats("stats_1m_all", from, to, List.of());
|
|
}
|
|
|
|
@Override
|
|
public ExecutionStats statsForApp(Instant from, Instant to, String groupName) {
|
|
return queryStats("stats_1m_app", from, to, List.of(
|
|
new Filter("group_name", groupName)));
|
|
}
|
|
|
|
@Override
|
|
public ExecutionStats statsForRoute(Instant from, Instant to, String routeId, List<String> agentIds) {
|
|
// Note: agentIds is accepted for interface compatibility but not filterable
|
|
// on the continuous aggregate (it groups by route_id, not agent_id).
|
|
// All agents for the same route contribute to the same aggregate.
|
|
return queryStats("stats_1m_route", from, to, List.of(
|
|
new Filter("route_id", routeId)));
|
|
}
|
|
|
|
@Override
|
|
public ExecutionStats statsForProcessor(Instant from, Instant to, String routeId, String processorType) {
|
|
return queryStats("stats_1m_processor", from, to, List.of(
|
|
new Filter("route_id", routeId),
|
|
new Filter("processor_type", processorType)));
|
|
}
|
|
|
|
@Override
|
|
public StatsTimeseries timeseries(Instant from, Instant to, int bucketCount) {
|
|
return queryTimeseries("stats_1m_all", from, to, bucketCount, List.of(), true);
|
|
}
|
|
|
|
@Override
|
|
public StatsTimeseries timeseriesForApp(Instant from, Instant to, int bucketCount, String groupName) {
|
|
return queryTimeseries("stats_1m_app", from, to, bucketCount, List.of(
|
|
new Filter("group_name", groupName)), true);
|
|
}
|
|
|
|
@Override
|
|
public StatsTimeseries timeseriesForRoute(Instant from, Instant to, int bucketCount,
|
|
String routeId, List<String> agentIds) {
|
|
return queryTimeseries("stats_1m_route", from, to, bucketCount, List.of(
|
|
new Filter("route_id", routeId)), true);
|
|
}
|
|
|
|
@Override
|
|
public StatsTimeseries timeseriesForProcessor(Instant from, Instant to, int bucketCount,
|
|
String routeId, String processorType) {
|
|
// stats_1m_processor does NOT have running_count column
|
|
return queryTimeseries("stats_1m_processor", from, to, bucketCount, List.of(
|
|
new Filter("route_id", routeId),
|
|
new Filter("processor_type", processorType)), false);
|
|
}
|
|
|
|
private record Filter(String column, String value) {}
|
|
|
|
private ExecutionStats queryStats(String view, Instant from, Instant to, List<Filter> filters) {
|
|
// running_count only exists on execution-level aggregates, not processor
|
|
boolean hasRunning = !view.equals("stats_1m_processor");
|
|
String runningCol = hasRunning ? "COALESCE(SUM(running_count), 0)" : "0";
|
|
|
|
String sql = "SELECT COALESCE(SUM(total_count), 0) AS total_count, " +
|
|
"COALESCE(SUM(failed_count), 0) AS failed_count, " +
|
|
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
|
|
"COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
|
|
runningCol + " AS active_count " +
|
|
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
|
|
|
|
List<Object> params = new ArrayList<>();
|
|
params.add(Timestamp.from(from));
|
|
params.add(Timestamp.from(to));
|
|
for (Filter f : filters) {
|
|
sql += " AND " + f.column() + " = ?";
|
|
params.add(f.value());
|
|
}
|
|
|
|
long totalCount = 0, failedCount = 0, avgDuration = 0, p99Duration = 0, activeCount = 0;
|
|
var currentResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
|
|
rs.getLong("total_count"), rs.getLong("failed_count"),
|
|
rs.getLong("avg_duration"), rs.getLong("p99_duration"),
|
|
rs.getLong("active_count")
|
|
}, params.toArray());
|
|
if (!currentResult.isEmpty()) {
|
|
long[] r = currentResult.get(0);
|
|
totalCount = r[0]; failedCount = r[1]; avgDuration = r[2];
|
|
p99Duration = r[3]; activeCount = r[4];
|
|
}
|
|
|
|
// Previous period (shifted back 24h)
|
|
Instant prevFrom = from.minus(Duration.ofHours(24));
|
|
Instant prevTo = to.minus(Duration.ofHours(24));
|
|
List<Object> prevParams = new ArrayList<>();
|
|
prevParams.add(Timestamp.from(prevFrom));
|
|
prevParams.add(Timestamp.from(prevTo));
|
|
for (Filter f : filters) prevParams.add(f.value());
|
|
String prevSql = sql; // same shape, different time params
|
|
|
|
long prevTotal = 0, prevFailed = 0, prevAvg = 0, prevP99 = 0;
|
|
var prevResult = jdbc.query(prevSql, (rs, rowNum) -> new long[]{
|
|
rs.getLong("total_count"), rs.getLong("failed_count"),
|
|
rs.getLong("avg_duration"), rs.getLong("p99_duration")
|
|
}, prevParams.toArray());
|
|
if (!prevResult.isEmpty()) {
|
|
long[] r = prevResult.get(0);
|
|
prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3];
|
|
}
|
|
|
|
// Today total (from midnight UTC)
|
|
Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
|
|
List<Object> todayParams = new ArrayList<>();
|
|
todayParams.add(Timestamp.from(todayStart));
|
|
todayParams.add(Timestamp.from(Instant.now()));
|
|
for (Filter f : filters) todayParams.add(f.value());
|
|
String todaySql = sql;
|
|
|
|
long totalToday = 0;
|
|
var todayResult = jdbc.query(todaySql, (rs, rowNum) -> rs.getLong("total_count"),
|
|
todayParams.toArray());
|
|
if (!todayResult.isEmpty()) totalToday = todayResult.get(0);
|
|
|
|
return new ExecutionStats(
|
|
totalCount, failedCount, avgDuration, p99Duration, activeCount,
|
|
totalToday, prevTotal, prevFailed, prevAvg, prevP99);
|
|
}
|
|
|
|
private StatsTimeseries queryTimeseries(String view, Instant from, Instant to,
|
|
int bucketCount, List<Filter> filters,
|
|
boolean hasRunningCount) {
|
|
long intervalSeconds = Duration.between(from, to).toSeconds() / Math.max(bucketCount, 1);
|
|
if (intervalSeconds < 60) intervalSeconds = 60;
|
|
|
|
String runningCol = hasRunningCount ? "COALESCE(SUM(running_count), 0)" : "0";
|
|
|
|
String sql = "SELECT time_bucket(? * INTERVAL '1 second', bucket) AS period, " +
|
|
"COALESCE(SUM(total_count), 0) AS total_count, " +
|
|
"COALESCE(SUM(failed_count), 0) AS failed_count, " +
|
|
"CASE WHEN SUM(total_count) > 0 THEN SUM(duration_sum) / SUM(total_count) ELSE 0 END AS avg_duration, " +
|
|
"COALESCE(MAX(p99_duration), 0) AS p99_duration, " +
|
|
runningCol + " AS active_count " +
|
|
"FROM " + view + " WHERE bucket >= ? AND bucket < ?";
|
|
|
|
List<Object> params = new ArrayList<>();
|
|
params.add(intervalSeconds);
|
|
params.add(Timestamp.from(from));
|
|
params.add(Timestamp.from(to));
|
|
for (Filter f : filters) {
|
|
sql += " AND " + f.column() + " = ?";
|
|
params.add(f.value());
|
|
}
|
|
sql += " GROUP BY period ORDER BY period";
|
|
|
|
List<TimeseriesBucket> buckets = jdbc.query(sql, (rs, rowNum) ->
|
|
new TimeseriesBucket(
|
|
rs.getTimestamp("period").toInstant(),
|
|
rs.getLong("total_count"), rs.getLong("failed_count"),
|
|
rs.getLong("avg_duration"), rs.getLong("p99_duration"),
|
|
rs.getLong("active_count")
|
|
), params.toArray());
|
|
|
|
return new StatsTimeseries(buckets);
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `mvn test -pl cameleer-server-app -Dtest=PostgresStatsStoreIT -q`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresStatsStore.java
|
|
git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/PostgresStatsStoreIT.java
|
|
git commit -m "feat: implement PostgresStatsStore querying continuous aggregates"
|
|
```
|
|
|
|
### Task 12: Implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresDiagramStore.java`
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresUserRepository.java`
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresOidcConfigRepository.java`
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/storage/PostgresMetricsStore.java`
|
|
|
|
- [ ] **Step 1: Write failing test for PostgresDiagramStore**
|
|
|
|
Create `PostgresDiagramStoreIT.java` with tests for:
|
|
- `store()` + `findByContentHash()` roundtrip
|
|
- Content-hash dedup (store same hash twice, verify single row)
|
|
- `findContentHashForRoute()` returns latest for route+agent
|
|
- `findContentHashForRouteByAgents()` returns across agent list
|
|
|
|
- [ ] **Step 2: Implement PostgresDiagramStore**
|
|
|
|
Straightforward CRUD with `ON CONFLICT (content_hash) DO NOTHING`. Port the SHA-256 hashing logic from `ClickHouseDiagramRepository`. Use `JdbcTemplate` queries.
|
|
|
|
- [ ] **Step 3: Run diagram tests, verify pass**
|
|
|
|
- [ ] **Step 4: Implement PostgresUserRepository**
|
|
|
|
Implements `UserRepository` interface (existing interface in `core.security`, unchanged).
|
|
|
|
```java
|
|
package com.cameleer.server.app.storage;
|
|
|
|
import com.cameleer.server.core.security.UserInfo;
|
|
import com.cameleer.server.core.security.UserRepository;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Repository;
|
|
|
|
import java.sql.Array;
|
|
import java.sql.Timestamp;
|
|
import java.util.List;
|
|
import java.util.Optional;
|
|
|
|
@Repository
|
|
public class PostgresUserRepository implements UserRepository {
|
|
|
|
private final JdbcTemplate jdbc;
|
|
|
|
public PostgresUserRepository(JdbcTemplate jdbc) {
|
|
this.jdbc = jdbc;
|
|
}
|
|
|
|
@Override
|
|
public Optional<UserInfo> findById(String userId) {
|
|
var results = jdbc.query(
|
|
"SELECT * FROM users WHERE user_id = ?",
|
|
(rs, rowNum) -> mapUser(rs), userId);
|
|
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
|
}
|
|
|
|
@Override
|
|
public List<UserInfo> findAll() {
|
|
return jdbc.query("SELECT * FROM users ORDER BY user_id",
|
|
(rs, rowNum) -> mapUser(rs));
|
|
}
|
|
|
|
@Override
|
|
public void upsert(UserInfo user) {
|
|
jdbc.update("""
|
|
INSERT INTO users (user_id, provider, email, display_name, roles, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, now(), now())
|
|
ON CONFLICT (user_id) DO UPDATE SET
|
|
provider = EXCLUDED.provider, email = EXCLUDED.email,
|
|
display_name = EXCLUDED.display_name, roles = EXCLUDED.roles,
|
|
updated_at = now()
|
|
""",
|
|
user.userId(), user.provider(), user.email(), user.displayName(),
|
|
user.roles().toArray(new String[0]));
|
|
}
|
|
|
|
@Override
|
|
public void updateRoles(String userId, List<String> roles) {
|
|
jdbc.update("UPDATE users SET roles = ?, updated_at = now() WHERE user_id = ?",
|
|
roles.toArray(new String[0]), userId);
|
|
}
|
|
|
|
@Override
|
|
public void delete(String userId) {
|
|
jdbc.update("DELETE FROM users WHERE user_id = ?", userId);
|
|
}
|
|
|
|
private UserInfo mapUser(java.sql.ResultSet rs) throws java.sql.SQLException {
|
|
Array rolesArray = rs.getArray("roles");
|
|
String[] roles = rolesArray != null ? (String[]) rolesArray.getArray() : new String[0];
|
|
return new UserInfo(
|
|
rs.getString("user_id"), rs.getString("provider"),
|
|
rs.getString("email"), rs.getString("display_name"),
|
|
List.of(roles));
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 5: Implement PostgresOidcConfigRepository**
|
|
|
|
Implements `OidcConfigRepository` interface (existing interface in `core.security`).
|
|
|
|
```java
|
|
package com.cameleer.server.app.storage;
|
|
|
|
import com.cameleer.server.core.security.OidcConfig;
|
|
import com.cameleer.server.core.security.OidcConfigRepository;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Repository;
|
|
|
|
import java.sql.Array;
|
|
import java.util.List;
|
|
import java.util.Optional;
|
|
|
|
@Repository
|
|
public class PostgresOidcConfigRepository implements OidcConfigRepository {
|
|
|
|
private final JdbcTemplate jdbc;
|
|
|
|
public PostgresOidcConfigRepository(JdbcTemplate jdbc) {
|
|
this.jdbc = jdbc;
|
|
}
|
|
|
|
@Override
|
|
public Optional<OidcConfig> find() {
|
|
var results = jdbc.query(
|
|
"SELECT * FROM oidc_config WHERE config_id = 'default'",
|
|
(rs, rowNum) -> {
|
|
Array arr = rs.getArray("default_roles");
|
|
String[] roles = arr != null ? (String[]) arr.getArray() : new String[0];
|
|
return new OidcConfig(
|
|
rs.getBoolean("enabled"), rs.getString("issuer_uri"),
|
|
rs.getString("client_id"), rs.getString("client_secret"),
|
|
rs.getString("roles_claim"), List.of(roles),
|
|
rs.getBoolean("auto_signup"), rs.getString("display_name_claim"));
|
|
});
|
|
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
|
}
|
|
|
|
@Override
|
|
public void save(OidcConfig config) {
|
|
jdbc.update("""
|
|
INSERT INTO oidc_config (config_id, enabled, issuer_uri, client_id, client_secret,
|
|
roles_claim, default_roles, auto_signup, display_name_claim, updated_at)
|
|
VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, now())
|
|
ON CONFLICT (config_id) DO UPDATE SET
|
|
enabled = EXCLUDED.enabled, issuer_uri = EXCLUDED.issuer_uri,
|
|
client_id = EXCLUDED.client_id, client_secret = EXCLUDED.client_secret,
|
|
roles_claim = EXCLUDED.roles_claim, default_roles = EXCLUDED.default_roles,
|
|
auto_signup = EXCLUDED.auto_signup, display_name_claim = EXCLUDED.display_name_claim,
|
|
updated_at = now()
|
|
""",
|
|
config.enabled(), config.issuerUri(), config.clientId(), config.clientSecret(),
|
|
config.rolesClaim(), config.defaultRoles().toArray(new String[0]),
|
|
config.autoSignup(), config.displayNameClaim());
|
|
}
|
|
|
|
@Override
|
|
public void delete() {
|
|
jdbc.update("DELETE FROM oidc_config WHERE config_id = 'default'");
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 6: Implement PostgresMetricsStore**
|
|
|
|
```java
|
|
package com.cameleer.server.app.storage;
|
|
|
|
import com.cameleer.server.core.storage.MetricsStore;
|
|
import com.cameleer.server.core.storage.model.MetricsSnapshot;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Repository;
|
|
|
|
import java.sql.Timestamp;
|
|
import java.util.List;
|
|
|
|
@Repository
|
|
public class PostgresMetricsStore implements MetricsStore {
|
|
|
|
private static final ObjectMapper MAPPER = new ObjectMapper();
|
|
private final JdbcTemplate jdbc;
|
|
|
|
public PostgresMetricsStore(JdbcTemplate jdbc) {
|
|
this.jdbc = jdbc;
|
|
}
|
|
|
|
@Override
|
|
public void insertBatch(List<MetricsSnapshot> snapshots) {
|
|
jdbc.batchUpdate("""
|
|
INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags,
|
|
collected_at, server_received_at)
|
|
VALUES (?, ?, ?, ?::jsonb, ?, now())
|
|
""",
|
|
snapshots.stream().map(s -> new Object[]{
|
|
s.agentId(), s.metricName(), s.metricValue(),
|
|
tagsToJson(s.tags()),
|
|
Timestamp.from(s.collectedAt())
|
|
}).toList());
|
|
}
|
|
|
|
private String tagsToJson(java.util.Map<String, String> tags) {
|
|
if (tags == null || tags.isEmpty()) return null;
|
|
try { return MAPPER.writeValueAsString(tags); }
|
|
catch (JsonProcessingException e) { return null; }
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 7: Run all store tests, verify pass**
|
|
|
|
Run: `mvn test -pl cameleer-server-app -Dtest="Postgres*IT" -q`
|
|
|
|
- [ ] **Step 8: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/storage/Postgres*.java
|
|
git add cameleer-server-app/src/test/java/com/cameleer/server/app/storage/Postgres*.java
|
|
git commit -m "feat: implement PostgresDiagramStore, PostgresUserRepository, PostgresOidcConfigRepository, PostgresMetricsStore"
|
|
```
|
|
|
|
---
|
|
|
|
## Chunk 4: OpenSearch Integration
|
|
|
|
### Task 13: Implement OpenSearchIndex
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/OpenSearchConfig.java`
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/search/OpenSearchIndex.java`
|
|
- Create: `cameleer-server-app/src/test/java/com/cameleer/server/app/search/OpenSearchIndexIT.java`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
```java
|
|
package com.cameleer.server.app.search;
|
|
|
|
import com.cameleer.server.app.AbstractPostgresIT;
|
|
import com.cameleer.server.core.search.ExecutionSummary;
|
|
import com.cameleer.server.core.search.SearchRequest;
|
|
import com.cameleer.server.core.search.SearchResult;
|
|
import com.cameleer.server.core.storage.SearchIndex;
|
|
import com.cameleer.server.core.storage.model.ExecutionDocument;
|
|
import com.cameleer.server.core.storage.model.ExecutionDocument.ProcessorDoc;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.opensearch.testcontainers.OpensearchContainer;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.test.context.DynamicPropertyRegistry;
|
|
import org.springframework.test.context.DynamicPropertySource;
|
|
import org.testcontainers.junit.jupiter.Container;
|
|
|
|
import java.time.Instant;
|
|
import java.util.List;
|
|
|
|
import static org.junit.jupiter.api.Assertions.*;
|
|
|
|
// Extends AbstractPostgresIT for PostgreSQL datasource needed by Spring context
|
|
class OpenSearchIndexIT extends AbstractPostgresIT {
|
|
|
|
@Container
|
|
static final OpensearchContainer<?> opensearch =
|
|
new OpensearchContainer<>("opensearchproject/opensearch:2.19.0")
|
|
.withSecurityEnabled(false);
|
|
|
|
@DynamicPropertySource
|
|
static void configureOpenSearch(DynamicPropertyRegistry registry) {
|
|
registry.add("opensearch.url", opensearch::getHttpHostAddress);
|
|
}
|
|
|
|
@Autowired
|
|
SearchIndex searchIndex;
|
|
|
|
@Test
|
|
void indexAndSearchByText() throws Exception {
|
|
Instant now = Instant.now();
|
|
ExecutionDocument doc = new ExecutionDocument(
|
|
"search-1", "route-a", "agent-1", "app-1",
|
|
"FAILED", "corr-1", "exch-1",
|
|
now, now.plusMillis(100), 100L,
|
|
"OrderNotFoundException: order-12345 not found", null,
|
|
List.of(new ProcessorDoc("proc-1", "log", "COMPLETED",
|
|
null, null, "request body with customer-99", null, null, null)));
|
|
|
|
searchIndex.index(doc);
|
|
Thread.sleep(1500); // Allow OpenSearch refresh
|
|
|
|
SearchRequest request = new SearchRequest(
|
|
null, now.minusSeconds(60), now.plusSeconds(60),
|
|
null, null, null,
|
|
"OrderNotFoundException", null, null, null,
|
|
null, null, null, null, null,
|
|
0, 50, "startTime", "desc");
|
|
|
|
SearchResult<ExecutionSummary> result = searchIndex.search(request);
|
|
assertTrue(result.total() > 0);
|
|
assertEquals("search-1", result.items().get(0).executionId());
|
|
}
|
|
|
|
@Test
|
|
void wildcardSearchFindsSubstring() throws Exception {
|
|
Instant now = Instant.now();
|
|
ExecutionDocument doc = new ExecutionDocument(
|
|
"wild-1", "route-b", "agent-1", "app-1",
|
|
"COMPLETED", null, null,
|
|
now, now.plusMillis(50), 50L, null, null,
|
|
List.of(new ProcessorDoc("proc-1", "bean", "COMPLETED",
|
|
null, null, "UniquePayloadIdentifier12345", null, null, null)));
|
|
|
|
searchIndex.index(doc);
|
|
Thread.sleep(1500);
|
|
|
|
SearchRequest request = new SearchRequest(
|
|
null, now.minusSeconds(60), now.plusSeconds(60),
|
|
null, null, null,
|
|
"PayloadIdentifier", null, null, null,
|
|
null, null, null, null, null,
|
|
0, 50, "startTime", "desc");
|
|
|
|
SearchResult<ExecutionSummary> result = searchIndex.search(request);
|
|
assertTrue(result.total() > 0);
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Create OpenSearchConfig**
|
|
|
|
```java
|
|
package com.cameleer.server.app.config;
|
|
|
|
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
|
|
import org.apache.hc.core5.http.HttpHost;
|
|
import org.opensearch.client.opensearch.OpenSearchClient;
|
|
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.context.annotation.Bean;
|
|
import org.springframework.context.annotation.Configuration;
|
|
|
|
@Configuration
|
|
public class OpenSearchConfig {
|
|
|
|
@Value("${opensearch.url:http://localhost:9200}")
|
|
private String opensearchUrl;
|
|
|
|
@Bean
|
|
public OpenSearchClient openSearchClient() {
|
|
HttpHost host = HttpHost.create(opensearchUrl);
|
|
var transport = ApacheHttpClient5TransportBuilder.builder(host).build();
|
|
return new OpenSearchClient(transport);
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 3: Implement OpenSearchIndex**
|
|
|
|
```java
|
|
package com.cameleer.server.app.search;
|
|
|
|
import com.cameleer.server.core.search.ExecutionSummary;
|
|
import com.cameleer.server.core.search.SearchRequest;
|
|
import com.cameleer.server.core.search.SearchResult;
|
|
import com.cameleer.server.core.storage.SearchIndex;
|
|
import com.cameleer.server.core.storage.model.ExecutionDocument;
|
|
import com.cameleer.server.core.storage.model.ExecutionDocument.ProcessorDoc;
|
|
import jakarta.annotation.PostConstruct;
|
|
import org.opensearch.client.opensearch.OpenSearchClient;
|
|
import org.opensearch.client.opensearch._types.FieldValue;
|
|
import org.opensearch.client.opensearch._types.SortOrder;
|
|
import org.opensearch.client.opensearch._types.query_dsl.*;
|
|
import org.opensearch.client.opensearch.core.*;
|
|
import org.opensearch.client.opensearch.core.search.Hit;
|
|
import org.opensearch.client.opensearch.indices.*;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.stereotype.Repository;
|
|
|
|
import java.io.IOException;
|
|
import java.time.Instant;
|
|
import java.time.ZoneOffset;
|
|
import java.time.format.DateTimeFormatter;
|
|
import java.util.*;
|
|
import java.util.stream.Collectors;
|
|
|
|
@Repository
|
|
public class OpenSearchIndex implements SearchIndex {
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class);
|
|
private static final String INDEX_PREFIX = "executions-";
|
|
private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd")
|
|
.withZone(ZoneOffset.UTC);
|
|
|
|
private final OpenSearchClient client;
|
|
|
|
public OpenSearchIndex(OpenSearchClient client) {
|
|
this.client = client;
|
|
}
|
|
|
|
@PostConstruct
|
|
void ensureIndexTemplate() {
|
|
// Full template with ngram analyzer for infix wildcard search.
|
|
// The template JSON matches the spec's OpenSearch index template definition.
|
|
try {
|
|
boolean exists = client.indices().existsIndexTemplate(
|
|
ExistsIndexTemplateRequest.of(b -> b.name("executions-template"))).value();
|
|
if (!exists) {
|
|
client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b
|
|
.name("executions-template")
|
|
.indexPatterns(List.of("executions-*"))
|
|
.template(t -> t
|
|
.settings(s -> s
|
|
.numberOfShards("3")
|
|
.numberOfReplicas("1")
|
|
.analysis(a -> a
|
|
.analyzer("ngram_analyzer", an -> an
|
|
.custom(c -> c
|
|
.tokenizer("ngram_tokenizer")
|
|
.filter("lowercase")))
|
|
.tokenizer("ngram_tokenizer", tk -> tk
|
|
.definition(d -> d
|
|
.ngram(ng -> ng
|
|
.minGram(3)
|
|
.maxGram(4)
|
|
.tokenChars(TokenChar.Letter,
|
|
TokenChar.Digit,
|
|
TokenChar.Punctuation,
|
|
TokenChar.Symbol)))))))));
|
|
log.info("OpenSearch index template created with ngram analyzer");
|
|
}
|
|
} catch (IOException e) {
|
|
log.error("Failed to create index template", e);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void index(ExecutionDocument doc) {
|
|
String indexName = INDEX_PREFIX + DAY_FMT.format(doc.startTime());
|
|
try {
|
|
client.index(IndexRequest.of(b -> b
|
|
.index(indexName)
|
|
.id(doc.executionId())
|
|
.document(toMap(doc))));
|
|
} catch (IOException e) {
|
|
log.error("Failed to index execution {}", doc.executionId(), e);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public SearchResult<ExecutionSummary> search(SearchRequest request) {
|
|
try {
|
|
var searchReq = buildSearchRequest(request, request.limit());
|
|
var response = client.search(searchReq, Map.class);
|
|
|
|
List<ExecutionSummary> items = response.hits().hits().stream()
|
|
.map(this::hitToSummary)
|
|
.collect(Collectors.toList());
|
|
|
|
long total = response.hits().total() != null ? response.hits().total().value() : 0;
|
|
return new SearchResult<>(items, total);
|
|
} catch (IOException e) {
|
|
log.error("Search failed", e);
|
|
return new SearchResult<>(List.of(), 0);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public long count(SearchRequest request) {
|
|
try {
|
|
var countReq = CountRequest.of(b -> b
|
|
.index(INDEX_PREFIX + "*")
|
|
.query(buildQuery(request)));
|
|
return client.count(countReq).count();
|
|
} catch (IOException e) {
|
|
log.error("Count failed", e);
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void delete(String executionId) {
|
|
try {
|
|
client.deleteByQuery(DeleteByQueryRequest.of(b -> b
|
|
.index(List.of(INDEX_PREFIX + "*"))
|
|
.query(Query.of(q -> q.term(t -> t
|
|
.field("execution_id").value(executionId))))));
|
|
} catch (IOException e) {
|
|
log.error("Failed to delete execution {}", executionId, e);
|
|
}
|
|
}
|
|
|
|
private org.opensearch.client.opensearch.core.SearchRequest buildSearchRequest(
|
|
SearchRequest request, int size) {
|
|
return org.opensearch.client.opensearch.core.SearchRequest.of(b -> {
|
|
b.index(INDEX_PREFIX + "*")
|
|
.query(buildQuery(request))
|
|
.size(size)
|
|
.from(request.offset())
|
|
.sort(s -> s.field(f -> f
|
|
.field(request.sortColumn())
|
|
.order("asc".equalsIgnoreCase(request.sortDir())
|
|
? SortOrder.Asc : SortOrder.Desc)));
|
|
return b;
|
|
});
|
|
}
|
|
|
|
private Query buildQuery(SearchRequest request) {
|
|
List<Query> must = new ArrayList<>();
|
|
List<Query> filter = new ArrayList<>();
|
|
|
|
// Time range
|
|
if (request.timeFrom() != null || request.timeTo() != null) {
|
|
filter.add(Query.of(q -> q.range(r -> {
|
|
r.field("start_time");
|
|
if (request.timeFrom() != null)
|
|
r.gte(jakarta.json.Json.createValue(request.timeFrom().toString()));
|
|
if (request.timeTo() != null)
|
|
r.lte(jakarta.json.Json.createValue(request.timeTo().toString()));
|
|
return r;
|
|
})));
|
|
}
|
|
|
|
// Keyword filters
|
|
if (request.status() != null)
|
|
filter.add(termQuery("status", request.status()));
|
|
if (request.routeId() != null)
|
|
filter.add(termQuery("route_id", request.routeId()));
|
|
if (request.agentId() != null)
|
|
filter.add(termQuery("agent_id", request.agentId()));
|
|
if (request.correlationId() != null)
|
|
filter.add(termQuery("correlation_id", request.correlationId()));
|
|
|
|
// Full-text search across all fields + nested processor fields
|
|
if (request.text() != null && !request.text().isBlank()) {
|
|
String text = request.text();
|
|
List<Query> textQueries = new ArrayList<>();
|
|
|
|
// Search top-level text fields
|
|
textQueries.add(Query.of(q -> q.multiMatch(m -> m
|
|
.query(text)
|
|
.fields("error_message", "error_stacktrace",
|
|
"error_message.ngram", "error_stacktrace.ngram"))));
|
|
|
|
// Search nested processor fields
|
|
textQueries.add(Query.of(q -> q.nested(n -> n
|
|
.path("processors")
|
|
.query(nq -> nq.multiMatch(m -> m
|
|
.query(text)
|
|
.fields("processors.input_body", "processors.output_body",
|
|
"processors.input_headers", "processors.output_headers",
|
|
"processors.error_message", "processors.error_stacktrace",
|
|
"processors.input_body.ngram", "processors.output_body.ngram",
|
|
"processors.input_headers.ngram", "processors.output_headers.ngram",
|
|
"processors.error_message.ngram", "processors.error_stacktrace.ngram"))))));
|
|
|
|
// Also try keyword fields for exact matches
|
|
textQueries.add(Query.of(q -> q.multiMatch(m -> m
|
|
.query(text)
|
|
.fields("execution_id", "route_id", "agent_id", "correlation_id", "exchange_id"))));
|
|
|
|
must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1"))));
|
|
}
|
|
|
|
// Scoped text searches
|
|
if (request.textInBody() != null && !request.textInBody().isBlank()) {
|
|
must.add(Query.of(q -> q.nested(n -> n
|
|
.path("processors")
|
|
.query(nq -> nq.multiMatch(m -> m
|
|
.query(request.textInBody())
|
|
.fields("processors.input_body", "processors.output_body",
|
|
"processors.input_body.ngram", "processors.output_body.ngram"))))));
|
|
}
|
|
if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) {
|
|
must.add(Query.of(q -> q.nested(n -> n
|
|
.path("processors")
|
|
.query(nq -> nq.multiMatch(m -> m
|
|
.query(request.textInHeaders())
|
|
.fields("processors.input_headers", "processors.output_headers",
|
|
"processors.input_headers.ngram", "processors.output_headers.ngram"))))));
|
|
}
|
|
if (request.textInErrors() != null && !request.textInErrors().isBlank()) {
|
|
String errText = request.textInErrors();
|
|
must.add(Query.of(q -> q.bool(b -> b.should(
|
|
Query.of(sq -> sq.multiMatch(m -> m
|
|
.query(errText)
|
|
.fields("error_message", "error_stacktrace",
|
|
"error_message.ngram", "error_stacktrace.ngram"))),
|
|
Query.of(sq -> sq.nested(n -> n
|
|
.path("processors")
|
|
.query(nq -> nq.multiMatch(m -> m
|
|
.query(errText)
|
|
.fields("processors.error_message", "processors.error_stacktrace",
|
|
"processors.error_message.ngram", "processors.error_stacktrace.ngram")))))
|
|
).minimumShouldMatch("1"))));
|
|
}
|
|
|
|
// Duration range
|
|
if (request.durationMin() != null || request.durationMax() != null) {
|
|
filter.add(Query.of(q -> q.range(r -> {
|
|
r.field("duration_ms");
|
|
if (request.durationMin() != null)
|
|
r.gte(jakarta.json.Json.createValue(request.durationMin()));
|
|
if (request.durationMax() != null)
|
|
r.lte(jakarta.json.Json.createValue(request.durationMax()));
|
|
return r;
|
|
})));
|
|
}
|
|
|
|
return Query.of(q -> q.bool(b -> {
|
|
if (!must.isEmpty()) b.must(must);
|
|
if (!filter.isEmpty()) b.filter(filter);
|
|
if (must.isEmpty() && filter.isEmpty()) b.must(Query.of(mq -> mq.matchAll(m -> m)));
|
|
return b;
|
|
}));
|
|
}
|
|
|
|
private Query termQuery(String field, String value) {
|
|
return Query.of(q -> q.term(t -> t.field(field).value(value)));
|
|
}
|
|
|
|
private Map<String, Object> toMap(ExecutionDocument doc) {
|
|
Map<String, Object> map = new LinkedHashMap<>();
|
|
map.put("execution_id", doc.executionId());
|
|
map.put("route_id", doc.routeId());
|
|
map.put("agent_id", doc.agentId());
|
|
map.put("group_name", doc.groupName());
|
|
map.put("status", doc.status());
|
|
map.put("correlation_id", doc.correlationId());
|
|
map.put("exchange_id", doc.exchangeId());
|
|
map.put("start_time", doc.startTime() != null ? doc.startTime().toString() : null);
|
|
map.put("end_time", doc.endTime() != null ? doc.endTime().toString() : null);
|
|
map.put("duration_ms", doc.durationMs());
|
|
map.put("error_message", doc.errorMessage());
|
|
map.put("error_stacktrace", doc.errorStacktrace());
|
|
if (doc.processors() != null) {
|
|
map.put("processors", doc.processors().stream().map(p -> {
|
|
Map<String, Object> pm = new LinkedHashMap<>();
|
|
pm.put("processor_id", p.processorId());
|
|
pm.put("processor_type", p.processorType());
|
|
pm.put("status", p.status());
|
|
pm.put("error_message", p.errorMessage());
|
|
pm.put("error_stacktrace", p.errorStacktrace());
|
|
pm.put("input_body", p.inputBody());
|
|
pm.put("output_body", p.outputBody());
|
|
pm.put("input_headers", p.inputHeaders());
|
|
pm.put("output_headers", p.outputHeaders());
|
|
return pm;
|
|
}).toList());
|
|
}
|
|
return map;
|
|
}
|
|
|
|
@SuppressWarnings("unchecked")
|
|
private ExecutionSummary hitToSummary(Hit<Map> hit) {
|
|
Map<String, Object> src = hit.source();
|
|
if (src == null) return null;
|
|
return new ExecutionSummary(
|
|
(String) src.get("execution_id"),
|
|
(String) src.get("route_id"),
|
|
(String) src.get("agent_id"),
|
|
(String) src.get("status"),
|
|
src.get("start_time") != null ? Instant.parse((String) src.get("start_time")) : null,
|
|
src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null,
|
|
src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L,
|
|
(String) src.get("correlation_id"),
|
|
(String) src.get("error_message"));
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `mvn test -pl cameleer-server-app -Dtest=OpenSearchIndexIT -q`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/OpenSearchConfig.java
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/search/OpenSearchIndex.java
|
|
git add cameleer-server-app/src/test/java/com/cameleer/server/app/search/OpenSearchIndexIT.java
|
|
git commit -m "feat: implement OpenSearchIndex with full-text and wildcard search"
|
|
```
|
|
|
|
### Task 14: Implement SearchIndexer (debounced event-driven indexer)
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-core/src/main/java/com/cameleer/server/core/indexing/SearchIndexer.java`
|
|
|
|
- [ ] **Step 1: Implement SearchIndexer**
|
|
|
|
```java
|
|
package com.cameleer.server.core.indexing;
|
|
|
|
import com.cameleer.server.core.storage.ExecutionStore;
|
|
import com.cameleer.server.core.storage.ExecutionStore.ExecutionRecord;
|
|
import com.cameleer.server.core.storage.ExecutionStore.ProcessorRecord;
|
|
import com.cameleer.server.core.storage.SearchIndex;
|
|
import com.cameleer.server.core.storage.model.ExecutionDocument;
|
|
import com.cameleer.server.core.storage.model.ExecutionDocument.ProcessorDoc;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.concurrent.*;
|
|
|
|
public class SearchIndexer {
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(SearchIndexer.class);
|
|
|
|
private final ExecutionStore executionStore;
|
|
private final SearchIndex searchIndex;
|
|
private final long debounceMs;
|
|
private final int queueCapacity;
|
|
|
|
private final Map<String, ScheduledFuture<?>> pending = new ConcurrentHashMap<>();
|
|
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
|
|
r -> { Thread t = new Thread(r, "search-indexer"); t.setDaemon(true); return t; });
|
|
|
|
public SearchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
|
|
long debounceMs, int queueCapacity) {
|
|
this.executionStore = executionStore;
|
|
this.searchIndex = searchIndex;
|
|
this.debounceMs = debounceMs;
|
|
this.queueCapacity = queueCapacity;
|
|
}
|
|
|
|
public void onExecutionUpdated(ExecutionUpdatedEvent event) {
|
|
if (pending.size() >= queueCapacity) {
|
|
log.warn("Search indexer queue full, dropping event for {}", event.executionId());
|
|
return;
|
|
}
|
|
|
|
ScheduledFuture<?> existing = pending.put(event.executionId(),
|
|
scheduler.schedule(() -> indexExecution(event.executionId()),
|
|
debounceMs, TimeUnit.MILLISECONDS));
|
|
if (existing != null) {
|
|
existing.cancel(false);
|
|
}
|
|
}
|
|
|
|
private void indexExecution(String executionId) {
|
|
pending.remove(executionId);
|
|
try {
|
|
ExecutionRecord exec = executionStore.findById(executionId).orElse(null);
|
|
if (exec == null) return;
|
|
|
|
List<ProcessorRecord> processors = executionStore.findProcessors(executionId);
|
|
List<ProcessorDoc> processorDocs = processors.stream()
|
|
.map(p -> new ProcessorDoc(
|
|
p.processorId(), p.processorType(), p.status(),
|
|
p.errorMessage(), p.errorStacktrace(),
|
|
p.inputBody(), p.outputBody(),
|
|
p.inputHeaders(), p.outputHeaders()))
|
|
.toList();
|
|
|
|
searchIndex.index(new ExecutionDocument(
|
|
exec.executionId(), exec.routeId(), exec.agentId(), exec.groupName(),
|
|
exec.status(), exec.correlationId(), exec.exchangeId(),
|
|
exec.startTime(), exec.endTime(), exec.durationMs(),
|
|
exec.errorMessage(), exec.errorStacktrace(), processorDocs));
|
|
} catch (Exception e) {
|
|
log.error("Failed to index execution {}", executionId, e);
|
|
}
|
|
}
|
|
|
|
public void shutdown() {
|
|
scheduler.shutdown();
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-core/src/main/java/com/cameleer/server/core/indexing/SearchIndexer.java
|
|
git commit -m "feat: implement debounced SearchIndexer for async OpenSearch indexing"
|
|
```
|
|
|
|
---
|
|
|
|
## Chunk 5: Wiring, Cleanup, and Integration
|
|
|
|
### Task 15: Create bean configuration and wire everything
|
|
|
|
**Files:**
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java`
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/MetricsFlushScheduler.java`
|
|
- Create: `cameleer-server-app/src/main/java/com/cameleer/server/app/retention/RetentionScheduler.java`
|
|
- Modify: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/SearchBeanConfig.java`
|
|
- Modify: `cameleer-server-app/src/main/java/com/cameleer/server/app/config/IngestionBeanConfig.java`
|
|
|
|
- [ ] **Step 1: Create StorageBeanConfig**
|
|
|
|
Wire `DetailService`, `SearchIndexer`, `IngestionService` with new store beans:
|
|
|
|
```java
|
|
package com.cameleer.server.app.config;
|
|
|
|
import com.cameleer.server.core.detail.DetailService;
|
|
import com.cameleer.server.core.indexing.SearchIndexer;
|
|
import com.cameleer.server.core.ingestion.IngestionService;
|
|
import com.cameleer.server.core.ingestion.WriteBuffer;
|
|
import com.cameleer.server.core.storage.*;
|
|
import com.cameleer.server.core.storage.model.MetricsSnapshot;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.context.annotation.Bean;
|
|
import org.springframework.context.annotation.Configuration;
|
|
|
|
@Configuration
|
|
public class StorageBeanConfig {
|
|
|
|
@Bean
|
|
public DetailService detailService(ExecutionStore executionStore) {
|
|
return new DetailService(executionStore);
|
|
}
|
|
|
|
@Bean(destroyMethod = "shutdown")
|
|
public SearchIndexer searchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
|
|
@Value("${opensearch.debounce-ms:2000}") long debounceMs,
|
|
@Value("${opensearch.queue-size:10000}") int queueSize) {
|
|
return new SearchIndexer(executionStore, searchIndex, debounceMs, queueSize);
|
|
}
|
|
|
|
@Bean
|
|
public IngestionService ingestionService(ExecutionStore executionStore,
|
|
DiagramStore diagramStore,
|
|
WriteBuffer<MetricsSnapshot> metricsBuffer,
|
|
SearchIndexer searchIndexer,
|
|
@Value("${cameleer.body-size-limit:16384}") int bodySizeLimit) {
|
|
return new IngestionService(executionStore, diagramStore, metricsBuffer,
|
|
searchIndexer::onExecutionUpdated, bodySizeLimit);
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Update SearchBeanConfig**
|
|
|
|
Wire `SearchService` with `SearchIndex` + `StatsStore`:
|
|
|
|
```java
|
|
@Bean
|
|
public SearchService searchService(SearchIndex searchIndex, StatsStore statsStore) {
|
|
return new SearchService(searchIndex, statsStore);
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 3: Update IngestionBeanConfig**
|
|
|
|
Remove execution and diagram write buffers. Keep only metrics write buffer:
|
|
|
|
```java
|
|
@Bean
|
|
public WriteBuffer<MetricsSnapshot> metricsBuffer(IngestionConfig config) {
|
|
return new WriteBuffer<>(config.getBufferCapacity());
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Create MetricsFlushScheduler**
|
|
|
|
Create `cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/MetricsFlushScheduler.java`:
|
|
|
|
```java
|
|
package com.cameleer.server.app.ingestion;
|
|
|
|
import com.cameleer.server.app.config.IngestionConfig;
|
|
import com.cameleer.server.core.ingestion.WriteBuffer;
|
|
import com.cameleer.server.core.storage.MetricsStore;
|
|
import com.cameleer.server.core.storage.model.MetricsSnapshot;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.context.SmartLifecycle;
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
import java.util.List;
|
|
|
|
@Component
|
|
public class MetricsFlushScheduler implements SmartLifecycle {
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(MetricsFlushScheduler.class);
|
|
|
|
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
|
|
private final MetricsStore metricsStore;
|
|
private final int batchSize;
|
|
private volatile boolean running = false;
|
|
|
|
public MetricsFlushScheduler(WriteBuffer<MetricsSnapshot> metricsBuffer,
|
|
MetricsStore metricsStore,
|
|
IngestionConfig config) {
|
|
this.metricsBuffer = metricsBuffer;
|
|
this.metricsStore = metricsStore;
|
|
this.batchSize = config.getBatchSize();
|
|
}
|
|
|
|
@Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
|
|
public void flush() {
|
|
try {
|
|
List<MetricsSnapshot> batch = metricsBuffer.drain(batchSize);
|
|
if (!batch.isEmpty()) {
|
|
metricsStore.insertBatch(batch);
|
|
log.debug("Flushed {} metrics to PostgreSQL", batch.size());
|
|
}
|
|
} catch (Exception e) {
|
|
log.error("Failed to flush metrics", e);
|
|
}
|
|
}
|
|
|
|
@Override public void start() { running = true; }
|
|
@Override public void stop() {
|
|
// Drain remaining on shutdown
|
|
while (metricsBuffer.size() > 0) {
|
|
List<MetricsSnapshot> batch = metricsBuffer.drain(batchSize);
|
|
if (batch.isEmpty()) break;
|
|
try { metricsStore.insertBatch(batch); }
|
|
catch (Exception e) { log.error("Failed to flush metrics during shutdown", e); break; }
|
|
}
|
|
running = false;
|
|
}
|
|
@Override public boolean isRunning() { return running; }
|
|
@Override public int getPhase() { return Integer.MAX_VALUE - 1; }
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 5: Create RetentionScheduler**
|
|
|
|
Create `cameleer-server-app/src/main/java/com/cameleer/server/app/retention/RetentionScheduler.java`:
|
|
|
|
```java
|
|
package com.cameleer.server.app.retention;
|
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
@Component
|
|
public class RetentionScheduler {
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(RetentionScheduler.class);
|
|
|
|
private final JdbcTemplate jdbc;
|
|
private final int retentionDays;
|
|
|
|
public RetentionScheduler(JdbcTemplate jdbc,
|
|
@Value("${cameleer.retention-days:30}") int retentionDays) {
|
|
this.jdbc = jdbc;
|
|
this.retentionDays = retentionDays;
|
|
}
|
|
|
|
@Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM UTC
|
|
public void dropExpiredChunks() {
|
|
String interval = retentionDays + " days";
|
|
try {
|
|
// Raw data
|
|
jdbc.execute("SELECT drop_chunks('executions', INTERVAL '" + interval + "')");
|
|
jdbc.execute("SELECT drop_chunks('processor_executions', INTERVAL '" + interval + "')");
|
|
jdbc.execute("SELECT drop_chunks('agent_metrics', INTERVAL '" + interval + "')");
|
|
|
|
// Continuous aggregates (keep 3x longer)
|
|
String caggInterval = (retentionDays * 3) + " days";
|
|
jdbc.execute("SELECT drop_chunks('stats_1m_all', INTERVAL '" + caggInterval + "')");
|
|
jdbc.execute("SELECT drop_chunks('stats_1m_app', INTERVAL '" + caggInterval + "')");
|
|
jdbc.execute("SELECT drop_chunks('stats_1m_route', INTERVAL '" + caggInterval + "')");
|
|
jdbc.execute("SELECT drop_chunks('stats_1m_processor', INTERVAL '" + caggInterval + "')");
|
|
|
|
log.info("Retention: dropped chunks older than {} days (aggregates: {} days)",
|
|
retentionDays, retentionDays * 3);
|
|
} catch (Exception e) {
|
|
log.error("Retention job failed", e);
|
|
}
|
|
}
|
|
// Note: OpenSearch daily index deletion should be handled via ILM policy
|
|
// configured at deployment time, not in application code.
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/StorageBeanConfig.java
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/SearchBeanConfig.java
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/config/IngestionBeanConfig.java
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/MetricsFlushScheduler.java
|
|
git add cameleer-server-app/src/main/java/com/cameleer/server/app/retention/RetentionScheduler.java
|
|
git commit -m "feat: wire new storage beans, add MetricsFlushScheduler and RetentionScheduler"
|
|
```
|
|
|
|
### Task 16: Delete ClickHouse code and old interfaces
|
|
|
|
**Files:**
|
|
- Delete all files listed in "Files to delete" section above
|
|
|
|
- [ ] **Step 1: Delete ClickHouse implementations**
|
|
|
|
```bash
|
|
rm cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouse*.java
|
|
rm cameleer-server-app/src/main/java/com/cameleer/server/app/search/ClickHouseSearchEngine.java
|
|
rm cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ClickHouseFlushScheduler.java
|
|
rm cameleer-server-app/src/main/java/com/cameleer/server/app/config/ClickHouseConfig.java
|
|
```
|
|
|
|
- [ ] **Step 2: Delete old core interfaces replaced by new ones**
|
|
|
|
`UserRepository` and `OidcConfigRepository` in `core.security` are **kept** — the new Postgres implementations implement them. Only interfaces replaced by new storage interfaces are deleted.
|
|
|
|
```bash
|
|
rm cameleer-server-core/src/main/java/com/cameleer/server/core/storage/ExecutionRepository.java
|
|
rm cameleer-server-core/src/main/java/com/cameleer/server/core/storage/DiagramRepository.java
|
|
rm cameleer-server-core/src/main/java/com/cameleer/server/core/storage/MetricsRepository.java
|
|
rm cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchEngine.java
|
|
rm cameleer-server-core/src/main/java/com/cameleer/server/core/detail/RawExecutionRow.java
|
|
```
|
|
|
|
- [ ] **Step 3: Delete ClickHouse SQL migrations**
|
|
|
|
```bash
|
|
rm -r cameleer-server-app/src/main/resources/clickhouse/
|
|
```
|
|
|
|
- [ ] **Step 4: Delete old test base class**
|
|
|
|
```bash
|
|
rm cameleer-server-app/src/test/java/com/cameleer/server/app/AbstractClickHouseIT.java
|
|
```
|
|
|
|
- [ ] **Step 5: Fix compilation errors in specific files**
|
|
|
|
Each file and what to change:
|
|
- `DiagramRenderController.java` — change `DiagramRepository` to `DiagramStore`
|
|
- `DiagramController.java` — change `DiagramRepository` to `DiagramStore`
|
|
- `SearchController.java` — already uses `SearchService`, verify no direct `SearchEngine` refs
|
|
- `DetailController.java` — already uses `DetailService`, verify no direct `ExecutionRepository` refs
|
|
- `TreeReconstructionTest.java` — rewrite to test `DetailService.buildTree()` with `ProcessorRecord` list input
|
|
- `ExecutionController.java` — update to call `IngestionService.ingestExecution()` (synchronous, catches exceptions for 503)
|
|
- `DiagramController.java` — update to call `IngestionService.ingestDiagram()` (synchronous)
|
|
- `MetricsController.java` — update to call `IngestionService.acceptMetrics()` (still buffered)
|
|
- `SearchBeanConfig.java` — wire `SearchService(SearchIndex, StatsStore)` instead of `SearchService(SearchEngine)`
|
|
- `IngestionBeanConfig.java` — remove execution/diagram buffer beans, add `bodySizeLimit` to `IngestionService` constructor
|
|
- All ITs extending `AbstractClickHouseIT` — change to extend `AbstractPostgresIT`
|
|
|
|
- [ ] **Step 6: Verify compilation**
|
|
|
|
Run: `mvn compile -q`
|
|
Expected: BUILD SUCCESS
|
|
|
|
- [ ] **Step 7: Commit**
|
|
|
|
```bash
|
|
git add -A
|
|
git commit -m "refactor: remove all ClickHouse code, old interfaces, and SQL migrations"
|
|
```
|
|
|
|
### Task 17: Update existing integration tests
|
|
|
|
**Files:**
|
|
- Modify: all IT files under `cameleer-server-app/src/test/`
|
|
|
|
- [ ] **Step 1: Update all ITs to extend AbstractPostgresIT**
|
|
|
|
Every IT that currently extends `AbstractClickHouseIT` must extend `AbstractPostgresIT` instead. Also add OpenSearch Testcontainer where search tests are needed.
|
|
|
|
- [ ] **Step 2: Update ExecutionControllerIT**
|
|
|
|
Adapt to synchronous writes (no buffer flush delay). The controller now returns 202 on success, 503 on DB write failure.
|
|
|
|
- [ ] **Step 3: Update SearchControllerIT**
|
|
|
|
Search now hits OpenSearch. Add OpenSearch Testcontainer. Allow time for async indexing (use Awaitility to poll search results rather than `Thread.sleep`).
|
|
|
|
- [ ] **Step 4: Update DetailControllerIT**
|
|
|
|
Detail now reads from `PostgresExecutionStore` directly. Simpler setup — just insert execution + processors.
|
|
|
|
- [ ] **Step 5: Run full test suite**
|
|
|
|
Run: `mvn verify -q`
|
|
Expected: BUILD SUCCESS, all tests pass
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add -A
|
|
git commit -m "test: migrate all integration tests from ClickHouse to PostgreSQL + OpenSearch"
|
|
```
|
|
|
|
### Task 18: Update Dockerfile and K8s manifests
|
|
|
|
**Files:**
|
|
- Modify: `Dockerfile`
|
|
- Modify: `deploy/*.yaml` (K8s manifests)
|
|
|
|
- [ ] **Step 1: Update Dockerfile**
|
|
|
|
No JDBC driver changes needed in Dockerfile (drivers are in the fat JAR). Just verify the `REGISTRY_TOKEN` build arg still works for `cameleer-common` resolution.
|
|
|
|
- [ ] **Step 2: Update K8s manifests**
|
|
|
|
- Replace ClickHouse StatefulSet with PostgreSQL/TimescaleDB StatefulSet
|
|
- Add OpenSearch StatefulSet (single-node for dev, clustered for prod)
|
|
- Update server Deployment env vars: `SPRING_DATASOURCE_URL`, `OPENSEARCH_URL`
|
|
- Update secrets: replace `clickhouse-credentials` with `postgres-credentials`
|
|
- Update health check probes: PostgreSQL uses standard port 5432, OpenSearch uses 9200
|
|
|
|
- [ ] **Step 3: Commit**
|
|
|
|
```bash
|
|
git add Dockerfile deploy/
|
|
git commit -m "deploy: replace ClickHouse with PostgreSQL/TimescaleDB + OpenSearch in K8s manifests"
|
|
```
|
|
|
|
### Task 19: Update CI workflow
|
|
|
|
**Files:**
|
|
- Modify: `.gitea/workflows/ci.yml`
|
|
|
|
- [ ] **Step 1: Update CI workflow**
|
|
|
|
Changes needed:
|
|
- Docker build: no ClickHouse references in the image (it's a fat JAR, driver is bundled)
|
|
- Deploy step: update K8s secret names from `clickhouse-credentials` to `postgres-credentials`
|
|
- Deploy step: add OpenSearch deployment manifests
|
|
- Verify `REGISTRY_TOKEN` build arg still works for `cameleer-common`
|
|
- Integration tests still skipped in CI (`-DskipITs`) — Testcontainers needs Docker-in-Docker
|
|
|
|
- [ ] **Step 2: Commit**
|
|
|
|
```bash
|
|
git add .gitea/workflows/ci.yml
|
|
git commit -m "ci: update workflow for PostgreSQL + OpenSearch deployment"
|
|
```
|
|
|
|
### Task 20: Update OpenAPI spec
|
|
|
|
- [ ] **Step 1: Regenerate openapi.json**
|
|
|
|
Run the server locally (or via test) and export the OpenAPI spec. The API surface hasn't changed — only the storage backend.
|
|
|
|
Run: `mvn spring-boot:run` (briefly, to generate spec), then fetch `/api/v1/api-docs`.
|
|
|
|
- [ ] **Step 2: Commit**
|
|
|
|
```bash
|
|
git add cameleer-server-app/src/main/resources/static/openapi.json
|
|
git commit -m "docs: regenerate openapi.json after storage layer refactor"
|
|
```
|
|
|
|
### Task 21: Final verification
|
|
|
|
- [ ] **Step 1: Full build**
|
|
|
|
Run: `mvn clean verify -q`
|
|
Expected: BUILD SUCCESS
|
|
|
|
- [ ] **Step 2: Manual smoke test**
|
|
|
|
Start server against local PostgreSQL/TimescaleDB + OpenSearch (Docker Compose or individual containers):
|
|
1. POST an execution → verify 202
|
|
2. GET `/api/v1/search?text=...` → verify search works
|
|
3. GET `/api/v1/stats` → verify stats return
|
|
4. GET `/api/v1/detail/{id}` → verify detail with processor tree
|
|
|
|
- [ ] **Step 3: Commit any remaining fixes**
|
|
|
|
```bash
|
|
git add -A
|
|
git commit -m "fix: final adjustments from smoke testing"
|
|
```
|