Rename Java packages from net.siegeln.cameleer3 to net.siegeln.cameleer, update all references in workflows, Docker configs, docs, and bootstrap. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
857 lines
33 KiB
Markdown
857 lines
33 KiB
Markdown
# Moat-Strengthening Features — Design Specification
|
|
|
|
**Date:** 2026-03-29
|
|
**Status:** Draft — Awaiting Review
|
|
**Author:** Boardroom simulation (Strategist, Skeptic, Architect, Growth Hacker)
|
|
**Gitea Issues:** cameleer/cameleer #57-#72 (label: MOAT)
|
|
|
|
## Executive Summary
|
|
|
|
Three features designed to convert Cameleer's technical moat (ByteBuddy agent) into a workflow moat (debugger + lineage) and ultimately a network moat (cross-service correlation) before the vibe-coding window closes.
|
|
|
|
| Feature | Ship Target | Moat Type | Agent Changes | Server Changes |
|
|
|---------|-------------|-----------|---------------|----------------|
|
|
| Live Route Debugger | Weeks 8-14 | Workflow | Heavy (DebugSessionManager, breakpoints) | Heavy (WebSocket, session mgmt) |
|
|
| Payload Flow Lineage | Weeks 3-6 | Technical | Light (one capture mode check) | Medium (DiffEngine) |
|
|
| Cross-Service Correlation | Weeks 1-9 | Network effect | Light (header propagation) | Medium (trace assembly, topology) |
|
|
|
|
### Build Order
|
|
|
|
```
|
|
Week 1-3: Foundation + Topology Graph (from existing data, zero agent changes)
|
|
Week 3-6: Payload Flow Lineage (agent + server + UI)
|
|
Week 5-9: Distributed Trace Correlation (agent header + server joins + UI)
|
|
Week 8-14: Live Route Debugger (agent + server + UI)
|
|
```
|
|
|
|
### Gitea Issue Map
|
|
|
|
**Epics:**
|
|
- #57 — Live Route Debugger
|
|
- #58 — Payload Flow Lineage
|
|
- #59 — Cross-Service Trace Correlation + Topology Map
|
|
|
|
**Debugger sub-issues:**
|
|
- #60 — Protocol: Debug session command types (`cameleer-common`)
|
|
- #61 — Agent: DebugSessionManager + breakpoint InterceptStrategy integration
|
|
- #62 — Agent: ExchangeStateSerializer + synthetic direct route wrapper
|
|
- #63 — Server: DebugSessionService + WebSocket + REST API
|
|
- #70 — UI: Debug session frontend components
|
|
|
|
**Lineage sub-issues:**
|
|
- #64 — Protocol: Lineage command types (`cameleer-common`)
|
|
- #65 — Agent: LineageManager + capture mode integration
|
|
- #66 — Server: LineageService + DiffEngine + REST API
|
|
- #71 — UI: Lineage timeline + diff viewer components
|
|
|
|
**Correlation sub-issues:**
|
|
- #67 — Agent: Enhanced trace context header propagation
|
|
- #68 — Server: CorrelationService — distributed trace assembly
|
|
- #69 — Server: DependencyGraphService + service topology materialized view
|
|
- #72 — UI: Distributed trace view + service topology graph
|
|
|
|
---
|
|
|
|
## 1. Live Route Debugger
|
|
|
|
### 1.1 Concept
|
|
|
|
Extend the existing `replay` command with a debug-session wrapper. Users provide an exchange (from a prior failed execution or manually constructed) and replay it through a route with breakpoints. Only the replayed exchange's thread blocks at breakpoints — production traffic flows normally.
|
|
|
|
**User Story:** A developer sees a failed exchange. They click "Debug This Exchange." Cameleer pre-fills the body/headers. They set breakpoints, click "Start Debug Session." The exchange replays through the route, pausing at each breakpoint. They inspect state, modify the body, step forward. Total: 3 minutes. Without Cameleer: 45 minutes.
|
|
|
|
### 1.2 Architecture
|
|
|
|
```
|
|
Browser (SaaS UI)
|
|
|
|
|
v
|
|
WebSocket <--------------------------------------+
|
|
| |
|
|
v |
|
|
cameleer-server |
|
|
| POST /api/v1/debug/sessions |
|
|
| POST /api/v1/debug/sessions/{id}/step |
|
|
| POST /api/v1/debug/sessions/{id}/resume |
|
|
| DELETE /api/v1/debug/sessions/{id} |
|
|
| |
|
|
v |
|
|
SSE Command Channel --> cameleer agent |
|
|
| | |
|
|
| "start-debug" | |
|
|
| command v |
|
|
| DebugSessionManager |
|
|
| | |
|
|
| Replay exchange via |
|
|
| ProducerTemplate |
|
|
| | |
|
|
| InterceptStrategy checks |
|
|
| breakpoints before each |
|
|
| processor |
|
|
| | |
|
|
| On breakpoint hit: |
|
|
| > LockSupport.park() |
|
|
| > Serialize exchange state |
|
|
| > POST state to server -------+
|
|
| | (server pushes to
|
|
| Wait for resume/step/skip browser via WS)
|
|
| command via SSE
|
|
| |
|
|
| On resume: LockSupport.unpark()
|
|
| Continue to next processor
|
|
```
|
|
|
|
### 1.3 Protocol Additions (cameleer-common)
|
|
|
|
#### New SSE Commands
|
|
|
|
| Command | Direction | Purpose |
|
|
|---------|-----------|---------|
|
|
| `START_DEBUG` | Server -> Agent | Create session, spawn thread, replay exchange with breakpoints |
|
|
| `DEBUG_RESUME` | Server -> Agent | Unpark thread, continue to next breakpoint |
|
|
| `DEBUG_STEP` | Server -> Agent | Unpark thread, break at next processor (STEP_OVER/STEP_INTO) |
|
|
| `DEBUG_SKIP` | Server -> Agent | Skip current processor, continue |
|
|
| `DEBUG_MODIFY` | Server -> Agent | Apply body/header changes at current breakpoint before resuming |
|
|
| `DEBUG_ABORT` | Server -> Agent | Abort session, release thread |
|
|
|
|
#### StartDebugPayload
|
|
|
|
```json
|
|
{
|
|
"sessionId": "dbg-a1b2c3",
|
|
"routeId": "route-orders",
|
|
"exchange": {
|
|
"body": "{\"orderId\": 42, \"amount\": 150.00}",
|
|
"headers": { "Content-Type": "application/json" }
|
|
},
|
|
"breakpoints": [
|
|
{ "processorId": "choice1", "condition": null },
|
|
{ "processorId": "to5", "condition": "${body.amount} > 100" }
|
|
],
|
|
"mode": "STEP_OVER",
|
|
"timeoutSeconds": 300,
|
|
"originalExchangeId": "ID-failed-789",
|
|
"replayToken": "...",
|
|
"nonce": "..."
|
|
}
|
|
```
|
|
|
|
#### BreakpointHitReport (Agent -> Server)
|
|
|
|
```json
|
|
{
|
|
"sessionId": "dbg-a1b2c3",
|
|
"processorId": "to5",
|
|
"processorType": "TO",
|
|
"endpointUri": "http://payment-service/charge",
|
|
"depth": 2,
|
|
"stepIndex": 4,
|
|
"exchangeState": {
|
|
"body": "{\"orderId\": 42, \"amount\": 150.00, \"validated\": true}",
|
|
"headers": { "..." },
|
|
"properties": { "CamelSplitIndex": 0 },
|
|
"exception": null,
|
|
"bodyType": "java.util.LinkedHashMap"
|
|
},
|
|
"executionTree": ["...partial tree up to this point..."],
|
|
"parentProcessorId": "split1",
|
|
"routeId": "route-orders",
|
|
"timestamp": "2026-03-29T14:22:05.123Z"
|
|
}
|
|
```
|
|
|
|
### 1.4 Agent Implementation (cameleer-agent)
|
|
|
|
#### DebugSessionManager
|
|
|
|
- Location: `com.cameleer.agent.debug.DebugSessionManager`
|
|
- Stores active sessions: `ConcurrentHashMap<sessionId, DebugSession>`
|
|
- Enforces max concurrent sessions (default 3, configurable via `cameleer.debug.maxSessions`)
|
|
- Allocates **dedicated Thread** per session (NOT from Camel thread pool)
|
|
- Timeout watchdog: `ScheduledExecutorService` auto-aborts expired sessions
|
|
- Handles all `DEBUG_*` commands via `DefaultCommandHandler` delegation
|
|
|
|
#### DebugSession
|
|
|
|
- Stores breakpoint definitions, current step mode, parked thread reference
|
|
- `shouldBreak(processorId, Exchange)`: evaluates processor match + Simple condition + step mode
|
|
- `reportBreakpointHit()`: serializes state, POSTs to server, calls `LockSupport.park()`
|
|
- `applyModifications(Exchange)`: sets body/headers from `DEBUG_MODIFY` command
|
|
|
|
#### InterceptStrategy Integration
|
|
|
|
In `CameleerInterceptStrategy.DelegateAsyncProcessor.process()`:
|
|
|
|
```java
|
|
DebugSession session = debugSessionManager.getSession(exchange);
|
|
if (session != null && session.shouldBreak(processorId, exchange)) {
|
|
ExchangeState state = ExchangeStateSerializer.capture(exchange);
|
|
List<ProcessorExecution> tree = executionCollector.getPartialTree(exchange);
|
|
session.reportBreakpointHit(processorId, state, tree);
|
|
// Thread parked until server sends resume/step/skip/abort
|
|
|
|
if (session.isAborted()) throw new DebugSessionAbortedException();
|
|
if (session.shouldSkip()) { callback.done(true); return true; }
|
|
if (session.hasModifications()) session.applyModifications(exchange);
|
|
}
|
|
```
|
|
|
|
**Zero production overhead:** Debug exchanges carry `CameleerDebugSessionId` exchange property. `getSession()` checks this property — single null-check. Production exchanges have no property, check returns null, no further work.
|
|
|
|
#### ExchangeStateSerializer
|
|
|
|
- TypeConverter chain: String -> byte[] as Base64 -> class name fallback
|
|
- Stream bodies: wrap in `CachedOutputStream` (same pattern as Camel's stream caching)
|
|
- Sensitive header redaction (reuses `PayloadCapture` redaction logic)
|
|
- Size limit: `cameleer.debug.maxBodySize` (default 64KB)
|
|
|
|
#### Synthetic Direct Route Wrapper
|
|
|
|
For non-direct routes (timer, jms, http, file):
|
|
1. Extract route's processor chain from `CamelContext`
|
|
2. Create temporary `direct:__debug_{routeId}` route with same processors (shared by reference)
|
|
3. Debug exchange enters via `ProducerTemplate.send()`
|
|
4. Remove temporary route on session completion
|
|
|
|
### 1.5 Server Implementation (cameleer-server)
|
|
|
|
#### REST Endpoints
|
|
|
|
| Method | Path | Role | Purpose |
|
|
|--------|------|------|---------|
|
|
| POST | `/api/v1/debug/sessions` | OPERATOR | Create debug session |
|
|
| GET | `/api/v1/debug/sessions/{id}` | VIEWER | Get session state |
|
|
| POST | `/api/v1/debug/sessions/{id}/step` | OPERATOR | Step over/into |
|
|
| POST | `/api/v1/debug/sessions/{id}/resume` | OPERATOR | Resume to next breakpoint |
|
|
| POST | `/api/v1/debug/sessions/{id}/skip` | OPERATOR | Skip current processor |
|
|
| POST | `/api/v1/debug/sessions/{id}/modify` | OPERATOR | Modify exchange at breakpoint |
|
|
| DELETE | `/api/v1/debug/sessions/{id}` | OPERATOR | Abort session |
|
|
| POST | `/api/v1/debug/sessions/{id}/breakpoint-hit` | AGENT | Agent reports breakpoint |
|
|
| GET | `/api/v1/debug/sessions/{id}/compare` | VIEWER | Compare debug vs original |
|
|
|
|
#### WebSocket Channel
|
|
|
|
```
|
|
Endpoint: WS /api/v1/debug/ws?token={jwt}
|
|
|
|
Server -> Browser events:
|
|
{ "type": "breakpoint-hit", "sessionId": "...", "data": { ...state... } }
|
|
{ "type": "session-completed", "sessionId": "...", "execution": { ... } }
|
|
{ "type": "session-error", "sessionId": "...", "error": "Agent disconnected" }
|
|
{ "type": "session-timeout", "sessionId": "..." }
|
|
```
|
|
|
|
#### Data Model
|
|
|
|
```sql
|
|
CREATE TABLE debug_sessions (
|
|
session_id TEXT PRIMARY KEY,
|
|
agent_id TEXT NOT NULL,
|
|
route_id TEXT NOT NULL,
|
|
original_exchange TEXT,
|
|
status TEXT NOT NULL DEFAULT 'PENDING',
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
completed_at TIMESTAMPTZ,
|
|
breakpoints JSONB,
|
|
current_state JSONB,
|
|
step_count INT DEFAULT 0,
|
|
replay_exchange TEXT,
|
|
created_by TEXT NOT NULL
|
|
);
|
|
```
|
|
|
|
#### DebugSessionService
|
|
|
|
Lifecycle: PENDING -> ACTIVE -> PAUSED -> COMPLETED/ABORTED/TIMEOUT
|
|
|
|
1. Generate sessionId + nonce + replay token
|
|
2. Send `START_DEBUG` via existing SSE channel
|
|
3. Receive breakpoint-hit POSTs, store state, push to WebSocket
|
|
4. Translate browser actions (step/resume/skip/modify) into SSE commands
|
|
5. Detect agent SSE disconnect via `SseConnectionManager` callback
|
|
6. Store completed execution in normal pipeline (tagged with `debugSessionId`)
|
|
|
|
### 1.6 SaaS Layer (cameleer-saas)
|
|
|
|
- Tenant isolation: debug sessions scoped to tenant's agents
|
|
- Concurrent session limits per tier (free: 1, pro: 5, enterprise: unlimited)
|
|
- Usage metering: session creation counted as billable event
|
|
|
|
### 1.7 UI Components
|
|
|
|
- **DebugLauncher.tsx** — "Debug This Exchange" button on failed execution detail, pre-fills exchange data
|
|
- **DebugSession.tsx** — Main view: route diagram with status coloring (green/yellow/gray), exchange state panel, step controls (F10/F11/F5/F6 keyboard shortcuts)
|
|
- **DebugCompare.tsx** — Side-by-side: original execution vs debug replay with diff highlighting
|
|
- **BreakpointEditor.tsx** — Click processor nodes to toggle breakpoints, conditional expression input
|
|
|
|
### 1.8 Safety Mechanisms
|
|
|
|
| Concern | Mitigation |
|
|
|---------|------------|
|
|
| Thread leak | Session timeout auto-aborts (default 5 min) |
|
|
| Memory leak | Exchange state captured on-demand, not buffered |
|
|
| Agent restart | Server detects SSE disconnect, notifies browser |
|
|
| High-throughput route | Only debug exchange hits breakpoints (property check) |
|
|
| Concurrent sessions | Hard limit (default 3), FAILURE ack if exceeded |
|
|
| Non-direct routes | Synthetic `direct:__debug_*` wrapper with same processor chain |
|
|
|
|
---
|
|
|
|
## 2. Payload Flow Lineage
|
|
|
|
### 2.1 Concept
|
|
|
|
Capture the full transformation history of a message flowing through a route. At each processor, snapshot body before and after. Server computes structural diffs. UI renders a visual "data flow" timeline showing exactly where and how data transforms.
|
|
|
|
**User Story:** A developer has an exchange where `customerName` is null. They click "Trace Payload Flow." Vertical timeline: at each processor, before/after body with structural diff. Processor 7 (`enrich1`) returned a response missing the `name` field. Root cause in 30 seconds.
|
|
|
|
### 2.2 Architecture
|
|
|
|
```
|
|
cameleer agent
|
|
|
|
|
| On lineage-enabled exchange:
|
|
| Before processor: capture INPUT
|
|
| After processor: capture OUTPUT
|
|
| Attach to ProcessorExecution as inputBody/outputBody
|
|
|
|
|
v
|
|
POST /api/v1/data/executions (processors carry full snapshots)
|
|
|
|
|
v
|
|
cameleer-server
|
|
|
|
|
| LineageService:
|
|
| > Flatten processor tree to ordered list
|
|
| > Compute diffs between processor[n].output and processor[n+1].input
|
|
| > Classify transformation type
|
|
| > Generate human-readable summary
|
|
|
|
|
v
|
|
GET /api/v1/executions/{id}/lineage
|
|
|
|
|
v
|
|
Browser: LineageTimeline + DiffViewer
|
|
```
|
|
|
|
### 2.3 Protocol Additions (cameleer-common)
|
|
|
|
#### New SSE Commands
|
|
|
|
| Command | Direction | Purpose |
|
|
|---------|-----------|---------|
|
|
| `ENABLE_LINEAGE` | Server -> Agent | Activate targeted payload capture |
|
|
| `DISABLE_LINEAGE` | Server -> Agent | Deactivate lineage capture |
|
|
|
|
#### EnableLineagePayload
|
|
|
|
```json
|
|
{
|
|
"lineageId": "lin-x1y2z3",
|
|
"scope": {
|
|
"type": "ROUTE",
|
|
"routeId": "route-orders"
|
|
},
|
|
"predicate": "${header.orderId} == 'ORD-500'",
|
|
"predicateLanguage": "simple",
|
|
"maxCaptures": 10,
|
|
"duration": "PT10M",
|
|
"captureHeaders": true,
|
|
"captureProperties": false
|
|
}
|
|
```
|
|
|
|
#### Scope Types
|
|
|
|
| Scope | Meaning |
|
|
|-------|---------|
|
|
| `ROUTE` | All exchanges on a specific route |
|
|
| `CORRELATION` | All exchanges with a specific correlationId |
|
|
| `EXPRESSION` | Any exchange matching a Simple/JsonPath predicate |
|
|
| `NEXT_N` | Next N exchanges on the route (countdown) |
|
|
|
|
### 2.4 Agent Implementation (cameleer-agent)
|
|
|
|
#### LineageManager
|
|
|
|
- Location: `com.cameleer.agent.lineage.LineageManager`
|
|
- Stores active configs: `ConcurrentHashMap<lineageId, LineageConfig>`
|
|
- Tracks capture count per lineageId: auto-disables at `maxCaptures`
|
|
- Duration timeout via `ScheduledExecutorService`: auto-disables after expiry
|
|
- `shouldCaptureLineage(Exchange)`: evaluates scope + predicate, sets `CameleerLineageActive` property
|
|
- `isLineageActive(Exchange)`: single null-check on exchange property (HOT PATH, O(1))
|
|
|
|
#### Integration Points (Minimal Agent Changes)
|
|
|
|
**1. CameleerEventNotifier.onExchangeCreated():**
|
|
```java
|
|
lineageManager.shouldCaptureLineage(exchange);
|
|
// Sets CameleerLineageActive property if matching
|
|
```
|
|
|
|
**2. ExecutionCollector.resolveProcessorCaptureMode():**
|
|
```java
|
|
if (lineageManager.isLineageActive(exchange)) {
|
|
return PayloadCaptureMode.BOTH;
|
|
}
|
|
```
|
|
|
|
**3. PayloadCapture body size:**
|
|
```java
|
|
int maxSize = lineageManager.isLineageActive(exchange)
|
|
? config.getLineageMaxBodySize() // 64KB
|
|
: config.getMaxBodySize(); // 4KB
|
|
```
|
|
|
|
**Production overhead when lineage is disabled: effectively zero.** The `isLineageActive()` check is a single null-check on an exchange property that doesn't exist on non-lineage exchanges.
|
|
|
|
#### Configuration
|
|
|
|
```properties
|
|
cameleer.lineage.maxBodySize=65536 # 64KB for lineage captures (vs 4KB normal)
|
|
cameleer.lineage.enabled=true # master switch
|
|
```
|
|
|
|
### 2.5 Server Implementation (cameleer-server)
|
|
|
|
#### LineageService
|
|
|
|
- `getLineage(executionId)`: fetch execution, flatten tree to ordered processor list, compute diffs
|
|
- `enableLineage(request)`: send `ENABLE_LINEAGE` to target agents
|
|
- `disableLineage(lineageId)`: send `DISABLE_LINEAGE`
|
|
- `getActiveLineages()`: list active configs across all agents
|
|
|
|
#### DiffEngine
|
|
|
|
Format-aware diff computation:
|
|
|
|
| Format | Detection | Library | Output |
|
|
|--------|-----------|---------|--------|
|
|
| JSON | Jackson parse success | zjsonpatch (RFC 6902) or custom tree walk | FIELD_ADDED, FIELD_REMOVED, FIELD_MODIFIED with JSON path |
|
|
| XML | DOM parse success | xmlunit-core | ELEMENT_ADDED, ELEMENT_REMOVED, ATTRIBUTE_CHANGED |
|
|
| Text | Fallback | java-diff-utils (Myers) | LINE_ADDED, LINE_REMOVED, LINE_CHANGED |
|
|
| Binary | Type detection | N/A | Size comparison only |
|
|
|
|
#### Transformation Classification
|
|
|
|
```
|
|
UNCHANGED — No diff
|
|
MUTATION — Existing fields modified, same format
|
|
ENRICHMENT — Fields only added (e.g., enrich processor)
|
|
REDUCTION — Fields only removed
|
|
FORMAT_CHANGED — Content type changed (XML -> JSON)
|
|
TYPE_CHANGED — Java type changed but content equivalent
|
|
MIXED — Combination of additions, removals, modifications
|
|
```
|
|
|
|
#### Summary Generation
|
|
|
|
Auto-generated human-readable summaries:
|
|
- `"XML -> JSON conversion"` (FORMAT_CHANGED)
|
|
- `"Added customer object from external API"` (ENRICHMENT + field names)
|
|
- `"Modified amount field: 150.00 -> 135.00"` (MUTATION + values)
|
|
|
|
#### Lineage Response Schema
|
|
|
|
```json
|
|
{
|
|
"executionId": "exec-123",
|
|
"routeId": "route-orders",
|
|
"processors": [
|
|
{
|
|
"processorId": "unmarshal1",
|
|
"processorType": "UNMARSHAL",
|
|
"input": {
|
|
"body": "<order><id>42</id></order>",
|
|
"bodyType": "java.lang.String",
|
|
"contentType": "application/xml"
|
|
},
|
|
"output": {
|
|
"body": "{\"id\": 42}",
|
|
"bodyType": "java.util.LinkedHashMap",
|
|
"contentType": "application/json"
|
|
},
|
|
"diff": {
|
|
"transformationType": "FORMAT_CHANGED",
|
|
"summary": "XML -> JSON conversion",
|
|
"bodyChanged": true,
|
|
"headersChanged": true,
|
|
"changes": [
|
|
{ "type": "FORMAT_CHANGED", "from": "XML", "to": "JSON" }
|
|
]
|
|
},
|
|
"durationMs": 12,
|
|
"status": "COMPLETED"
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
#### REST Endpoints
|
|
|
|
| Method | Path | Role | Purpose |
|
|
|--------|------|------|---------|
|
|
| GET | `/api/v1/executions/{id}/lineage` | VIEWER | Full lineage with diffs |
|
|
| POST | `/api/v1/lineage/enable` | OPERATOR | Enable lineage on agents |
|
|
| DELETE | `/api/v1/lineage/{lineageId}` | OPERATOR | Disable lineage |
|
|
| GET | `/api/v1/lineage/active` | VIEWER | List active lineage configs |
|
|
|
|
### 2.6 SaaS Layer (cameleer-saas)
|
|
|
|
- Lineage captures counted as premium events (higher billing weight)
|
|
- Active lineage config limits per tier
|
|
- Post-hoc lineage from COMPLETE engine level available on all tiers (resource-intensive fallback)
|
|
- Targeted lineage-on-demand is a paid-tier feature (upgrade driver)
|
|
|
|
### 2.7 UI Components
|
|
|
|
- **LineageTimeline.tsx** — Vertical processor list, color-coded by transformation type (green/yellow/blue/red/purple), expandable diffs, auto-generated summaries
|
|
- **LineageDiffViewer.tsx** — Side-by-side or unified diff, format-aware (JSON tree-diff, XML element-diff, text line-diff, binary hex)
|
|
- **LineageEnableDialog.tsx** — "Trace Payload Flow" button, scope/predicate builder, max captures slider
|
|
- **LineageSummaryStrip.tsx** — Compact horizontal strip on execution detail page, transformation icons per processor
|
|
|
|
---
|
|
|
|
## 3. Cross-Service Trace Correlation + Topology Map
|
|
|
|
### 3.1 Concept
|
|
|
|
Stitch executions across services into unified distributed traces. Build a service dependency topology graph automatically from observed traffic. Design the protocol for future cross-tenant federation.
|
|
|
|
**User Story:** Platform team with 8 Camel microservices. Order stuck in "processing." Engineer searches by orderId, sees distributed trace: horizontal timeline across all services, each expandable to route detail. Service C (pricing) timed out. Root cause across 4 boundaries in 60 seconds.
|
|
|
|
### 3.2 Phase 1: Intra-Tenant Trace Correlation
|
|
|
|
#### Enhanced Trace Context Header
|
|
|
|
```
|
|
Current (exists):
|
|
X-Cameleer-CorrelationId: corr-abc-123
|
|
|
|
New (added):
|
|
X-Cameleer-TraceContext: {
|
|
"traceId": "trc-xyz",
|
|
"parentSpanId": "span-001",
|
|
"hopIndex": 2,
|
|
"sourceApp": "order-service",
|
|
"sourceRoute": "route-validate"
|
|
}
|
|
```
|
|
|
|
#### Transport-Specific Propagation
|
|
|
|
| Transport | Detection | Mechanism |
|
|
|-----------|-----------|-----------|
|
|
| HTTP/REST | URI prefix `http:`, `https:`, `rest:` | HTTP header `X-Cameleer-TraceContext` |
|
|
| JMS | URI prefix `jms:`, `activemq:`, `amqp:` | JMS property `CameleerTraceContext` |
|
|
| Kafka | URI prefix `kafka:` | Kafka header `cameleer-trace-context` |
|
|
| Direct/SEDA | URI prefix `direct:`, `seda:`, `vm:` | Exchange property (in-process) |
|
|
| File/FTP | URI prefix `file:`, `ftp:` | Not propagated (async) |
|
|
|
|
### 3.3 Agent Implementation (cameleer-agent)
|
|
|
|
#### Outgoing Propagation (InterceptStrategy)
|
|
|
|
Before delegating to TO/ENRICH/WIRE_TAP processors:
|
|
|
|
```java
|
|
if (isOutgoingEndpoint(processorType, endpointUri)) {
|
|
TraceContext ctx = new TraceContext(
|
|
executionCollector.getTraceId(exchange),
|
|
currentProcessorExecution.getId(),
|
|
executionCollector.getHopIndex(exchange) + 1,
|
|
config.getApplicationName(),
|
|
exchange.getFromRouteId()
|
|
);
|
|
injectTraceContext(exchange, endpointUri, ctx);
|
|
}
|
|
```
|
|
|
|
#### Incoming Extraction (CameleerEventNotifier)
|
|
|
|
In `onExchangeCreated()`:
|
|
|
|
```java
|
|
String traceCtxJson = extractTraceContext(exchange);
|
|
if (traceCtxJson != null) {
|
|
TraceContext ctx = objectMapper.readValue(traceCtxJson, TraceContext.class);
|
|
exchange.setProperty("CameleerParentSpanId", ctx.parentSpanId);
|
|
exchange.setProperty("CameleerSourceApp", ctx.sourceApp);
|
|
exchange.setProperty("CameleerSourceRoute", ctx.sourceRoute);
|
|
exchange.setProperty("CameleerHopIndex", ctx.hopIndex);
|
|
}
|
|
```
|
|
|
|
#### New RouteExecution Fields
|
|
|
|
```java
|
|
execution.setParentSpanId(...); // processor execution ID from calling service
|
|
execution.setSourceApp(...); // application name of caller
|
|
execution.setSourceRoute(...); // routeId of caller
|
|
execution.setHopIndex(...); // depth in distributed trace
|
|
```
|
|
|
|
#### Safety
|
|
|
|
- Header size always <256 bytes
|
|
- Parse failure: log warning, continue without context (no exchange failure)
|
|
- Only inject on outgoing processors, never on FROM consumers
|
|
|
|
### 3.4 Server Implementation: Trace Assembly (cameleer-server)
|
|
|
|
#### CorrelationService
|
|
|
|
```
|
|
buildDistributedTrace(correlationId):
|
|
1. SELECT * FROM executions WHERE correlation_id = ? ORDER BY start_time
|
|
2. Index by executionId for O(1) lookup
|
|
3. Build tree: roots = executions where parentSpanId IS NULL
|
|
For each with parentSpanId: find parent, attach as child hop
|
|
4. Compute gaps: child.startTime - parent.processor.startTime = network latency
|
|
If gap < 0: flag clock skew warning
|
|
5. Aggregate: totalDuration, serviceCount, hopCount, status
|
|
```
|
|
|
|
#### Distributed Trace Response
|
|
|
|
```json
|
|
{
|
|
"traceId": "trc-xyz",
|
|
"correlationId": "corr-abc-123",
|
|
"totalDurationMs": 1250,
|
|
"hopCount": 4,
|
|
"serviceCount": 3,
|
|
"status": "FAILED",
|
|
"entryPoint": {
|
|
"application": "api-gateway",
|
|
"routeId": "route-incoming-orders",
|
|
"executionId": "exec-001",
|
|
"durationMs": 1250,
|
|
"children": [
|
|
{
|
|
"calledFrom": {
|
|
"processorId": "to3",
|
|
"processorType": "TO",
|
|
"endpointUri": "http://order-service/validate"
|
|
},
|
|
"application": "order-service",
|
|
"routeId": "route-validate",
|
|
"executionId": "exec-002",
|
|
"durationMs": 350,
|
|
"networkLatencyMs": 12,
|
|
"children": []
|
|
}
|
|
]
|
|
}
|
|
}
|
|
```
|
|
|
|
#### Data Model Changes
|
|
|
|
```sql
|
|
ALTER TABLE executions ADD COLUMN parent_span_id TEXT;
|
|
ALTER TABLE executions ADD COLUMN source_app TEXT;
|
|
ALTER TABLE executions ADD COLUMN source_route TEXT;
|
|
ALTER TABLE executions ADD COLUMN hop_index INT;
|
|
|
|
CREATE INDEX idx_executions_parent_span
|
|
ON executions(parent_span_id) WHERE parent_span_id IS NOT NULL;
|
|
```
|
|
|
|
#### Edge Cases
|
|
|
|
- **Missing hops:** uninstrumented service shown as "unknown" node
|
|
- **Clock skew:** flagged as warning, still rendered
|
|
- **Fan-out:** parallel multicast creates multiple children from same processor
|
|
- **Circular calls:** detected via hopIndex (max depth 20)
|
|
|
|
### 3.5 Server Implementation: Topology Graph (cameleer-server)
|
|
|
|
#### DependencyGraphService
|
|
|
|
Builds service dependency graph from existing execution data — **zero additional agent overhead**.
|
|
|
|
Data source: `processor_executions` where `processor_type IN (TO, TO_DYNAMIC, EIP_ENRICH, EIP_POLL_ENRICH, EIP_WIRE_TAP)` and `resolved_endpoint_uri IS NOT NULL`.
|
|
|
|
#### Endpoint-to-Service Resolution
|
|
|
|
1. Direct/SEDA match: `direct:processOrder` -> route's applicationName
|
|
2. Agent registration match: URI base URL matches registered agent
|
|
3. Kubernetes hostname: extract hostname from URI -> applicationName
|
|
4. Manual mapping: admin-configured regex/glob patterns
|
|
5. Unresolved: `external:{hostname}` node
|
|
|
|
#### Materialized View
|
|
|
|
```sql
|
|
CREATE MATERIALIZED VIEW service_dependencies AS
|
|
SELECT
|
|
e.application_name AS source_app,
|
|
pe.resolved_endpoint_uri AS target_uri,
|
|
COUNT(*) AS call_count,
|
|
AVG(pe.duration_ms) AS avg_latency_ms,
|
|
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY pe.duration_ms) AS p99_latency_ms,
|
|
SUM(CASE WHEN pe.status = 'FAILED' THEN 1 ELSE 0 END)::FLOAT
|
|
/ NULLIF(COUNT(*), 0) AS error_rate,
|
|
MAX(pe.start_time) AS last_seen,
|
|
MIN(pe.start_time) AS first_seen
|
|
FROM executions e
|
|
JOIN processor_executions pe
|
|
ON e.execution_id = pe.execution_id
|
|
AND e.start_time = pe.start_time
|
|
WHERE pe.processor_type IN ('TO','TO_DYNAMIC','EIP_ENRICH','EIP_POLL_ENRICH','EIP_WIRE_TAP')
|
|
AND pe.resolved_endpoint_uri IS NOT NULL
|
|
AND e.start_time > NOW() - INTERVAL '24 hours'
|
|
GROUP BY e.application_name, pe.resolved_endpoint_uri;
|
|
|
|
-- Refresh every 5 minutes
|
|
```
|
|
|
|
#### REST Endpoints
|
|
|
|
| Method | Path | Role | Purpose |
|
|
|--------|------|------|---------|
|
|
| GET | `/api/v1/traces/{correlationId}` | VIEWER | Assembled distributed trace |
|
|
| GET | `/api/v1/traces/{correlationId}/timeline` | VIEWER | Flat timeline for Gantt |
|
|
| GET | `/api/v1/topology/dependencies` | VIEWER | Service dependency graph |
|
|
| GET | `/api/v1/topology/diff` | VIEWER | Topology changes between windows |
|
|
| GET | `/api/v1/topology/dependencies/{source}/{target}` | VIEWER | Dependency detail |
|
|
|
|
### 3.6 Phase 2: Cross-Tenant Federation (Design Only)
|
|
|
|
Reserve `sourceTenantHash` in TraceContext for future use:
|
|
|
|
```json
|
|
{
|
|
"traceId": "trc-xyz",
|
|
"parentSpanId": "span-001",
|
|
"hopIndex": 2,
|
|
"sourceApp": "order-service",
|
|
"sourceRoute": "route-validate",
|
|
"sourceTenantHash": null
|
|
}
|
|
```
|
|
|
|
**Consent model (v2):**
|
|
- Both tenants opt-in to "Federation" in SaaS settings
|
|
- Shared: trace structure (timing, status, service names)
|
|
- NOT shared: payload content, headers, internal route details
|
|
- Either tenant can revoke at any time
|
|
|
|
### 3.7 SaaS Layer (cameleer-saas)
|
|
|
|
- All trace correlation intra-tenant in v1
|
|
- Topology graph scoped to tenant's applications
|
|
- External dependencies shown as opaque nodes
|
|
- Cross-tenant federation as enterprise-tier feature (v2)
|
|
|
|
### 3.8 UI Components
|
|
|
|
- **DistributedTraceView.tsx** — Horizontal Gantt timeline, rows=services, bars=executions, arrows=call flow, click-to-expand to route detail
|
|
- **ServiceTopologyGraph.tsx** — Force-directed graph, nodes sized by throughput, edges colored by error rate, animated traffic pulse, click drill-down
|
|
- **TopologyDiff.tsx** — "What changed?" view, new/removed dependencies highlighted, latency/error changes annotated
|
|
- **TraceSearchEnhanced.tsx** — Search by correlationId/traceId/business attributes, results show trace summaries with service count and hop count
|
|
|
|
---
|
|
|
|
## 4. Cross-Feature Integration Points
|
|
|
|
| From -> To | Integration |
|
|
|------------|-------------|
|
|
| Correlation -> Debugger | "Debug This Hop": from distributed trace, click a service hop to replay and debug |
|
|
| Correlation -> Lineage | "Trace Payload Across Services": enable lineage on a correlationId, see transforms across boundaries |
|
|
| Lineage -> Debugger | "Debug From Diff": unexpected processor output -> one-click launch debug with breakpoint on that processor |
|
|
| Debugger -> Lineage | Debug sessions auto-capture full lineage (all processors at BOTH mode) |
|
|
| Topology -> Correlation | Click dependency edge -> show recent traces between those services |
|
|
| Topology -> Lineage | "How does data transform?" -> aggregated lineage summary for a dependency edge |
|
|
|
|
---
|
|
|
|
## 5. Competitive Analysis
|
|
|
|
### What an LLM + Junior Dev Can Replicate
|
|
|
|
| Capability | Replicable? | Time | Barrier |
|
|
|------------|-------------|------|---------|
|
|
| JMX metrics dashboard | Yes | 1 weekend | None |
|
|
| Log parsing + display | Yes | 1 weekend | None |
|
|
| Basic replay (re-send exchange) | Yes | 1 week | Need agent access |
|
|
| Per-processor payload capture | No* | 2-3 months | Requires bytecode instrumentation |
|
|
| Nested EIP execution trees | No* | 3-6 months | Requires deep Camel internals knowledge |
|
|
| Breakpoint debugging in route | No | 6+ months | Thread management + InterceptStrategy + serialization |
|
|
| Format-aware payload diffing | Partially | 2 weeks | Diff library exists, but data pipeline doesn't |
|
|
| Distributed trace assembly | Partially | 1 month | OTel exists but lacks Camel-specific depth |
|
|
| Service topology from execution data | Partially | 2 weeks | Istio does this at network layer, not route layer |
|
|
|
|
*Achievable with OTel Camel instrumentation (spans only, not payload content)
|
|
|
|
### Where Each Feature Creates Unreplicable Value
|
|
|
|
- **Debugger:** Requires InterceptStrategy breakpoints + thread parking + exchange serialization. The combination is unique — no other Camel tool offers browser-based route stepping.
|
|
- **Lineage:** Requires per-processor INPUT/OUTPUT capture with correct ordering. OTel spans don't carry body content. JMX doesn't capture payloads. Only bytecode instrumentation provides this data.
|
|
- **Correlation + Topology:** The trace assembly is achievable elsewhere. The differentiation is Camel-specific depth: each hop shows processor-level execution trees, not just "Service B took 350ms."
|
|
|
|
---
|
|
|
|
## 6. Implementation Sequencing
|
|
|
|
### Phase A: Foundation + Topology (Weeks 1-3)
|
|
|
|
| Work | Repo | Issue |
|
|
|------|------|-------|
|
|
| Service topology materialized view | cameleer-server | #69 |
|
|
| Topology REST API | cameleer-server | #69 |
|
|
| ServiceTopologyGraph.tsx | cameleer-server + saas | #72 |
|
|
| WebSocket infrastructure (for debugger) | cameleer-server | #63 |
|
|
| TraceContext DTO in cameleer-common | cameleer | #67 |
|
|
|
|
**Ship:** Topology graph visible from existing data. Zero agent changes. Immediate visual payoff.
|
|
|
|
### Phase B: Lineage (Weeks 3-6)
|
|
|
|
| Work | Repo | Issue |
|
|
|------|------|-------|
|
|
| Lineage protocol DTOs | cameleer-common | #64 |
|
|
| LineageManager + capture integration | cameleer-agent | #65 |
|
|
| LineageService + DiffEngine | cameleer-server | #66 |
|
|
| Lineage UI components | cameleer-server + saas | #71 |
|
|
|
|
**Ship:** Payload flow lineage independently usable.
|
|
|
|
### Phase C: Distributed Trace Correlation (Weeks 5-9, overlaps B)
|
|
|
|
| Work | Repo | Issue |
|
|
|------|------|-------|
|
|
| Trace context header propagation | cameleer-agent | #67 |
|
|
| Executions table migration (new columns) | cameleer-server | #68 |
|
|
| CorrelationService + trace assembly | cameleer-server | #68 |
|
|
| DistributedTraceView + TraceSearch UI | cameleer-server + saas | #72 |
|
|
|
|
**Ship:** Distributed traces + topology — full correlation story.
|
|
|
|
### Phase D: Live Route Debugger (Weeks 8-14)
|
|
|
|
| Work | Repo | Issue |
|
|
|------|------|-------|
|
|
| Debug protocol DTOs | cameleer-common | #60 |
|
|
| DebugSessionManager + InterceptStrategy | cameleer-agent | #61 |
|
|
| ExchangeStateSerializer + synthetic wrapper | cameleer-agent | #62 |
|
|
| DebugSessionService + WS + REST | cameleer-server | #63 |
|
|
| Debug UI components | cameleer-server + saas | #70 |
|
|
|
|
**Ship:** Full browser-based route debugger with integration to lineage and correlation.
|
|
|
|
---
|
|
|
|
## 7. Open Questions
|
|
|
|
1. **Debugger concurrency model:** Should we support debugging through parallel `Split` branches? Current design follows the main thread. Parallel branches would require multiple parked threads per session.
|
|
|
|
2. **Lineage storage costs:** Full INPUT+OUTPUT at every processor generates significant data. Should we add a separate lineage retention policy (e.g., 7 days) shorter than normal execution retention?
|
|
|
|
3. **Topology graph refresh frequency:** 5-minute materialized view refresh is a trade-off. Real-time would require streaming aggregation (e.g., Kafka Streams). Is 5 minutes acceptable for v1?
|
|
|
|
4. **Cross-tenant federation security model:** The v2 `sourceTenantHash` design needs a full threat model. Can a malicious tenant forge trace context to see another tenant's data?
|
|
|
|
5. **OTel interop:** Should the trace context header be compatible with W3C Trace Context format? This would enable mixed environments where some services use OTel and others use Cameleer.
|