Files
cameleer-server/docs/superpowers/specs/2026-03-31-append-only-execution-protocol.md
hsiegeln cb3ebfea7c
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 18s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped
chore: rename cameleer3 to cameleer
Rename Java packages from com.cameleer3 to com.cameleer, module
directories from cameleer3-* to cameleer-*, and all references
throughout workflows, Dockerfiles, docs, migrations, and pom.xml.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 15:28:42 +02:00

6.4 KiB

Append-Only Execution Data Protocol

A reference document for redesigning the Cameleer agent's data reporting to be append-only, eliminating the need for upserts in the storage layer.

Problem

The current protocol sends execution data in two phases:

  1. RUNNING phase: Agent sends a partial record when a route starts executing (execution_id, route_id, start_time, status=RUNNING). No bodies, no duration, no error info.
  2. COMPLETED/FAILED phase: Agent sends an enriched record when execution finishes (duration, output body, headers, errors, processor tree).

The server uses INSERT ... ON CONFLICT DO UPDATE SET COALESCE(...) to merge these into a single row. This works in PostgreSQL but creates problems for append-only stores like ClickHouse, Kafka topics, or any event-sourced architecture.

Why This Matters

  • ClickHouse: No native upsert. Must use ReplacingMergeTree (eventual consistency, FINAL overhead) or application-side buffering.
  • Event streaming: Kafka/Pulsar topics are append-only. Two-phase lifecycle requires a stateful stream processor to merge.
  • Data lakes: Parquet files are immutable. Updates require read-modify-write of entire files.
  • Materialized views: Insert-triggered aggregations (ClickHouse MVs, Kafka Streams, Flink) double-count if they see both RUNNING and COMPLETED inserts for the same execution.

Proposed Protocol Change

The agent buffers the execution locally and sends a single, complete record only when the execution reaches a terminal state (COMPLETED or FAILED).

Current:  Agent -> [RUNNING] -> Server -> [COMPLETED] -> Server (upsert)
Proposed: Agent -> [buffer locally] -> [COMPLETED with all fields] -> Server (append)

What changes in the agent:

  • RouteExecutionTracker holds in-flight executions in a local ConcurrentHashMap
  • On route start: create tracker entry with start_time, route_id, etc.
  • On route complete: enrich tracker entry with duration, bodies, errors, processor tree
  • On report: send the complete record in one HTTP POST
  • On timeout (configurable, e.g., 5 minutes): flush as RUNNING (for visibility of stuck routes)

What changes in the server:

  • Storage becomes pure append: INSERT INTO executions VALUES (...) — no upsert, no COALESCE
  • No SearchIndexer / ExecutionAccumulator needed — the server just writes what it receives
  • Materialized views count correctly (one insert = one execution)
  • Works with any append-only store (ClickHouse, Kafka, S3/Parquet)

Trade-offs:

  • RUNNING executions are not visible on the server until they complete (or timeout-flush)
  • "Active execution count" must come from agent heartbeat/registry data, not from stored RUNNING rows
  • If the agent crashes, in-flight executions are lost (same as current behavior — RUNNING rows become orphans anyway)

Option B: Event Log with Reconstruction

Send both phases as separate events (not records), and let the server reconstruct the current state.

Event 1: {type: "EXECUTION_STARTED", executionId: "abc", startTime: ..., routeId: ...}
Event 2: {type: "EXECUTION_COMPLETED", executionId: "abc", duration: 250, outputBody: ..., processors: [...]}

Server-side:

  • Store raw events in an append-only log table
  • Reconstruct current state via SELECT argMax(field, event_time) FROM events WHERE execution_id = ? GROUP BY execution_id
  • Or: use a materialized view with AggregatingMergeTree + argMaxState to maintain a "latest state" table

Trade-offs:

  • More complex server-side reconstruction
  • Higher storage (two rows per execution instead of one)
  • More flexible: supports any number of state transitions (RUNNING -> PAUSED -> RUNNING -> COMPLETED)
  • Natural fit for event sourcing architectures

Option C: Hybrid (Current Cameleer-Server Approach)

Keep the two-phase protocol but handle merging at the server application layer. This is what cameleer-server implements today with the ExecutionAccumulator:

  • RUNNING POST -> hold in ConcurrentHashMap (no DB write)
  • COMPLETED POST -> merge with RUNNING in-memory -> single INSERT to DB
  • Timeout sweep -> flush stale RUNNING entries for visibility

Trade-offs:

  • No agent changes required
  • Server must be stateful (in-memory accumulator)
  • Crash window: active executions lost if server restarts
  • Adds complexity to the server that wouldn't exist with Option A

Recommendation

Option A (single-phase reporting) is the strongest choice for a new protocol version:

  1. Simplest server implementation: Pure append, no state, no merging
  2. Works everywhere: ClickHouse, Kafka, S3, any append-only store
  3. Correct by construction: MVs, aggregations, and stream processing all see one event per execution
  4. Agent is the natural place to buffer: The agent already tracks in-flight executions for instrumentation — it just needs to hold the report until completion
  5. Minimal data loss risk: Agent crash loses in-flight data regardless of protocol — this doesn't make it worse

Migration Strategy

  1. Add protocol_version field to agent registration
  2. v1 agents: server uses ExecutionAccumulator (current behavior)
  3. v2 agents: server does pure append (no accumulator needed for v2 data)
  4. Both can coexist — the server checks protocol version per agent

Fields for Single-Phase Record

The complete record sent by a v2 agent:

{
  "executionId": "uuid",
  "routeId": "myRoute",
  "agentId": "agent-1",
  "applicationName": "my-app",
  "correlationId": "corr-123",
  "exchangeId": "exchange-456",
  "status": "COMPLETED",
  "startTime": "2026-03-31T10:00:00.000Z",
  "endTime": "2026-03-31T10:00:00.250Z",
  "durationMs": 250,
  "errorMessage": null,
  "errorStackTrace": null,
  "errorType": null,
  "errorCategory": null,
  "rootCauseType": null,
  "rootCauseMessage": null,
  "inputSnapshot": {"body": "...", "headers": {"Content-Type": "application/json"}},
  "outputSnapshot": {"body": "...", "headers": {"Content-Type": "application/xml"}},
  "attributes": {"key": "value"},
  "traceId": "otel-trace-id",
  "spanId": "otel-span-id",
  "replayExchangeId": null,
  "processors": [
    {
      "processorId": "proc-1",
      "processorType": "to",
      "status": "COMPLETED",
      "startTime": "...",
      "endTime": "...",
      "durationMs": 120,
      "inputBody": "...",
      "outputBody": "...",
      "children": []
    }
  ]
}

All fields populated. No second POST needed. Server does a single INSERT.