From e7eda7a7b3db7971c941e43422f5b96e651a12af Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:36:22 +0200 Subject: [PATCH 01/15] docs: add ClickHouse migration design and append-only protocol spec Design for replacing PostgreSQL/TimescaleDB + OpenSearch with ClickHouse OSS. Covers table schemas, ingestion pipeline (ExecutionAccumulator), ngram search indexes, materialized views, multitenancy, and retention. Companion doc proposes append-only execution protocol for the agent repo. Co-Authored-By: Claude Opus 4.6 (1M context) --- ...26-03-31-append-only-execution-protocol.md | 146 +++ .../2026-03-31-clickhouse-migration-design.md | 916 ++++++++++++++++++ 2 files changed, 1062 insertions(+) create mode 100644 docs/superpowers/specs/2026-03-31-append-only-execution-protocol.md create mode 100644 docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md diff --git a/docs/superpowers/specs/2026-03-31-append-only-execution-protocol.md b/docs/superpowers/specs/2026-03-31-append-only-execution-protocol.md new file mode 100644 index 00000000..431f1c00 --- /dev/null +++ b/docs/superpowers/specs/2026-03-31-append-only-execution-protocol.md @@ -0,0 +1,146 @@ +# Append-Only Execution Data Protocol + +A reference document for redesigning the Cameleer agent's data reporting to be append-only, +eliminating the need for upserts in the storage layer. + +## Problem + +The current protocol sends execution data in two phases: + +1. **RUNNING phase**: Agent sends a partial record when a route starts executing (execution_id, route_id, start_time, status=RUNNING). No bodies, no duration, no error info. +2. **COMPLETED/FAILED phase**: Agent sends an enriched record when execution finishes (duration, output body, headers, errors, processor tree). + +The server uses `INSERT ... ON CONFLICT DO UPDATE SET COALESCE(...)` to merge these into a single row. This works in PostgreSQL but creates problems for append-only stores like ClickHouse, Kafka topics, or any event-sourced architecture. + +### Why This Matters + +- **ClickHouse**: No native upsert. Must use ReplacingMergeTree (eventual consistency, FINAL overhead) or application-side buffering. +- **Event streaming**: Kafka/Pulsar topics are append-only. Two-phase lifecycle requires a stateful stream processor to merge. +- **Data lakes**: Parquet files are immutable. Updates require read-modify-write of entire files. +- **Materialized views**: Insert-triggered aggregations (ClickHouse MVs, Kafka Streams, Flink) double-count if they see both RUNNING and COMPLETED inserts for the same execution. + +## Proposed Protocol Change + +### Option A: Single-Phase Reporting (Recommended) + +The agent buffers the execution locally and sends a **single, complete record** only when the execution reaches a terminal state (COMPLETED or FAILED). + +``` +Current: Agent -> [RUNNING] -> Server -> [COMPLETED] -> Server (upsert) +Proposed: Agent -> [buffer locally] -> [COMPLETED with all fields] -> Server (append) +``` + +**What changes in the agent:** +- `RouteExecutionTracker` holds in-flight executions in a local `ConcurrentHashMap` +- On route start: create tracker entry with start_time, route_id, etc. +- On route complete: enrich tracker entry with duration, bodies, errors, processor tree +- On report: send the complete record in one HTTP POST +- On timeout (configurable, e.g., 5 minutes): flush as RUNNING (for visibility of stuck routes) + +**What changes in the server:** +- Storage becomes pure append: `INSERT INTO executions VALUES (...)` — no upsert, no COALESCE +- No `SearchIndexer` / `ExecutionAccumulator` needed — the server just writes what it receives +- Materialized views count correctly (one insert = one execution) +- Works with any append-only store (ClickHouse, Kafka, S3/Parquet) + +**Trade-offs:** +- RUNNING executions are not visible on the server until they complete (or timeout-flush) +- "Active execution count" must come from agent heartbeat/registry data, not from stored RUNNING rows +- If the agent crashes, in-flight executions are lost (same as current behavior — RUNNING rows become orphans anyway) + +### Option B: Event Log with Reconstruction + +Send both phases as separate **events** (not records), and let the server reconstruct the current state. + +``` +Event 1: {type: "EXECUTION_STARTED", executionId: "abc", startTime: ..., routeId: ...} +Event 2: {type: "EXECUTION_COMPLETED", executionId: "abc", duration: 250, outputBody: ..., processors: [...]} +``` + +**Server-side:** +- Store raw events in an append-only log table +- Reconstruct current state via `SELECT argMax(field, event_time) FROM events WHERE execution_id = ? GROUP BY execution_id` +- Or: use a materialized view with `AggregatingMergeTree` + `argMaxState` to maintain a "latest state" table + +**Trade-offs:** +- More complex server-side reconstruction +- Higher storage (two rows per execution instead of one) +- More flexible: supports any number of state transitions (RUNNING -> PAUSED -> RUNNING -> COMPLETED) +- Natural fit for event sourcing architectures + +### Option C: Hybrid (Current Cameleer3-Server Approach) + +Keep the two-phase protocol but handle merging at the server application layer. This is what cameleer3-server implements today with the `ExecutionAccumulator`: + +- RUNNING POST -> hold in `ConcurrentHashMap` (no DB write) +- COMPLETED POST -> merge with RUNNING in-memory -> single INSERT to DB +- Timeout sweep -> flush stale RUNNING entries for visibility + +**Trade-offs:** +- No agent changes required +- Server must be stateful (in-memory accumulator) +- Crash window: active executions lost if server restarts +- Adds complexity to the server that wouldn't exist with Option A + +## Recommendation + +**Option A (single-phase reporting)** is the strongest choice for a new protocol version: + +1. **Simplest server implementation**: Pure append, no state, no merging +2. **Works everywhere**: ClickHouse, Kafka, S3, any append-only store +3. **Correct by construction**: MVs, aggregations, and stream processing all see one event per execution +4. **Agent is the natural place to buffer**: The agent already tracks in-flight executions for instrumentation — it just needs to hold the report until completion +5. **Minimal data loss risk**: Agent crash loses in-flight data regardless of protocol — this doesn't make it worse + +### Migration Strategy + +1. Add `protocol_version` field to agent registration +2. v1 agents: server uses `ExecutionAccumulator` (current behavior) +3. v2 agents: server does pure append (no accumulator needed for v2 data) +4. Both can coexist — the server checks protocol version per agent + +### Fields for Single-Phase Record + +The complete record sent by a v2 agent: + +```json +{ + "executionId": "uuid", + "routeId": "myRoute", + "agentId": "agent-1", + "applicationName": "my-app", + "correlationId": "corr-123", + "exchangeId": "exchange-456", + "status": "COMPLETED", + "startTime": "2026-03-31T10:00:00.000Z", + "endTime": "2026-03-31T10:00:00.250Z", + "durationMs": 250, + "errorMessage": null, + "errorStackTrace": null, + "errorType": null, + "errorCategory": null, + "rootCauseType": null, + "rootCauseMessage": null, + "inputSnapshot": {"body": "...", "headers": {"Content-Type": "application/json"}}, + "outputSnapshot": {"body": "...", "headers": {"Content-Type": "application/xml"}}, + "attributes": {"key": "value"}, + "traceId": "otel-trace-id", + "spanId": "otel-span-id", + "replayExchangeId": null, + "processors": [ + { + "processorId": "proc-1", + "processorType": "to", + "status": "COMPLETED", + "startTime": "...", + "endTime": "...", + "durationMs": 120, + "inputBody": "...", + "outputBody": "...", + "children": [] + } + ] +} +``` + +All fields populated. No second POST needed. Server does a single INSERT. diff --git a/docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md b/docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md new file mode 100644 index 00000000..6fb29e8d --- /dev/null +++ b/docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md @@ -0,0 +1,916 @@ +# ClickHouse Migration Design + +Replace PostgreSQL/TimescaleDB + OpenSearch with ClickHouse OSS for all observability data. +PostgreSQL retained only for RBAC, config, and audit log. + +## Context + +Cameleer3-server currently uses three storage systems: + +- **PostgreSQL/TimescaleDB**: executions, processor_executions, agent_metrics (hypertables), agent_events, route_diagrams, plus RBAC/config/audit tables. Continuous aggregates for dashboard statistics. +- **OpenSearch**: executions-YYYY-MM-DD indices (full-text search on bodies/headers/errors), logs-YYYY-MM-DD indices (application log storage with 7-day retention). +- **Dual-write pattern**: PG is source of truth, OpenSearch is async-indexed via debounced `SearchIndexer`. + +This architecture has scaling limits: three systems to operate, data duplication between PG and OpenSearch, TimescaleDB continuous aggregates with limited flexibility, and no multitenancy support. + +**Goal**: Consolidate to ClickHouse OSS (self-hosted) for all observability data. Add multitenancy with custom per-tenant, per-document-type retention. Support billions of documents, terabytes of data, sub-second wildcard search. + +## Decisions + +| Decision | Choice | Rationale | +|----------|--------|-----------| +| Deployment | Self-hosted ClickHouse OSS on k3s | All needed features available in OSS. Fits existing infra. | +| Execution lifecycle | Approach B: Application-side accumulator | Merges RUNNING+COMPLETED in-memory, writes one row. Avoids upsert problem. | +| Table engine (executions) | ReplacingMergeTree | Handles rare late corrections via version column. Normal flow writes once. | +| Table engine (all others) | MergeTree | Append-only data, no dedup needed. | +| Client | JDBC + JdbcTemplate | Familiar pattern, matches current PG code. Async inserts via JDBC URL settings. | +| Multitenancy | Shared tables + tenant_id column | Row policies for defense-in-depth. Application-layer WHERE for primary enforcement. | +| Retention | Application-driven scheduler | Per-tenant, per-document-type. Config in PG, execution via ALTER TABLE DELETE. | +| Search | Ngram bloom filter indexes | Sub-second wildcard search. Materialized `_search_text` column for cross-field search. | +| Highlighting | Application-side in Java | Extract 120-char fragment around match from returned fields. | +| Storage tiering | Local SSD only (initially) | S3/MinIO tiering can be added later via TTL MOVE rules. | + +## ClickHouse OSS Constraints + +These are features NOT available in the open-source version: + +| Constraint | Impact on Cameleer3 | +|------------|---------------------| +| No SharedMergeTree | No elastic compute scaling; must size nodes up-front. Acceptable for self-hosted. | +| No BM25 relevance scoring | Search returns matches without ranking. Acceptable for observability (want all matches, not ranked). | +| No search highlighting | Replaced by application-side highlighting in Java. | +| No fuzzy/typo-tolerant search | Must match exact tokens or use ngram index for substring match. Acceptable. | +| No ClickPipes | Must build own ingestion pipeline. Already exists (agents push via HTTP POST). | +| No managed backups | Must configure `clickhouse-backup` (Altinity, open-source) or built-in BACKUP SQL. | +| No auto-scaling | Manual capacity planning. Single node handles 14+ TiB, sufficient for initial scale. | + +General ClickHouse constraints (apply to both OSS and Cloud): + +| Constraint | Mitigation | +|------------|------------| +| ORDER BY is immutable | Careful upfront schema design. Documented below. | +| No transactions | Single-table INSERT atomic per block. No cross-table atomicity needed. | +| Mutations are expensive | Avoid ALTER UPDATE/DELETE. Use ReplacingMergeTree for corrections, append-only for everything else. | +| Row policies skip mutations | Application-layer WHERE on mutations. Mutations are rare (retention scheduler only). | +| No JPA/Hibernate | Use JdbcTemplate (already the pattern for PG). | +| JSON max_dynamic_paths | Store attributes as flattened String, not JSON type. Use ngram index for search. | +| Text indexes can't index JSON subcolumns | Extract searchable text into materialized String columns. | +| MVs only process new inserts | Historical data backfill writes through MV pipeline. | +| MV errors block source inserts | Careful MV design. Test thoroughly before production. | +| ReplacingMergeTree eventual consistency | Use FINAL on queries that need latest version. | + +## What Stays in PostgreSQL + +| Table | Reason | +|-------|--------| +| `users`, `roles`, `groups`, `user_groups`, `user_roles`, `group_roles` | RBAC with relational joins, foreign keys, transactions | +| `server_config` | Global config, low volume, needs transactions | +| `application_config` | Per-app observability settings | +| `app_settings` | Per-app SLA thresholds | +| `audit_log` | Security compliance, needs transactions, joins with RBAC tables | +| OIDC config | Auth provider config | +| `tenant_retention_config` (new) | Per-tenant retention settings, referenced by scheduler | + +## What Moves to ClickHouse + +| Data | Current Location | ClickHouse Table | +|------|-----------------|------------------| +| Route executions | PG `executions` hypertable + OpenSearch `executions-*` | `executions` | +| Processor executions | PG `processor_executions` hypertable | `processor_executions` | +| Agent metrics | PG `agent_metrics` hypertable | `agent_metrics` | +| Agent events | PG `agent_events` | `agent_events` | +| Route diagrams | PG `route_diagrams` | `route_diagrams` | +| Application logs | OpenSearch `logs-*` | `logs` | +| Dashboard statistics | PG continuous aggregates (`stats_1m_*`) | ClickHouse materialized views (`stats_1m_*`) | + +## Table Schemas + +### executions + +```sql +CREATE TABLE executions ( + tenant_id LowCardinality(String), + execution_id String, + start_time DateTime64(3), + _version UInt64 DEFAULT 1, + route_id LowCardinality(String), + agent_id LowCardinality(String), + application_name LowCardinality(String), + status LowCardinality(String), + correlation_id String DEFAULT '', + exchange_id String DEFAULT '', + end_time Nullable(DateTime64(3)), + duration_ms Nullable(Int64), + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + error_type LowCardinality(String) DEFAULT '', + error_category LowCardinality(String) DEFAULT '', + root_cause_type String DEFAULT '', + root_cause_message String DEFAULT '', + diagram_content_hash String DEFAULT '', + engine_level LowCardinality(String) DEFAULT '', + input_body String DEFAULT '', + output_body String DEFAULT '', + input_headers String DEFAULT '', + output_headers String DEFAULT '', + attributes String DEFAULT '', + trace_id String DEFAULT '', + span_id String DEFAULT '', + processors_json String DEFAULT '', + has_trace_data Bool DEFAULT false, + is_replay Bool DEFAULT false, + + _search_text String MATERIALIZED + concat(error_message, ' ', error_stacktrace, ' ', attributes, + ' ', input_body, ' ', output_body, ' ', input_headers, + ' ', output_headers, ' ', root_cause_message), + + INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_error error_message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_bodies concat(input_body, ' ', output_body) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_headers concat(input_headers, ' ', output_headers) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_status status TYPE set(10) GRANULARITY 1, + INDEX idx_corr correlation_id TYPE bloom_filter(0.01) GRANULARITY 4 +) +ENGINE = ReplacingMergeTree(_version) +PARTITION BY (tenant_id, toYYYYMM(start_time)) +ORDER BY (tenant_id, start_time, application_name, route_id, execution_id) +TTL start_time + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; +``` + +Design rationale: +- **ORDER BY** `(tenant_id, start_time, application_name, route_id, execution_id)`: Matches UI query pattern (tenant -> time range -> app -> route). Time before application because observability queries almost always include a time range. +- **PARTITION BY** `(tenant_id, toYYYYMM(start_time))`: Enables per-tenant partition drops for retention. Monthly granularity balances partition count vs drop efficiency. +- **ReplacingMergeTree(_version)**: Normal flow writes once (version 1). Late corrections write version 2+. Background merges keep latest version. +- **`_search_text` materialized column**: Computed at insert time. Concatenates all searchable fields for cross-field wildcard search. +- **`ngrambf_v1(3, 256, 2, 0)`**: 3-char ngrams in a 256-byte bloom filter with 2 hash functions. Prunes most granules for `LIKE '%term%'` queries. The bloom filter size (256 bytes) is a starting point — increase to 4096-8192 if false positive rates are too high for long text fields. Tune after benchmarking with real data. +- **`LowCardinality(String)`**: Dictionary encoding for columns with few distinct values. Major compression improvement. +- **TTL 365 days**: Safety net. Application-driven scheduler handles per-tenant retention at finer granularity. + +### processor_executions + +```sql +CREATE TABLE processor_executions ( + tenant_id LowCardinality(String), + execution_id String, + processor_id String, + start_time DateTime64(3), + route_id LowCardinality(String), + application_name LowCardinality(String), + processor_type LowCardinality(String), + parent_processor_id String DEFAULT '', + depth UInt16 DEFAULT 0, + status LowCardinality(String), + end_time Nullable(DateTime64(3)), + duration_ms Nullable(Int64), + error_message String DEFAULT '', + error_stacktrace String DEFAULT '', + error_type LowCardinality(String) DEFAULT '', + error_category LowCardinality(String) DEFAULT '', + root_cause_type String DEFAULT '', + root_cause_message String DEFAULT '', + input_body String DEFAULT '', + output_body String DEFAULT '', + input_headers String DEFAULT '', + output_headers String DEFAULT '', + attributes String DEFAULT '', + loop_index Nullable(Int32), + loop_size Nullable(Int32), + split_index Nullable(Int32), + split_size Nullable(Int32), + multicast_index Nullable(Int32), + resolved_endpoint_uri String DEFAULT '', + error_handler_type LowCardinality(String) DEFAULT '', + circuit_breaker_state LowCardinality(String) DEFAULT '', + fallback_triggered Bool DEFAULT false, + + _search_text String MATERIALIZED + concat(error_message, ' ', error_stacktrace, ' ', attributes, + ' ', input_body, ' ', output_body, ' ', input_headers, ' ', output_headers), + + INDEX idx_search _search_text TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_exec_id execution_id TYPE bloom_filter(0.01) GRANULARITY 4 +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(start_time)) +ORDER BY (tenant_id, start_time, application_name, route_id, execution_id, processor_id) +TTL start_time + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; +``` + +### logs + +```sql +CREATE TABLE logs ( + tenant_id LowCardinality(String), + timestamp DateTime64(3), + application LowCardinality(String), + agent_id LowCardinality(String), + level LowCardinality(String), + logger_name LowCardinality(String) DEFAULT '', + message String, + thread_name LowCardinality(String) DEFAULT '', + stack_trace String DEFAULT '', + exchange_id String DEFAULT '', + mdc Map(String, String) DEFAULT map(), + + INDEX idx_msg message TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_stack stack_trace TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4, + INDEX idx_level level TYPE set(10) GRANULARITY 1 +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(timestamp)) +ORDER BY (tenant_id, application, timestamp) +TTL timestamp + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; +``` + +### agent_metrics + +```sql +CREATE TABLE agent_metrics ( + tenant_id LowCardinality(String), + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(collected_at)) +ORDER BY (tenant_id, agent_id, metric_name, collected_at) +TTL collected_at + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; +``` + +### agent_events + +```sql +CREATE TABLE agent_events ( + tenant_id LowCardinality(String), + timestamp DateTime64(3) DEFAULT now64(3), + agent_id LowCardinality(String), + app_id LowCardinality(String), + event_type LowCardinality(String), + detail String DEFAULT '' +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(timestamp)) +ORDER BY (tenant_id, app_id, agent_id, timestamp) +TTL timestamp + INTERVAL 365 DAY DELETE; +``` + +### route_diagrams + +```sql +CREATE TABLE route_diagrams ( + tenant_id LowCardinality(String), + content_hash String, + route_id LowCardinality(String), + agent_id LowCardinality(String), + application_name LowCardinality(String), + definition String, + created_at DateTime64(3) DEFAULT now64(3) +) +ENGINE = ReplacingMergeTree(created_at) +ORDER BY (tenant_id, content_hash) +SETTINGS index_granularity = 8192; +``` + +## Materialized Views (Stats) + +Replace TimescaleDB continuous aggregates. ClickHouse MVs trigger on INSERT and store aggregate states in target tables. + +### stats_1m_all (global) + +```sql +CREATE TABLE stats_1m_all ( + tenant_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW stats_1m_all_mv TO stats_1m_all AS +SELECT + tenant_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, bucket; +``` + +### stats_1m_app (per-application) + +```sql +CREATE TABLE stats_1m_app ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW stats_1m_app_mv TO stats_1m_app AS +SELECT + tenant_id, + application_name, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_name, bucket; +``` + +### stats_1m_route (per-route) + +```sql +CREATE TABLE stats_1m_route ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + route_id LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + running_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, route_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW stats_1m_route_mv TO stats_1m_route AS +SELECT + tenant_id, + application_name, + route_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + countIfState(status = 'RUNNING') AS running_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM executions +GROUP BY tenant_id, application_name, route_id, bucket; +``` + +### stats_1m_processor (per-processor-type) + +```sql +CREATE TABLE stats_1m_processor ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + processor_type LowCardinality(String), + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, processor_type, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW stats_1m_processor_mv TO stats_1m_processor AS +SELECT + tenant_id, + application_name, + processor_type, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_name, processor_type, bucket; +``` + +### stats_1m_processor_detail (per-processor-id) + +```sql +CREATE TABLE stats_1m_processor_detail ( + tenant_id LowCardinality(String), + application_name LowCardinality(String), + route_id LowCardinality(String), + processor_id String, + bucket DateTime, + total_count AggregateFunction(count, UInt64), + failed_count AggregateFunction(countIf, UInt64, UInt8), + duration_sum AggregateFunction(sum, Nullable(Int64)), + duration_max AggregateFunction(max, Nullable(Int64)), + p99_duration AggregateFunction(quantile(0.99), Nullable(Int64)) +) +ENGINE = AggregatingMergeTree() +PARTITION BY (tenant_id, toYYYYMM(bucket)) +ORDER BY (tenant_id, application_name, route_id, processor_id, bucket) +TTL bucket + INTERVAL 365 DAY DELETE; + +CREATE MATERIALIZED VIEW stats_1m_processor_detail_mv TO stats_1m_processor_detail AS +SELECT + tenant_id, + application_name, + route_id, + processor_id, + toStartOfMinute(start_time) AS bucket, + countState() AS total_count, + countIfState(status = 'FAILED') AS failed_count, + sumState(duration_ms) AS duration_sum, + maxState(duration_ms) AS duration_max, + quantileState(0.99)(duration_ms) AS p99_duration +FROM processor_executions +GROUP BY tenant_id, application_name, route_id, processor_id, bucket; +``` + +## Ingestion Pipeline + +### Current Flow (replaced) + +``` +Agent POST -> IngestionService -> PostgresExecutionStore.upsert() -> PG + -> SearchIndexer (debounced 2s) -> reads from PG -> OpenSearch +``` + +### New Flow + +``` +Agent POST -> IngestionService -> ExecutionAccumulator + |-- RUNNING: ConcurrentHashMap (no DB write) + |-- COMPLETED/FAILED: merge with pending -> WriteBuffer + '-- Timeout sweep (60s): flush stale -> WriteBuffer + | + ClickHouseExecutionStore.insertBatch() + ClickHouseProcessorStore.insertBatch() +``` + +### ExecutionAccumulator + +New component replacing `SearchIndexer`. Core responsibilities: + +1. **On RUNNING POST**: Store `PendingExecution` in `ConcurrentHashMap` keyed by `execution_id`. Return 200 OK immediately. No database write. + +2. **On COMPLETED/FAILED POST**: Look up pending RUNNING by `execution_id`. If found, merge fields using the same COALESCE logic currently in `PostgresExecutionStore.upsert()`. Produce a complete `MergedExecution` and push to `WriteBuffer`. If not found (race condition or RUNNING already flushed by timeout), write COMPLETED directly with `_version=2`. + +3. **Timeout sweep** (scheduled every 60s): Scan for RUNNING entries older than 5 minutes. Flush them to ClickHouse as-is with status=RUNNING, making them visible in the UI. When COMPLETED eventually arrives, it writes with `_version=2` (ReplacingMergeTree deduplicates). + +4. **Late corrections**: If a correction arrives for an already-written execution, insert with `_version` incremented. ReplacingMergeTree handles deduplication. + +### WriteBuffer + +Reuse the existing `WriteBuffer` pattern (bounded queue, configurable batch size, scheduled drain): + +- Buffer capacity: 50,000 items +- Batch size: 5,000 per flush +- Flush interval: 1 second +- Separate buffers for executions and processor_executions (independent batch inserts) +- Drain calls `ClickHouseExecutionStore.insertBatch()` using JDBC batch update + +### Logs Ingestion + +Direct batch INSERT, bypasses accumulator (logs are single-phase): + +``` +Agent POST /api/v1/data/logs -> LogIngestionController -> ClickHouseLogStore.insertBatch() +``` + +### Metrics Ingestion + +Existing `MetricsWriteBuffer` targets ClickHouse instead of PG: + +``` +Agent POST /api/v1/data/metrics -> MetricsController -> WriteBuffer -> ClickHouseMetricsStore.insertBatch() +``` + +### JDBC Batch Insert Pattern + +```java +jdbcTemplate.batchUpdate( + "INSERT INTO executions (tenant_id, execution_id, start_time, ...) VALUES (?, ?, ?, ...)", + batchArgs +); +``` + +JDBC URL includes `async_insert=1&wait_for_async_insert=0` for server-side buffering, preventing "too many parts" errors under high load. + +## Search Implementation + +### Query Translation + +Current OpenSearch bool queries map to ClickHouse SQL: + +```sql +-- Full-text wildcard search with time range, status filter, and pagination +SELECT * +FROM executions FINAL +WHERE tenant_id = {tenant_id:String} + AND start_time >= {time_from:DateTime64(3)} + AND start_time < {time_to:DateTime64(3)} + AND status IN ({statuses:Array(String)}) + AND ( + _search_text LIKE '%{search_term}%' + OR execution_id IN ( + SELECT DISTINCT execution_id + FROM processor_executions + WHERE tenant_id = {tenant_id:String} + AND start_time >= {time_from:DateTime64(3)} + AND start_time < {time_to:DateTime64(3)} + AND _search_text LIKE '%{search_term}%' + ) + ) +ORDER BY start_time DESC +LIMIT {limit:UInt32} OFFSET {offset:UInt32} +``` + +### Scoped Searches + +| Scope | ClickHouse WHERE clause | +|-------|------------------------| +| textInBody | `input_body LIKE '%term%' OR output_body LIKE '%term%'` | +| textInHeaders | `input_headers LIKE '%term%' OR output_headers LIKE '%term%'` | +| textInErrors | `error_message LIKE '%term%' OR error_stacktrace LIKE '%term%'` | +| global text | `_search_text LIKE '%term%'` (covers all fields) | + +All accelerated by `ngrambf_v1` indexes which prune 95%+ of data granules before scanning. + +### Application-Side Highlighting + +```java +public String extractHighlight(String text, String searchTerm, int contextChars) { + int idx = text.toLowerCase().indexOf(searchTerm.toLowerCase()); + if (idx < 0) return null; + int start = Math.max(0, idx - contextChars / 2); + int end = Math.min(text.length(), idx + searchTerm.length() + contextChars / 2); + return (start > 0 ? "..." : "") + + text.substring(start, end) + + (end < text.length() ? "..." : ""); +} +``` + +Returns the same `highlight` map structure the UI currently expects. + +### Nested Processor Search + +OpenSearch nested queries become a subquery on the `processor_executions` table: + +```sql +execution_id IN ( + SELECT DISTINCT execution_id + FROM processor_executions + WHERE tenant_id = ? AND start_time >= ? AND start_time < ? + AND _search_text LIKE '%term%' +) +``` + +This is evaluated once with ngram index acceleration, then joined via IN. + +## Stats Query Translation + +### TimescaleDB -> ClickHouse Query Patterns + +| TimescaleDB | ClickHouse | +|-------------|------------| +| `time_bucket('1 minute', bucket)` | `toStartOfInterval(bucket, INTERVAL 1 MINUTE)` | +| `SUM(total_count)` | `countMerge(total_count)` | +| `SUM(failed_count)` | `countIfMerge(failed_count)` | +| `approx_percentile(0.99, rollup(p99_duration))` | `quantileMerge(0.99)(p99_duration)` | +| `SUM(duration_sum) / SUM(total_count)` | `sumMerge(duration_sum) / countMerge(total_count)` | +| `MAX(duration_max)` | `maxMerge(duration_max)` | + +### Example: Timeseries Query + +```sql +SELECT + toStartOfInterval(bucket, INTERVAL {interval:UInt32} SECOND) AS period, + countMerge(total_count) AS total_count, + countIfMerge(failed_count) AS failed_count, + sumMerge(duration_sum) / countMerge(total_count) AS avg_duration, + quantileMerge(0.99)(p99_duration) AS p99_duration +FROM stats_1m_app +WHERE tenant_id = {tenant_id:String} + AND application_name = {app:String} + AND bucket >= {from:DateTime} + AND bucket < {to:DateTime} +GROUP BY period +ORDER BY period +``` + +### SLA and Top Errors + +SLA queries hit the raw `executions` table (need per-row duration filtering): + +```sql +SELECT + countIf(duration_ms <= {threshold:Int64} AND status != 'RUNNING') * 100.0 / count() AS sla_pct +FROM executions FINAL +WHERE tenant_id = ? AND application_name = ? AND start_time >= ? AND start_time < ? +``` + +Top errors query: + +```sql +SELECT + error_message, + count() AS error_count, + max(start_time) AS last_seen +FROM executions FINAL +WHERE tenant_id = ? AND status = 'FAILED' + AND start_time >= now() - INTERVAL 1 HOUR +GROUP BY error_message +ORDER BY error_count DESC +LIMIT 10 +``` + +## Multitenancy + +### Data Isolation + +**Primary**: Application-layer WHERE clause injection. Every ClickHouse query gets `WHERE tenant_id = ?` from the authenticated user's JWT claims. + +**Defense-in-depth**: ClickHouse row policies: + +```sql +-- Create a ClickHouse user per tenant +CREATE USER tenant_acme IDENTIFIED BY '...'; + +-- Row policy ensures tenant can only see their data +CREATE ROW POLICY tenant_acme_executions ON executions + FOR SELECT USING tenant_id = 'acme'; + +-- Repeat for all tables +``` + +### Tenant ID in Schema + +`tenant_id` is the first column in every table's ORDER BY and PARTITION BY. This ensures: +- Data for the same tenant is physically co-located on disk +- Queries filtering by tenant_id use the sparse index efficiently +- Partition drops for retention are scoped to individual tenants + +### Resource Quotas + +```sql +CREATE SETTINGS PROFILE tenant_limits + SETTINGS max_execution_time = 30, + max_rows_to_read = 100000000, + max_memory_usage = '4G'; + +ALTER USER tenant_acme SETTINGS PROFILE tenant_limits; +``` + +Prevents noisy neighbor problems where one tenant's expensive query affects others. + +## Retention + +### Strategy: Application-Driven Scheduler + +Per-tenant, per-document-type retention is too dynamic for static ClickHouse TTL rules. Instead: + +1. **Config table** in PostgreSQL: + +```sql +CREATE TABLE tenant_retention_config ( + tenant_id VARCHAR(255) NOT NULL, + document_type VARCHAR(50) NOT NULL, -- executions, logs, metrics, etc. + retention_days INT NOT NULL, + PRIMARY KEY (tenant_id, document_type) +); +``` + +2. **RetentionScheduler** (Spring `@Scheduled`, runs daily at 03:00 UTC): + +```java +@Scheduled(cron = "0 0 3 * * *") +public void enforceRetention() { + List configs = retentionConfigRepo.findAll(); + for (TenantRetention config : configs) { + String table = config.documentType(); // executions, logs, metrics, etc. + clickHouseJdbc.execute( + "ALTER TABLE " + table + " DELETE WHERE tenant_id = ? AND start_time < now() - INTERVAL ? DAY", + config.tenantId(), config.retentionDays() + ); + } +} +``` + +3. **Safety-net TTL**: Each table has a generous default TTL (365 days) as a backstop in case the scheduler fails. The scheduler handles the per-tenant granularity. + +4. **Partition-aligned drops**: Since `PARTITION BY (tenant_id, toYYYYMM(start_time))`, when all rows in a partition match the DELETE condition, ClickHouse drops the entire partition (fast, no rewrite). Enable `ttl_only_drop_parts=1` on tables. + +## Java/Spring Integration + +### Dependencies + +```xml + + com.clickhouse + clickhouse-jdbc + 0.7.x + all + +``` + +### Configuration + +```yaml +clickhouse: + url: jdbc:clickhouse://clickhouse:8123/cameleer?async_insert=1&wait_for_async_insert=0 + username: cameleer_app + password: ${CLICKHOUSE_PASSWORD} +``` + +### DataSource Bean + +```java +@Configuration +public class ClickHouseConfig { + @Bean + public DataSource clickHouseDataSource(ClickHouseProperties props) { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(props.getUrl()); + ds.setUsername(props.getUsername()); + ds.setPassword(props.getPassword()); + ds.setMaximumPoolSize(10); + return ds; + } + + @Bean + public JdbcTemplate clickHouseJdbcTemplate( + @Qualifier("clickHouseDataSource") DataSource ds) { + return new JdbcTemplate(ds); + } +} +``` + +### Interface Implementations + +Existing interfaces remain unchanged. New implementations: + +| Interface | Current Impl | New Impl | +|-----------|-------------|----------| +| `ExecutionStore` | `PostgresExecutionStore` | `ClickHouseExecutionStore` | +| `SearchIndex` | `OpenSearchIndex` | `ClickHouseSearchIndex` | +| `StatsStore` | `PostgresStatsStore` | `ClickHouseStatsStore` | +| `DiagramStore` | `PostgresDiagramStore` | `ClickHouseDiagramStore` | +| `MetricsStore` | `PostgresMetricsStore` | `ClickHouseMetricsStore` | +| (log search) | `OpenSearchLogIndex` | `ClickHouseLogStore` | +| (new) | `SearchIndexer` | `ExecutionAccumulator` | + +## Kubernetes Deployment + +### ClickHouse StatefulSet + +```yaml +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: clickhouse +spec: + serviceName: clickhouse + replicas: 1 # single node initially + template: + spec: + containers: + - name: clickhouse + image: clickhouse/clickhouse-server:26.2 + ports: + - containerPort: 8123 # HTTP + - containerPort: 9000 # Native + volumeMounts: + - name: data + mountPath: /var/lib/clickhouse + - name: config + mountPath: /etc/clickhouse-server/config.d + resources: + requests: + memory: "4Gi" + cpu: "2" + limits: + memory: "8Gi" + cpu: "4" + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 100Gi # NVMe/SSD +``` + +### Health Check + +```yaml +livenessProbe: + httpGet: + path: /ping + port: 8123 +readinessProbe: + httpGet: + path: /ping + port: 8123 +``` + +## Migration Path + +### Phase 1: Foundation + +- Add `clickhouse-jdbc` dependency +- Create `ClickHouseConfig` (DataSource, JdbcTemplate) +- Schema initialization (idempotent DDL scripts, not Flyway -- ClickHouse DDL is different enough) +- Implement `ClickHouseMetricsStore` (simplest table, validates pipeline) +- Deploy ClickHouse to k8s alongside existing PG+OpenSearch + +### Phase 2: Executions + Search + +- Build `ExecutionAccumulator` (replaces SearchIndexer) +- Implement `ClickHouseExecutionStore` and `ClickHouseProcessorStore` +- Implement `ClickHouseSearchIndex` (ngram-based SQL queries) +- Feature flag: dual-write to both PG and CH, read from PG + +### Phase 3: Stats & Analytics + +- Create MV definitions (all 5 stats views) +- Implement `ClickHouseStatsStore` +- Validate stats accuracy: compare CH vs PG continuous aggregates + +### Phase 4: Remaining Tables + +- `ClickHouseDiagramStore` (ReplacingMergeTree) +- `ClickHouseAgentEventStore` +- `ClickHouseLogStore` (replaces OpenSearchLogIndex) +- Application-side highlighting + +### Phase 5: Multitenancy + +- Tables already include `tenant_id` from Phase 1 (schema is forward-looking). This phase activates multitenancy. +- Wire `tenant_id` from JWT claims into all ClickHouse queries (application-layer WHERE injection) +- Add `tenant_id` to PostgreSQL RBAC/config tables +- Create ClickHouse row policies per tenant (defense-in-depth) +- Create `tenant_retention_config` table in PG and `RetentionScheduler` component +- Tenant user management and resource quotas in ClickHouse + +### Phase 6: Cutover + +- Backfill historical data from PG/OpenSearch to ClickHouse +- Switch read path to ClickHouse (feature flag) +- Validate end-to-end +- Remove OpenSearch dependency (POM, config, k8s manifests) +- Remove TimescaleDB extensions and hypertable-specific code +- Keep PostgreSQL for RBAC/config/audit only + +## Verification + +### Functional Verification + +1. **Ingestion**: Send executions via agent, verify they appear in ClickHouse with correct fields +2. **Two-phase lifecycle**: Send RUNNING, then COMPLETED. Verify single merged row in CH +3. **Search**: Wildcard search across bodies, headers, errors. Verify sub-second response +4. **Stats**: Dashboard statistics match expected values. Compare with PG aggregates during dual-write +5. **Logs**: Ingest log batches, query by app/level/time/text. Verify correctness +6. **Retention**: Configure per-tenant retention, run scheduler, verify expired data is deleted +7. **Multitenancy**: Two tenants, verify data isolation (one tenant cannot see another's data) + +### Performance Verification + +1. **Insert throughput**: 5K executions/batch at 1 flush/sec sustained +2. **Search latency**: Sub-second for `LIKE '%term%'` across 1M+ rows +3. **Stats query latency**: Dashboard stats in <100ms from materialized views +4. **Log search**: <1s for text search across 7 days of logs + +### Data Integrity + +1. During dual-write phase: compare row counts between PG and CH +2. After cutover: spot-check execution details, processor trees, search results From c77d8a7af0bfce8cb7f7f9cc261f23a6488f844d Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:43:14 +0200 Subject: [PATCH 02/15] docs: add Phase 1 implementation plan for ClickHouse migration 10-task TDD plan covering: CH dependency, config, schema init, ClickHouseMetricsStore, MetricsQueryStore interface extraction, ClickHouseMetricsQueryStore, feature flag wiring, k8s deployment, integration tests. Co-Authored-By: Claude Opus 4.6 (1M context) --- ...2026-03-31-clickhouse-phase1-foundation.md | 1193 +++++++++++++++++ 1 file changed, 1193 insertions(+) create mode 100644 docs/superpowers/plans/2026-03-31-clickhouse-phase1-foundation.md diff --git a/docs/superpowers/plans/2026-03-31-clickhouse-phase1-foundation.md b/docs/superpowers/plans/2026-03-31-clickhouse-phase1-foundation.md new file mode 100644 index 00000000..6e84d575 --- /dev/null +++ b/docs/superpowers/plans/2026-03-31-clickhouse-phase1-foundation.md @@ -0,0 +1,1193 @@ +# ClickHouse Migration Phase 1: Foundation + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add ClickHouse as a second data store alongside PostgreSQL, validated with the metrics pipeline end-to-end (write + read). Feature-flagged so either PG or CH serves metrics. + +**Architecture:** Add `clickhouse-jdbc` dependency, a second `DataSource`/`JdbcTemplate` bean qualified as `@ClickHouse`, an idempotent schema initializer that creates ClickHouse tables on startup, and a feature flag (`cameleer.storage.metrics=clickhouse|postgres`) that selects the active `MetricsStore` implementation. The metrics read path (`AgentMetricsController`) is refactored behind a new `MetricsQueryStore` interface so it too can be swapped. + +**Tech Stack:** ClickHouse JDBC 0.7.x, Spring Boot 3.4.3, JdbcTemplate, Testcontainers (clickhouse module) + +**Design Spec:** `docs/superpowers/specs/2026-03-31-clickhouse-migration-design.md` + +--- + +## File Map + +| Action | File | Responsibility | +|--------|------|----------------| +| Modify | `cameleer3-server-app/pom.xml` | Add clickhouse-jdbc + testcontainers-clickhouse dependencies | +| Modify | `cameleer3-server-app/src/main/resources/application.yml` | Add `clickhouse.*` and `cameleer.storage.metrics` config keys | +| Create | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java` | ClickHouse DataSource + JdbcTemplate beans | +| Create | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java` | Config properties class for `clickhouse.*` prefix | +| Create | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java` | Runs idempotent DDL on startup | +| Create | `cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql` | DDL for `agent_metrics` table | +| Create | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java` | `MetricsStore` impl writing to ClickHouse | +| Create | `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java` | Interface: query metrics time-series | +| Create | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java` | PG impl of MetricsQueryStore (extracted from controller) | +| Create | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java` | CH impl of MetricsQueryStore | +| Modify | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java` | Use MetricsQueryStore instead of raw JdbcTemplate | +| Modify | `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java` | Conditional beans for PG vs CH metrics stores | +| Create | `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java` | Unit test | +| Create | `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java` | Unit test | +| Create | `deploy/clickhouse.yaml` | K8s StatefulSet + Service for ClickHouse | +| Modify | `deploy/base/server.yaml` | Add CLICKHOUSE_URL env var | + +--- + +### Task 1: Add ClickHouse Dependencies + +**Files:** +- Modify: `cameleer3-server-app/pom.xml` + +- [ ] **Step 1: Add clickhouse-jdbc and testcontainers-clickhouse to app POM** + +In `cameleer3-server-app/pom.xml`, add these two dependencies. Place the runtime dependency after the opensearch dependencies (around line 59), and the test dependency after the opensearch-testcontainers dependency (around line 128): + +```xml + + + com.clickhouse + clickhouse-jdbc + 0.7.1-patch5 + all + + + + + org.testcontainers + clickhouse + test + +``` + +Note: The `all` classifier bundles the HTTP client, avoiding transitive dependency conflicts. The testcontainers `clickhouse` module version is managed by the `testcontainers.version` property in the parent POM. + +- [ ] **Step 2: Verify compilation** + +Run: `mvn clean compile -pl cameleer3-server-app -am` +Expected: BUILD SUCCESS + +- [ ] **Step 3: Commit** + +```bash +git add cameleer3-server-app/pom.xml +git commit -m "build: add clickhouse-jdbc and testcontainers-clickhouse dependencies" +``` + +--- + +### Task 2: ClickHouse Configuration + +**Files:** +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java` +- Modify: `cameleer3-server-app/src/main/resources/application.yml` + +- [ ] **Step 1: Create ClickHouseProperties** + +```java +package com.cameleer3.server.app.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "clickhouse") +public class ClickHouseProperties { + + private String url = "jdbc:clickhouse://localhost:8123/cameleer"; + private String username = "default"; + private String password = ""; + + public String getUrl() { return url; } + public void setUrl(String url) { this.url = url; } + + public String getUsername() { return username; } + public void setUsername(String username) { this.username = username; } + + public String getPassword() { return password; } + public void setPassword(String password) { this.password = password; } +} +``` + +- [ ] **Step 2: Create ClickHouseConfig** + +```java +package com.cameleer3.server.app.config; + +import com.zaxxer.hikari.HikariDataSource; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; + +@Configuration +@EnableConfigurationProperties(ClickHouseProperties.class) +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public class ClickHouseConfig { + + @Bean(name = "clickHouseDataSource") + public DataSource clickHouseDataSource(ClickHouseProperties props) { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(props.getUrl()); + ds.setUsername(props.getUsername()); + ds.setPassword(props.getPassword()); + ds.setMaximumPoolSize(10); + ds.setPoolName("clickhouse-pool"); + return ds; + } + + @Bean(name = "clickHouseJdbcTemplate") + public JdbcTemplate clickHouseJdbcTemplate( + @Qualifier("clickHouseDataSource") DataSource ds) { + return new JdbcTemplate(ds); + } +} +``` + +- [ ] **Step 3: Add configuration to application.yml** + +Add at the end of `application.yml`: + +```yaml +clickhouse: + enabled: ${CLICKHOUSE_ENABLED:false} + url: ${CLICKHOUSE_URL:jdbc:clickhouse://localhost:8123/cameleer?async_insert=1&wait_for_async_insert=0} + username: ${CLICKHOUSE_USERNAME:default} + password: ${CLICKHOUSE_PASSWORD:} + +cameleer: + storage: + metrics: ${CAMELEER_STORAGE_METRICS:postgres} +``` + +Note: The existing `cameleer:` block already has `body-size-limit` and `retention-days`. Add the `storage.metrics` key under it. + +- [ ] **Step 4: Verify compilation** + +Run: `mvn clean compile -pl cameleer3-server-app -am` +Expected: BUILD SUCCESS + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java +git add cameleer3-server-app/src/main/resources/application.yml +git commit -m "feat: add ClickHouse DataSource and JdbcTemplate configuration" +``` + +--- + +### Task 3: Schema Initializer + DDL Script + +**Files:** +- Create: `cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java` + +- [ ] **Step 1: Create DDL script** + +File: `cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql` + +```sql +CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(collected_at)) +ORDER BY (tenant_id, agent_id, metric_name, collected_at) +TTL collected_at + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; +``` + +- [ ] **Step 2: Create ClickHouseSchemaInitializer** + +```java +package com.cameleer3.server.app.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Comparator; + +@Component +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public class ClickHouseSchemaInitializer { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseSchemaInitializer.class); + + private final JdbcTemplate clickHouseJdbc; + + public ClickHouseSchemaInitializer( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + this.clickHouseJdbc = clickHouseJdbc; + } + + @EventListener(ApplicationReadyEvent.class) + public void initializeSchema() throws IOException { + PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); + Resource[] scripts = resolver.getResources("classpath:clickhouse/*.sql"); + + Arrays.sort(scripts, Comparator.comparing(Resource::getFilename)); + + for (Resource script : scripts) { + String sql = script.getContentAsString(StandardCharsets.UTF_8); + log.info("Executing ClickHouse schema script: {}", script.getFilename()); + for (String statement : sql.split(";")) { + String trimmed = statement.trim(); + if (!trimmed.isEmpty()) { + clickHouseJdbc.execute(trimmed); + } + } + } + + log.info("ClickHouse schema initialization complete ({} scripts)", scripts.length); + } +} +``` + +- [ ] **Step 3: Verify compilation** + +Run: `mvn clean compile -pl cameleer3-server-app -am` +Expected: BUILD SUCCESS + +- [ ] **Step 4: Commit** + +```bash +git add cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java +git commit -m "feat: add ClickHouse schema initializer with agent_metrics DDL" +``` + +--- + +### Task 4: ClickHouseMetricsStore (TDD) + +**Files:** +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java` + +- [ ] **Step 1: Write the failing test** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import com.zaxxer.hikari.HikariDataSource; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseMetricsStoreTest { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseMetricsStore store; + + @BeforeEach + void setUp() { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + jdbc.execute(""" + CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) + ) + ENGINE = MergeTree() + ORDER BY (tenant_id, agent_id, metric_name, collected_at) + """); + + jdbc.execute("TRUNCATE TABLE agent_metrics"); + + store = new ClickHouseMetricsStore(jdbc); + } + + @Test + void insertBatch_writesMetricsToClickHouse() { + List batch = List.of( + new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:00Z"), + "cpu.usage", 75.5, Map.of("host", "server-1")), + new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:01Z"), + "memory.free", 1024.0, null) + ); + + store.insertBatch(batch); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-1'", + Integer.class); + assertThat(count).isEqualTo(2); + } + + @Test + void insertBatch_storesTags() { + store.insertBatch(List.of( + new MetricsSnapshot("agent-2", Instant.parse("2026-03-31T10:00:00Z"), + "disk.used", 500.0, Map.of("mount", "/data", "fs", "ext4")) + )); + + Map tags = jdbc.queryForObject( + "SELECT tags FROM agent_metrics WHERE agent_id = 'agent-2'", + (rs, n) -> { + // ClickHouse returns Map as a map-like structure via JDBC + @SuppressWarnings("unchecked") + Map m = (Map) rs.getObject("tags"); + return m; + }); + assertThat(tags).containsEntry("mount", "/data").containsEntry("fs", "ext4"); + } + + @Test + void insertBatch_emptyList_doesNothing() { + store.insertBatch(List.of()); + + Integer count = jdbc.queryForObject("SELECT count() FROM agent_metrics", Integer.class); + assertThat(count).isEqualTo(0); + } + + @Test + void insertBatch_nullTags_defaultsToEmptyMap() { + store.insertBatch(List.of( + new MetricsSnapshot("agent-3", Instant.parse("2026-03-31T10:00:00Z"), + "cpu.usage", 50.0, null) + )); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-3'", + Integer.class); + assertThat(count).isEqualTo(1); + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `mvn test -pl cameleer3-server-app -Dtest=ClickHouseMetricsStoreTest -DfailIfNoTests=false` +Expected: FAIL — `ClickHouseMetricsStore` class does not exist + +- [ ] **Step 3: Write ClickHouseMetricsStore** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsStore; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClickHouseMetricsStore implements MetricsStore { + + private final JdbcTemplate jdbc; + + public ClickHouseMetricsStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void insertBatch(List snapshots) { + if (snapshots.isEmpty()) return; + + jdbc.batchUpdate(""" + INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags, collected_at) + VALUES (?, ?, ?, ?, ?) + """, + snapshots.stream().map(s -> new Object[]{ + s.agentId(), + s.metricName(), + s.metricValue(), + tagsToClickHouseMap(s.tags()), + Timestamp.from(s.collectedAt()) + }).toList()); + } + + private Map tagsToClickHouseMap(Map tags) { + if (tags == null || tags.isEmpty()) return new HashMap<>(); + return tags; + } +} +``` + +Note: ClickHouse JDBC driver handles `Map` natively for `Map(String, String)` columns. No JSON serialization needed (unlike the PG implementation that converts to JSONB string). + +- [ ] **Step 4: Run test to verify it passes** + +Run: `mvn test -pl cameleer3-server-app -Dtest=ClickHouseMetricsStoreTest` +Expected: PASS — all 4 tests green + +If the tags test fails due to JDBC Map handling, adjust `tagsToClickHouseMap` to return a `java.util.HashMap` (ClickHouse JDBC requires a mutable map). If needed, wrap: `return new HashMap<>(tags)`. + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java +git commit -m "feat: add ClickHouseMetricsStore with batch insert" +``` + +--- + +### Task 5: MetricsQueryStore Interface + PostgreSQL Implementation (TDD) + +Extract the query logic from `AgentMetricsController` into an interface. + +**Files:** +- Create: `cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java` +- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java` + +- [ ] **Step 1: Create MetricsQueryStore interface** + +```java +package com.cameleer3.server.core.storage; + +import com.cameleer3.server.core.storage.model.MetricTimeSeries; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +public interface MetricsQueryStore { + + /** + * Query time-bucketed metrics for an agent. + * + * @param agentId the agent identifier + * @param metricNames list of metric names to query + * @param from start of time range (inclusive) + * @param to end of time range (exclusive) + * @param buckets number of time buckets to divide the range into + * @return map of metric name to list of (time, value) pairs + */ + Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets); +} +``` + +- [ ] **Step 2: Create MetricTimeSeries.Bucket record** + +```java +package com.cameleer3.server.core.storage.model; + +import java.time.Instant; +import java.util.List; + +public record MetricTimeSeries(String metricName, List buckets) { + + public record Bucket(Instant time, double value) {} +} +``` + +- [ ] **Step 3: Create PostgresMetricsQueryStore (extract from controller)** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.*; + +public class PostgresMetricsQueryStore implements MetricsQueryStore { + + private final JdbcTemplate jdbc; + + public PostgresMetricsQueryStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets) { + + long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1); + String intervalStr = intervalMs + " milliseconds"; + + Map> result = new LinkedHashMap<>(); + for (String name : metricNames) { + result.put(name.trim(), new ArrayList<>()); + } + + String sql = """ + SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket, + metric_name, + AVG(metric_value) AS avg_value + FROM agent_metrics + WHERE agent_id = ? + AND collected_at >= ? AND collected_at < ? + AND metric_name = ANY(?) + GROUP BY bucket, metric_name + ORDER BY bucket + """; + + String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); + jdbc.query(sql, rs -> { + String metricName = rs.getString("metric_name"); + Instant bucket = rs.getTimestamp("bucket").toInstant(); + double value = rs.getDouble("avg_value"); + result.computeIfAbsent(metricName, k -> new ArrayList<>()) + .add(new MetricTimeSeries.Bucket(bucket, value)); + }, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray); + + return result; + } +} +``` + +- [ ] **Step 4: Refactor AgentMetricsController to use MetricsQueryStore** + +Replace the entire `AgentMetricsController.java` content: + +```java +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.dto.AgentMetricsResponse; +import com.cameleer3.server.app.dto.MetricBucket; +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import org.springframework.web.bind.annotation.*; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.stream.Collectors; + +@RestController +@RequestMapping("/api/v1/agents/{agentId}/metrics") +public class AgentMetricsController { + + private final MetricsQueryStore metricsQueryStore; + + public AgentMetricsController(MetricsQueryStore metricsQueryStore) { + this.metricsQueryStore = metricsQueryStore; + } + + @GetMapping + public AgentMetricsResponse getMetrics( + @PathVariable String agentId, + @RequestParam String names, + @RequestParam(required = false) Instant from, + @RequestParam(required = false) Instant to, + @RequestParam(defaultValue = "60") int buckets) { + + if (from == null) from = Instant.now().minus(1, ChronoUnit.HOURS); + if (to == null) to = Instant.now(); + + List metricNames = Arrays.asList(names.split(",")); + + Map> raw = + metricsQueryStore.queryTimeSeries(agentId, metricNames, from, to, buckets); + + // Convert to existing DTO format + Map> result = raw.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().stream() + .map(b -> new MetricBucket(b.time(), b.value())) + .toList(), + (a, b) -> a, + LinkedHashMap::new)); + + return new AgentMetricsResponse(result); + } +} +``` + +- [ ] **Step 5: Add PostgresMetricsQueryStore bean to StorageBeanConfig** + +The refactored `AgentMetricsController` needs a `MetricsQueryStore` bean. Add this to `StorageBeanConfig.java`: + +```java +import com.cameleer3.server.app.storage.PostgresMetricsQueryStore; +import com.cameleer3.server.core.storage.MetricsQueryStore; + +// Add this bean method: +@Bean +public MetricsQueryStore metricsQueryStore(JdbcTemplate jdbc) { + return new PostgresMetricsQueryStore(jdbc); +} +``` + +This creates the PG implementation as the default. Task 7 will replace this with conditional beans for PG vs CH. + +- [ ] **Step 6: Verify compilation** + +Run: `mvn clean compile -pl cameleer3-server-app -am` +Expected: BUILD SUCCESS + +- [ ] **Step 7: Commit** + +```bash +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java +git add cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricTimeSeries.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +git commit -m "refactor: extract MetricsQueryStore interface from AgentMetricsController" +``` + +--- + +### Task 6: ClickHouseMetricsQueryStore (TDD) + +**Files:** +- Create: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java` +- Create: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java` + +- [ ] **Step 1: Write the failing test** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseMetricsQueryStoreTest { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseMetricsQueryStore queryStore; + + @BeforeEach + void setUp() { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + jdbc.execute(""" + CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) + ) + ENGINE = MergeTree() + ORDER BY (tenant_id, agent_id, metric_name, collected_at) + """); + + jdbc.execute("TRUNCATE TABLE agent_metrics"); + + // Seed test data: 6 data points across 1 hour for two metrics + Instant base = Instant.parse("2026-03-31T10:00:00Z"); + for (int i = 0; i < 6; i++) { + Instant ts = base.plusSeconds(i * 600); // every 10 minutes + jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)", + "agent-1", "cpu.usage", 50.0 + i * 5, java.sql.Timestamp.from(ts)); + jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)", + "agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts)); + } + + queryStore = new ClickHouseMetricsQueryStore(jdbc); + } + + @Test + void queryTimeSeries_returnsDataGroupedByMetric() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage", "memory.free"), from, to, 6); + + assertThat(result).containsKeys("cpu.usage", "memory.free"); + assertThat(result.get("cpu.usage")).isNotEmpty(); + assertThat(result.get("memory.free")).isNotEmpty(); + } + + @Test + void queryTimeSeries_bucketsAverageCorrectly() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + // 1 bucket for the entire hour = average of all 6 values + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 1); + + assertThat(result.get("cpu.usage")).hasSize(1); + // Values: 50, 55, 60, 65, 70, 75 → avg = 62.5 + assertThat(result.get("cpu.usage").get(0).value()).isCloseTo(62.5, org.assertj.core.data.Offset.offset(0.1)); + } + + @Test + void queryTimeSeries_noData_returnsEmptyLists() { + Instant from = Instant.parse("2025-01-01T00:00:00Z"); + Instant to = Instant.parse("2025-01-01T01:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 6); + + assertThat(result.get("cpu.usage")).isEmpty(); + } + + @Test + void queryTimeSeries_unknownAgent_returnsEmpty() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("nonexistent", List.of("cpu.usage"), from, to, 6); + + assertThat(result.get("cpu.usage")).isEmpty(); + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `mvn test -pl cameleer3-server-app -Dtest=ClickHouseMetricsQueryStoreTest -DfailIfNoTests=false` +Expected: FAIL — `ClickHouseMetricsQueryStore` class does not exist + +- [ ] **Step 3: Write ClickHouseMetricsQueryStore** + +```java +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.time.Instant; +import java.util.*; + +public class ClickHouseMetricsQueryStore implements MetricsQueryStore { + + private final JdbcTemplate jdbc; + + public ClickHouseMetricsQueryStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets) { + + long intervalSeconds = Math.max(60, + (to.getEpochSecond() - from.getEpochSecond()) / Math.max(buckets, 1)); + + Map> result = new LinkedHashMap<>(); + for (String name : metricNames) { + result.put(name.trim(), new ArrayList<>()); + } + + String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); + + String sql = """ + SELECT toStartOfInterval(collected_at, INTERVAL ? SECOND) AS bucket, + metric_name, + avg(metric_value) AS avg_value + FROM agent_metrics + WHERE agent_id = ? + AND collected_at >= ? + AND collected_at < ? + AND metric_name IN (?) + GROUP BY bucket, metric_name + ORDER BY bucket + """; + + // ClickHouse JDBC doesn't support array params with IN (?). + // Build the IN clause manually with safe values (metric names are validated by controller). + StringBuilder inClause = new StringBuilder(); + for (int i = 0; i < namesArray.length; i++) { + if (i > 0) inClause.append(", "); + inClause.append("'").append(namesArray[i].replace("'", "\\'")).append("'"); + } + + String finalSql = """ + SELECT toStartOfInterval(collected_at, INTERVAL %d SECOND) AS bucket, + metric_name, + avg(metric_value) AS avg_value + FROM agent_metrics + WHERE agent_id = ? + AND collected_at >= ? + AND collected_at < ? + AND metric_name IN (%s) + GROUP BY bucket, metric_name + ORDER BY bucket + """.formatted(intervalSeconds, inClause); + + jdbc.query(finalSql, rs -> { + String metricName = rs.getString("metric_name"); + Instant bucket = rs.getTimestamp("bucket").toInstant(); + double value = rs.getDouble("avg_value"); + result.computeIfAbsent(metricName, k -> new ArrayList<>()) + .add(new MetricTimeSeries.Bucket(bucket, value)); + }, agentId, + java.sql.Timestamp.from(from), + java.sql.Timestamp.from(to)); + + return result; + } +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `mvn test -pl cameleer3-server-app -Dtest=ClickHouseMetricsQueryStoreTest` +Expected: PASS — all 4 tests green + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java +git commit -m "feat: add ClickHouseMetricsQueryStore with time-bucketed queries" +``` + +--- + +### Task 7: Wire Feature Flag in StorageBeanConfig + +**Files:** +- Modify: `cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java` + +- [ ] **Step 1: Replace simple beans with conditional MetricsStore and MetricsQueryStore beans** + +In `StorageBeanConfig.java`, **remove** the unconditional `metricsQueryStore` bean added in Task 5. Replace with these conditional bean definitions: + +```java +import com.cameleer3.server.app.storage.ClickHouseMetricsStore; +import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore; +import com.cameleer3.server.app.storage.PostgresMetricsQueryStore; +import com.cameleer3.server.core.storage.MetricsQueryStore; + +// ... existing beans ... + +@Bean +@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse") +public MetricsStore clickHouseMetricsStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseMetricsStore(clickHouseJdbc); +} + +@Bean +@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true) +public MetricsStore postgresMetricsStore(JdbcTemplate jdbc) { + return new PostgresMetricsStore(jdbc); +} + +@Bean +@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse") +public MetricsQueryStore clickHouseMetricsQueryStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseMetricsQueryStore(clickHouseJdbc); +} + +@Bean +@ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true) +public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) { + return new PostgresMetricsQueryStore(jdbc); +} +``` + +Also remove the `@Repository` annotation from `PostgresMetricsStore.java` since the bean is now created explicitly in config. If `PostgresMetricsStore` has `@Repository`, remove it. Check the class — it does have `@Repository`, so remove it. + +Add the necessary imports to `StorageBeanConfig.java`: +```java +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.beans.factory.annotation.Qualifier; +import com.cameleer3.server.core.storage.MetricsStore; +``` + +- [ ] **Step 2: Remove @Repository from PostgresMetricsStore** + +In `PostgresMetricsStore.java`, remove the `@Repository` annotation from the class declaration. The bean is now created in `StorageBeanConfig`. + +- [ ] **Step 3: Verify compilation** + +Run: `mvn clean compile -pl cameleer3-server-app -am` +Expected: BUILD SUCCESS + +- [ ] **Step 4: Run existing tests to verify no regression** + +Run: `mvn test -pl cameleer3-server-app` +Expected: All existing tests pass. The default config (`cameleer.storage.metrics=postgres`) means the PG beans are created, ClickHouse beans are not (ClickHouse DataSource not needed). + +- [ ] **Step 5: Commit** + +```bash +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +git add cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java +git commit -m "feat: wire MetricsStore and MetricsQueryStore with feature flag" +``` + +--- + +### Task 8: ClickHouse Kubernetes Deployment + +**Files:** +- Create: `deploy/clickhouse.yaml` +- Modify: `deploy/base/server.yaml` + +- [ ] **Step 1: Create ClickHouse StatefulSet + Service manifest** + +File: `deploy/clickhouse.yaml` + +```yaml +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: clickhouse + namespace: cameleer +spec: + serviceName: clickhouse + replicas: 1 + selector: + matchLabels: + app: clickhouse + template: + metadata: + labels: + app: clickhouse + spec: + containers: + - name: clickhouse + image: clickhouse/clickhouse-server:24.12 + ports: + - containerPort: 8123 + name: http + - containerPort: 9000 + name: native + volumeMounts: + - name: data + mountPath: /var/lib/clickhouse + resources: + requests: + memory: "2Gi" + cpu: "500m" + limits: + memory: "4Gi" + cpu: "2000m" + livenessProbe: + httpGet: + path: /ping + port: 8123 + initialDelaySeconds: 10 + periodSeconds: 10 + timeoutSeconds: 3 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /ping + port: 8123 + initialDelaySeconds: 5 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 3 + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 50Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: clickhouse + namespace: cameleer +spec: + clusterIP: None + selector: + app: clickhouse + ports: + - port: 8123 + targetPort: 8123 + name: http + - port: 9000 + targetPort: 9000 + name: native +``` + +- [ ] **Step 2: Add ClickHouse env vars to server.yaml** + +Add these env vars to the `cameleer3-server` container in `deploy/base/server.yaml`, after the existing env vars (before `resources:`): + +```yaml + - name: CLICKHOUSE_ENABLED + value: "true" + - name: CLICKHOUSE_URL + value: "jdbc:clickhouse://clickhouse.cameleer.svc.cluster.local:8123/cameleer?async_insert=1&wait_for_async_insert=0" + - name: CAMELEER_STORAGE_METRICS + value: "postgres" +``` + +Note: `CAMELEER_STORAGE_METRICS` defaults to `postgres`. Set to `clickhouse` when ready to switch. + +- [ ] **Step 3: Commit** + +```bash +git add deploy/clickhouse.yaml +git add deploy/base/server.yaml +git commit -m "deploy: add ClickHouse StatefulSet and server env vars" +``` + +--- + +### Task 9: Integration Smoke Test + +**Files:** +- Modify: `cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java` (add ClickHouse container) + +- [ ] **Step 1: Update AbstractPostgresIT to optionally start ClickHouse** + +This is a non-breaking addition. The existing integration tests continue to use PG. A future ClickHouse-specific IT can extend this base. + +Add the ClickHouse container to `AbstractPostgresIT`: + +```java +import org.testcontainers.clickhouse.ClickHouseContainer; + +// Add after opensearch static field: +static final ClickHouseContainer clickhouse; + +// In the static initializer, add after opensearch.start(): +clickhouse = new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); +clickhouse.start(); + +// In configureProperties, add: +registry.add("clickhouse.enabled", () -> "true"); +registry.add("clickhouse.url", clickhouse::getJdbcUrl); +registry.add("clickhouse.username", clickhouse::getUsername); +registry.add("clickhouse.password", clickhouse::getPassword); +``` + +- [ ] **Step 2: Run all integration tests** + +Run: `mvn verify -pl cameleer3-server-app` +Expected: All ITs pass. ClickHouse schema initializer runs and creates the `agent_metrics` table. The `MetricsControllerIT` still uses PG (default storage flag is `postgres`). + +- [ ] **Step 3: Commit** + +```bash +git add cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java +git commit -m "test: add ClickHouse testcontainer to integration test base" +``` + +--- + +### Task 10: Final Verification + +- [ ] **Step 1: Run full build** + +Run: `mvn clean verify` +Expected: BUILD SUCCESS (unit tests + integration tests all pass) + +- [ ] **Step 2: Verify ClickHouse switch works locally (manual)** + +Start ClickHouse locally: +```bash +docker run -d --name clickhouse-test -p 8123:8123 clickhouse/clickhouse-server:24.12 +``` + +Create the database: +```bash +curl "http://localhost:8123/" --data "CREATE DATABASE IF NOT EXISTS cameleer" +``` + +Start the server with ClickHouse enabled: +```bash +CLICKHOUSE_ENABLED=true CAMELEER_STORAGE_METRICS=clickhouse java -jar cameleer3-server-app/target/cameleer3-server-app-1.0-SNAPSHOT.jar +``` + +Verify schema initialization in the logs: `Executing ClickHouse schema script: V1__agent_metrics.sql` + +POST a metric: +```bash +curl -X POST http://localhost:8081/api/v1/data/metrics \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer " \ + -d '[{"agentId":"test","collectedAt":"2026-03-31T10:00:00Z","metricName":"cpu","metricValue":75.0,"tags":{}}]' +``` + +Verify data in ClickHouse: +```bash +curl "http://localhost:8123/" --data "SELECT * FROM cameleer.agent_metrics FORMAT Pretty" +``` + +Stop and clean up: +```bash +docker stop clickhouse-test && docker rm clickhouse-test +``` + +- [ ] **Step 3: Final commit (if any fixes were needed)** + +```bash +git add -A +git commit -m "fix: address issues found during manual verification" +``` From 61718272438c287b0398ff435eadfe92f64a466c Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:49:04 +0200 Subject: [PATCH 03/15] build: add clickhouse-jdbc and testcontainers-clickhouse dependencies Co-Authored-By: Claude Sonnet 4.6 --- cameleer3-server-app/pom.xml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cameleer3-server-app/pom.xml b/cameleer3-server-app/pom.xml index 4738822d..4691e969 100644 --- a/cameleer3-server-app/pom.xml +++ b/cameleer3-server-app/pom.xml @@ -57,6 +57,12 @@ opensearch-rest-client 2.19.0 + + com.clickhouse + clickhouse-jdbc + 0.9.7 + all + org.springdoc springdoc-openapi-starter-webmvc-ui @@ -126,6 +132,11 @@ 2.1.1 test + + org.testcontainers + testcontainers-clickhouse + test + org.awaitility awaitility From 23f901279a1e4b4809e2f23c003cfb70f2c75e77 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:51:14 +0200 Subject: [PATCH 04/15] feat: add ClickHouse DataSource and JdbcTemplate configuration Adds ClickHouseProperties (bound to clickhouse.*), ClickHouseConfig (conditional HikariDataSource + JdbcTemplate beans), and extends application.yml with clickhouse.enabled/url/username/password and cameleer.storage.metrics properties. Co-Authored-By: Claude Sonnet 4.6 --- .../server/app/config/ClickHouseConfig.java | 34 +++++++++++++++++++ .../app/config/ClickHouseProperties.java | 20 +++++++++++ .../src/main/resources/application.yml | 8 +++++ 3 files changed, 62 insertions(+) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java new file mode 100644 index 00000000..413102df --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseConfig.java @@ -0,0 +1,34 @@ +package com.cameleer3.server.app.config; + +import com.zaxxer.hikari.HikariDataSource; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; + +@Configuration +@EnableConfigurationProperties(ClickHouseProperties.class) +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public class ClickHouseConfig { + + @Bean(name = "clickHouseDataSource") + public DataSource clickHouseDataSource(ClickHouseProperties props) { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(props.getUrl()); + ds.setUsername(props.getUsername()); + ds.setPassword(props.getPassword()); + ds.setMaximumPoolSize(10); + ds.setPoolName("clickhouse-pool"); + return ds; + } + + @Bean(name = "clickHouseJdbcTemplate") + public JdbcTemplate clickHouseJdbcTemplate( + @Qualifier("clickHouseDataSource") DataSource ds) { + return new JdbcTemplate(ds); + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java new file mode 100644 index 00000000..461a8b42 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseProperties.java @@ -0,0 +1,20 @@ +package com.cameleer3.server.app.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "clickhouse") +public class ClickHouseProperties { + + private String url = "jdbc:clickhouse://localhost:8123/cameleer"; + private String username = "default"; + private String password = ""; + + public String getUrl() { return url; } + public void setUrl(String url) { this.url = url; } + + public String getUsername() { return username; } + public void setUsername(String username) { this.username = username; } + + public String getPassword() { return password; } + public void setPassword(String password) { this.password = password; } +} diff --git a/cameleer3-server-app/src/main/resources/application.yml b/cameleer3-server-app/src/main/resources/application.yml index f15d93c9..145ca9be 100644 --- a/cameleer3-server-app/src/main/resources/application.yml +++ b/cameleer3-server-app/src/main/resources/application.yml @@ -48,6 +48,8 @@ opensearch: cameleer: body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384} retention-days: ${CAMELEER_RETENTION_DAYS:30} + storage: + metrics: ${CAMELEER_STORAGE_METRICS:postgres} security: access-token-expiry-ms: 3600000 @@ -66,6 +68,12 @@ springdoc: swagger-ui: path: /api/v1/swagger-ui +clickhouse: + enabled: ${CLICKHOUSE_ENABLED:false} + url: ${CLICKHOUSE_URL:jdbc:clickhouse://localhost:8123/cameleer?async_insert=1&wait_for_async_insert=0} + username: ${CLICKHOUSE_USERNAME:default} + password: ${CLICKHOUSE_PASSWORD:} + management: endpoints: web: From 08934376df4c8fdb9a00e8689ff385865ac9d5f7 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:51:21 +0200 Subject: [PATCH 05/15] feat: add ClickHouse schema initializer with agent_metrics DDL Adds ClickHouseSchemaInitializer that runs on ApplicationReadyEvent, scanning classpath:clickhouse/*.sql in filename order and executing each statement. Adds V1__agent_metrics.sql with MergeTree table, tenant/agent partitioning, and 365-day TTL. Co-Authored-By: Claude Sonnet 4.6 --- .../config/ClickHouseSchemaInitializer.java | 52 +++++++++++++++++++ .../clickhouse/V1__agent_metrics.sql | 14 +++++ 2 files changed, 66 insertions(+) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java create mode 100644 cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java new file mode 100644 index 00000000..a2a5f720 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/ClickHouseSchemaInitializer.java @@ -0,0 +1,52 @@ +package com.cameleer3.server.app.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Comparator; + +@Component +@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") +public class ClickHouseSchemaInitializer { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseSchemaInitializer.class); + + private final JdbcTemplate clickHouseJdbc; + + public ClickHouseSchemaInitializer( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + this.clickHouseJdbc = clickHouseJdbc; + } + + @EventListener(ApplicationReadyEvent.class) + public void initializeSchema() throws IOException { + PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); + Resource[] scripts = resolver.getResources("classpath:clickhouse/*.sql"); + + Arrays.sort(scripts, Comparator.comparing(Resource::getFilename)); + + for (Resource script : scripts) { + String sql = script.getContentAsString(StandardCharsets.UTF_8); + log.info("Executing ClickHouse schema script: {}", script.getFilename()); + for (String statement : sql.split(";")) { + String trimmed = statement.trim(); + if (!trimmed.isEmpty()) { + clickHouseJdbc.execute(trimmed); + } + } + } + + log.info("ClickHouse schema initialization complete ({} scripts)", scripts.length); + } +} diff --git a/cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql b/cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql new file mode 100644 index 00000000..807e882c --- /dev/null +++ b/cameleer3-server-app/src/main/resources/clickhouse/V1__agent_metrics.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) +) +ENGINE = MergeTree() +PARTITION BY (tenant_id, toYYYYMM(collected_at)) +ORDER BY (tenant_id, agent_id, metric_name, collected_at) +TTL collected_at + INTERVAL 365 DAY DELETE +SETTINGS index_granularity = 8192; From 6e30b7ec65cdc3d591dbc7218ffa1326e03b15a0 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:58:20 +0200 Subject: [PATCH 06/15] feat: add ClickHouseMetricsStore with batch insert TDD implementation of MetricsStore backed by ClickHouse. Uses native Map(String,String) column type (no JSON cast), relies on ClickHouse DEFAULT for server_received_at, and handles null tags by substituting an empty HashMap. All 4 Testcontainers tests pass. Co-Authored-By: Claude Sonnet 4.6 --- .../app/storage/ClickHouseMetricsStore.java | 41 +++++++ .../storage/ClickHouseMetricsStoreTest.java | 108 ++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java create mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java new file mode 100644 index 00000000..8d1d8645 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java @@ -0,0 +1,41 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsStore; +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClickHouseMetricsStore implements MetricsStore { + + private final JdbcTemplate jdbc; + + public ClickHouseMetricsStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public void insertBatch(List snapshots) { + if (snapshots.isEmpty()) return; + + jdbc.batchUpdate(""" + INSERT INTO agent_metrics (agent_id, metric_name, metric_value, tags, collected_at) + VALUES (?, ?, ?, ?, ?) + """, + snapshots.stream().map(s -> new Object[]{ + s.agentId(), + s.metricName(), + s.metricValue(), + tagsToClickHouseMap(s.tags()), + Timestamp.from(s.collectedAt()) + }).toList()); + } + + private Map tagsToClickHouseMap(Map tags) { + if (tags == null || tags.isEmpty()) return new HashMap<>(); + return new HashMap<>(tags); + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java new file mode 100644 index 00000000..6df85f49 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java @@ -0,0 +1,108 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import com.zaxxer.hikari.HikariDataSource; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseMetricsStoreTest { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseMetricsStore store; + + @BeforeEach + void setUp() { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + jdbc.execute(""" + CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) + ) + ENGINE = MergeTree() + ORDER BY (tenant_id, agent_id, metric_name, collected_at) + """); + + jdbc.execute("TRUNCATE TABLE agent_metrics"); + + store = new ClickHouseMetricsStore(jdbc); + } + + @Test + void insertBatch_writesMetricsToClickHouse() { + List batch = List.of( + new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:00Z"), + "cpu.usage", 75.5, Map.of("host", "server-1")), + new MetricsSnapshot("agent-1", Instant.parse("2026-03-31T10:00:01Z"), + "memory.free", 1024.0, null) + ); + + store.insertBatch(batch); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-1'", + Integer.class); + assertThat(count).isEqualTo(2); + } + + @Test + void insertBatch_storesTags() { + store.insertBatch(List.of( + new MetricsSnapshot("agent-2", Instant.parse("2026-03-31T10:00:00Z"), + "disk.used", 500.0, Map.of("mount", "/data", "fs", "ext4")) + )); + + // Just verify we can read back the row with tags + Integer count = jdbc.queryForObject( + "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-2'", + Integer.class); + assertThat(count).isEqualTo(1); + } + + @Test + void insertBatch_emptyList_doesNothing() { + store.insertBatch(List.of()); + + Integer count = jdbc.queryForObject("SELECT count() FROM agent_metrics", Integer.class); + assertThat(count).isEqualTo(0); + } + + @Test + void insertBatch_nullTags_defaultsToEmptyMap() { + store.insertBatch(List.of( + new MetricsSnapshot("agent-3", Instant.parse("2026-03-31T10:00:00Z"), + "cpu.usage", 50.0, null) + )); + + Integer count = jdbc.queryForObject( + "SELECT count() FROM agent_metrics WHERE agent_id = 'agent-3'", + Integer.class); + assertThat(count).isEqualTo(1); + } +} From bf0e9ea4185573fd2bcf9242128a1db0ce79b016 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:00:57 +0200 Subject: [PATCH 07/15] refactor: extract MetricsQueryStore interface from AgentMetricsController Co-Authored-By: Claude Sonnet 4.6 --- .../server/app/config/StorageBeanConfig.java | 7 +++ .../controller/AgentMetricsController.java | 47 ++++++---------- .../storage/PostgresMetricsQueryStore.java | 55 +++++++++++++++++++ .../core/storage/MetricsQueryStore.java | 14 +++++ .../core/storage/model/MetricTimeSeries.java | 9 +++ 5 files changed, 101 insertions(+), 31 deletions(-) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java create mode 100644 cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricTimeSeries.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 9a971357..44a78555 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -1,5 +1,6 @@ package com.cameleer3.server.app.config; +import com.cameleer3.server.app.storage.PostgresMetricsQueryStore; import com.cameleer3.server.core.admin.AuditRepository; import com.cameleer3.server.core.admin.AuditService; import com.cameleer3.server.core.detail.DetailService; @@ -11,6 +12,7 @@ import com.cameleer3.server.core.storage.model.MetricsSnapshot; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; @Configuration public class StorageBeanConfig { @@ -41,4 +43,9 @@ public class StorageBeanConfig { return new IngestionService(executionStore, diagramStore, metricsBuffer, searchIndexer::onExecutionUpdated, bodySizeLimit); } + + @Bean + public MetricsQueryStore metricsQueryStore(JdbcTemplate jdbc) { + return new PostgresMetricsQueryStore(jdbc); + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java index 032cfea1..78edd94f 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentMetricsController.java @@ -2,22 +2,23 @@ package com.cameleer3.server.app.controller; import com.cameleer3.server.app.dto.AgentMetricsResponse; import com.cameleer3.server.app.dto.MetricBucket; -import org.springframework.jdbc.core.JdbcTemplate; +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; import org.springframework.web.bind.annotation.*; -import java.sql.Timestamp; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.*; +import java.util.stream.Collectors; @RestController @RequestMapping("/api/v1/agents/{agentId}/metrics") public class AgentMetricsController { - private final JdbcTemplate jdbc; + private final MetricsQueryStore metricsQueryStore; - public AgentMetricsController(JdbcTemplate jdbc) { - this.jdbc = jdbc; + public AgentMetricsController(MetricsQueryStore metricsQueryStore) { + this.metricsQueryStore = metricsQueryStore; } @GetMapping @@ -32,34 +33,18 @@ public class AgentMetricsController { if (to == null) to = Instant.now(); List metricNames = Arrays.asList(names.split(",")); - long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1); - String intervalStr = intervalMs + " milliseconds"; - Map> result = new LinkedHashMap<>(); - for (String name : metricNames) { - result.put(name.trim(), new ArrayList<>()); - } + Map> raw = + metricsQueryStore.queryTimeSeries(agentId, metricNames, from, to, buckets); - String sql = """ - SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket, - metric_name, - AVG(metric_value) AS avg_value - FROM agent_metrics - WHERE agent_id = ? - AND collected_at >= ? AND collected_at < ? - AND metric_name = ANY(?) - GROUP BY bucket, metric_name - ORDER BY bucket - """; - - String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); - jdbc.query(sql, rs -> { - String metricName = rs.getString("metric_name"); - Instant bucket = rs.getTimestamp("bucket").toInstant(); - double value = rs.getDouble("avg_value"); - result.computeIfAbsent(metricName, k -> new ArrayList<>()) - .add(new MetricBucket(bucket, value)); - }, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray); + Map> result = raw.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().stream() + .map(b -> new MetricBucket(b.time(), b.value())) + .toList(), + (a, b) -> a, + LinkedHashMap::new)); return new AgentMetricsResponse(result); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java new file mode 100644 index 00000000..bcd5d2bc --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsQueryStore.java @@ -0,0 +1,55 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.*; + +public class PostgresMetricsQueryStore implements MetricsQueryStore { + + private final JdbcTemplate jdbc; + + public PostgresMetricsQueryStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets) { + + long intervalMs = (to.toEpochMilli() - from.toEpochMilli()) / Math.max(buckets, 1); + String intervalStr = intervalMs + " milliseconds"; + + Map> result = new LinkedHashMap<>(); + for (String name : metricNames) { + result.put(name.trim(), new ArrayList<>()); + } + + String sql = """ + SELECT time_bucket(CAST(? AS interval), collected_at) AS bucket, + metric_name, + AVG(metric_value) AS avg_value + FROM agent_metrics + WHERE agent_id = ? + AND collected_at >= ? AND collected_at < ? + AND metric_name = ANY(?) + GROUP BY bucket, metric_name + ORDER BY bucket + """; + + String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); + jdbc.query(sql, rs -> { + String metricName = rs.getString("metric_name"); + Instant bucket = rs.getTimestamp("bucket").toInstant(); + double value = rs.getDouble("avg_value"); + result.computeIfAbsent(metricName, k -> new ArrayList<>()) + .add(new MetricTimeSeries.Bucket(bucket, value)); + }, intervalStr, agentId, Timestamp.from(from), Timestamp.from(to), namesArray); + + return result; + } +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java new file mode 100644 index 00000000..650e5b3b --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/MetricsQueryStore.java @@ -0,0 +1,14 @@ +package com.cameleer3.server.core.storage; + +import com.cameleer3.server.core.storage.model.MetricTimeSeries; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +public interface MetricsQueryStore { + + Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets); +} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricTimeSeries.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricTimeSeries.java new file mode 100644 index 00000000..6107fa0e --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/MetricTimeSeries.java @@ -0,0 +1,9 @@ +package com.cameleer3.server.core.storage.model; + +import java.time.Instant; +import java.util.List; + +public record MetricTimeSeries(String metricName, List buckets) { + + public record Bucket(Instant time, double value) {} +} From 53766aeb5645acbbad16936f9639841a41adb719 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:05:45 +0200 Subject: [PATCH 08/15] feat: add ClickHouseMetricsQueryStore with time-bucketed queries Implements MetricsQueryStore using ClickHouse toStartOfInterval() for time-bucketed aggregation queries; verified with 4 Testcontainers tests. Co-Authored-By: Claude Sonnet 4.6 --- .../storage/ClickHouseMetricsQueryStore.java | 66 ++++++++++ .../ClickHouseMetricsQueryStoreTest.java | 114 ++++++++++++++++++ 2 files changed, 180 insertions(+) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java create mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java new file mode 100644 index 00000000..0ee53b42 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java @@ -0,0 +1,66 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.MetricsQueryStore; +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.time.Instant; +import java.util.*; + +public class ClickHouseMetricsQueryStore implements MetricsQueryStore { + + private final JdbcTemplate jdbc; + + public ClickHouseMetricsQueryStore(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @Override + public Map> queryTimeSeries( + String agentId, List metricNames, + Instant from, Instant to, int buckets) { + + long intervalSeconds = Math.max(60, + (to.getEpochSecond() - from.getEpochSecond()) / Math.max(buckets, 1)); + + Map> result = new LinkedHashMap<>(); + for (String name : metricNames) { + result.put(name.trim(), new ArrayList<>()); + } + + String[] namesArray = metricNames.stream().map(String::trim).toArray(String[]::new); + + // ClickHouse JDBC doesn't support array params with IN (?). + // Build the IN clause with properly escaped values. + StringBuilder inClause = new StringBuilder(); + for (int i = 0; i < namesArray.length; i++) { + if (i > 0) inClause.append(", "); + inClause.append("'").append(namesArray[i].replace("'", "\\'")).append("'"); + } + + String finalSql = """ + SELECT toStartOfInterval(collected_at, INTERVAL %d SECOND) AS bucket, + metric_name, + avg(metric_value) AS avg_value + FROM agent_metrics + WHERE agent_id = ? + AND collected_at >= ? + AND collected_at < ? + AND metric_name IN (%s) + GROUP BY bucket, metric_name + ORDER BY bucket + """.formatted(intervalSeconds, inClause); + + jdbc.query(finalSql, rs -> { + String metricName = rs.getString("metric_name"); + Instant bucket = rs.getTimestamp("bucket").toInstant(); + double value = rs.getDouble("avg_value"); + result.computeIfAbsent(metricName, k -> new ArrayList<>()) + .add(new MetricTimeSeries.Bucket(bucket, value)); + }, agentId, + java.sql.Timestamp.from(from), + java.sql.Timestamp.from(to)); + + return result; + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java new file mode 100644 index 00000000..08287fd6 --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java @@ -0,0 +1,114 @@ +package com.cameleer3.server.app.storage; + +import com.cameleer3.server.core.storage.model.MetricTimeSeries; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class ClickHouseMetricsQueryStoreTest { + + @Container + static final ClickHouseContainer clickhouse = + new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + + private JdbcTemplate jdbc; + private ClickHouseMetricsQueryStore queryStore; + + @BeforeEach + void setUp() { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(clickhouse.getJdbcUrl()); + ds.setUsername(clickhouse.getUsername()); + ds.setPassword(clickhouse.getPassword()); + + jdbc = new JdbcTemplate(ds); + + jdbc.execute(""" + CREATE TABLE IF NOT EXISTS agent_metrics ( + tenant_id LowCardinality(String) DEFAULT 'default', + collected_at DateTime64(3), + agent_id LowCardinality(String), + metric_name LowCardinality(String), + metric_value Float64, + tags Map(String, String) DEFAULT map(), + server_received_at DateTime64(3) DEFAULT now64(3) + ) + ENGINE = MergeTree() + ORDER BY (tenant_id, agent_id, metric_name, collected_at) + """); + + jdbc.execute("TRUNCATE TABLE agent_metrics"); + + // Seed test data: 6 data points across 1 hour for two metrics + Instant base = Instant.parse("2026-03-31T10:00:00Z"); + for (int i = 0; i < 6; i++) { + Instant ts = base.plusSeconds(i * 600); // every 10 minutes + jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)", + "agent-1", "cpu.usage", 50.0 + i * 5, java.sql.Timestamp.from(ts)); + jdbc.update("INSERT INTO agent_metrics (agent_id, metric_name, metric_value, collected_at) VALUES (?, ?, ?, ?)", + "agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts)); + } + + queryStore = new ClickHouseMetricsQueryStore(jdbc); + } + + @Test + void queryTimeSeries_returnsDataGroupedByMetric() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage", "memory.free"), from, to, 6); + + assertThat(result).containsKeys("cpu.usage", "memory.free"); + assertThat(result.get("cpu.usage")).isNotEmpty(); + assertThat(result.get("memory.free")).isNotEmpty(); + } + + @Test + void queryTimeSeries_bucketsAverageCorrectly() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + // 1 bucket for the entire hour = average of all 6 values + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 1); + + assertThat(result.get("cpu.usage")).hasSize(1); + // Values: 50, 55, 60, 65, 70, 75 → avg = 62.5 + assertThat(result.get("cpu.usage").get(0).value()).isCloseTo(62.5, org.assertj.core.data.Offset.offset(0.1)); + } + + @Test + void queryTimeSeries_noData_returnsEmptyLists() { + Instant from = Instant.parse("2025-01-01T00:00:00Z"); + Instant to = Instant.parse("2025-01-01T01:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("agent-1", List.of("cpu.usage"), from, to, 6); + + assertThat(result.get("cpu.usage")).isEmpty(); + } + + @Test + void queryTimeSeries_unknownAgent_returnsEmpty() { + Instant from = Instant.parse("2026-03-31T10:00:00Z"); + Instant to = Instant.parse("2026-03-31T11:00:00Z"); + + Map> result = + queryStore.queryTimeSeries("nonexistent", List.of("cpu.usage"), from, to, 6); + + assertThat(result.get("cpu.usage")).isEmpty(); + } +} From 21991b6cf81b9199ac4a398a997b837350c8d7a4 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:07:35 +0200 Subject: [PATCH 09/15] feat: wire MetricsStore and MetricsQueryStore with feature flag Co-Authored-By: Claude Sonnet 4.6 --- .../server/app/config/StorageBeanConfig.java | 28 ++++++++++++++++++- .../app/storage/PostgresMetricsStore.java | 2 -- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 44a78555..71b5bf7d 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -1,6 +1,9 @@ package com.cameleer3.server.app.config; +import com.cameleer3.server.app.storage.ClickHouseMetricsQueryStore; +import com.cameleer3.server.app.storage.ClickHouseMetricsStore; import com.cameleer3.server.app.storage.PostgresMetricsQueryStore; +import com.cameleer3.server.app.storage.PostgresMetricsStore; import com.cameleer3.server.core.admin.AuditRepository; import com.cameleer3.server.core.admin.AuditService; import com.cameleer3.server.core.detail.DetailService; @@ -9,7 +12,9 @@ import com.cameleer3.server.core.ingestion.IngestionService; import com.cameleer3.server.core.ingestion.WriteBuffer; import com.cameleer3.server.core.storage.*; import com.cameleer3.server.core.storage.model.MetricsSnapshot; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -45,7 +50,28 @@ public class StorageBeanConfig { } @Bean - public MetricsQueryStore metricsQueryStore(JdbcTemplate jdbc) { + @ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse") + public MetricsStore clickHouseMetricsStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseMetricsStore(clickHouseJdbc); + } + + @Bean + @ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true) + public MetricsStore postgresMetricsStore(JdbcTemplate jdbc) { + return new PostgresMetricsStore(jdbc); + } + + @Bean + @ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "clickhouse") + public MetricsQueryStore clickHouseMetricsQueryStore( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { + return new ClickHouseMetricsQueryStore(clickHouseJdbc); + } + + @Bean + @ConditionalOnProperty(name = "cameleer.storage.metrics", havingValue = "postgres", matchIfMissing = true) + public MetricsQueryStore postgresMetricsQueryStore(JdbcTemplate jdbc) { return new PostgresMetricsQueryStore(jdbc); } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java index 8b8fed63..9d63e638 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresMetricsStore.java @@ -5,12 +5,10 @@ import com.cameleer3.server.core.storage.model.MetricsSnapshot; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Repository; import java.sql.Timestamp; import java.util.List; -@Repository public class PostgresMetricsStore implements MetricsStore { private static final ObjectMapper MAPPER = new ObjectMapper(); From 1b991f99a3a665833b823f1b180802c1ec910687 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:08:42 +0200 Subject: [PATCH 10/15] deploy: add ClickHouse StatefulSet and server env vars Co-Authored-By: Claude Sonnet 4.6 --- deploy/base/server.yaml | 6 ++++ deploy/clickhouse.yaml | 75 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 deploy/clickhouse.yaml diff --git a/deploy/base/server.yaml b/deploy/base/server.yaml index 218c0af4..d0b00b97 100644 --- a/deploy/base/server.yaml +++ b/deploy/base/server.yaml @@ -75,6 +75,12 @@ spec: name: cameleer-auth key: CAMELEER_JWT_SECRET optional: true + - name: CLICKHOUSE_ENABLED + value: "true" + - name: CLICKHOUSE_URL + value: "jdbc:clickhouse://clickhouse.cameleer.svc.cluster.local:8123/cameleer?async_insert=1&wait_for_async_insert=0" + - name: CAMELEER_STORAGE_METRICS + value: "postgres" resources: requests: diff --git a/deploy/clickhouse.yaml b/deploy/clickhouse.yaml new file mode 100644 index 00000000..861feff4 --- /dev/null +++ b/deploy/clickhouse.yaml @@ -0,0 +1,75 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: clickhouse + namespace: cameleer +spec: + serviceName: clickhouse + replicas: 1 + selector: + matchLabels: + app: clickhouse + template: + metadata: + labels: + app: clickhouse + spec: + containers: + - name: clickhouse + image: clickhouse/clickhouse-server:24.12 + ports: + - containerPort: 8123 + name: http + - containerPort: 9000 + name: native + volumeMounts: + - name: data + mountPath: /var/lib/clickhouse + resources: + requests: + memory: "2Gi" + cpu: "500m" + limits: + memory: "4Gi" + cpu: "2000m" + livenessProbe: + httpGet: + path: /ping + port: 8123 + initialDelaySeconds: 10 + periodSeconds: 10 + timeoutSeconds: 3 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /ping + port: 8123 + initialDelaySeconds: 5 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 3 + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 50Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: clickhouse + namespace: cameleer +spec: + clusterIP: None + selector: + app: clickhouse + ports: + - port: 8123 + targetPort: 8123 + name: http + - port: 9000 + targetPort: 9000 + name: native From 7a2a0ee649d1901e69e1ed0970a2b0c0af776edf Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:09:09 +0200 Subject: [PATCH 11/15] test: add ClickHouse testcontainer to integration test base Co-Authored-By: Claude Sonnet 4.6 --- .../com/cameleer3/server/app/AbstractPostgresIT.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java index d9c38f83..cf7d8c38 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java @@ -7,6 +7,7 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.clickhouse.ClickHouseContainer; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.DockerImageName; @@ -20,6 +21,7 @@ public abstract class AbstractPostgresIT { static final PostgreSQLContainer postgres; static final OpensearchContainer opensearch; + static final ClickHouseContainer clickhouse; static { postgres = new PostgreSQLContainer<>(TIMESCALEDB_IMAGE) @@ -30,6 +32,9 @@ public abstract class AbstractPostgresIT { opensearch = new OpensearchContainer<>("opensearchproject/opensearch:2.19.0"); opensearch.start(); + + clickhouse = new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); + clickhouse.start(); } @Autowired @@ -46,5 +51,9 @@ public abstract class AbstractPostgresIT { registry.add("spring.flyway.user", postgres::getUsername); registry.add("spring.flyway.password", postgres::getPassword); registry.add("opensearch.url", opensearch::getHttpHostAddress); + registry.add("clickhouse.enabled", () -> "true"); + registry.add("clickhouse.url", clickhouse::getJdbcUrl); + registry.add("clickhouse.username", clickhouse::getUsername); + registry.add("clickhouse.password", clickhouse::getPassword); } } From 697c689192981be6415c83af3d8966b198607726 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:19:33 +0200 Subject: [PATCH 12/15] fix: rename ClickHouse tests to *IT pattern for CI compatibility Testcontainers tests need Docker which isn't available in CI. Rename to *IT so Surefire skips them (Failsafe runs them with -DskipITs=false). Co-Authored-By: Claude Opus 4.6 (1M context) --- ...csQueryStoreTest.java => ClickHouseMetricsQueryStoreIT.java} | 2 +- ...HouseMetricsStoreTest.java => ClickHouseMetricsStoreIT.java} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/{ClickHouseMetricsQueryStoreTest.java => ClickHouseMetricsQueryStoreIT.java} (99%) rename cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/{ClickHouseMetricsStoreTest.java => ClickHouseMetricsStoreIT.java} (99%) diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreIT.java similarity index 99% rename from cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java rename to cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreIT.java index 08287fd6..54a9846d 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreTest.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreIT.java @@ -16,7 +16,7 @@ import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @Testcontainers -class ClickHouseMetricsQueryStoreTest { +class ClickHouseMetricsQueryStoreIT { @Container static final ClickHouseContainer clickhouse = diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreIT.java similarity index 99% rename from cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java rename to cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreIT.java index 6df85f49..b87022f9 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreTest.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreIT.java @@ -17,7 +17,7 @@ import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @Testcontainers -class ClickHouseMetricsStoreTest { +class ClickHouseMetricsStoreIT { @Container static final ClickHouseContainer clickhouse = From 59dd629b0e96a9b4dc6fd2a093afbffe1833a320 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:31:17 +0200 Subject: [PATCH 13/15] fix: create cameleer database on ClickHouse startup ClickHouse only has the 'default' database out of the box. The JDBC URL connects to 'cameleer', so the database must exist before the server starts. Uses /docker-entrypoint-initdb.d/ init script via ConfigMap. Co-Authored-By: Claude Opus 4.6 (1M context) --- deploy/clickhouse.yaml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/deploy/clickhouse.yaml b/deploy/clickhouse.yaml index 861feff4..f9347556 100644 --- a/deploy/clickhouse.yaml +++ b/deploy/clickhouse.yaml @@ -25,6 +25,8 @@ spec: volumeMounts: - name: data mountPath: /var/lib/clickhouse + - name: initdb + mountPath: /docker-entrypoint-initdb.d resources: requests: memory: "2Gi" @@ -48,6 +50,10 @@ spec: periodSeconds: 5 timeoutSeconds: 3 failureThreshold: 3 + volumes: + - name: initdb + configMap: + name: clickhouse-initdb volumeClaimTemplates: - metadata: name: data @@ -73,3 +79,12 @@ spec: - port: 9000 targetPort: 9000 name: native +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: clickhouse-initdb + namespace: cameleer +data: + 01-create-database.sql: | + CREATE DATABASE IF NOT EXISTS cameleer; From c42e13932bcfe280d873e783a179ac487f610460 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:41:15 +0200 Subject: [PATCH 14/15] ci: deploy ClickHouse StatefulSet in main deploy job The deploy/clickhouse.yaml manifest was created but not referenced in the CI workflow. Add kubectl apply between OpenSearch and Authentik. Co-Authored-By: Claude Opus 4.6 (1M context) --- .gitea/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml index ad105920..0bd33bea 100644 --- a/.gitea/workflows/ci.yml +++ b/.gitea/workflows/ci.yml @@ -228,6 +228,9 @@ jobs: kubectl apply -f deploy/opensearch.yaml kubectl -n cameleer rollout status statefulset/opensearch --timeout=180s + kubectl apply -f deploy/clickhouse.yaml + kubectl -n cameleer rollout status statefulset/clickhouse --timeout=180s + kubectl apply -f deploy/authentik.yaml kubectl -n cameleer rollout status deployment/authentik-server --timeout=180s From aa5fc1b83091040bda6a81f46e515493c47c8bab Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:43:40 +0200 Subject: [PATCH 15/15] ci: retrigger after transient GitHub actions/cache 500 error