# Append-Only Execution Data Protocol A reference document for redesigning the Cameleer agent's data reporting to be append-only, eliminating the need for upserts in the storage layer. ## Problem The current protocol sends execution data in two phases: 1. **RUNNING phase**: Agent sends a partial record when a route starts executing (execution_id, route_id, start_time, status=RUNNING). No bodies, no duration, no error info. 2. **COMPLETED/FAILED phase**: Agent sends an enriched record when execution finishes (duration, output body, headers, errors, processor tree). The server uses `INSERT ... ON CONFLICT DO UPDATE SET COALESCE(...)` to merge these into a single row. This works in PostgreSQL but creates problems for append-only stores like ClickHouse, Kafka topics, or any event-sourced architecture. ### Why This Matters - **ClickHouse**: No native upsert. Must use ReplacingMergeTree (eventual consistency, FINAL overhead) or application-side buffering. - **Event streaming**: Kafka/Pulsar topics are append-only. Two-phase lifecycle requires a stateful stream processor to merge. - **Data lakes**: Parquet files are immutable. Updates require read-modify-write of entire files. - **Materialized views**: Insert-triggered aggregations (ClickHouse MVs, Kafka Streams, Flink) double-count if they see both RUNNING and COMPLETED inserts for the same execution. ## Proposed Protocol Change ### Option A: Single-Phase Reporting (Recommended) The agent buffers the execution locally and sends a **single, complete record** only when the execution reaches a terminal state (COMPLETED or FAILED). ``` Current: Agent -> [RUNNING] -> Server -> [COMPLETED] -> Server (upsert) Proposed: Agent -> [buffer locally] -> [COMPLETED with all fields] -> Server (append) ``` **What changes in the agent:** - `RouteExecutionTracker` holds in-flight executions in a local `ConcurrentHashMap` - On route start: create tracker entry with start_time, route_id, etc. - On route complete: enrich tracker entry with duration, bodies, errors, processor tree - On report: send the complete record in one HTTP POST - On timeout (configurable, e.g., 5 minutes): flush as RUNNING (for visibility of stuck routes) **What changes in the server:** - Storage becomes pure append: `INSERT INTO executions VALUES (...)` — no upsert, no COALESCE - No `SearchIndexer` / `ExecutionAccumulator` needed — the server just writes what it receives - Materialized views count correctly (one insert = one execution) - Works with any append-only store (ClickHouse, Kafka, S3/Parquet) **Trade-offs:** - RUNNING executions are not visible on the server until they complete (or timeout-flush) - "Active execution count" must come from agent heartbeat/registry data, not from stored RUNNING rows - If the agent crashes, in-flight executions are lost (same as current behavior — RUNNING rows become orphans anyway) ### Option B: Event Log with Reconstruction Send both phases as separate **events** (not records), and let the server reconstruct the current state. ``` Event 1: {type: "EXECUTION_STARTED", executionId: "abc", startTime: ..., routeId: ...} Event 2: {type: "EXECUTION_COMPLETED", executionId: "abc", duration: 250, outputBody: ..., processors: [...]} ``` **Server-side:** - Store raw events in an append-only log table - Reconstruct current state via `SELECT argMax(field, event_time) FROM events WHERE execution_id = ? GROUP BY execution_id` - Or: use a materialized view with `AggregatingMergeTree` + `argMaxState` to maintain a "latest state" table **Trade-offs:** - More complex server-side reconstruction - Higher storage (two rows per execution instead of one) - More flexible: supports any number of state transitions (RUNNING -> PAUSED -> RUNNING -> COMPLETED) - Natural fit for event sourcing architectures ### Option C: Hybrid (Current Cameleer-Server Approach) Keep the two-phase protocol but handle merging at the server application layer. This is what cameleer-server implements today with the `ExecutionAccumulator`: - RUNNING POST -> hold in `ConcurrentHashMap` (no DB write) - COMPLETED POST -> merge with RUNNING in-memory -> single INSERT to DB - Timeout sweep -> flush stale RUNNING entries for visibility **Trade-offs:** - No agent changes required - Server must be stateful (in-memory accumulator) - Crash window: active executions lost if server restarts - Adds complexity to the server that wouldn't exist with Option A ## Recommendation **Option A (single-phase reporting)** is the strongest choice for a new protocol version: 1. **Simplest server implementation**: Pure append, no state, no merging 2. **Works everywhere**: ClickHouse, Kafka, S3, any append-only store 3. **Correct by construction**: MVs, aggregations, and stream processing all see one event per execution 4. **Agent is the natural place to buffer**: The agent already tracks in-flight executions for instrumentation — it just needs to hold the report until completion 5. **Minimal data loss risk**: Agent crash loses in-flight data regardless of protocol — this doesn't make it worse ### Migration Strategy 1. Add `protocol_version` field to agent registration 2. v1 agents: server uses `ExecutionAccumulator` (current behavior) 3. v2 agents: server does pure append (no accumulator needed for v2 data) 4. Both can coexist — the server checks protocol version per agent ### Fields for Single-Phase Record The complete record sent by a v2 agent: ```json { "executionId": "uuid", "routeId": "myRoute", "agentId": "agent-1", "applicationName": "my-app", "correlationId": "corr-123", "exchangeId": "exchange-456", "status": "COMPLETED", "startTime": "2026-03-31T10:00:00.000Z", "endTime": "2026-03-31T10:00:00.250Z", "durationMs": 250, "errorMessage": null, "errorStackTrace": null, "errorType": null, "errorCategory": null, "rootCauseType": null, "rootCauseMessage": null, "inputSnapshot": {"body": "...", "headers": {"Content-Type": "application/json"}}, "outputSnapshot": {"body": "...", "headers": {"Content-Type": "application/xml"}}, "attributes": {"key": "value"}, "traceId": "otel-trace-id", "spanId": "otel-span-id", "replayExchangeId": null, "processors": [ { "processorId": "proc-1", "processorType": "to", "status": "COMPLETED", "startTime": "...", "endTime": "...", "durationMs": 120, "inputBody": "...", "outputBody": "...", "children": [] } ] } ``` All fields populated. No second POST needed. Server does a single INSERT.