Rename Java packages from com.cameleer3 to com.cameleer, module directories from cameleer3-* to cameleer-*, and all references throughout workflows, Dockerfiles, docs, migrations, and pom.xml. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
147 lines
6.4 KiB
Markdown
147 lines
6.4 KiB
Markdown
# Append-Only Execution Data Protocol
|
|
|
|
A reference document for redesigning the Cameleer agent's data reporting to be append-only,
|
|
eliminating the need for upserts in the storage layer.
|
|
|
|
## Problem
|
|
|
|
The current protocol sends execution data in two phases:
|
|
|
|
1. **RUNNING phase**: Agent sends a partial record when a route starts executing (execution_id, route_id, start_time, status=RUNNING). No bodies, no duration, no error info.
|
|
2. **COMPLETED/FAILED phase**: Agent sends an enriched record when execution finishes (duration, output body, headers, errors, processor tree).
|
|
|
|
The server uses `INSERT ... ON CONFLICT DO UPDATE SET COALESCE(...)` to merge these into a single row. This works in PostgreSQL but creates problems for append-only stores like ClickHouse, Kafka topics, or any event-sourced architecture.
|
|
|
|
### Why This Matters
|
|
|
|
- **ClickHouse**: No native upsert. Must use ReplacingMergeTree (eventual consistency, FINAL overhead) or application-side buffering.
|
|
- **Event streaming**: Kafka/Pulsar topics are append-only. Two-phase lifecycle requires a stateful stream processor to merge.
|
|
- **Data lakes**: Parquet files are immutable. Updates require read-modify-write of entire files.
|
|
- **Materialized views**: Insert-triggered aggregations (ClickHouse MVs, Kafka Streams, Flink) double-count if they see both RUNNING and COMPLETED inserts for the same execution.
|
|
|
|
## Proposed Protocol Change
|
|
|
|
### Option A: Single-Phase Reporting (Recommended)
|
|
|
|
The agent buffers the execution locally and sends a **single, complete record** only when the execution reaches a terminal state (COMPLETED or FAILED).
|
|
|
|
```
|
|
Current: Agent -> [RUNNING] -> Server -> [COMPLETED] -> Server (upsert)
|
|
Proposed: Agent -> [buffer locally] -> [COMPLETED with all fields] -> Server (append)
|
|
```
|
|
|
|
**What changes in the agent:**
|
|
- `RouteExecutionTracker` holds in-flight executions in a local `ConcurrentHashMap`
|
|
- On route start: create tracker entry with start_time, route_id, etc.
|
|
- On route complete: enrich tracker entry with duration, bodies, errors, processor tree
|
|
- On report: send the complete record in one HTTP POST
|
|
- On timeout (configurable, e.g., 5 minutes): flush as RUNNING (for visibility of stuck routes)
|
|
|
|
**What changes in the server:**
|
|
- Storage becomes pure append: `INSERT INTO executions VALUES (...)` — no upsert, no COALESCE
|
|
- No `SearchIndexer` / `ExecutionAccumulator` needed — the server just writes what it receives
|
|
- Materialized views count correctly (one insert = one execution)
|
|
- Works with any append-only store (ClickHouse, Kafka, S3/Parquet)
|
|
|
|
**Trade-offs:**
|
|
- RUNNING executions are not visible on the server until they complete (or timeout-flush)
|
|
- "Active execution count" must come from agent heartbeat/registry data, not from stored RUNNING rows
|
|
- If the agent crashes, in-flight executions are lost (same as current behavior — RUNNING rows become orphans anyway)
|
|
|
|
### Option B: Event Log with Reconstruction
|
|
|
|
Send both phases as separate **events** (not records), and let the server reconstruct the current state.
|
|
|
|
```
|
|
Event 1: {type: "EXECUTION_STARTED", executionId: "abc", startTime: ..., routeId: ...}
|
|
Event 2: {type: "EXECUTION_COMPLETED", executionId: "abc", duration: 250, outputBody: ..., processors: [...]}
|
|
```
|
|
|
|
**Server-side:**
|
|
- Store raw events in an append-only log table
|
|
- Reconstruct current state via `SELECT argMax(field, event_time) FROM events WHERE execution_id = ? GROUP BY execution_id`
|
|
- Or: use a materialized view with `AggregatingMergeTree` + `argMaxState` to maintain a "latest state" table
|
|
|
|
**Trade-offs:**
|
|
- More complex server-side reconstruction
|
|
- Higher storage (two rows per execution instead of one)
|
|
- More flexible: supports any number of state transitions (RUNNING -> PAUSED -> RUNNING -> COMPLETED)
|
|
- Natural fit for event sourcing architectures
|
|
|
|
### Option C: Hybrid (Current Cameleer-Server Approach)
|
|
|
|
Keep the two-phase protocol but handle merging at the server application layer. This is what cameleer-server implements today with the `ExecutionAccumulator`:
|
|
|
|
- RUNNING POST -> hold in `ConcurrentHashMap` (no DB write)
|
|
- COMPLETED POST -> merge with RUNNING in-memory -> single INSERT to DB
|
|
- Timeout sweep -> flush stale RUNNING entries for visibility
|
|
|
|
**Trade-offs:**
|
|
- No agent changes required
|
|
- Server must be stateful (in-memory accumulator)
|
|
- Crash window: active executions lost if server restarts
|
|
- Adds complexity to the server that wouldn't exist with Option A
|
|
|
|
## Recommendation
|
|
|
|
**Option A (single-phase reporting)** is the strongest choice for a new protocol version:
|
|
|
|
1. **Simplest server implementation**: Pure append, no state, no merging
|
|
2. **Works everywhere**: ClickHouse, Kafka, S3, any append-only store
|
|
3. **Correct by construction**: MVs, aggregations, and stream processing all see one event per execution
|
|
4. **Agent is the natural place to buffer**: The agent already tracks in-flight executions for instrumentation — it just needs to hold the report until completion
|
|
5. **Minimal data loss risk**: Agent crash loses in-flight data regardless of protocol — this doesn't make it worse
|
|
|
|
### Migration Strategy
|
|
|
|
1. Add `protocol_version` field to agent registration
|
|
2. v1 agents: server uses `ExecutionAccumulator` (current behavior)
|
|
3. v2 agents: server does pure append (no accumulator needed for v2 data)
|
|
4. Both can coexist — the server checks protocol version per agent
|
|
|
|
### Fields for Single-Phase Record
|
|
|
|
The complete record sent by a v2 agent:
|
|
|
|
```json
|
|
{
|
|
"executionId": "uuid",
|
|
"routeId": "myRoute",
|
|
"agentId": "agent-1",
|
|
"applicationName": "my-app",
|
|
"correlationId": "corr-123",
|
|
"exchangeId": "exchange-456",
|
|
"status": "COMPLETED",
|
|
"startTime": "2026-03-31T10:00:00.000Z",
|
|
"endTime": "2026-03-31T10:00:00.250Z",
|
|
"durationMs": 250,
|
|
"errorMessage": null,
|
|
"errorStackTrace": null,
|
|
"errorType": null,
|
|
"errorCategory": null,
|
|
"rootCauseType": null,
|
|
"rootCauseMessage": null,
|
|
"inputSnapshot": {"body": "...", "headers": {"Content-Type": "application/json"}},
|
|
"outputSnapshot": {"body": "...", "headers": {"Content-Type": "application/xml"}},
|
|
"attributes": {"key": "value"},
|
|
"traceId": "otel-trace-id",
|
|
"spanId": "otel-span-id",
|
|
"replayExchangeId": null,
|
|
"processors": [
|
|
{
|
|
"processorId": "proc-1",
|
|
"processorType": "to",
|
|
"status": "COMPLETED",
|
|
"startTime": "...",
|
|
"endTime": "...",
|
|
"durationMs": 120,
|
|
"inputBody": "...",
|
|
"outputBody": "...",
|
|
"children": []
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
All fields populated. No second POST needed. Server does a single INSERT.
|