From 574f82b731825e939cc6b6adc774db3f0882bb95 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Sat, 4 Apr 2026 15:45:49 +0200 Subject: [PATCH] docs: add historical implementation plans Co-Authored-By: Claude Opus 4.6 (1M context) --- ...-31-clickhouse-phase2-executions-search.md | 1828 +++++++++++++++++ ...03-31-clickhouse-phase3-stats-analytics.md | 431 ++++ ...3-31-clickhouse-phase4-remaining-tables.md | 244 +++ .../2026-04-02-admin-context-separation.md | 285 +++ ...2026-04-02-composable-sidebar-migration.md | 572 ++++++ 5 files changed, 3360 insertions(+) create mode 100644 docs/superpowers/plans/2026-03-31-clickhouse-phase2-executions-search.md create mode 100644 docs/superpowers/plans/2026-03-31-clickhouse-phase3-stats-analytics.md create mode 100644 docs/superpowers/plans/2026-03-31-clickhouse-phase4-remaining-tables.md create mode 100644 docs/superpowers/plans/2026-04-02-admin-context-separation.md create mode 100644 docs/superpowers/plans/2026-04-02-composable-sidebar-migration.md diff --git a/docs/superpowers/plans/2026-03-31-clickhouse-phase2-executions-search.md b/docs/superpowers/plans/2026-03-31-clickhouse-phase2-executions-search.md new file mode 100644 index 00000000..904b8d10 --- /dev/null +++ b/docs/superpowers/plans/2026-03-31-clickhouse-phase2-executions-search.md @@ -0,0 +1,1828 @@ +# ClickHouse Phase 2: Executions + Search — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Ingest chunked execution data into ClickHouse and provide a ClickHouse-backed search implementation behind a feature flag. Replace the old `RouteExecution` tree ingestion with `ExecutionChunk` + `FlatProcessorRecord` flat ingestion. + +**Architecture:** Agents send `ExecutionChunk` documents containing flat `FlatProcessorRecord` entries with `seq`/`parentSeq`/`iteration` fields. A new `ChunkIngestionController` accepts chunks at `POST /api/v1/data/chunks`. A `ChunkAccumulator` buffers exchange envelope data in-memory, inserts processor records immediately via WriteBuffer, and writes the execution row when the final chunk arrives. A `ClickHouseSearchIndex` implements the `SearchIndex` interface using SQL with ngram bloom filter acceleration. Feature flags control which search backend is active. The old `RouteExecution` ingestion path is removed (no backward compatibility needed). + +**Tech Stack:** ClickHouse 24.12, clickhouse-jdbc 0.9.7 (all classifier), Spring JdbcTemplate, Testcontainers + +**Design Spec:** `docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md` + +**Note on cameleer3-common:** The agent team is refactoring `cameleer3-common` to add `ExecutionChunk` and `FlatProcessorRecord`. Until that library is published, this plan defines server-side DTOs in `cameleer3-server-core` that mirror the common models. When the common lib is ready, swap the server DTOs for the shared classes (import change only). + +--- + +## File Structure + +| File | Responsibility | +|------|----------------| +| `cameleer3-server-app/.../resources/clickhouse/V2__executions.sql` | DDL for `executions` table (ReplacingMergeTree) | +| `cameleer3-server-app/.../resources/clickhouse/V3__processor_executions.sql` | DDL for `processor_executions` table (MergeTree) — uses seq/parentSeq/iteration | +| `cameleer3-server-core/.../model/ExecutionChunk.java` | Server-side DTO mirroring agent's ExecutionChunk (temporary until common lib ready) | +| `cameleer3-server-core/.../model/FlatProcessorRecord.java` | Server-side DTO mirroring agent's FlatProcessorRecord (temporary until common lib ready) | +| `cameleer3-server-core/.../ingestion/ChunkAccumulator.java` | Accumulates exchange envelope across chunks, pushes processor records + final execution row to WriteBuffers | +| `cameleer3-server-core/.../ingestion/MergedExecution.java` | Record holding merged execution envelope + version + tenant | +| `cameleer3-server-app/.../storage/ClickHouseExecutionStore.java` | Batch INSERT for executions + processor_executions to ClickHouse | +| `cameleer3-server-app/.../search/ClickHouseSearchIndex.java` | SearchIndex impl using SQL with ngram indexes | +| `cameleer3-server-app/.../ingestion/ExecutionFlushScheduler.java` | Drains execution + processor WriteBuffers → ClickHouseExecutionStore | +| `cameleer3-server-app/.../controller/ChunkIngestionController.java` | REST endpoint `POST /api/v1/data/chunks` accepting ExecutionChunk | +| `cameleer3-server-app/.../config/StorageBeanConfig.java` | Modified: add chunk accumulator + CH search beans | +| `cameleer3-server-app/.../config/IngestionBeanConfig.java` | Modified: add execution + processor WriteBuffer beans | +| `cameleer3-server-app/.../resources/application.yml` | Modified: add `cameleer.storage.search` flag | +| `cameleer3-server-app/...test.../storage/ClickHouseExecutionStoreIT.java` | Integration test for CH execution writes | +| `cameleer3-server-app/...test.../search/ClickHouseSearchIndexIT.java` | Integration test for CH search | +| `cameleer3-server-core/...test.../ingestion/ChunkAccumulatorTest.java` | Unit test for accumulator logic | + +--- + +### Task 1: Server-Side DTOs (ExecutionChunk + FlatProcessorRecord) + +**Files:** +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java` +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java` + +These mirror the agent's models exactly. When `cameleer3-common` is published with these classes, delete these files and update imports. + +- [ ] **Step 1: Create FlatProcessorRecord** + +```java +// cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java +package com.cameleer3.server.core.storage.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; + +import java.time.Instant; +import java.util.Map; + +/** + * Flat processor execution record with seq/parentSeq for tree reconstruction. + * Mirrors cameleer3-common FlatProcessorRecord — replace with common lib import when available. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public record FlatProcessorRecord( + int seq, + Integer parentSeq, + String parentProcessorId, + String processorId, + String processorType, + Integer iteration, + Integer iterationSize, + String status, + Instant startTime, + long durationMs, + String resolvedEndpointUri, + String inputBody, + String outputBody, + Map inputHeaders, + Map outputHeaders, + String errorMessage, + String errorStackTrace, + String errorType, + String errorCategory, + String rootCauseType, + String rootCauseMessage, + Map attributes, + String circuitBreakerState, + Boolean fallbackTriggered, + Boolean filterMatched, + Boolean duplicateMessage +) {} +``` + +- [ ] **Step 2: Create ExecutionChunk** + +```java +// cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java +package com.cameleer3.server.core.storage.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +/** + * Chunk document: exchange envelope + list of FlatProcessorRecords. + * Mirrors cameleer3-common ExecutionChunk — replace with common lib import when available. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public record ExecutionChunk( + String exchangeId, + String applicationName, + String agentId, + String routeId, + String correlationId, + String status, + Instant startTime, + Instant endTime, + Long durationMs, + String engineLevel, + String errorMessage, + String errorStackTrace, + String errorType, + String errorCategory, + String rootCauseType, + String rootCauseMessage, + Map attributes, + String traceId, + String spanId, + String originalExchangeId, + String replayExchangeId, + int chunkSeq, + @JsonProperty("final") boolean isFinal, + List processors +) {} +``` + +- [ ] **Step 3: Write deserialization test** + +```java +// cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java +package com.cameleer3.server.core.storage.model; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +class ExecutionChunkDeserializationTest { + + private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()); + + @Test + void roundTrip_fullChunk() throws Exception { + ExecutionChunk chunk = new ExecutionChunk( + "ex-1", "order-service", "pod-1", "order-route", + "corr-1", "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + "REGULAR", + null, null, null, null, null, null, + Map.of("orderId", "ORD-1"), + "trace-1", "span-1", null, null, + 2, true, + List.of(new FlatProcessorRecord( + 1, null, null, "log1", "log", + null, null, "COMPLETED", + Instant.parse("2026-03-31T10:00:00.100Z"), 5L, + null, "body", null, null, null, + null, null, null, null, null, null, + null, null, null, null, null))); + + String json = MAPPER.writeValueAsString(chunk); + ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class); + + assertThat(deserialized.exchangeId()).isEqualTo("ex-1"); + assertThat(deserialized.isFinal()).isTrue(); + assertThat(deserialized.chunkSeq()).isEqualTo(2); + assertThat(deserialized.processors()).hasSize(1); + assertThat(deserialized.processors().get(0).seq()).isEqualTo(1); + assertThat(deserialized.processors().get(0).processorId()).isEqualTo("log1"); + assertThat(deserialized.attributes()).containsEntry("orderId", "ORD-1"); + } + + @Test + void roundTrip_runningChunkWithIterations() throws Exception { + ExecutionChunk chunk = new ExecutionChunk( + "ex-2", "app", "agent-1", "route-1", + "ex-2", "RUNNING", + Instant.parse("2026-03-31T10:00:00Z"), + null, null, "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 0, false, + List.of( + new FlatProcessorRecord( + 1, null, null, "split1", "split", + null, 3, "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 100L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord( + 2, 1, "split1", "log1", "log", + 0, null, "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 5L, + null, "item-0", null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord( + 3, 1, "split1", "log1", "log", + 1, null, "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 5L, + null, "item-1", null, null, null, + null, null, null, null, null, null, + null, null, null, null, null))); + + String json = MAPPER.writeValueAsString(chunk); + ExecutionChunk deserialized = MAPPER.readValue(json, ExecutionChunk.class); + + assertThat(deserialized.isFinal()).isFalse(); + assertThat(deserialized.processors()).hasSize(3); + + FlatProcessorRecord split = deserialized.processors().get(0); + assertThat(split.iterationSize()).isEqualTo(3); + assertThat(split.parentSeq()).isNull(); + + FlatProcessorRecord child0 = deserialized.processors().get(1); + assertThat(child0.parentSeq()).isEqualTo(1); + assertThat(child0.parentProcessorId()).isEqualTo("split1"); + assertThat(child0.iteration()).isEqualTo(0); + } + + @Test + void deserialize_unknownFieldsIgnored() throws Exception { + String json = """ + {"exchangeId":"ex-1","routeId":"r1","status":"COMPLETED", + "startTime":"2026-03-31T10:00:00Z","chunkSeq":0,"final":true, + "futureField":"ignored","processors":[]} + """; + ExecutionChunk chunk = MAPPER.readValue(json, ExecutionChunk.class); + assertThat(chunk.exchangeId()).isEqualTo("ex-1"); + assertThat(chunk.isFinal()).isTrue(); + } +} +``` + +- [ ] **Step 4: Run tests** + +```bash +mvn test -pl cameleer3-server-core -Dtest=ExecutionChunkDeserializationTest +``` + +Expected: PASS (3 tests). + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/FlatProcessorRecord.java \ + cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionChunk.java \ + cameleer3-server-core/src/test/java/com/cameleer3/server/core/storage/model/ExecutionChunkDeserializationTest.java +git commit -m "feat: add server-side ExecutionChunk and FlatProcessorRecord DTOs" +``` + +--- + +### Task 2: DDL Scripts for executions and processor_executions + +**Files:** +- Create: `cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql` +- Create: `cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql` + +- [ ] **Step 1: Create executions DDL** + +The executions table stores one row per exchange (written when the final chunk arrives). Uses `ReplacingMergeTree(_version)` for rare late corrections. + +```sql +-- V2__executions.sql +CREATE TABLE IF NOT EXISTS executions ( + tenant_id LowCardinality(String) DEFAULT 'default', + execution_id String, + start_time DateTime64(3), + _version UInt64 DEFAULT 1, + route_id LowCardinality(String), + agent_id LowCardinality(String), + application_name LowCardinality(String), + status LowCardinality(String), + correlation_id String DEFAULT '', + exchange_id String DEFAULT '', + end_time Nullable(DateTime64(3)), + duration_ms Nullable(Int64), + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + error_type LowCardinality(String) DEFAULT '', + error_category LowCardinality(String) DEFAULT '', + root_cause_type String DEFAULT '', + root_cause_message String DEFAULT '', + diagram_content_hash String DEFAULT '', + engine_level LowCardinality(String) DEFAULT '', + input_body String DEFAULT '', + output_body String DEFAULT '', + input_headers String DEFAULT '', + output_headers String DEFAULT '', + attributes String DEFAULT '', + trace_id String DEFAULT '', + span_id String DEFAULT '', + has_trace_data Bool DEFAULT false, + is_replay Bool DEFAULT false, + + _search_text String MATERIALIZED + concat(error_message, ' ', error_stacktrace, ' ', attributes, + ' ', input_body, ' ', output_body, ' ', input_headers, + ' ', output_headers, ' ', root_cause_message), + + INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_status status TYPE set(10) GRANULARITY 1, + INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4 +) +ENGINE = ReplacingMergeTree(_version) +PARTITION BY (tenant_id, toYYYYMM(start_time)) +ORDER BY (tenant_id, start_time, application_name, route_id, execution_id) +TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; +``` + +Note: Removed `processors_json` — with flat records in `processor_executions`, the nested JSON column is no longer needed. + +- [ ] **Step 2: Create processor_executions DDL** + +Uses `seq`/`parentSeq`/`iteration` instead of `depth`/`loopIndex`/`splitIndex`/`multicastIndex`. ORDER BY ends with `seq` (unique per execution) instead of `processor_id` (can repeat across iterations). + +```sql +-- V3__processor_executions.sql +CREATE TABLE IF NOT EXISTS processor_executions ( + tenant_id LowCardinality(String) DEFAULT 'default', + execution_id String, + seq UInt32, + parent_seq Nullable(UInt32), + parent_processor_id String DEFAULT '', + processor_id String, + processor_type LowCardinality(String), + start_time DateTime64(3), + route_id LowCardinality(String), + application_name LowCardinality(String), + iteration Nullable(Int32), + iteration_size Nullable(Int32), + status LowCardinality(String), + end_time Nullable(DateTime64(3)), + duration_ms Nullable(Int64), + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + error_type LowCardinality(String) DEFAULT '', + error_category LowCardinality(String) DEFAULT '', + root_cause_type String DEFAULT '', + root_cause_message String DEFAULT '', + input_body String DEFAULT '', + output_body String DEFAULT '', + input_headers String DEFAULT '', + output_headers String DEFAULT '', + attributes String DEFAULT '', + resolved_endpoint_uri String DEFAULT '', + circuit_breaker_state LowCardinality(String) DEFAULT '', + fallback_triggered Bool DEFAULT false, + filter_matched Bool DEFAULT false, + duplicate_message Bool DEFAULT false, + + _search_text String MATERIALIZED + concat(error_message, ' ', error_stacktrace, ' ', attributes, + ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers), + + INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4 +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(start_time)) +ORDER BY (tenant_id, start_time, application_name, route_id, execution_id, seq) +TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; +``` + +- [ ] **Step 3: Verify DDL loads** + +```bash +mvn clean compile -pl cameleer3-server-app +``` + +`ClickHouseSchemaInitializer` scans `classpath:clickhouse/*.sql` automatically. + +- [ ] **Step 4: Commit** + +```bash +git add cameleer3-server-app/src/main/resources/clickhouse/V2__executions.sql \ + cameleer3-server-app/src/main/resources/clickhouse/V3__processor_executions.sql +git commit -m "feat(clickhouse): add executions and processor_executions DDL for chunked transport" +``` + +--- + +### Task 3: MergedExecution + ClickHouseExecutionStore + +**Files:** +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java` +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java` + +The store handles batch INSERT for both `executions` (from MergedExecution) and `processor_executions` (from FlatProcessorRecord). It does NOT implement the `ExecutionStore` interface — it has its own batch API consumed by the flush scheduler. + +- [ ] **Step 1: Create MergedExecution record** + +```java +// cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java +package com.cameleer3.server.core.ingestion; + +import java.time.Instant; +import java.util.Map; + +/** + * A merged execution envelope ready for ClickHouse insertion. + * Produced by {@link ChunkAccumulator} after receiving the final chunk. + */ +public record MergedExecution( + String tenantId, + long version, + String executionId, + String routeId, + String agentId, + String applicationName, + String status, + String correlationId, + String exchangeId, + Instant startTime, + Instant endTime, + Long durationMs, + String errorMessage, + String errorStacktrace, + String errorType, + String errorCategory, + String rootCauseType, + String rootCauseMessage, + String diagramContentHash, + String engineLevel, + String inputBody, + String outputBody, + String inputHeaders, + String outputHeaders, + String attributes, + String traceId, + String spanId, + boolean hasTraceData, + boolean isReplay +) {} +``` + +- [ ] **Step 2: Write the failing integration test** + +```java +// cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseExecutionStoreIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseExecutionStore store; + + @BeforeEach + void setUp() throws IOException { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + jdbc = new JdbcTemplate(ds); + + String execDdl = new String(getClass().getResourceAsStream( + "/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8); + String procDdl = new String(getClass().getResourceAsStream( + "/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8); + jdbc.execute(execDdl); + jdbc.execute(procDdl); + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + store = new ClickHouseExecutionStore(jdbc); + } + + @Test + void insertExecutionBatch_writesToClickHouse() { + MergedExecution exec = new MergedExecution( + "default", 1, + "exec-1", "route-timer", "agent-a", "my-app", + "COMPLETED", "corr-1", "exchange-1", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + null, null, null, null, null, null, + "hash-abc", "DEEP", + "{\"key\":\"val\"}", "{\"out\":\"data\"}", "{\"h\":\"1\"}", "{\"h\":\"2\"}", + "{\"attr\":\"val\"}", + "trace-1", "span-1", true, false); + + store.insertExecutionBatch(List.of(exec)); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM executions WHERE execution_id = 'exec-1'", + Integer.class); + assertThat(count).isEqualTo(1); + } + + @Test + void insertProcessorBatch_writesToClickHouse() { + FlatProcessorRecord proc = new FlatProcessorRecord( + 1, null, null, "proc-1", "to", + null, null, "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 500L, + "http://localhost:8080/api", + "input body", "output body", + Map.of("Content-Type", "application/json"), null, + null, null, null, null, null, null, + null, null, null, null, null); + + store.insertProcessorBatch("default", "exec-1", "route-timer", "my-app", + Instant.parse("2026-03-31T10:00:00Z"), List.of(proc)); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM processor_executions WHERE execution_id = 'exec-1'", + Integer.class); + assertThat(count).isEqualTo(1); + + // Verify seq and parent_seq are stored + Integer seq = jdbc.queryForObject( + "SELECT seq FROM processor_executions WHERE execution_id = 'exec-1'", + Integer.class); + assertThat(seq).isEqualTo(1); + } + + @Test + void insertProcessorBatch_withIterations() { + List procs = List.of( + new FlatProcessorRecord(1, null, null, "split1", "split", + null, 3, "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 100L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord(2, 1, "split1", "log1", "log", + 0, null, "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 5L, + null, "item-0", null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord(3, 1, "split1", "log1", "log", + 1, null, "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 5L, + null, "item-1", null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord(4, 1, "split1", "log1", "log", + 2, null, "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), 5L, + null, "item-2", null, null, null, + null, null, null, null, null, null, + null, null, null, null, null)); + + store.insertProcessorBatch("default", "exec-split", "route-1", "my-app", + Instant.parse("2026-03-31T10:00:00Z"), procs); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM processor_executions WHERE execution_id = 'exec-split'", + Integer.class); + assertThat(count).isEqualTo(4); + + // Verify iteration data + Integer iterSize = jdbc.queryForObject( + "SELECT iteration_size FROM processor_executions WHERE execution_id = 'exec-split' AND seq = 1", + Integer.class); + assertThat(iterSize).isEqualTo(3); + } + + @Test + void insertExecutionBatch_emptyList_doesNothing() { + store.insertExecutionBatch(List.of()); + Integer count = jdbc.queryForObject("SELECT count() FROM executions", Integer.class); + assertThat(count).isEqualTo(0); + } + + @Test + void insertExecutionBatch_replacingMergeTree_keepsLatestVersion() { + Instant startTime = Instant.parse("2026-03-31T10:00:00Z"); + MergedExecution v1 = new MergedExecution( + "default", 1, + "exec-dup", "route-1", "agent-a", "my-app", + "RUNNING", null, "exchange-1", + startTime, null, null, + null, null, null, null, null, null, + null, null, + null, null, null, null, null, + null, null, false, false); + MergedExecution v2 = new MergedExecution( + "default", 2, + "exec-dup", "route-1", "agent-a", "my-app", + "COMPLETED", "corr-1", "exchange-1", + startTime, Instant.parse("2026-03-31T10:00:01Z"), 1000L, + null, null, null, null, null, null, + null, null, + null, null, null, null, null, + null, null, false, false); + + store.insertExecutionBatch(List.of(v1, v2)); + jdbc.execute("OPTIMIZE TABLE executions FINAL"); + + String status = jdbc.queryForObject( + "SELECT status FROM executions WHERE execution_id = 'exec-dup'", + String.class); + assertThat(status).isEqualTo("COMPLETED"); + } +} +``` + +- [ ] **Step 3: Run test to verify it fails** + +```bash +mvn test -pl cameleer3-server-app -Dtest=ClickHouseExecutionStoreIT -Dfailsafe.provider=surefire -DfailIfNoTests=false +``` + +Expected: compilation error — `ClickHouseExecutionStore` does not exist. + +- [ ] **Step 4: Implement ClickHouseExecutionStore** + +```java +// cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +public class ClickHouseExecutionStore { + + private static final ObjectMapper JSON = new ObjectMapper(); + private final JdbcTemplate jdbc; + + public ClickHouseExecutionStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + public void insertExecutionBatch(List executions) { + if (executions.isEmpty()) return; + + jdbc.batchUpdate(""" + INSERT INTO executions ( + tenant_id, execution_id, start_time, _version, + route_id, agent_id, application_name, status, + correlation_id, exchange_id, end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, + diagram_content_hash, engine_level, + input_body, output_body, input_headers, output_headers, + attributes, trace_id, span_id, + has_trace_data, is_replay + ) VALUES ( + ?, ?, ?, ?, + ?, ?, ?, ?, + ?, ?, ?, ?, + ?, ?, ?, ?, + ?, ?, + ?, ?, + ?, ?, ?, ?, + ?, ?, ?, + ?, ? + ) + """, + executions.stream().map(e -> new Object[]{ + e.tenantId(), + e.executionId(), + Timestamp.from(e.startTime()), + e.version(), + orEmpty(e.routeId()), + orEmpty(e.agentId()), + orEmpty(e.applicationName()), + orEmpty(e.status()), + orEmpty(e.correlationId()), + orEmpty(e.exchangeId()), + e.endTime() != null ? Timestamp.from(e.endTime()) : null, + e.durationMs(), + orEmpty(e.errorMessage()), + orEmpty(e.errorStacktrace()), + orEmpty(e.errorType()), + orEmpty(e.errorCategory()), + orEmpty(e.rootCauseType()), + orEmpty(e.rootCauseMessage()), + orEmpty(e.diagramContentHash()), + orEmpty(e.engineLevel()), + orEmpty(e.inputBody()), + orEmpty(e.outputBody()), + orEmpty(e.inputHeaders()), + orEmpty(e.outputHeaders()), + orEmpty(e.attributes()), + orEmpty(e.traceId()), + orEmpty(e.spanId()), + e.hasTraceData(), + e.isReplay() + }).toList()); + } + + public void insertProcessorBatch(String tenantId, String executionId, + String routeId, String applicationName, + Instant execStartTime, + List processors) { + if (processors.isEmpty()) return; + + jdbc.batchUpdate(""" + INSERT INTO processor_executions ( + tenant_id, execution_id, seq, parent_seq, parent_processor_id, + processor_id, processor_type, start_time, + route_id, application_name, + iteration, iteration_size, status, + end_time, duration_ms, + error_message, error_stacktrace, error_type, error_category, + root_cause_type, root_cause_message, + input_body, output_body, input_headers, output_headers, + attributes, resolved_endpoint_uri, + circuit_breaker_state, fallback_triggered, + filter_matched, duplicate_message + ) VALUES ( + ?, ?, ?, ?, ?, + ?, ?, ?, + ?, ?, + ?, ?, ?, + ?, ?, + ?, ?, ?, ?, + ?, ?, + ?, ?, ?, ?, + ?, ?, + ?, ?, + ?, ? + ) + """, + processors.stream().map(p -> new Object[]{ + tenantId, + executionId, + p.seq(), + p.parentSeq(), + orEmpty(p.parentProcessorId()), + p.processorId(), + p.processorType(), + p.startTime() != null ? Timestamp.from(p.startTime()) : Timestamp.from(execStartTime), + routeId, + applicationName, + p.iteration(), + p.iterationSize(), + orEmpty(p.status()), + p.startTime() != null && p.durationMs() > 0 + ? Timestamp.from(p.startTime().plusMillis(p.durationMs())) : null, + p.durationMs(), + orEmpty(p.errorMessage()), + orEmpty(p.errorStackTrace()), + orEmpty(p.errorType()), + orEmpty(p.errorCategory()), + orEmpty(p.rootCauseType()), + orEmpty(p.rootCauseMessage()), + orEmpty(p.inputBody()), + orEmpty(p.outputBody()), + headersToString(p.inputHeaders()), + headersToString(p.outputHeaders()), + mapToString(p.attributes()), + orEmpty(p.resolvedEndpointUri()), + orEmpty(p.circuitBreakerState()), + p.fallbackTriggered() != null ? p.fallbackTriggered() : false, + p.filterMatched() != null ? p.filterMatched() : false, + p.duplicateMessage() != null ? p.duplicateMessage() : false + }).toList()); + } + + private static String orEmpty(String value) { + return value != null ? value : ""; + } + + private static String headersToString(Map headers) { + if (headers == null || headers.isEmpty()) return ""; + try { + return JSON.writeValueAsString(headers); + } catch (JsonProcessingException e) { + return ""; + } + } + + private static String mapToString(Map map) { + if (map == null || map.isEmpty()) return ""; + try { + return JSON.writeValueAsString(map); + } catch (JsonProcessingException e) { + return ""; + } + } +} +``` + +- [ ] **Step 5: Run test to verify it passes** + +```bash +mvn test -pl cameleer3-server-app -Dtest=ClickHouseExecutionStoreIT -Dfailsafe.provider=surefire +``` + +Expected: all 5 tests PASS. + +- [ ] **Step 6: Commit** + +```bash +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java \ + cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java \ + cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java +git commit -m "feat(clickhouse): add ClickHouseExecutionStore with batch insert for chunked format" +``` + +--- + +### Task 4: ChunkAccumulator + +**Files:** +- Create: `cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java` +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java` + +The ChunkAccumulator receives `ExecutionChunk` documents. For each chunk: +- Processor records are pushed to a sink immediately (they're append-only) +- Exchange envelope data is buffered/merged in a ConcurrentHashMap +- When `isFinal=true`, the merged envelope is pushed to an execution sink + +A scheduled sweep flushes stale exchanges (no final chunk received within 5 minutes). + +- [ ] **Step 1: Write the failing unit test** + +```java +// cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java +package com.cameleer3.server.core.ingestion; + +import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +import static org.assertj.core.api.Assertions.assertThat; + +class ChunkAccumulatorTest { + + private List executionSink; + private List processorSink; + private ChunkAccumulator accumulator; + + @BeforeEach + void setUp() { + executionSink = new CopyOnWriteArrayList<>(); + processorSink = new CopyOnWriteArrayList<>(); + accumulator = new ChunkAccumulator(executionSink::add, processorSink::add, Duration.ofMinutes(5)); + } + + @Test + void singleFinalChunk_producesExecutionAndProcessors() { + ExecutionChunk chunk = new ExecutionChunk( + "ex-1", "my-app", "agent-a", "route-1", + "corr-1", "COMPLETED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:01Z"), 1000L, + "DEEP", + null, null, null, null, null, null, + Map.of("env", "prod"), + "trace-1", "span-1", null, null, + 0, true, + List.of(new FlatProcessorRecord( + 1, null, null, "log1", "log", + null, null, "COMPLETED", + Instant.parse("2026-03-31T10:00:00.100Z"), 5L, + null, "body", null, null, null, + null, null, null, null, null, null, + null, null, null, null, null))); + + accumulator.onChunk(chunk); + + assertThat(executionSink).hasSize(1); + MergedExecution merged = executionSink.get(0); + assertThat(merged.executionId()).isEqualTo("ex-1"); + assertThat(merged.status()).isEqualTo("COMPLETED"); + assertThat(merged.durationMs()).isEqualTo(1000L); + assertThat(merged.version()).isEqualTo(1); + + assertThat(processorSink).hasSize(1); + assertThat(processorSink.get(0).processors()).hasSize(1); + assertThat(processorSink.get(0).executionId()).isEqualTo("ex-1"); + } + + @Test + void multipleChunks_mergesEnvelope_insertsProcessorsImmediately() { + Instant start = Instant.parse("2026-03-31T10:00:00Z"); + + // Chunk 0: RUNNING with 2 processors + ExecutionChunk chunk0 = new ExecutionChunk( + "ex-multi", "my-app", "agent-a", "route-1", + "corr-1", "RUNNING", + start, null, null, "DEEP", + null, null, null, null, null, null, + null, null, null, null, null, + 0, false, + List.of( + new FlatProcessorRecord(1, null, null, "log1", "log", + null, null, "COMPLETED", start, 5L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord(2, null, null, "to1", "to", + null, null, "COMPLETED", start.plusMillis(5), 10L, + "http://svc/api", null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null))); + accumulator.onChunk(chunk0); + + // Processors inserted immediately + assertThat(processorSink).hasSize(1); + assertThat(processorSink.get(0).processors()).hasSize(2); + // Execution NOT yet flushed + assertThat(executionSink).isEmpty(); + + // Chunk 1: COMPLETED (final) with 1 more processor + ExecutionChunk chunk1 = new ExecutionChunk( + "ex-multi", "my-app", "agent-a", "route-1", + "corr-1", "COMPLETED", + start, start.plusMillis(500), 500L, "DEEP", + null, null, null, null, null, null, + Map.of("result", "ok"), + null, null, null, null, + 1, true, + List.of(new FlatProcessorRecord(3, null, null, "log2", "log", + null, null, "COMPLETED", start.plusMillis(100), 2L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null))); + accumulator.onChunk(chunk1); + + // Final chunk triggers execution flush + assertThat(executionSink).hasSize(1); + MergedExecution merged = executionSink.get(0); + assertThat(merged.status()).isEqualTo("COMPLETED"); + assertThat(merged.durationMs()).isEqualTo(500L); + assertThat(merged.version()).isEqualTo(1); + + // Second processor batch + assertThat(processorSink).hasSize(2); + assertThat(processorSink.get(1).processors()).hasSize(1); + } + + @Test + void staleExchange_flushedBySweep() { + accumulator = new ChunkAccumulator(executionSink::add, processorSink::add, Duration.ofMillis(1)); + + ExecutionChunk chunk = new ExecutionChunk( + "ex-stale", "my-app", "agent-a", "route-1", + "ex-stale", "RUNNING", + Instant.parse("2026-03-31T09:50:00Z"), + null, null, "REGULAR", + null, null, null, null, null, null, + null, null, null, null, null, + 0, false, List.of()); + accumulator.onChunk(chunk); + + try { Thread.sleep(5); } catch (InterruptedException ignored) {} + + accumulator.sweepStale(); + + assertThat(executionSink).hasSize(1); + assertThat(executionSink.get(0).status()).isEqualTo("RUNNING"); + assertThat(executionSink.get(0).version()).isEqualTo(1); + } + + @Test + void finalChunkWithErrors_populatesErrorFields() { + ExecutionChunk chunk = new ExecutionChunk( + "ex-err", "my-app", "agent-a", "route-1", + "ex-err", "FAILED", + Instant.parse("2026-03-31T10:00:00Z"), + Instant.parse("2026-03-31T10:00:00.200Z"), 200L, + "REGULAR", + "Connection refused", "java.net.ConnectException...", + "java.net.ConnectException", "CONNECTION", + "java.net.ConnectException", "Connection refused", + null, null, null, null, null, + 0, true, List.of()); + accumulator.onChunk(chunk); + + MergedExecution merged = executionSink.get(0); + assertThat(merged.errorMessage()).isEqualTo("Connection refused"); + assertThat(merged.errorType()).isEqualTo("java.net.ConnectException"); + assertThat(merged.errorCategory()).isEqualTo("CONNECTION"); + } + + @Test + void getPendingCount_tracksBufferedExchanges() { + Instant t = Instant.parse("2026-03-31T10:00:00Z"); + accumulator.onChunk(new ExecutionChunk("e1", "app", "a", "r", "e1", "RUNNING", + t, null, null, "REGULAR", null, null, null, null, null, null, + null, null, null, null, null, 0, false, List.of())); + accumulator.onChunk(new ExecutionChunk("e2", "app", "a", "r", "e2", "RUNNING", + t, null, null, "REGULAR", null, null, null, null, null, null, + null, null, null, null, null, 0, false, List.of())); + assertThat(accumulator.getPendingCount()).isEqualTo(2); + + accumulator.onChunk(new ExecutionChunk("e1", "app", "a", "r", "e1", "COMPLETED", + t, t.plusMillis(100), 100L, "REGULAR", null, null, null, null, null, null, + null, null, null, null, null, 1, true, List.of())); + assertThat(accumulator.getPendingCount()).isEqualTo(1); + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +```bash +mvn test -pl cameleer3-server-core -Dtest=ChunkAccumulatorTest -DfailIfNoTests=false +``` + +Expected: compilation error — `ChunkAccumulator` does not exist. + +- [ ] **Step 3: Implement ChunkAccumulator** + +```java +// cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java +package com.cameleer3.server.core.ingestion; + +import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +/** + * Accumulates ExecutionChunk documents per exchange. + *

+ * Processor records are pushed to the processor sink immediately (append-only). + * Exchange envelope data is buffered and merged across chunks. + * When the final chunk arrives, the merged envelope is pushed to the execution sink. + */ +public class ChunkAccumulator { + + private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class); + private static final ObjectMapper JSON = new ObjectMapper(); + private static final String DEFAULT_TENANT = "default"; + + private final Consumer executionSink; + private final Consumer processorSink; + private final Duration staleThreshold; + private final Map pending = new ConcurrentHashMap<>(); + + public ChunkAccumulator(Consumer executionSink, + Consumer processorSink, + Duration staleThreshold) { + this.executionSink = executionSink; + this.processorSink = processorSink; + this.staleThreshold = staleThreshold; + } + + public void onChunk(ExecutionChunk chunk) { + String exchangeId = chunk.exchangeId(); + + // Insert processor records immediately (append-only) + if (chunk.processors() != null && !chunk.processors().isEmpty()) { + processorSink.accept(new ProcessorBatch( + DEFAULT_TENANT, exchangeId, + coalesce(chunk.routeId(), ""), + coalesce(chunk.applicationName(), ""), + chunk.startTime(), + chunk.processors())); + } + + if (chunk.isFinal()) { + // Merge with any pending state and flush execution + PendingExchange pendingExchange = pending.remove(exchangeId); + MergedExecution merged = buildMergedExecution(chunk, pendingExchange); + executionSink.accept(merged); + } else { + // Buffer/update exchange envelope + pending.compute(exchangeId, (id, existing) -> { + if (existing == null) { + return new PendingExchange(chunk, Instant.now()); + } + return existing.mergeWith(chunk); + }); + } + } + + public void sweepStale() { + Instant cutoff = Instant.now().minus(staleThreshold); + List staleIds = new ArrayList<>(); + + pending.forEach((id, pe) -> { + if (pe.receivedAt().isBefore(cutoff)) { + staleIds.add(id); + } + }); + + for (String id : staleIds) { + PendingExchange stale = pending.remove(id); + if (stale != null) { + log.info("Flushing stale exchange {}", id); + executionSink.accept(buildMergedExecution(stale.envelope(), null)); + } + } + } + + public int getPendingCount() { + return pending.size(); + } + + private MergedExecution buildMergedExecution(ExecutionChunk finalChunk, + PendingExchange pendingState) { + ExecutionChunk base = pendingState != null ? pendingState.envelope() : null; + + String attributes = serializeMap(finalChunk.attributes()); + if ((attributes == null || attributes.isEmpty()) && base != null) { + attributes = serializeMap(base.envelope().attributes()); + } + + boolean hasTraceData = false; + boolean isReplay = finalChunk.replayExchangeId() != null; + + return new MergedExecution( + DEFAULT_TENANT, 1, + finalChunk.exchangeId(), + coalesce(finalChunk.routeId(), base != null ? base.envelope().routeId() : null), + coalesce(finalChunk.agentId(), base != null ? base.envelope().agentId() : null), + coalesce(finalChunk.applicationName(), base != null ? base.envelope().applicationName() : null), + coalesce(finalChunk.status(), base != null ? base.envelope().status() : "RUNNING"), + coalesce(finalChunk.correlationId(), base != null ? base.envelope().correlationId() : null), + finalChunk.exchangeId(), + coalesce(finalChunk.startTime(), base != null ? base.envelope().startTime() : null), + coalesce(finalChunk.endTime(), base != null ? base.envelope().endTime() : null), + coalesce(finalChunk.durationMs(), base != null ? base.envelope().durationMs() : null), + coalesce(finalChunk.errorMessage(), base != null ? base.envelope().errorMessage() : null), + coalesce(finalChunk.errorStackTrace(), base != null ? base.envelope().errorStackTrace() : null), + coalesce(finalChunk.errorType(), base != null ? base.envelope().errorType() : null), + coalesce(finalChunk.errorCategory(), base != null ? base.envelope().errorCategory() : null), + coalesce(finalChunk.rootCauseType(), base != null ? base.envelope().rootCauseType() : null), + coalesce(finalChunk.rootCauseMessage(), base != null ? base.envelope().rootCauseMessage() : null), + "", // diagramContentHash — server-side lookup, not in chunk + coalesce(finalChunk.engineLevel(), base != null ? base.envelope().engineLevel() : null), + "", "", "", "", // input/output body/headers — on processor records now, not envelope + coalesce(attributes, ""), + coalesce(finalChunk.traceId(), base != null ? base.envelope().traceId() : null), + coalesce(finalChunk.spanId(), base != null ? base.envelope().spanId() : null), + hasTraceData, + isReplay + ); + } + + private static String serializeMap(Map map) { + if (map == null || map.isEmpty()) return ""; + try { + return JSON.writeValueAsString(map); + } catch (JsonProcessingException e) { + return ""; + } + } + + private static T coalesce(T a, T b) { + return a != null ? a : b; + } + + /** + * A batch of processor records for a single exchange, ready for insertion. + */ + public record ProcessorBatch( + String tenantId, + String executionId, + String routeId, + String applicationName, + Instant execStartTime, + List processors + ) {} + + private record PendingExchange(ExecutionChunk envelope, Instant receivedAt) { + PendingExchange mergeWith(ExecutionChunk newer) { + // Keep the latest envelope data (later chunkSeq has more complete info) + ExecutionChunk merged = new ExecutionChunk( + envelope.exchangeId(), + coalesce(newer.applicationName(), envelope.applicationName()), + coalesce(newer.agentId(), envelope.agentId()), + coalesce(newer.routeId(), envelope.routeId()), + coalesce(newer.correlationId(), envelope.correlationId()), + coalesce(newer.status(), envelope.status()), + coalesce(envelope.startTime(), newer.startTime()), + coalesce(newer.endTime(), envelope.endTime()), + coalesce(newer.durationMs(), envelope.durationMs()), + coalesce(newer.engineLevel(), envelope.engineLevel()), + coalesce(newer.errorMessage(), envelope.errorMessage()), + coalesce(newer.errorStackTrace(), envelope.errorStackTrace()), + coalesce(newer.errorType(), envelope.errorType()), + coalesce(newer.errorCategory(), envelope.errorCategory()), + coalesce(newer.rootCauseType(), envelope.rootCauseType()), + coalesce(newer.rootCauseMessage(), envelope.rootCauseMessage()), + newer.attributes() != null ? newer.attributes() : envelope.attributes(), + coalesce(newer.traceId(), envelope.traceId()), + coalesce(newer.spanId(), envelope.spanId()), + coalesce(newer.originalExchangeId(), envelope.originalExchangeId()), + coalesce(newer.replayExchangeId(), envelope.replayExchangeId()), + newer.chunkSeq(), + newer.isFinal(), + List.of()); + return new PendingExchange(merged, receivedAt); + } + } +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +```bash +mvn test -pl cameleer3-server-core -Dtest=ChunkAccumulatorTest +``` + +Expected: all 5 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java \ + cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java +git commit -m "feat(clickhouse): add ChunkAccumulator for chunked execution ingestion" +``` + +--- + +### Task 5: ExecutionFlushScheduler + ChunkIngestionController + +**Files:** +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java` + +- [ ] **Step 1: Implement ExecutionFlushScheduler** + +Follows `MetricsFlushScheduler` pattern. Drains two WriteBuffers (executions + processor batches) and calls ClickHouseExecutionStore. Also runs the stale sweep. + +```java +// cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java +package com.cameleer3.server.app.ingestion; + +import com.cameleer3.server.app.config.IngestionConfig; +import com.cameleer3.server.app.storage.ClickHouseExecutionStore; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.ingestion.WriteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; +import org.springframework.scheduling.annotation.Scheduled; + +import java.util.List; + +public class ExecutionFlushScheduler implements SmartLifecycle { + + private static final Logger log = LoggerFactory.getLogger(ExecutionFlushScheduler.class); + + private final WriteBuffer executionBuffer; + private final WriteBuffer processorBuffer; + private final ClickHouseExecutionStore executionStore; + private final ChunkAccumulator accumulator; + private final int batchSize; + private volatile boolean running = false; + + public ExecutionFlushScheduler(WriteBuffer executionBuffer, + WriteBuffer processorBuffer, + ClickHouseExecutionStore executionStore, + ChunkAccumulator accumulator, + IngestionConfig config) { + this.executionBuffer = executionBuffer; + this.processorBuffer = processorBuffer; + this.executionStore = executionStore; + this.accumulator = accumulator; + this.batchSize = config.getBatchSize(); + } + + @Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}") + public void flush() { + flushExecutions(); + flushProcessors(); + } + + private void flushExecutions() { + try { + List batch = executionBuffer.drain(batchSize); + if (!batch.isEmpty()) { + executionStore.insertExecutionBatch(batch); + log.debug("Flushed {} executions to ClickHouse", batch.size()); + } + } catch (Exception e) { + log.error("Failed to flush executions to ClickHouse", e); + } + } + + private void flushProcessors() { + try { + List batches = processorBuffer.drain(batchSize); + for (ChunkAccumulator.ProcessorBatch batch : batches) { + if (!batch.processors().isEmpty()) { + executionStore.insertProcessorBatch( + batch.tenantId(), batch.executionId(), + batch.routeId(), batch.applicationName(), + batch.execStartTime(), batch.processors()); + } + } + if (!batches.isEmpty()) { + int totalProcs = batches.stream().mapToInt(b -> b.processors().size()).sum(); + log.debug("Flushed {} processor batches ({} records) to ClickHouse", + batches.size(), totalProcs); + } + } catch (Exception e) { + log.error("Failed to flush processors to ClickHouse", e); + } + } + + @Scheduled(fixedDelay = 60_000) + public void sweepStale() { + try { + accumulator.sweepStale(); + } catch (Exception e) { + log.error("Failed to sweep stale exchanges", e); + } + } + + @Override public void start() { running = true; } + + @Override + public void stop() { + flush(); + running = false; + } + + @Override public boolean isRunning() { return running; } + @Override public int getPhase() { return Integer.MAX_VALUE - 1; } +} +``` + +- [ ] **Step 2: Implement ChunkIngestionController** + +```java +// cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.storage.model.ExecutionChunk; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +@RestController +@RequestMapping("/api/v1/data") +@Tag(name = "Ingestion", description = "Data ingestion endpoints") +public class ChunkIngestionController { + + private final ChunkAccumulator accumulator; + + public ChunkIngestionController(ChunkAccumulator accumulator) { + this.accumulator = accumulator; + } + + @PostMapping("/chunks") + @Operation(summary = "Ingest execution chunk (single or array)") + public ResponseEntity ingestChunks(@RequestBody String body) { + try { + com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper() + .registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule()); + + String trimmed = body.trim(); + if (trimmed.startsWith("[")) { + List chunks = mapper.readValue(trimmed, + mapper.getTypeFactory().constructCollectionType(List.class, ExecutionChunk.class)); + for (ExecutionChunk chunk : chunks) { + accumulator.onChunk(chunk); + } + } else { + ExecutionChunk chunk = mapper.readValue(trimmed, ExecutionChunk.class); + accumulator.onChunk(chunk); + } + return ResponseEntity.accepted().build(); + } catch (Exception e) { + return ResponseEntity.badRequest().build(); + } + } +} +``` + +- [ ] **Step 3: Compile** + +```bash +mvn clean compile -pl cameleer3-server-app +``` + +- [ ] **Step 4: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ExecutionFlushScheduler.java \ + cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ChunkIngestionController.java +git commit -m "feat(clickhouse): add ExecutionFlushScheduler and ChunkIngestionController" +``` + +--- + +### Task 6: ClickHouseSearchIndex + +**Files:** +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java` + +Same as the original plan — implements `SearchIndex` using SQL against ClickHouse. The search query patterns are unchanged: `_search_text LIKE '%term%'` on executions, subquery join on processor_executions for body/header/error scoped searches. + +This task is identical to Task 5 in the original plan. Refer to that task's complete code for the `ClickHouseSearchIndex` and `ClickHouseSearchIndexIT` implementations. The only difference is that `processor_executions` now uses `seq`/`iteration` columns instead of `depth`/`loopIndex`/etc., but the search queries only use `_search_text`, `execution_id`, `input_body`, `output_body`, `input_headers`, `output_headers`, `error_message`, and `error_stacktrace` — none of which changed. + +- [ ] **Step 1: Write the failing integration test** + +Use the same test class from the original plan's Task 5, Step 1. The test seeds data via `ClickHouseExecutionStore` using the new `MergedExecution` and `FlatProcessorRecord` types. Refer to the original plan for the complete test code. + +- [ ] **Step 2: Run test to verify it fails** + +```bash +mvn test -pl cameleer3-server-app -Dtest=ClickHouseSearchIndexIT -Dfailsafe.provider=surefire -DfailIfNoTests=false +``` + +- [ ] **Step 3: Implement ClickHouseSearchIndex** + +Use the same implementation from the original plan's Task 5, Step 3. The SQL queries and WHERE clause building are identical. + +- [ ] **Step 4: Run test to verify it passes** + +```bash +mvn test -pl cameleer3-server-app -Dtest=ClickHouseSearchIndexIT -Dfailsafe.provider=surefire +``` + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java \ + cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java +git commit -m "feat(clickhouse): add ClickHouseSearchIndex with ngram-accelerated SQL search" +``` + +--- + +### Task 7: Feature Flag Wiring + +**Files:** +- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java` +- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java` +- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java` +- Modify: `cameleer3-server-app/src/main/resources/application.yml` +- Modify: `deploy/base/server.yaml` + +Wire up the ChunkAccumulator, WriteBuffers, flush scheduler, and search switching. + +- [ ] **Step 1: Add execution + processor WriteBuffer beans to IngestionBeanConfig** + +```java +// Add to IngestionBeanConfig.java +@Bean +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public WriteBuffer executionBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); +} + +@Bean +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public WriteBuffer processorBatchBuffer(IngestionConfig config) { + return new WriteBuffer<>(config.getBufferCapacity()); +} +``` + +- [ ] **Step 2: Add CH beans to StorageBeanConfig** + +```java +// Add to StorageBeanConfig.java + +@Bean +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public ClickHouseExecutionStore clickHouseExecutionStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseExecutionStore(clickHouseJdbc); +} + +@Bean +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public ChunkAccumulator chunkAccumulator( + WriteBuffer executionBuffer, + WriteBuffer processorBatchBuffer) { + return new ChunkAccumulator( + executionBuffer::offer, + processorBatchBuffer::offer, + java.time.Duration.ofMinutes(5)); +} + +@Bean +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public ExecutionFlushScheduler executionFlushScheduler( + WriteBuffer executionBuffer, + WriteBuffer processorBatchBuffer, + ClickHouseExecutionStore executionStore, + ChunkAccumulator accumulator, + IngestionConfig config) { + return new ExecutionFlushScheduler(executionBuffer, processorBatchBuffer, + executionStore, accumulator, config); +} + +@Bean +@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse") +public SearchIndex clickHouseSearchIndex( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseSearchIndex(clickHouseJdbc); +} +``` + +- [ ] **Step 3: Add ConditionalOnProperty to OpenSearchIndex** + +```java +@Repository +@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "opensearch", matchIfMissing = true) +public class OpenSearchIndex implements SearchIndex { +``` + +- [ ] **Step 4: Update application.yml** + +```yaml +cameleer: + storage: + metrics: ${CAMELEER_STORAGE_METRICS:postgres} + search: ${CAMELEER_STORAGE_SEARCH:opensearch} +``` + +- [ ] **Step 5: Update deploy/base/server.yaml** + +Add env var: +```yaml +- name: CAMELEER_STORAGE_SEARCH + value: "opensearch" +``` + +- [ ] **Step 6: Compile and verify all tests pass** + +```bash +mvn clean verify -DskipITs +``` + +- [ ] **Step 7: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java \ + cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/IngestionBeanConfig.java \ + cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java \ + cameleer3-server-app/src/main/resources/application.yml \ + deploy/base/server.yaml +git commit -m "feat(clickhouse): wire ChunkAccumulator, flush scheduler, and search feature flag" +``` + +--- + +### Task 8: End-to-End Integration Test + +**Files:** +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java` + +Validates the full pipeline: ChunkAccumulator → WriteBuffers → ClickHouseExecutionStore → ClickHouseSearchIndex. + +- [ ] **Step 1: Write the integration test** + +```java +// cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.app.search.ClickHouseSearchIndex; +import com.cameleer3.server.core.ingestion.ChunkAccumulator; +import com.cameleer3.server.core.ingestion.MergedExecution; +import com.cameleer3.server.core.search.ExecutionSummary; +import com.cameleer3.server.core.search.SearchRequest; +import com.cameleer3.server.core.search.SearchResult; +import com.cameleer3.server.core.storage.model.ExecutionChunk; +import com.cameleer3.server.core.storage.model.FlatProcessorRecord; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseChunkPipelineIT { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseExecutionStore executionStore; + private ClickHouseSearchIndex searchIndex; + private ChunkAccumulator accumulator; + private List executionBuffer; + private List processorBuffer; + + @BeforeEach + void setUp() throws IOException { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + jdbc = new JdbcTemplate(ds); + + String execDdl = new String(getClass().getResourceAsStream( + "/clickhouse/V2__executions.sql").readAllBytes(), StandardCharsets.UTF_8); + String procDdl = new String(getClass().getResourceAsStream( + "/clickhouse/V3__processor_executions.sql").readAllBytes(), StandardCharsets.UTF_8); + jdbc.execute(execDdl); + jdbc.execute(procDdl); + jdbc.execute("TRUNCATE TABLE executions"); + jdbc.execute("TRUNCATE TABLE processor_executions"); + + executionStore = new ClickHouseExecutionStore(jdbc); + searchIndex = new ClickHouseSearchIndex(jdbc); + + executionBuffer = new ArrayList<>(); + processorBuffer = new ArrayList<>(); + accumulator = new ChunkAccumulator(executionBuffer::add, processorBuffer::add, Duration.ofMinutes(5)); + } + + @Test + void fullPipeline_chunkedIngestion_thenSearch() { + Instant start = Instant.parse("2026-03-31T12:00:00Z"); + + // Chunk 0: RUNNING with initial processors + accumulator.onChunk(new ExecutionChunk( + "pipeline-1", "order-service", "pod-1", "order-route", + "corr-1", "RUNNING", + start, null, null, "DEEP", + null, null, null, null, null, null, + Map.of("orderId", "ORD-123"), + null, null, null, null, + 0, false, + List.of( + new FlatProcessorRecord(1, null, null, "log1", "log", + null, null, "COMPLETED", start, 2L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord(2, null, null, "split1", "split", + null, 3, "COMPLETED", start.plusMillis(2), 100L, + null, null, null, null, null, + null, null, null, null, null, null, + null, null, null, null, null), + new FlatProcessorRecord(3, 2, "split1", "to1", "to", + 0, null, "COMPLETED", start.plusMillis(5), 30L, + "http://inventory/api", + "order ABC-123 check stock", "stock available", + null, null, + null, null, null, null, null, null, + null, null, null, null, null)))); + + // Processors should be flushed immediately + assertThat(processorBuffer).hasSize(1); + assertThat(executionBuffer).isEmpty(); + + // Chunk 1: COMPLETED (final) + accumulator.onChunk(new ExecutionChunk( + "pipeline-1", "order-service", "pod-1", "order-route", + "corr-1", "COMPLETED", + start, start.plusMillis(750), 750L, "DEEP", + null, null, null, null, null, null, + null, null, null, null, null, + 1, true, + List.of( + new FlatProcessorRecord(4, 2, "split1", "to1", "to", + 1, null, "COMPLETED", start.plusMillis(40), 25L, + "http://inventory/api", + "order DEF-456 check stock", "stock available", + null, null, + null, null, null, null, null, null, + null, null, null, null, null)))); + + assertThat(executionBuffer).hasSize(1); + assertThat(processorBuffer).hasSize(2); + + // Flush to ClickHouse + executionStore.insertExecutionBatch(executionBuffer); + for (ChunkAccumulator.ProcessorBatch batch : processorBuffer) { + executionStore.insertProcessorBatch( + batch.tenantId(), batch.executionId(), + batch.routeId(), batch.applicationName(), + batch.execStartTime(), batch.processors()); + } + + // Search by order ID in attributes + SearchResult result = searchIndex.search(new SearchRequest( + null, null, null, null, null, null, + "ORD-123", null, null, null, + null, null, null, null, null, + 0, 50, null, null)); + assertThat(result.total()).isEqualTo(1); + assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1"); + assertThat(result.data().get(0).status()).isEqualTo("COMPLETED"); + assertThat(result.data().get(0).durationMs()).isEqualTo(750L); + + // Search in processor body + SearchResult bodyResult = searchIndex.search(new SearchRequest( + null, null, null, null, null, null, + null, "ABC-123", null, null, + null, null, null, null, null, + 0, 50, null, null)); + assertThat(bodyResult.total()).isEqualTo(1); + + // Verify iteration data in processor_executions + Integer iterSize = jdbc.queryForObject( + "SELECT iteration_size FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 2", + Integer.class); + assertThat(iterSize).isEqualTo(3); + + Integer iter0 = jdbc.queryForObject( + "SELECT iteration FROM processor_executions WHERE execution_id = 'pipeline-1' AND seq = 3", + Integer.class); + assertThat(iter0).isEqualTo(0); + } +} +``` + +- [ ] **Step 2: Run the integration test** + +```bash +mvn test -pl cameleer3-server-app -Dtest=ClickHouseChunkPipelineIT -Dfailsafe.provider=surefire +``` + +Expected: PASS. + +- [ ] **Step 3: Commit** + +```bash +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java +git commit -m "test(clickhouse): add end-to-end chunk pipeline integration test" +``` + +--- + +## Verification Checklist + +After all tasks are complete, verify: + +1. **Chunk ingestion**: `POST /api/v1/data/chunks` accepts single and array ExecutionChunks +2. **Processor immediate insert**: Processor records are inserted as chunks arrive (append-only) +3. **Envelope accumulation**: Multiple non-final chunks merge envelope data correctly +4. **Final flush**: Final chunk triggers execution row write with version=1 +5. **Stale sweep**: Exchanges without final chunk for 5 minutes are flushed as RUNNING +6. **Search**: All filter types work: status, time range, duration, correlation ID, application, text, textInBody, textInHeaders, textInErrors +7. **Highlighting**: Search results include 120-char context snippets +8. **Feature flag**: `cameleer.storage.search=opensearch` uses OpenSearch, `=clickhouse` uses ClickHouse +9. **Backward compat**: With `clickhouse.enabled=false`, server starts without CH beans (PG + OpenSearch only) +10. **seq/parentSeq**: Processor records correctly store seq, parentSeq, iteration, iterationSize +11. **CI**: `mvn clean verify -DskipITs` passes diff --git a/docs/superpowers/plans/2026-03-31-clickhouse-phase3-stats-analytics.md b/docs/superpowers/plans/2026-03-31-clickhouse-phase3-stats-analytics.md new file mode 100644 index 00000000..e256924d --- /dev/null +++ b/docs/superpowers/plans/2026-03-31-clickhouse-phase3-stats-analytics.md @@ -0,0 +1,431 @@ +# ClickHouse Phase 3: Stats & Analytics — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace TimescaleDB continuous aggregates with ClickHouse materialized views and implement a `ClickHouseStatsStore` that reads from them using `-Merge` aggregate functions. + +**Architecture:** 5 DDL scripts create AggregatingMergeTree target tables + materialized views that trigger on INSERT to `executions` and `processor_executions`. A `ClickHouseStatsStore` implements the existing `StatsStore` interface, translating `time_bucket()` → `toStartOfInterval()`, `SUM(total_count)` → `countMerge(total_count)`, `approx_percentile` → `quantileMerge`, etc. SLA and topErrors queries hit the raw `executions` / `processor_executions` tables with `FINAL`. Feature flag `cameleer.storage.stats=postgres|clickhouse` controls which implementation is active. + +**Tech Stack:** ClickHouse 24.12, AggregatingMergeTree, `-State`/`-Merge` combinators, JdbcTemplate, Testcontainers + +**Design Spec:** `docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md` (Materialized Views + Stats Query Translation sections) + +--- + +## File Structure + +| File | Responsibility | +|------|----------------| +| `cameleer3-server-app/.../resources/clickhouse/V4__stats_tables_and_mvs.sql` | DDL for all 5 stats tables + 5 materialized views | +| `cameleer3-server-app/.../storage/ClickHouseStatsStore.java` | StatsStore impl using -Merge functions on AggregatingMergeTree tables | +| `cameleer3-server-app/.../config/StorageBeanConfig.java` | Modified: add CH stats store bean with feature flag | +| `cameleer3-server-app/.../storage/PostgresStatsStore.java` | Modified: add ConditionalOnProperty | +| `cameleer3-server-app/.../resources/application.yml` | Modified: add `cameleer.storage.stats` flag | +| `deploy/base/server.yaml` | Modified: add `CAMELEER_STORAGE_STATS` env var | +| `cameleer3-server-app/...test.../storage/ClickHouseStatsStoreIT.java` | Integration test for CH stats queries | + +--- + +## Query Translation Reference + +| TimescaleDB (PostgresStatsStore) | ClickHouse (ClickHouseStatsStore) | +|----------------------------------|-----------------------------------| +| `time_bucket(N * INTERVAL '1 second', bucket)` | `toStartOfInterval(bucket, INTERVAL N SECOND)` | +| `SUM(total_count)` | `countMerge(total_count)` | +| `SUM(failed_count)` | `countIfMerge(failed_count)` | +| `SUM(running_count)` | `countIfMerge(running_count)` | +| `SUM(duration_sum)` | `sumMerge(duration_sum)` | +| `MAX(p99_duration)` | `quantileMerge(0.99)(p99_duration)` | +| `MAX(duration_max)` | `maxMerge(duration_max)` | +| `SUM(duration_sum) / SUM(total_count)` | `sumMerge(duration_sum) / countMerge(total_count)` | +| `COUNT(*) FILTER (WHERE ...)` | `countIf(...)` | +| `EXTRACT(DOW FROM bucket)` | `toDayOfWeek(bucket, 1) % 7` (1=Mon in CH, shift to 0=Sun) | +| `EXTRACT(HOUR FROM bucket)` | `toHour(bucket)` | +| `LEFT(error_message, 200)` | `substring(error_message, 1, 200)` | +| `COUNT(DISTINCT ...)` | `uniq(...)` or `COUNT(DISTINCT ...)` | + +--- + +### Task 1: DDL for Stats Tables and Materialized Views + +**Files:** +- Create: `cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql` + +All 5 table+MV pairs in a single DDL file. Tables use `AggregatingMergeTree()`. MVs use `-State` combinators and trigger on INSERT to `executions` or `processor_executions`. + +- [ ] **Step 1: Create the DDL file** + +```sql +-- V4__stats_tables_and_mvs.sql +-- Materialized views replacing TimescaleDB continuous aggregates. +-- Tables use AggregatingMergeTree; MVs use -State combinators. + +-- ── stats_1m_all (global) ──────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS stats_1m_all ( + tenant_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS +SELECT + tenant_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, bucket; + +-- ── stats_1m_app (per-application) ─────────────────────────────────── + +CREATE TABLE IF NOT EXISTS stats_1m_app ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS +SELECT + tenant_id, + application_name, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_name, bucket; + +-- ── stats_1m_route (per-route) ─────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS stats_1m_route ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + route_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, route_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS +SELECT + tenant_id, + application_name, + route_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_name, route_id, bucket; + +-- ── stats_1m_processor (per-processor-type) ────────────────────────── + +CREATE TABLE IF NOT EXISTS stats_1m_processor ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + processor_type LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, processor_type, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS +SELECT + tenant_id, + application_name, + processor_type, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_name, processor_type, bucket; + +-- ── stats_1m_processor_detail (per-processor-id) ───────────────────── + +CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + route_id LowCardinality(String), + processor_id String, + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, route_id, processor_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS +SELECT + tenant_id, + application_name, + route_id, + processor_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_name, route_id, processor_id, bucket; +``` + +Note: The `ClickHouseSchemaInitializer` runs each `.sql` file as a single statement. ClickHouse supports multiple statements separated by `;` in a single call, BUT the JDBC driver may not. If the initializer fails, each CREATE statement may need to be in its own file. Check during testing. + +**IMPORTANT**: The ClickHouseSchemaInitializer needs to handle multi-statement files. Read it first — if it uses `jdbc.execute(sql)` for each file, the semicolons between statements will cause issues. If so, split into separate files (V4a, V4b, etc.) or modify the initializer to split on `;`. + +- [ ] **Step 2: Check ClickHouseSchemaInitializer handles multi-statement** + +Read `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java`. If it runs each file as a single `jdbc.execute()`, modify it to split on `;` and run each statement separately. If it already handles this, proceed. + +- [ ] **Step 3: Verify DDL loads in Testcontainers** + +Write a quick smoke test or manually verify that all 10 objects (5 tables + 5 MVs) are created: + +```bash +mvn clean compile -pl cameleer3-server-app -f pom.xml +``` + +- [ ] **Step 4: Commit** + +```bash +git add cameleer3-server-app/src/main/resources/clickhouse/V4__stats_tables_and_mvs.sql +# also add ClickHouseSchemaInitializer if modified +git commit -m "feat(clickhouse): add stats materialized views DDL (5 tables + 5 MVs)" +``` + +--- + +### Task 2: ClickHouseStatsStore — Aggregate Queries + +**Files:** +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java` + +The store implements `StatsStore` using ClickHouse `-Merge` functions. It follows the same pattern as `PostgresStatsStore` but with ClickHouse SQL syntax. + +**Key implementation patterns:** + +1. **Stats queries** (queryStats): Read from `stats_1m_*` tables using `-Merge` combinators: + ```sql + SELECT + countMerge(total_count) AS total_count, + countIfMerge(failed_count) AS failed_count, + CASE WHEN countMerge(total_count) > 0 + THEN sumMerge(duration_sum) / countMerge(total_count) ELSE 0 END AS avg_duration, + quantileMerge(0.99)(p99_duration) AS p99_duration, + countIfMerge(running_count) AS active_count + FROM stats_1m_all + WHERE tenant_id = 'default' AND bucket >= ? AND bucket < ? + ``` + Same pattern for prev-24h and today queries (identical to PostgresStatsStore logic). + +2. **Timeseries queries** (queryTimeseries): Group by time period: + ```sql + SELECT + toStartOfInterval(bucket, INTERVAL ? SECOND) AS period, + countMerge(total_count) AS total_count, + countIfMerge(failed_count) AS failed_count, + CASE WHEN countMerge(total_count) > 0 + THEN sumMerge(duration_sum) / countMerge(total_count) ELSE 0 END AS avg_duration, + quantileMerge(0.99)(p99_duration) AS p99_duration, + countIfMerge(running_count) AS active_count + FROM stats_1m_app + WHERE tenant_id = 'default' AND bucket >= ? AND bucket < ? AND application_name = ? + GROUP BY period ORDER BY period + ``` + +3. **Grouped timeseries**: Same as timeseries but with extra GROUP BY column (application_name or route_id), returned as `Map`. + +4. **SLA compliance**: Hit raw `executions FINAL` table: + ```sql + SELECT + countIf(duration_ms <= ? AND status != 'RUNNING') AS compliant, + countIf(status != 'RUNNING') AS total + FROM executions FINAL + WHERE tenant_id = 'default' AND start_time >= ? AND start_time < ? + AND application_name = ? + ``` + +5. **SLA counts by app/route**: Same pattern with GROUP BY. + +6. **Top errors**: Hit raw `executions FINAL` or `processor_executions` table with CTE for counts + velocity. ClickHouse differences: + - No `FILTER (WHERE ...)` → use `countIf(...)` + - No `LEFT(s, n)` → use `substring(s, 1, n)` + - CTE syntax is identical (`WITH ... AS (...)`) + +7. **Active error types**: `SELECT uniq(...)` or `COUNT(DISTINCT ...)` from raw executions. + +8. **Punchcard**: ClickHouse day-of-week: `toDayOfWeek(bucket, 1)` returns 1=Mon..7=Sun. PG `EXTRACT(DOW)` returns 0=Sun..6=Sat. Conversion: `toDayOfWeek(bucket, 1) % 7` gives 0=Sun..6=Sat. + +**Constructor**: Takes `@Qualifier("clickHouseJdbcTemplate") JdbcTemplate jdbc`. + +**Test approach**: Seed data by inserting directly into `executions` and `processor_executions` tables (the MVs trigger automatically on INSERT). Then query via the StatsStore methods and verify results. + +**Test data seeding**: Insert 10 executions across 2 apps, 3 routes, spanning 10 minutes. Include some FAILED, some COMPLETED, varying durations. Then verify: +- `stats()` returns correct totals +- `statsForApp()` filters correctly +- `timeseries()` returns multiple buckets +- `slaCompliance()` returns correct percentage +- `topErrors()` returns ranked errors +- `punchcard()` returns non-empty cells + +- [ ] **Step 1: Write the failing integration test** + +Create `ClickHouseStatsStoreIT.java` with: +- Load all 4 DDL files (V2 executions, V3 processor_executions, V4 stats MVs) +- Seed 10+ executions and 20+ processor records across 2 apps, 3 routes, 10 minutes +- Test: `stats_returnsCorrectTotals`, `statsForApp_filtersCorrectly`, `timeseries_returnsBuckets`, `timeseriesGroupedByApp_returnsMap`, `slaCompliance_calculatesCorrectly`, `topErrors_returnsRankedErrors`, `activeErrorTypes_countsDistinct`, `punchcard_returnsNonEmpty`, `slaCountsByApp_returnsMap` + +- [ ] **Step 2: Run test to verify it fails** + +```bash +mvn test -pl cameleer3-server-app -Dtest=ClickHouseStatsStoreIT -Dfailsafe.provider=surefire -DfailIfNoTests=false -f pom.xml +``` + +- [ ] **Step 3: Implement ClickHouseStatsStore** + +Follow the `PostgresStatsStore` structure closely. Same private `Filter` record, same `queryStats`/`queryTimeseries`/`queryGroupedTimeseries` helper methods. Replace PG-specific SQL with CH equivalents per the translation table above. + +- [ ] **Step 4: Run test to verify it passes** + +```bash +mvn test -pl cameleer3-server-app -Dtest=ClickHouseStatsStoreIT -Dfailsafe.provider=surefire -f pom.xml +``` + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java \ + cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java +git commit -m "feat(clickhouse): add ClickHouseStatsStore with -Merge aggregate queries" +``` + +--- + +### Task 3: Feature Flag Wiring + +**Files:** +- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java` +- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java` +- Modify: `cameleer3-server-app/src/main/resources/application.yml` +- Modify: `deploy/base/server.yaml` + +- [ ] **Step 1: Add ConditionalOnProperty to PostgresStatsStore** + +```java +@Repository +@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "postgres", matchIfMissing = true) +public class PostgresStatsStore implements StatsStore { +``` + +- [ ] **Step 2: Add CH StatsStore bean to StorageBeanConfig** + +```java +@Bean +@ConditionalOnProperty(name = "cameleer.storage.stats", havingValue = "clickhouse") +public StatsStore clickHouseStatsStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseStatsStore(clickHouseJdbc); +} +``` + +- [ ] **Step 3: Update application.yml** + +Add under `cameleer.storage`: +```yaml + stats: ${CAMELEER_STORAGE_STATS:postgres} +``` + +- [ ] **Step 4: Update deploy/base/server.yaml** + +Add env var: +```yaml + - name: CAMELEER_STORAGE_STATS + value: "postgres" +``` + +- [ ] **Step 5: Compile and verify all tests pass** + +```bash +mvn clean verify -DskipITs -f pom.xml +``` + +- [ ] **Step 6: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java \ + cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresStatsStore.java \ + cameleer3-server-app/src/main/resources/application.yml \ + deploy/base/server.yaml +git commit -m "feat(clickhouse): wire ClickHouseStatsStore with cameleer.storage.stats feature flag" +``` + +--- + +## Verification Checklist + +After all tasks are complete, verify: + +1. **MVs trigger**: Insert a row into `executions`, verify `stats_1m_all` has a row +2. **Aggregate correctness**: Insert known data, verify countMerge/sumMerge/quantileMerge produce correct values +3. **Timeseries bucketing**: Verify `toStartOfInterval` groups correctly across time ranges +4. **SLA compliance**: Verify percentage calculation against raw data +5. **Top errors**: Verify ranking and velocity trend detection +6. **Punchcard**: Verify weekday/hour mapping (0=Sun..6=Sat convention) +7. **Feature flag**: `cameleer.storage.stats=postgres` uses PG, `=clickhouse` uses CH +8. **Backward compat**: With default config, everything uses PG +9. **CI**: `mvn clean verify -DskipITs` passes diff --git a/docs/superpowers/plans/2026-03-31-clickhouse-phase4-remaining-tables.md b/docs/superpowers/plans/2026-03-31-clickhouse-phase4-remaining-tables.md new file mode 100644 index 00000000..5c55fdf9 --- /dev/null +++ b/docs/superpowers/plans/2026-03-31-clickhouse-phase4-remaining-tables.md @@ -0,0 +1,244 @@ +# ClickHouse Phase 4: Remaining Tables — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Migrate route diagrams, agent events, and application logs from PostgreSQL/OpenSearch to ClickHouse. + +**Architecture:** Three new ClickHouse stores implement existing interfaces. `ClickHouseDiagramStore` uses ReplacingMergeTree for content-hash dedup. `ClickHouseAgentEventRepository` uses MergeTree for append-only events. `ClickHouseLogStore` replaces `OpenSearchLogIndex` with SQL + ngram indexes. Feature flags control each store independently. + +**Tech Stack:** ClickHouse 24.12, JdbcTemplate, Testcontainers + +**Design Spec:** `docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md` + +--- + +## File Structure + +| File | Responsibility | +|------|----------------| +| `cameleer3-server-app/.../resources/clickhouse/V5__route_diagrams.sql` | DDL for `route_diagrams` (ReplacingMergeTree) | +| `cameleer3-server-app/.../resources/clickhouse/V6__agent_events.sql` | DDL for `agent_events` (MergeTree) | +| `cameleer3-server-app/.../resources/clickhouse/V7__logs.sql` | DDL for `logs` (MergeTree with ngram indexes) | +| `cameleer3-server-app/.../storage/ClickHouseDiagramStore.java` | DiagramStore impl for ClickHouse | +| `cameleer3-server-app/.../storage/ClickHouseAgentEventRepository.java` | AgentEventRepository impl for ClickHouse | +| `cameleer3-server-app/.../search/ClickHouseLogStore.java` | Replaces OpenSearchLogIndex | +| `cameleer3-server-app/.../config/StorageBeanConfig.java` | Modified: add CH beans with feature flags | +| `cameleer3-server-app/.../storage/PostgresDiagramStore.java` | Modified: add ConditionalOnProperty | +| `cameleer3-server-app/.../storage/PostgresAgentEventRepository.java` | Modified: add ConditionalOnProperty | +| `cameleer3-server-app/.../search/OpenSearchLogIndex.java` | Modified: add ConditionalOnProperty | +| `cameleer3-server-app/.../resources/application.yml` | Modified: add feature flags | +| `deploy/base/server.yaml` | Modified: add env vars | + +--- + +### Task 1: DDL Scripts + +**Files:** +- Create: `cameleer3-server-app/src/main/resources/clickhouse/V5__route_diagrams.sql` +- Create: `cameleer3-server-app/src/main/resources/clickhouse/V6__agent_events.sql` +- Create: `cameleer3-server-app/src/main/resources/clickhouse/V7__logs.sql` + +- [ ] **Step 1: Create route_diagrams DDL** + +```sql +CREATE TABLE IF NOT EXISTS route_diagrams ( + tenant_id LowCardinality(String) DEFAULT 'default', + content_hash String, + route_id LowCardinality(String), + agent_id LowCardinality(String), + application_name LowCardinality(String), + definition String, + created_at DateTime64(3) DEFAULT now64(3) +) +ENGINE = ReplacingMergeTree(created_at) +ORDER BY (tenant_id, content_hash) +SETTINGS index_granularity = 8192 +``` + +- [ ] **Step 2: Create agent_events DDL** + +```sql +CREATE TABLE IF NOT EXISTS agent_events ( + tenant_id LowCardinality(String) DEFAULT 'default', + timestamp DateTime64(3) DEFAULT now64(3), + agent_id LowCardinality(String), + app_id LowCardinality(String), + event_type LowCardinality(String), + detail String DEFAULT '' +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(timestamp)) +ORDER BY (tenant_id, app_id, agent_id, timestamp) +TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE +``` + +- [ ] **Step 3: Create logs DDL** + +```sql +CREATE TABLE IF NOT EXISTS logs ( + tenant_id LowCardinality(String) DEFAULT 'default', + timestamp DateTime64(3), + application LowCardinality(String), + agent_id LowCardinality(String), + level LowCardinality(String), + logger_name LowCardinality(String) DEFAULT '', + message String, + thread_name LowCardinality(String) DEFAULT '', + stack_trace String DEFAULT '', + exchange_id String DEFAULT '', + mdc Map(String, String) DEFAULT map(), + + INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_level level TYPE set(10) GRANULARITY 1 +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(timestamp)) +ORDER BY (tenant_id, application, timestamp) +TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192 +``` + +- [ ] **Step 4: Compile and commit** + +```bash +mvn clean compile -pl cameleer3-server-app -f pom.xml +git commit -m "feat(clickhouse): add DDL for route_diagrams, agent_events, and logs tables" +``` + +--- + +### Task 2: ClickHouseDiagramStore + +**Files:** +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramStore.java` +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseDiagramStoreIT.java` + +Implements `DiagramStore` interface (5 methods). Read `PostgresDiagramStore.java` first and translate. + +**Key differences from PG:** +- `INSERT INTO ... ON CONFLICT DO NOTHING` → just `INSERT INTO` (ReplacingMergeTree deduplicates by content_hash) +- `?::jsonb` → plain `?` (CH stores definition as String, not JSONB) +- `ORDER BY created_at DESC LIMIT 1` → `ORDER BY created_at DESC LIMIT 1` (same, but add `FINAL` for ReplacingMergeTree reads) +- `findProcessorRouteMapping`: PG uses `jsonb_array_elements()` — CH has no native JSON array functions. Instead, store the definition as a string and parse in Java, OR query `route_diagrams FINAL` and deserialize definitions application-side. **Recommended:** Fetch all definitions for the application, deserialize in Java, extract processor→route mappings. This is a small result set (one row per route). +- SHA-256 content hash computation stays in Java (same as PG store) +- Add `WHERE tenant_id = 'default'` to all queries + +**Tests:** +- `store_insertsNewDiagram` +- `store_duplicateHashIgnored` (ReplacingMergeTree dedup after OPTIMIZE FINAL) +- `findByContentHash_returnsGraph` +- `findContentHashForRoute_returnsMostRecent` +- `findProcessorRouteMapping_extractsMapping` + +--- + +### Task 3: ClickHouseAgentEventRepository + +**Files:** +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepository.java` +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepositoryIT.java` + +Implements `AgentEventRepository` interface (2 methods: `insert` + `query`). + +**Key differences from PG:** +- No `BIGSERIAL id` — CH doesn't have auto-increment. `AgentEventRecord` has `long id` but set to 0 for CH rows. +- `INSERT INTO agent_events (tenant_id, agent_id, app_id, event_type, detail) VALUES (?, ?, ?, ?, ?)` +- `query` builds dynamic WHERE (same pattern as PG) with `ORDER BY timestamp DESC LIMIT ?` +- Add `WHERE tenant_id = 'default'` + +**Tests:** +- `insert_writesEvent` +- `query_byAppId_filtersCorrectly` +- `query_byTimeRange_filtersCorrectly` +- `query_respectsLimit` + +--- + +### Task 4: ClickHouseLogStore + +**Files:** +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java` +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseLogStoreIT.java` + +Replaces `OpenSearchLogIndex`. Must have the same public API: +- `search(application, agentId, level, query, exchangeId, from, to, limit)` → returns `List` +- `indexBatch(agentId, application, List entries)` → batch INSERT into `logs` + +**Key implementation:** + +`indexBatch`: Batch INSERT with `Map(String, String)` for MDC column. Extract `camel.exchangeId` from MDC into top-level `exchange_id` column. + +```sql +INSERT INTO logs (tenant_id, timestamp, application, agent_id, level, logger_name, + message, thread_name, stack_trace, exchange_id, mdc) +VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +``` + +MCE Map type: pass as `java.util.HashMap` — ClickHouse JDBC 0.9.7 supports native Map type (same as ClickHouseMetricsStore uses for tags). + +`search`: Build WHERE clause from params. Use `LIKE '%query%'` for message text search (ngram-accelerated). Return `LogEntryResponse` records. + +```sql +SELECT timestamp, level, logger_name, message, thread_name, stack_trace +FROM logs +WHERE tenant_id = 'default' AND application = ? + [AND agent_id = ?] + [AND level = ?] + [AND (exchange_id = ? OR mdc['camel.exchangeId'] = ?)] + [AND message LIKE ?] + [AND timestamp >= ?] + [AND timestamp <= ?] +ORDER BY timestamp DESC +LIMIT ? +``` + +**Tests:** +- `indexBatch_writesLogs` +- `search_byApplication_returnsLogs` +- `search_byLevel_filtersCorrectly` +- `search_byQuery_usesLikeSearch` +- `search_byExchangeId_matchesTopLevelAndMdc` +- `search_byTimeRange_filtersCorrectly` +- `indexBatch_storesMdc` + +--- + +### Task 5: Feature Flag Wiring + +**Files to modify:** +- `PostgresDiagramStore.java` — add `@ConditionalOnProperty(name = "cameleer.storage.diagrams", havingValue = "postgres")` +- `PostgresAgentEventRepository.java` — add `@ConditionalOnProperty(name = "cameleer.storage.events", havingValue = "postgres")` +- `OpenSearchLogIndex.java` — add `@ConditionalOnProperty(name = "cameleer.storage.logs", havingValue = "opensearch")` +- `StorageBeanConfig.java` — add CH diagram, event, and log store beans (all default to clickhouse with `matchIfMissing = true`) +- `application.yml` — add `diagrams`, `events`, `logs` flags under `cameleer.storage` +- `deploy/base/server.yaml` — add env vars + +**Feature flags (all default to clickhouse):** +```yaml +cameleer: + storage: + metrics: ${CAMELEER_STORAGE_METRICS:postgres} + search: ${CAMELEER_STORAGE_SEARCH:opensearch} + stats: ${CAMELEER_STORAGE_STATS:clickhouse} + diagrams: ${CAMELEER_STORAGE_DIAGRAMS:clickhouse} + events: ${CAMELEER_STORAGE_EVENTS:clickhouse} + logs: ${CAMELEER_STORAGE_LOGS:clickhouse} +``` + +**Important for LogStore wiring:** The `OpenSearchLogIndex` is a `@Repository` used directly by controllers (not via an interface). The `ClickHouseLogStore` must be injectable in the same way. Options: +- Extract a `LogIndex` interface with `search()` + `indexBatch()` methods, used by both controllers +- Or make `ClickHouseLogStore` extend/implement the same type + +**Recommended:** Create a `LogIndex` interface in the core module with the two methods, have both `OpenSearchLogIndex` and `ClickHouseLogStore` implement it, and update `LogIngestionController` + `LogQueryController` to inject `LogIndex` instead of `OpenSearchLogIndex`. + +--- + +## Verification Checklist + +1. **Diagrams**: Store + retrieve RouteGraph via ClickHouse, verify content-hash dedup +2. **Events**: Insert + query events with time range and app/agent filters +3. **Logs**: Batch insert + search with all filter types (level, query, exchangeId, time range) +4. **Feature flags**: Each store independently switchable between PG/OS and CH +5. **Backward compat**: Default config uses ClickHouse for all Phase 4 stores +6. **CI**: `mvn clean verify -DskipITs` passes diff --git a/docs/superpowers/plans/2026-04-02-admin-context-separation.md b/docs/superpowers/plans/2026-04-02-admin-context-separation.md new file mode 100644 index 00000000..66389247 --- /dev/null +++ b/docs/superpowers/plans/2026-04-02-admin-context-separation.md @@ -0,0 +1,285 @@ +# Admin Page Context Separation Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Hide the operational sidebar and TopBar on admin pages, replacing them with a minimal admin header bar containing only a back button and logout. + +**Architecture:** Conditionally skip ``, ``, and `` in `LayoutShell` when `isAdminPage` is true. Add a self-contained admin header bar inside `AdminLayout` with back navigation and user/logout. No design system changes needed — `AppShell` already handles `null` sidebar gracefully. + +**Tech Stack:** React 19, react-router, lucide-react icons, CSS Modules, `@cameleer/design-system` CSS variables + +--- + +## File Map + +| File | Action | Responsibility | +|------|--------|----------------| +| `ui/src/components/LayoutShell.tsx` | Modify | Conditionally skip Sidebar, TopBar, CommandPalette on admin pages | +| `ui/src/pages/Admin/AdminLayout.tsx` | Modify | Add slim admin header bar with back button and user/logout | +| `ui/src/pages/Admin/AdminLayout.module.css` | Create | Styles for admin header bar | + +--- + +### Task 1: Modify LayoutShell to Skip Sidebar/TopBar on Admin Pages + +**Files:** +- Modify: `ui/src/components/LayoutShell.tsx` + +- [ ] **Step 1: Update `AppShell` sidebar prop to be conditional** + +In `LayoutShell.tsx`, find the `return` statement in `LayoutContent` (around line 296). Change the `` to conditionally pass `null` as sidebar, and conditionally render TopBar and CommandPalette: + +```tsx + return ( + + } + > + {!isAdminPage && ( + + )} + {!isAdminPage && ( + setPaletteOpen(false)} + onOpen={() => setPaletteOpen(true)} + onSelect={handlePaletteSelect} + onSubmit={handlePaletteSubmit} + onQueryChange={setPaletteQuery} + data={searchData} + /> + )} + + {!isAdminPage && ( + + )} + +

+ +
+ + ); +``` + +Note: The existing `isAdminPage` guard on `ContentTabs` (line 317) and the padding ternary on `
` (line 321) should be updated. Admin padding moves into `AdminLayout` itself, so set `padding: 0` for admin in the main tag. + +- [ ] **Step 2: Verify it compiles** + +Run: `cd ui && npx tsc --noEmit` +Expected: No type errors. + +- [ ] **Step 3: Commit** + +```bash +git add ui/src/components/LayoutShell.tsx +git commit -m "feat(#112): hide sidebar, topbar, cmd palette on admin pages" +``` + +--- + +### Task 2: Create Admin Header Bar Styles + +**Files:** +- Create: `ui/src/pages/Admin/AdminLayout.module.css` + +- [ ] **Step 1: Create the CSS module** + +```css +.header { + display: flex; + align-items: center; + height: 48px; + padding: 0 16px; + background: var(--bg-surface); + border-bottom: 1px solid var(--border-subtle); +} + +.backBtn { + display: flex; + align-items: center; + gap: 6px; + background: none; + border: none; + color: var(--text-secondary); + font-size: 13px; + font-family: var(--font-body); + cursor: pointer; + padding: 6px 10px; + border-radius: var(--radius-sm); +} + +.backBtn:hover { + background: var(--bg-hover); + color: var(--text-primary); +} + +.title { + flex: 1; + text-align: center; + font-size: 12px; + font-weight: 600; + letter-spacing: 0.08em; + color: var(--text-muted); + text-transform: uppercase; +} + +.userSection { + display: flex; + align-items: center; + gap: 12px; +} + +.username { + font-size: 13px; + color: var(--text-secondary); + font-family: var(--font-body); +} + +.logoutBtn { + background: none; + border: 1px solid var(--border-subtle); + border-radius: var(--radius-sm); + padding: 4px 12px; + font-size: 12px; + color: var(--text-secondary); + cursor: pointer; + font-family: var(--font-body); +} + +.logoutBtn:hover { + background: var(--bg-hover); + color: var(--text-primary); +} + +.content { + padding: 20px 24px 40px; +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add ui/src/pages/Admin/AdminLayout.module.css +git commit -m "feat(#112): add admin header bar styles" +``` + +--- + +### Task 3: Add Admin Header Bar to AdminLayout + +**Files:** +- Modify: `ui/src/pages/Admin/AdminLayout.tsx` + +- [ ] **Step 1: Rewrite AdminLayout with the header bar** + +Replace the full contents of `AdminLayout.tsx`: + +```tsx +import { Outlet, useNavigate, useLocation } from 'react-router'; +import { Tabs } from '@cameleer/design-system'; +import { ArrowLeft, LogOut } from 'lucide-react'; +import { useAuthStore } from '../../auth/auth-store'; +import styles from './AdminLayout.module.css'; + +const ADMIN_TABS = [ + { label: 'User Management', value: '/admin/rbac' }, + { label: 'Audit Log', value: '/admin/audit' }, + { label: 'OIDC', value: '/admin/oidc' }, + { label: 'App Config', value: '/admin/appconfig' }, + { label: 'Database', value: '/admin/database' }, + { label: 'ClickHouse', value: '/admin/clickhouse' }, +]; + +export default function AdminLayout() { + const navigate = useNavigate(); + const location = useLocation(); + const { username, logout } = useAuthStore(); + + const handleBack = () => navigate('/exchanges'); + const handleLogout = () => { + logout(); + navigate('/login'); + }; + + return ( +
+
+ + Admin +
+ {username} + +
+
+ navigate(path)} + /> +
+ +
+
+ ); +} +``` + +- [ ] **Step 2: Verify it compiles** + +Run: `cd ui && npx tsc --noEmit` +Expected: No type errors. + +- [ ] **Step 3: Commit** + +```bash +git add ui/src/pages/Admin/AdminLayout.tsx +git commit -m "feat(#112): add admin header bar with back button and logout" +``` + +--- + +### Task 4: Visual Verification + +- [ ] **Step 1: Start the dev server and verify** + +Run: `cd ui && npm run dev` + +Open `http://localhost:5173/admin/rbac` and verify: +1. No sidebar visible — content spans full viewport width +2. No TopBar (no breadcrumb, no search trigger, no status filters, no time range) +3. Admin header bar visible at top with: `[<- Back]` left, `ADMIN` center, `[username] [logout]` right +4. Admin tabs (User Management, Audit Log, etc.) below the header bar +5. Admin content renders correctly below tabs + +- [ ] **Step 2: Verify operational pages are unaffected** + +Navigate to `http://localhost:5173/exchanges` and verify: +1. Sidebar renders normally with app/agent/route trees +2. TopBar renders with breadcrumb, search, filters, time range +3. ContentTabs show (Exchanges, Dashboard, Runtime, Logs) +4. Command palette works (Ctrl+K / Cmd+K) + +- [ ] **Step 3: Verify back button** + +From any admin page, click "Back" — should navigate to `/exchanges`. + +- [ ] **Step 4: Verify logout** + +Click logout icon in admin header — should navigate to `/login`. + +- [ ] **Step 5: Final commit if any fixes were needed** + +```bash +git add -A +git commit -m "fix(#112): admin layout adjustments from visual review" +``` diff --git a/docs/superpowers/plans/2026-04-02-composable-sidebar-migration.md b/docs/superpowers/plans/2026-04-02-composable-sidebar-migration.md new file mode 100644 index 00000000..0cc9f1ec --- /dev/null +++ b/docs/superpowers/plans/2026-04-02-composable-sidebar-migration.md @@ -0,0 +1,572 @@ +# Composable Sidebar Migration Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Migrate the server UI from the old monolithic `` to the new composable compound Sidebar API from `@cameleer/design-system` v0.1.23, adding admin accordion behavior and icon-rail collapse. + +**Architecture:** Extract tree-building logic into a local `sidebar-utils.ts`, rewrite sidebar composition in `LayoutShell.tsx` using `Sidebar.Header/Section/Footer` compound components, add admin accordion behavior via route-based section state management, and simplify `AdminLayout.tsx` by removing its tab navigation (sidebar handles it now). + +**Tech Stack:** React 19, `@cameleer/design-system` v0.1.23, react-router, lucide-react, CSS Modules + +--- + +## File Map + +| File | Action | Responsibility | +|------|--------|----------------| +| `ui/src/components/sidebar-utils.ts` | Create | Tree-building functions, SidebarApp type, formatCount, admin node builder | +| `ui/src/components/LayoutShell.tsx` | Modify | Rewrite sidebar composition with compound API, add accordion + collapse | +| `ui/src/pages/Admin/AdminLayout.tsx` | Modify | Remove tab navigation (sidebar handles it), keep content wrapper | + +--- + +### Task 1: Create sidebar-utils.ts with Tree-Building Functions + +**Files:** +- Create: `ui/src/components/sidebar-utils.ts` + +- [ ] **Step 1: Create the utility file** + +This file contains the `SidebarApp` type (moved from DS), tree-building functions, and the admin node builder. These were previously inside the DS's monolithic Sidebar component. + +```typescript +import type { ReactNode } from 'react'; +import type { SidebarTreeNode } from '@cameleer/design-system'; + +// ── Domain types (moved from DS) ────────────────────────────────────────── + +export interface SidebarRoute { + id: string; + name: string; + exchangeCount: number; +} + +export interface SidebarAgent { + id: string; + name: string; + status: 'live' | 'stale' | 'dead'; + tps: number; +} + +export interface SidebarApp { + id: string; + name: string; + health: 'live' | 'stale' | 'dead'; + exchangeCount: number; + routes: SidebarRoute[]; + agents: SidebarAgent[]; +} + +// ── Helpers ─────────────────────────────────────────────────────────────── + +export function formatCount(n: number): string { + if (n >= 1000) return `${(n / 1000).toFixed(1)}k`; + return String(n); +} + +// ── Tree node builders ──────────────────────────────────────────────────── + +export function buildAppTreeNodes( + apps: SidebarApp[], + statusDot: (health: string) => ReactNode, + chevron: ReactNode, +): SidebarTreeNode[] { + return apps.map((app) => ({ + id: app.id, + label: app.name, + icon: statusDot(app.health), + badge: formatCount(app.exchangeCount), + path: `/apps/${app.id}`, + starrable: true, + starKey: `app:${app.id}`, + children: app.routes.map((route) => ({ + id: `${app.id}/${route.id}`, + label: route.name, + icon: chevron, + badge: formatCount(route.exchangeCount), + path: `/apps/${app.id}/${route.id}`, + })), + })); +} + +export function buildAgentTreeNodes( + apps: SidebarApp[], + statusDot: (health: string) => ReactNode, +): SidebarTreeNode[] { + return apps + .filter((app) => app.agents.length > 0) + .map((app) => { + const liveCount = app.agents.filter((a) => a.status === 'live').length; + return { + id: `agents:${app.id}`, + label: app.name, + icon: statusDot(app.health), + badge: `${liveCount}/${app.agents.length} live`, + path: `/agents/${app.id}`, + starrable: true, + starKey: `agent:${app.id}`, + children: app.agents.map((agent) => ({ + id: `agents:${app.id}/${agent.id}`, + label: agent.name, + icon: statusDot(agent.status), + badge: `${agent.tps.toFixed(1)}/s`, + path: `/agents/${app.id}/${agent.id}`, + })), + }; + }); +} + +export function buildRouteTreeNodes( + apps: SidebarApp[], + statusDot: (health: string) => ReactNode, + chevron: ReactNode, +): SidebarTreeNode[] { + return apps + .filter((app) => app.routes.length > 0) + .map((app) => ({ + id: `routes:${app.id}`, + label: app.name, + icon: statusDot(app.health), + badge: `${app.routes.length} route${app.routes.length !== 1 ? 's' : ''}`, + path: `/routes/${app.id}`, + starrable: true, + starKey: `routestat:${app.id}`, + children: app.routes.map((route) => ({ + id: `routes:${app.id}/${route.id}`, + label: route.name, + icon: chevron, + badge: formatCount(route.exchangeCount), + path: `/routes/${app.id}/${route.id}`, + })), + })); +} + +export function buildAdminTreeNodes(): SidebarTreeNode[] { + return [ + { id: 'admin:rbac', label: 'User Management', path: '/admin/rbac' }, + { id: 'admin:audit', label: 'Audit Log', path: '/admin/audit' }, + { id: 'admin:oidc', label: 'OIDC', path: '/admin/oidc' }, + { id: 'admin:appconfig', label: 'App Config', path: '/admin/appconfig' }, + { id: 'admin:database', label: 'Database', path: '/admin/database' }, + { id: 'admin:clickhouse', label: 'ClickHouse', path: '/admin/clickhouse' }, + ]; +} + +// ── localStorage-backed section collapse ────────────────────────────────── + +export function readCollapsed(key: string, defaultValue: boolean): boolean { + try { + const raw = localStorage.getItem(key); + if (raw !== null) return raw === 'true'; + } catch { /* ignore */ } + return defaultValue; +} + +export function writeCollapsed(key: string, value: boolean): void { + try { + localStorage.setItem(key, String(value)); + } catch { /* ignore */ } +} +``` + +- [ ] **Step 2: Verify it compiles** + +Run: `cd C:/Users/Hendrik/Documents/projects/cameleer3-server/ui && npx tsc --project tsconfig.app.json --noEmit` +Expected: No errors. + +- [ ] **Step 3: Commit** + +```bash +git add ui/src/components/sidebar-utils.ts +git commit -m "feat(#112): extract sidebar tree builders and types from DS" +``` + +--- + +### Task 2: Rewrite LayoutShell with Composable Sidebar + +**Files:** +- Modify: `ui/src/components/LayoutShell.tsx` + +This is the main migration task. The `LayoutContent` function gets a significant rewrite of its sidebar composition while preserving all existing TopBar, CommandPalette, ContentTabs, breadcrumb, and scope logic. + +- [ ] **Step 1: Update imports** + +Replace the old DS imports and add new ones. In `LayoutShell.tsx`, change the first 10 lines to: + +```typescript +import { Outlet, useNavigate, useLocation } from 'react-router'; +import { AppShell, Sidebar, SidebarTree, useStarred, TopBar, CommandPalette, CommandPaletteProvider, GlobalFilterProvider, ToastProvider, BreadcrumbProvider, useCommandPalette, useGlobalFilters, StatusDot } from '@cameleer/design-system'; +import type { SidebarTreeNode, SearchResult } from '@cameleer/design-system'; +import { Box, Cpu, GitBranch, Settings, FileText, ChevronRight } from 'lucide-react'; +import { useRouteCatalog } from '../api/queries/catalog'; +import { useAgents } from '../api/queries/agents'; +import { useSearchExecutions, useAttributeKeys } from '../api/queries/executions'; +import { useAuthStore } from '../auth/auth-store'; +import { useState, useMemo, useCallback, useEffect, useRef } from 'react'; +import { ContentTabs } from './ContentTabs'; +import { useScope } from '../hooks/useScope'; +import { buildAppTreeNodes, buildAgentTreeNodes, buildRouteTreeNodes, buildAdminTreeNodes, readCollapsed, writeCollapsed } from './sidebar-utils'; +import type { SidebarApp } from './sidebar-utils'; +``` + +- [ ] **Step 2: Remove the old `sidebarApps` builder and `healthToColor` function** + +Delete the `healthToColor` function (lines 12-19) and the `sidebarApps` useMemo block (lines 128-154). These are replaced by tree-building functions that produce `SidebarTreeNode[]` directly. + +- [ ] **Step 3: Add sidebar state management inside LayoutContent** + +Add these state and memo declarations inside `LayoutContent`, after the existing hooks (after `const { scope, setTab } = useScope();` around line 112): + +```typescript + // ── Sidebar state ────────────────────────────────────────────────────── + const [sidebarCollapsed, setSidebarCollapsed] = useState(false); + const [filterQuery, setFilterQuery] = useState(''); + const { starredIds, isStarred, toggleStar } = useStarred(); + + // Section collapse states — persisted to localStorage + const [appsOpen, setAppsOpen] = useState(() => !readCollapsed('cameleer:sidebar:apps-collapsed', false)); + const [agentsOpen, setAgentsOpen] = useState(() => !readCollapsed('cameleer:sidebar:agents-collapsed', false)); + const [routesOpen, setRoutesOpen] = useState(() => !readCollapsed('cameleer:sidebar:routes-collapsed', true)); + const [adminOpen, setAdminOpen] = useState(false); + + const toggleApps = useCallback(() => { + setAppsOpen((v) => { writeCollapsed('cameleer:sidebar:apps-collapsed', v); return !v; }); + }, []); + const toggleAgents = useCallback(() => { + setAgentsOpen((v) => { writeCollapsed('cameleer:sidebar:agents-collapsed', v); return !v; }); + }, []); + const toggleRoutes = useCallback(() => { + setRoutesOpen((v) => { writeCollapsed('cameleer:sidebar:routes-collapsed', v); return !v; }); + }, []); + const toggleAdmin = useCallback(() => setAdminOpen((v) => !v), []); + + // Accordion: entering admin expands admin, collapses operational sections + const prevOpsState = useRef<{ apps: boolean; agents: boolean; routes: boolean } | null>(null); + + useEffect(() => { + if (isAdminPage) { + // Save current operational state, then collapse all, expand admin + prevOpsState.current = { apps: appsOpen, agents: agentsOpen, routes: routesOpen }; + setAppsOpen(false); + setAgentsOpen(false); + setRoutesOpen(false); + setAdminOpen(true); + } else if (prevOpsState.current) { + // Restore operational state, collapse admin + setAppsOpen(prevOpsState.current.apps); + setAgentsOpen(prevOpsState.current.agents); + setRoutesOpen(prevOpsState.current.routes); + setAdminOpen(false); + prevOpsState.current = null; + } + }, [isAdminPage]); // eslint-disable-line react-hooks/exhaustive-deps + + // Build tree nodes from catalog data + const statusDot = useCallback((health: string) => , []); + const chevronIcon = useMemo(() => , []); + + const appNodes = useMemo( + () => buildAppTreeNodes(catalog ? [...catalog].sort((a: any, b: any) => a.appId.localeCompare(b.appId)).map((app: any) => ({ + id: app.appId, + name: app.appId, + health: app.health as 'live' | 'stale' | 'dead', + exchangeCount: app.exchangeCount, + routes: [...(app.routes || [])].sort((a: any, b: any) => a.routeId.localeCompare(b.routeId)).map((r: any) => ({ + id: r.routeId, name: r.routeId, exchangeCount: r.exchangeCount, + })), + agents: [...(app.agents || [])].sort((a: any, b: any) => a.name.localeCompare(b.name)).map((a: any) => ({ + id: a.id, name: a.name, status: a.status as 'live' | 'stale' | 'dead', tps: a.tps ?? 0, + })), + })) : [], statusDot, chevronIcon), + [catalog, statusDot, chevronIcon], + ); + + const agentNodes = useMemo( + () => buildAgentTreeNodes(catalog ? [...catalog].sort((a: any, b: any) => a.appId.localeCompare(b.appId)).map((app: any) => ({ + id: app.appId, + name: app.appId, + health: app.health as 'live' | 'stale' | 'dead', + exchangeCount: app.exchangeCount, + routes: [], + agents: [...(app.agents || [])].sort((a: any, b: any) => a.name.localeCompare(b.name)).map((a: any) => ({ + id: a.id, name: a.name, status: a.status as 'live' | 'stale' | 'dead', tps: a.tps ?? 0, + })), + })) : [], statusDot), + [catalog, statusDot], + ); + + const routeNodes = useMemo( + () => buildRouteTreeNodes(catalog ? [...catalog].sort((a: any, b: any) => a.appId.localeCompare(b.appId)).map((app: any) => ({ + id: app.appId, + name: app.appId, + health: app.health as 'live' | 'stale' | 'dead', + exchangeCount: app.exchangeCount, + routes: [...(app.routes || [])].sort((a: any, b: any) => a.routeId.localeCompare(b.routeId)).map((r: any) => ({ + id: r.routeId, name: r.routeId, exchangeCount: r.exchangeCount, + })), + agents: [], + })) : [], statusDot, chevronIcon), + [catalog, statusDot, chevronIcon], + ); + + const adminNodes = useMemo(() => buildAdminTreeNodes(), []); + + // Sidebar reveal from Cmd-K navigation + const sidebarRevealPath = (location.state as { sidebarReveal?: string } | null)?.sidebarReveal ?? null; + + useEffect(() => { + if (!sidebarRevealPath) return; + if (sidebarRevealPath.startsWith('/apps') && !appsOpen) setAppsOpen(true); + if (sidebarRevealPath.startsWith('/agents') && !agentsOpen) setAgentsOpen(true); + if (sidebarRevealPath.startsWith('/routes') && !routesOpen) setRoutesOpen(true); + if (sidebarRevealPath.startsWith('/admin') && !adminOpen) setAdminOpen(true); + }, [sidebarRevealPath]); // eslint-disable-line react-hooks/exhaustive-deps + + const effectiveSelectedPath = sidebarRevealPath ?? location.pathname; +``` + +- [ ] **Step 4: Replace the return block's sidebar composition** + +Replace the `return` statement in `LayoutContent` (starting from `return (` to the closing `);`). The key change is replacing `` with the compound Sidebar: + +```typescript + return ( + setSidebarCollapsed((v) => !v)} + searchValue={filterQuery} + onSearchChange={setFilterQuery} + > + } + title="cameleer" + version="v3.2.1" + onClick={() => handleSidebarNavigate('/apps')} + /> + + {isAdminPage && ( + } + open={adminOpen} + onToggle={toggleAdmin} + active={location.pathname.startsWith('/admin')} + > + + + )} + + } + open={appsOpen} + onToggle={() => { toggleApps(); if (isAdminPage) handleSidebarNavigate('/apps'); }} + active={effectiveSelectedPath.startsWith('/apps') || effectiveSelectedPath.startsWith('/exchanges') || effectiveSelectedPath.startsWith('/dashboard')} + > + + + + } + open={agentsOpen} + onToggle={() => { toggleAgents(); if (isAdminPage) handleSidebarNavigate('/agents'); }} + active={effectiveSelectedPath.startsWith('/agents') || effectiveSelectedPath.startsWith('/runtime')} + > + + + + } + open={routesOpen} + onToggle={() => { toggleRoutes(); if (isAdminPage) handleSidebarNavigate('/routes'); }} + active={effectiveSelectedPath.startsWith('/routes')} + > + + + + {!isAdminPage && ( + } + open={adminOpen} + onToggle={() => { toggleAdmin(); handleSidebarNavigate('/admin'); }} + active={false} + > + + + )} + + + } + label="API Docs" + onClick={() => handleSidebarNavigate('/api-docs')} + active={location.pathname === '/api-docs'} + /> + + + } + > + + setPaletteOpen(false)} + onOpen={() => setPaletteOpen(true)} + onSelect={handlePaletteSelect} + onSubmit={handlePaletteSubmit} + onQueryChange={setPaletteQuery} + data={searchData} + /> + + {!isAdminPage && ( + + )} + +
+ +
+ + ); +``` + +Note: The Admin section renders in two positions — at the top when `isAdminPage` (accordion mode), at the bottom when not (collapsed section). Only one renders at a time via the conditional. + +- [ ] **Step 5: Verify it compiles** + +Run: `cd C:/Users/Hendrik/Documents/projects/cameleer3-server/ui && npx tsc --project tsconfig.app.json --noEmit` +Expected: No errors. If `StatusDot` is not exported from the DS, check the exact export name with `grep -r "StatusDot" ui/node_modules/@cameleer/design-system/dist/index.es.d.ts`. + +- [ ] **Step 6: Commit** + +```bash +git add ui/src/components/LayoutShell.tsx +git commit -m "feat(#112): migrate to composable sidebar with accordion and collapse" +``` + +--- + +### Task 3: Simplify AdminLayout + +**Files:** +- Modify: `ui/src/pages/Admin/AdminLayout.tsx` + +The sidebar now handles admin sub-page navigation, so `AdminLayout` no longer needs its own ``. + +- [ ] **Step 1: Rewrite AdminLayout** + +Replace the full contents of `AdminLayout.tsx`: + +```typescript +import { Outlet } from 'react-router'; + +export default function AdminLayout() { + return ( +
+ +
+ ); +} +``` + +- [ ] **Step 2: Verify it compiles** + +Run: `cd C:/Users/Hendrik/Documents/projects/cameleer3-server/ui && npx tsc --project tsconfig.app.json --noEmit` +Expected: No errors. + +- [ ] **Step 3: Commit** + +```bash +git add ui/src/pages/Admin/AdminLayout.tsx +git commit -m "feat(#112): remove admin tabs, sidebar handles navigation" +``` + +--- + +### Task 4: Visual Verification + +- [ ] **Step 1: Verify operational mode** + +Open `http://localhost:5173/exchanges` and verify: +1. Sidebar shows all 4 sections: Applications, Agents, Routes, Admin +2. Applications and Agents expanded by default, Routes and Admin collapsed +3. Sidebar search filters tree items +4. Clicking an app navigates to the exchanges page for that app +5. TopBar, ContentTabs, CommandPalette all work normally +6. Star/unstar items work + +- [ ] **Step 2: Verify sidebar collapse** + +Click the `<<` toggle in sidebar header: +1. Sidebar collapses to ~48px icon rail +2. Section icons visible (Box, Cpu, GitBranch, Settings) +3. Footer link icon visible (FileText for API Docs) +4. Click any section icon — sidebar expands and that section opens + +- [ ] **Step 3: Verify admin accordion** + +Navigate to `/admin/rbac` (click Admin section in sidebar or navigate directly): +1. Admin section appears at top of sidebar, expanded, showing 6 sub-pages +2. Applications, Agents, Routes sections are collapsed to single-line headers +3. Admin sub-page items show active highlighting for current page +4. No admin tabs visible in content area (just content with padding) +5. Clicking between admin sub-pages (e.g., Audit Log, OIDC) works via sidebar + +- [ ] **Step 4: Verify leaving admin** + +From an admin page, click "Applications" section header: +1. Navigates to `/exchanges` (or last operational tab) +2. Admin section collapses +3. Operational sections restore their previous open/closed states + +- [ ] **Step 5: Final commit if any fixes were needed** + +```bash +git add -A +git commit -m "fix(#112): sidebar migration adjustments from visual review" +```