Files
cameleer-server/.planning/research/ARCHITECTURE.md
hsiegeln cb3ebfea7c
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 18s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
chore: rename cameleer3 to cameleer
Rename Java packages from com.cameleer3 to com.cameleer, module
directories from cameleer3-* to cameleer-*, and all references
throughout workflows, Dockerfiles, docs, migrations, and pom.xml.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 15:28:42 +02:00

595 lines
24 KiB
Markdown

# Architecture Patterns
**Domain:** Transaction monitoring / observability server for Apache Camel route executions
**Researched:** 2026-03-11
**Confidence:** MEDIUM (based on established observability architecture patterns; no live web verification available)
## Recommended Architecture
### High-Level Overview
The system follows a **write-heavy, read-occasional** observability pattern with three distinct data paths:
```
Agents (50+) Users / UI
| |
v v
[Ingestion Pipeline] [Query Engine]
| |
v |
[Write Buffer / Batcher] |
| |
v v
[ClickHouse] <----- reads ----------+
[Text Index] <----- full-text ------+
^
|
[Diagram Store] (versioned)
[SSE Channel Manager] --push--> Agents
```
### Component Boundaries
| Component | Module | Responsibility | Communicates With |
|-----------|--------|---------------|-------------------|
| **Ingestion Controller** | app | HTTP POST endpoint, request validation, deserialization | Write Buffer |
| **Write Buffer** | core | In-memory batching, backpressure signaling | ClickHouse Writer, Text Indexer |
| **ClickHouse Writer** | core | Batch INSERT into ClickHouse, retry logic | ClickHouse |
| **Text Indexer** | core | Extract searchable text, write to text index | Text index (ClickHouse or external) |
| **Transaction Service** | core | Domain logic: transactions, activities, correlations | Storage interfaces |
| **Query Engine** | core | Combines structured + full-text queries, pagination | ClickHouse, Text index |
| **Agent Registry** | core | Track agent instances, lifecycle (LIVE/STALE/DEAD), heartbeat | SSE Channel Manager |
| **SSE Channel Manager** | core (interface) + app (impl) | Manage SSE connections, push config/commands | Agent Registry |
| **Diagram Service** | core | Version diagrams, link to transactions, trigger rendering | Diagram Store |
| **Diagram Renderer** | core | Server-side rendering of route definitions to visual output | Diagram Service |
| **Auth Service** | core | JWT validation with RBAC (AGENT/VIEWER/OPERATOR/ADMIN), Ed25519 signing, bootstrap token flow, OIDC token exchange | All controllers |
| **User Repository** | core (interface) + app (ClickHouse) | Persist users from local login and OIDC, role management | Auth controllers, admin API |
| **REST Controllers** | app | HTTP endpoints for transactions, agents, diagrams, config | All core services |
| **SSE Controller** | app | SSE endpoint, connection lifecycle | SSE Channel Manager |
| **Config Controller** | app | Config CRUD, push triggers | SSE Channel Manager, Config store |
### Data Flow
#### 1. Transaction Ingestion (Hot Path)
```
Agent POST /api/v1/ingest
|
v
[IngestController] -- validates JWT, deserializes using cameleer-common models
|
v
[IngestionService.accept(batch)] -- accepts TransactionData/ActivityData
|
v
[WriteBuffer] -- in-memory queue (bounded, per-partition)
| signals backpressure via HTTP 429 when full
|
+---(flush trigger: size threshold OR time interval)---+
| |
v v
[ClickHouseWriter.insertBatch()] [TextIndexer.indexBatch()]
| |
v v
ClickHouse (MergeTree tables) ClickHouse full-text index
(or separate text index)
```
#### 2. Transaction Query (Read Path)
```
UI GET /api/v1/transactions?state=ERROR&from=...&to=...&q=free+text
|
v
[TransactionController] -- validates, builds query criteria
|
v
[QueryEngine.search(criteria)] -- combines structured filters + full-text
|
+--- structured filters --> ClickHouse WHERE clauses
+--- full-text query -----> text index lookup (returns transaction IDs)
+--- merge results -------> intersect, sort, paginate
|
v
[Page<TransactionSummary>] -- paginated response with cursor
```
#### 3. Agent SSE Communication
```
Agent GET /api/v1/agents/{id}/events (SSE)
|
v
[SseController] -- authenticates, registers SseEmitter
|
v
[SseChannelManager.register(agentId, emitter)]
|
v
[AgentRegistry.markLive(agentId)]
--- Later, when config changes ---
[ConfigController.update(config)]
|
v
[SseChannelManager.broadcast(configEvent)]
|
v
Each registered SseEmitter sends event to connected agent
```
#### 4. Diagram Versioning
```
Agent POST /api/v1/diagrams (on startup or route change)
|
v
[DiagramController] -- receives route definition (XML/YAML/JSON from cameleer-common)
|
v
[DiagramService.storeVersion(definition)]
|
+--- compute content hash
+--- if hash differs from latest: store new version with timestamp
+--- if identical: skip (idempotent)
|
v
[DiagramStore] -- versioned storage (content-addressable)
--- On transaction query ---
[TransactionService] -- looks up diagram version active at transaction timestamp
|
v
[DiagramService.getVersionAt(routeId, instant)]
|
v
[DiagramRenderer.render(definition)] -- produces SVG/PNG for display
```
## Patterns to Follow
### Pattern 1: Bounded Write Buffer with Backpressure
**What:** In-memory queue between ingestion endpoint and storage writes. Bounded size. When full, return HTTP 429 to agents so they back off and retry.
**When:** Always -- this is the critical buffer between high-throughput ingestion and batch-oriented database writes.
**Why:** ClickHouse performs best with large batch inserts (thousands of rows). Individual inserts per HTTP request would destroy write performance. The buffer decouples ingestion rate from write rate.
**Example:**
```java
public class WriteBuffer<T> {
private final BlockingQueue<T> queue;
private final int batchSize;
private final Duration maxFlushInterval;
private final Consumer<List<T>> flushAction;
public boolean offer(T item) {
// Returns false when queue is full -> caller returns 429
return queue.offer(item);
}
// Scheduled flush: drains up to batchSize items
@Scheduled(fixedDelayString = "${ingestion.flush-interval-ms:1000}")
void flush() {
List<T> batch = new ArrayList<>(batchSize);
queue.drainTo(batch, batchSize);
if (!batch.isEmpty()) {
flushAction.accept(batch);
}
}
}
```
**Implementation detail:** Use `ArrayBlockingQueue` with a capacity that matches your memory budget. At ~2KB per transaction record and 10,000 capacity, that is ~20MB -- well within bounds.
### Pattern 2: Repository Abstraction over ClickHouse
**What:** Define storage interfaces in core module, implement with ClickHouse JDBC in app module. Core never imports ClickHouse driver directly.
**When:** Always -- this is the key module boundary principle.
**Why:** Keeps core testable without a database. Allows swapping storage in tests (in-memory) and theoretically in production. More importantly, it enforces that domain logic does not leak storage concerns.
**Example:**
```java
// In core module
public interface TransactionRepository {
void insertBatch(List<Transaction> transactions);
Page<TransactionSummary> search(TransactionQuery query, PageRequest page);
Optional<Transaction> findById(String transactionId);
}
// In app module
@Repository
public class ClickHouseTransactionRepository implements TransactionRepository {
private final JdbcTemplate jdbc;
// ClickHouse-specific SQL, batch inserts, etc.
}
```
### Pattern 3: SseEmitter Registry with Heartbeat
**What:** Maintain a concurrent map of agent ID to SseEmitter. Send periodic heartbeat events. Remove on timeout, error, or completion.
**When:** For all SSE connections.
**Why:** SSE connections are long-lived. Without heartbeat, you cannot distinguish between a healthy idle connection and a silently dropped one. The registry is the source of truth for which agents are reachable.
**Example:**
```java
public class SseChannelManager {
private final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
public SseEmitter register(String agentId) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // no framework timeout
emitter.onCompletion(() -> remove(agentId));
emitter.onTimeout(() -> remove(agentId));
emitter.onError(e -> remove(agentId));
emitters.put(agentId, emitter);
return emitter;
}
@Scheduled(fixedDelay = 15_000)
void heartbeat() {
emitters.forEach((id, emitter) -> {
try {
emitter.send(SseEmitter.event().name("heartbeat").data(""));
} catch (IOException e) {
remove(id);
}
});
}
public void send(String agentId, String eventName, Object data) {
SseEmitter emitter = emitters.get(agentId);
if (emitter != null) {
emitter.send(SseEmitter.event().name(eventName).data(data));
}
}
}
```
### Pattern 4: Content-Addressable Diagram Versioning
**What:** Hash diagram definitions. Store each unique definition once. Link transactions to the definition hash + a version timestamp.
**When:** For diagram storage.
**Why:** Many transactions reference the same diagram version. Content-addressing deduplicates storage. A separate version table maps (routeId, timestamp) to content hash, enabling "what diagram was active at time T?" queries.
**Schema sketch:**
```sql
-- Diagram definitions (content-addressable)
CREATE TABLE diagram_definitions (
content_hash String, -- SHA-256 of definition
route_id String,
definition String, -- raw XML/YAML/JSON
rendered_svg String, -- pre-rendered SVG (nullable, filled async)
created_at DateTime64(3)
) ENGINE = MergeTree()
ORDER BY (content_hash);
-- Version history (which definition was active when)
CREATE TABLE diagram_versions (
route_id String,
active_from DateTime64(3),
content_hash String
) ENGINE = MergeTree()
ORDER BY (route_id, active_from);
```
### Pattern 5: Cursor-Based Pagination for Time-Series Data
**What:** Use cursor-based pagination (keyset pagination) instead of OFFSET/LIMIT for transaction listing.
**When:** For all list/search endpoints returning time-ordered transaction data.
**Why:** OFFSET-based pagination degrades as offset grows -- ClickHouse must scan and skip rows. Cursor-based pagination using `(timestamp, id) > (last_seen_timestamp, last_seen_id)` gives constant-time page fetches regardless of how deep you paginate.
**Example:**
```java
public record PageCursor(Instant timestamp, String id) {}
// Query: WHERE (timestamp, id) < (:cursorTs, :cursorId) ORDER BY timestamp DESC, id DESC LIMIT :size
```
## Anti-Patterns to Avoid
### Anti-Pattern 1: Individual Row Inserts to ClickHouse
**What:** Inserting one transaction per HTTP request directly to ClickHouse.
**Why bad:** ClickHouse is designed for bulk inserts. Individual inserts create excessive parts in MergeTree tables, causing merge pressure and degraded read performance. At 50+ agents posting concurrently, this would quickly become a bottleneck.
**Instead:** Buffer in memory, flush in batches of 1,000-10,000 rows per insert.
### Anti-Pattern 2: Storing Rendered Diagrams in ClickHouse BLOBs
**What:** Putting SVG/PNG binary data directly in the main ClickHouse tables alongside transaction data.
**Why bad:** ClickHouse is columnar and optimized for analytical queries. Large binary data in columns degrades compression ratios and query performance for all queries touching that table.
**Instead:** Store rendered output in filesystem or object storage. Store only the content hash reference in ClickHouse. Or use a separate ClickHouse table with the rendered content that is rarely queried alongside transaction data.
### Anti-Pattern 3: Blocking SSE Writes on the Request Thread
**What:** Sending SSE events synchronously from the thread handling a config update request.
**Why bad:** If an agent's connection is slow or dead, the config update request blocks. With 50+ agents, this creates cascading latency.
**Instead:** Send SSE events asynchronously. Use a thread pool or virtual threads (Java 21+) to handle SSE writes. Return success to the config updater immediately, handle delivery failures in the background.
### Anti-Pattern 4: Fat Core Module with Spring Dependencies
**What:** Adding Spring annotations (@Service, @Repository, @Autowired) throughout the core module.
**Why bad:** Couples domain logic to Spring. Makes unit testing harder. Violates the purpose of the core/app split.
**Instead:** Core module defines plain Java interfaces and classes. App module wires them with Spring. Core can use `@Scheduled` or similar only if Spring is already a dependency; otherwise, keep scheduling in app.
### Anti-Pattern 5: Unbounded SSE Emitter Timeouts
**What:** Setting SseEmitter timeout to 0 or Long.MAX_VALUE without any heartbeat or cleanup.
**Why bad:** Dead connections accumulate. Memory leaks. Agent registry shows agents as LIVE when they are actually gone.
**Instead:** Use heartbeat (Pattern 3). Track last successful send. Transition agents to STALE after N missed heartbeats, DEAD after M.
## Module Boundary Design
### Core Module (`cameleer-server-core`)
The core module is the domain layer. It contains:
- **Domain models** -- Transaction, Activity, Agent, DiagramVersion, etc. (may extend or complement cameleer-common models)
- **Service interfaces and implementations** -- TransactionService, AgentRegistryService, DiagramService, QueryEngine
- **Repository interfaces** -- TransactionRepository, DiagramRepository, AgentRepository (interfaces only, no implementations)
- **Ingestion logic** -- WriteBuffer, batch assembly, backpressure signaling
- **Text indexing abstraction** -- TextIndexer interface
- **Event/notification abstractions** -- SseChannelManager interface (not the Spring SseEmitter impl)
- **Security abstractions** -- JwtValidator interface, Ed25519Signer/Verifier
- **Query model** -- TransactionQuery, PageCursor, search criteria builders
**No Spring Boot dependencies.** Jackson is acceptable (already present). JUnit for tests.
### App Module (`cameleer-server-app`)
The app module is the infrastructure/adapter layer. It contains:
- **Spring Boot application class**
- **REST controllers** -- IngestController, TransactionController, AgentController, DiagramController, ConfigController, SseController
- **Repository implementations** -- ClickHouseTransactionRepository, etc.
- **SSE implementation** -- SpringSseChannelManager using SseEmitter
- **Security filters** -- JWT filter, bootstrap token filter
- **Configuration** -- application.yml, ClickHouse connection config, scheduler config
- **Diagram rendering implementation** -- if using an external library for SVG generation
- **Static resources** -- UI assets (later phase)
**Depends on core.** Wires everything together with Spring configuration.
### Boundary Rule
```
app --> core (allowed)
core --> app (NEVER)
core --> cameleer-common (allowed)
app --> cameleer-common (transitively via core)
```
## Ingestion Pipeline Detail
### Buffering Strategy
Use a two-stage approach:
1. **Accept stage** -- IngestController deserializes, validates, places into WriteBuffer. Returns 202 Accepted (or 429 if buffer full).
2. **Flush stage** -- Scheduled task drains buffer into batches. Each batch goes to ClickHouseWriter and TextIndexer.
### Backpressure Mechanism
- WriteBuffer has a bounded capacity (configurable, default 50,000 items).
- When buffer is >80% full, respond with HTTP 429 + `Retry-After` header.
- Agents (cameleer) should implement exponential backoff on 429.
- Monitor buffer fill level as a metric.
### Batch Size Tuning
- Target: 5,000-10,000 rows per ClickHouse INSERT.
- Flush interval: 1-2 seconds (configurable).
- Flush triggers: whichever comes first -- batch size reached OR interval elapsed.
## Storage Architecture
### Write Path (ClickHouse)
ClickHouse excels at:
- Columnar compression (10:1 or better for structured transaction data)
- Time-partitioned tables with automatic TTL-based expiry (30-day retention)
- Massive batch INSERT throughput
- Analytical queries over time ranges
**Table design principles:**
- Partition by month: `PARTITION BY toYYYYMM(execution_time)`
- Order by query pattern: `ORDER BY (execution_time, transaction_id)` for time-range scans
- TTL: `TTL execution_time + INTERVAL 30 DAY`
- Use `LowCardinality(String)` for state, agent_id, route_id columns
### Full-Text Search
Two viable approaches:
**Option A: ClickHouse built-in full-text index (recommended for simplicity)**
- ClickHouse supports `tokenbf_v1` and `ngrambf_v1` bloom filter indexes
- Not as powerful as Elasticsearch/Lucene but avoids a separate system
- Good enough for "find transactions containing this string" queries
- Add a `search_text` column that concatenates searchable fields
**Option B: External search index (Elasticsearch/OpenSearch)**
- More powerful: fuzzy matching, relevance scoring, complex text analysis
- Additional infrastructure to manage
- Only justified if full-text search quality is a key differentiator
**Recommendation:** Start with ClickHouse bloom filter indexes. The query pattern described (incident-driven, searching by known strings like correlation IDs or error messages) does not require Lucene-level text analysis. If users need fuzzy/ranked search later, add an external index as a separate phase.
### Read Path
- Structured queries go directly to ClickHouse SQL.
- Full-text queries use the bloom filter index for pre-filtering, then exact match.
- Results are merged at the QueryEngine level.
- Pagination uses cursor-based approach (Pattern 5).
## SSE Connection Management at Scale
### Connection Lifecycle
```
Agent connects --> authenticate JWT --> register SseEmitter --> mark LIVE
|
+-- heartbeat every 15s --> success: stays LIVE
| --> failure: mark STALE, remove emitter
|
+-- agent reconnects --> new SseEmitter replaces old one
|
+-- no reconnect within 5min --> mark DEAD
```
### Scaling Considerations
- 50 agents = 50 concurrent SSE connections. This is trivially handled by a single Spring Boot instance.
- At 500+ agents: consider sticky sessions behind a load balancer, or move to a pub/sub system (Redis Pub/Sub) for cross-instance coordination.
- Spring's SseEmitter uses Servlet async support. Each emitter holds a thread from the Servlet container's async pool, not a request thread.
- With virtual threads (Java 21+), SSE connection overhead becomes negligible even at scale.
### Reconnection Protocol
- Agents should reconnect with `Last-Event-Id` header.
- Server tracks last event ID per agent.
- On reconnect, replay missed events (if any) from a small in-memory or persistent event log.
- For config push: since config is idempotent, replaying the latest config on reconnect is sufficient.
## REST API Organization
### Controller Structure
```
/api/v1/
ingest/ IngestController
POST /transactions -- batch ingest from agents
POST /activities -- batch ingest activities
transactions/ TransactionController
GET / -- search/list with filters
GET /{id} -- single transaction detail
GET /{id}/activities -- activities within a transaction
agents/ AgentController
GET / -- list all agents with status
GET /{id} -- agent detail
GET /{id}/events -- SSE stream (SseController)
POST /register -- bootstrap registration
diagrams/ DiagramController
POST / -- store new diagram version
GET /{routeId} -- latest diagram
GET /{routeId}/at -- diagram at specific timestamp
GET /{routeId}/rendered -- rendered SVG/PNG
config/ ConfigController
GET / -- current config
PUT / -- update config (triggers SSE push)
POST /commands -- send ad-hoc command to agent(s)
```
### Response Conventions
- List endpoints return `Page<T>` with cursor-based pagination.
- All timestamps in ISO-8601 UTC.
- Error responses follow RFC 7807 Problem Details.
- Use `@RestControllerAdvice` for global exception handling.
## Scalability Considerations
| Concern | At 50 agents | At 500 agents | At 5,000 agents |
|---------|-------------|---------------|-----------------|
| **Ingestion throughput** | Single instance, in-memory buffer | Single instance, larger buffer | Multiple instances, partition by agent behind LB |
| **SSE connections** | Single instance, ConcurrentHashMap | Sticky sessions + Redis Pub/Sub for cross-instance events | Dedicated SSE gateway service |
| **ClickHouse writes** | Single writer thread, batch every 1-2s | Multiple writer threads, parallel batches | ClickHouse cluster with sharding |
| **Query latency** | Single ClickHouse node | Read replicas | Distributed ClickHouse cluster |
| **Diagram rendering** | Synchronous on request | Async pre-rendering on store | Worker pool with rendering queue |
## Suggested Build Order
Based on component dependencies:
```
Phase 1: Foundation
Domain models (core)
Repository interfaces (core)
Basic Spring Boot wiring (app)
Phase 2: Ingestion Pipeline
WriteBuffer (core)
ClickHouse schema + connection (app)
ClickHouseWriter (app)
IngestController (app)
--> Can receive and store transactions
Phase 3: Query Engine
TransactionQuery model (core)
QueryEngine (core)
ClickHouse query implementation (app)
TransactionController (app)
--> Can search stored transactions
Phase 4: Agent Registry + SSE
AgentRegistryService (core)
SseChannelManager interface (core) + impl (app)
AgentController + SseController (app)
--> Agents can register and receive push events
Phase 5: Diagram Service
DiagramService (core)
DiagramRepository interface (core) + impl (app)
DiagramRenderer (core/app)
DiagramController (app)
--> Versioned diagrams linked to transactions
Phase 6: Security
JWT validation (core interface, app impl)
Ed25519 config signing (core)
Bootstrap token flow (app)
Security filters (app)
--> All endpoints secured
Phase 7: Full-Text Search
TextIndexer (core interface, app impl)
ClickHouse bloom filter index setup
QueryEngine full-text integration
--> Combined structured + text search
Phase 8: UI
Static resources (app)
Frontend consuming REST API
```
**Ordering rationale:**
- Storage before query (you need data to query)
- Ingestion before agents (agents need an endpoint to POST to)
- Query before full-text (structured search first, text search layers on top)
- Security can be added at any point but is cleanest as a cross-cutting concern after core flows work
- Diagrams are semi-independent but reference transactions, so after query
- UI is last because API-first means the API must be stable
## Sources
- ClickHouse documentation on MergeTree engines, TTL, bloom filter indexes (official docs, verified against training data)
- Spring Boot SseEmitter documentation (Spring Framework reference)
- Observability system architecture patterns from Jaeger, Zipkin, and SigNoz architectures (well-established open-source projects)
- Content-addressable storage patterns from Git internals and Docker image layers
- Cursor-based pagination patterns from Slack API and Stripe API design guides
- Confidence: MEDIUM -- based on established patterns in training data, not live-verified against current documentation