Files
cameleer-server/.planning/phases/01-ingestion-pipeline-api-foundation/01-02-PLAN.md

270 lines
14 KiB
Markdown
Raw Normal View History

---
phase: 01-ingestion-pipeline-api-foundation
plan: 02
type: execute
wave: 3
depends_on: ["01-01", "01-03"]
files_modified:
- cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ExecutionController.java
- cameleer-server-app/src/main/java/com/cameleer/server/app/controller/DiagramController.java
- cameleer-server-app/src/main/java/com/cameleer/server/app/controller/MetricsController.java
- cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionRepository.java
- cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseDiagramRepository.java
- cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseMetricsRepository.java
- cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ClickHouseFlushScheduler.java
- cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java
- cameleer-server-app/src/test/java/com/cameleer/server/app/controller/ExecutionControllerIT.java
- cameleer-server-app/src/test/java/com/cameleer/server/app/controller/DiagramControllerIT.java
- cameleer-server-app/src/test/java/com/cameleer/server/app/controller/MetricsControllerIT.java
- cameleer-server-app/src/test/java/com/cameleer/server/app/controller/BackpressureIT.java
autonomous: true
requirements:
- INGST-01
- INGST-02
- INGST-03
- INGST-05
must_haves:
truths:
- "POST /api/v1/data/executions with valid RouteExecution payload returns 202 Accepted"
- "POST /api/v1/data/diagrams with valid RouteGraph payload returns 202 Accepted"
- "POST /api/v1/data/metrics with valid metrics payload returns 202 Accepted"
- "Data posted to endpoints appears in ClickHouse after flush interval"
- "When buffer is full, endpoints return 503 with Retry-After header"
artifacts:
- path: "cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ExecutionController.java"
provides: "POST /api/v1/data/executions endpoint"
min_lines: 20
- path: "cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionRepository.java"
provides: "Batch insert to route_executions table via JdbcTemplate"
min_lines: 30
- path: "cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ClickHouseFlushScheduler.java"
provides: "Scheduled drain of WriteBuffer into ClickHouse"
min_lines: 20
- path: "cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java"
provides: "Routes data to appropriate WriteBuffer instances"
min_lines: 20
key_links:
- from: "ExecutionController.java"
to: "IngestionService.java"
via: "constructor injection"
pattern: "IngestionService"
- from: "IngestionService.java"
to: "WriteBuffer.java"
via: "offer/offerBatch calls"
pattern: "writeBuffer\\.offer"
- from: "ClickHouseFlushScheduler.java"
to: "WriteBuffer.java"
via: "drain call on scheduled interval"
pattern: "writeBuffer\\.drain"
- from: "ClickHouseFlushScheduler.java"
to: "ClickHouseExecutionRepository.java"
via: "insertBatch call"
pattern: "executionRepository\\.insertBatch"
- from: "ClickHouseFlushScheduler.java"
to: "ClickHouseDiagramRepository.java"
via: "store call after drain"
pattern: "diagramRepository\\.store"
- from: "ClickHouseFlushScheduler.java"
to: "ClickHouseMetricsRepository.java"
via: "insertBatch call after drain"
pattern: "metricsRepository\\.insertBatch"
---
<objective>
Implement the three ingestion REST endpoints, ClickHouse repository implementations, flush scheduler, and IngestionService that wires controllers to the WriteBuffer.
Purpose: This is the core data pipeline -- agents POST data to endpoints, IngestionService buffers it, ClickHouseFlushScheduler drains and batch-inserts to ClickHouse. Backpressure returns 503 when buffer full.
Output: Working ingestion flow verified by integration tests against Testcontainers ClickHouse.
</objective>
<execution_context>
@C:/Users/Hendrik/.claude/get-shit-done/workflows/execute-plan.md
@C:/Users/Hendrik/.claude/get-shit-done/templates/summary.md
</execution_context>
<context>
@.planning/PROJECT.md
@.planning/ROADMAP.md
@.planning/STATE.md
@.planning/phases/01-ingestion-pipeline-api-foundation/01-RESEARCH.md
@.planning/phases/01-ingestion-pipeline-api-foundation/01-01-SUMMARY.md
<!-- Interfaces from Plan 01 that this plan depends on -->
<interfaces>
From cameleer-server-core WriteBuffer.java:
```java
public class WriteBuffer<T> {
public WriteBuffer(int capacity);
public boolean offer(T item);
public boolean offerBatch(List<T> items);
public List<T> drain(int maxBatch);
public int size();
public int capacity();
public boolean isFull();
public int remainingCapacity();
}
```
From cameleer-server-core repository interfaces:
```java
public interface ExecutionRepository {
void insertBatch(List<RouteExecution> executions);
}
public interface DiagramRepository {
void store(RouteGraph graph);
Optional<RouteGraph> findByContentHash(String hash);
Optional<String> findContentHashForRoute(String routeId, String agentId);
}
public interface MetricsRepository {
void insertBatch(List<MetricsSnapshot> metrics);
}
```
From IngestionConfig:
```java
@ConfigurationProperties("ingestion")
public class IngestionConfig {
int bufferCapacity; // default 50000
int batchSize; // default 5000
long flushIntervalMs; // default 1000
}
```
</interfaces>
</context>
<tasks>
<task type="auto" tdd="false">
<name>Task 1: IngestionService, ClickHouse repositories, and flush scheduler</name>
<files>
cameleer-server-core/src/main/java/com/cameleer/server/core/ingestion/IngestionService.java,
cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseExecutionRepository.java,
cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseDiagramRepository.java,
cameleer-server-app/src/main/java/com/cameleer/server/app/storage/ClickHouseMetricsRepository.java,
cameleer-server-app/src/main/java/com/cameleer/server/app/ingestion/ClickHouseFlushScheduler.java
</files>
<action>
1. Create IngestionService in core module (no Spring annotations -- it's a plain class):
- Constructor takes three WriteBuffer instances (executions, diagrams, metrics)
- acceptExecutions(List<RouteExecution>): calls executionBuffer.offerBatch(), returns boolean
- acceptExecution(RouteExecution): calls executionBuffer.offer(), returns boolean
- acceptDiagram(RouteGraph): calls diagramBuffer.offer(), returns boolean
- acceptDiagrams(List<RouteGraph>): calls diagramBuffer.offerBatch(), returns boolean
- acceptMetrics(List<MetricsSnapshot>): calls metricsBuffer.offerBatch(), returns boolean
- getExecutionBufferDepth(), getDiagramBufferDepth(), getMetricsBufferDepth() for monitoring
2. Create ClickHouseExecutionRepository implements ExecutionRepository:
- @Repository, inject JdbcTemplate
- insertBatch: INSERT INTO route_executions with all columns. Map RouteExecution fields to ClickHouse columns.
For processor execution arrays: extract from RouteExecution.getProcessorExecutions() into parallel arrays (processor_ids, processor_types, etc.)
Use JdbcTemplate.batchUpdate with BatchPreparedStatementSetter.
For Array columns, use java.sql.Array via connection.createArrayOf() or pass as comma-separated and cast.
Note: ClickHouse JDBC V2 handles Array types -- pass Java arrays directly via ps.setObject().
3. Create ClickHouseDiagramRepository implements DiagramRepository:
- @Repository, inject JdbcTemplate
- store(RouteGraph): serialize graph to JSON (Jackson ObjectMapper), compute SHA-256 hex hash of JSON bytes, INSERT INTO route_diagrams (content_hash, route_id, agent_id, definition)
- findByContentHash: SELECT by content_hash, deserialize definition JSON back to RouteGraph
- findContentHashForRoute: SELECT content_hash WHERE route_id=? AND agent_id=? ORDER BY created_at DESC LIMIT 1
4. Create ClickHouseMetricsRepository implements MetricsRepository:
- @Repository, inject JdbcTemplate
- insertBatch: INSERT INTO agent_metrics with batchUpdate
5. Create ClickHouseFlushScheduler:
- @Component, @EnableScheduling on the app config or main class
- Inject three WriteBuffer instances and three repository implementations
- Inject IngestionConfig for batchSize
- @Scheduled(fixedDelayString="${ingestion.flush-interval-ms:1000}") flushAll(): drains each buffer up to batchSize, calls insertBatch if non-empty. Wrap each in try-catch to log errors without stopping the scheduler.
- Implement SmartLifecycle: on stop(), flush all remaining data (loop drain until empty) before returning.
</action>
<verify>
<automated>mvn clean compile -q 2>&1 | tail -5</automated>
</verify>
<done>IngestionService routes data to WriteBuffers. ClickHouse repositories implement batch inserts via JdbcTemplate. FlushScheduler drains buffers on interval and flushes remaining on shutdown.</done>
</task>
<task type="auto" tdd="true">
<name>Task 2: Ingestion REST controllers and integration tests</name>
<files>
cameleer-server-app/src/main/java/com/cameleer/server/app/controller/ExecutionController.java,
cameleer-server-app/src/main/java/com/cameleer/server/app/controller/DiagramController.java,
cameleer-server-app/src/main/java/com/cameleer/server/app/controller/MetricsController.java,
cameleer-server-app/src/test/java/com/cameleer/server/app/controller/ExecutionControllerIT.java,
cameleer-server-app/src/test/java/com/cameleer/server/app/controller/DiagramControllerIT.java,
cameleer-server-app/src/test/java/com/cameleer/server/app/controller/MetricsControllerIT.java,
cameleer-server-app/src/test/java/com/cameleer/server/app/controller/BackpressureIT.java
</files>
<behavior>
- POST /api/v1/data/executions with single RouteExecution JSON returns 202
- POST /api/v1/data/executions with array of RouteExecutions returns 202
- POST /api/v1/data/diagrams with single RouteGraph returns 202
- POST /api/v1/data/diagrams with array of RouteGraphs returns 202
- POST /api/v1/data/metrics with metrics payload returns 202
- After flush interval, posted data is queryable in ClickHouse
- When buffer is full, POST returns 503 with Retry-After header
- Unknown JSON fields in request body are accepted (not rejected)
</behavior>
<action>
1. Create ExecutionController:
- @RestController @RequestMapping("/api/v1/data")
- POST /executions: accepts @RequestBody that handles both single RouteExecution and List<RouteExecution>. Use a custom deserializer or accept Object and check type, OR simply always accept as List (require agents to send arrays). Per protocol, accept both single and array.
- Calls ingestionService.acceptExecutions(). If returns false -> 503 with Retry-After: 5 header. If true -> 202 Accepted.
- Add @Operation annotations for OpenAPI documentation.
2. Create DiagramController:
- @RestController @RequestMapping("/api/v1/data")
- POST /diagrams: same pattern, accepts single or array of RouteGraph, delegates to ingestionService.
3. Create MetricsController:
- @RestController @RequestMapping("/api/v1/data")
- POST /metrics: same pattern.
4. Create ExecutionControllerIT (extends AbstractClickHouseIT):
- Use TestRestTemplate or MockMvc with @AutoConfigureMockMvc
- Test: POST valid RouteExecution JSON with X-Cameleer-Protocol-Version:1 header -> 202
- Test: POST array of executions -> 202
- Test: After post, wait for flush (use Awaitility), query ClickHouse directly via JdbcTemplate to verify data arrived
- Test: POST with unknown JSON fields -> 202 (forward compat, API-05)
5. Create DiagramControllerIT (extends AbstractClickHouseIT):
- Test: POST RouteGraph -> 202
- Test: After flush, diagram stored in ClickHouse with content_hash
6. Create MetricsControllerIT (extends AbstractClickHouseIT):
- Test: POST metrics -> 202
- Test: After flush, metrics in ClickHouse
7. Create BackpressureIT (extends AbstractClickHouseIT):
- Configure test with tiny buffer (ingestion.buffer-capacity=5)
- Fill buffer by posting enough data
- Next POST returns 503 with Retry-After header
- Verify previously buffered data is NOT lost (still flushes to ClickHouse)
Note: All integration tests must include X-Cameleer-Protocol-Version:1 header (API-04 will be enforced by Plan 03's interceptor, but include the header now for forward compatibility).
</action>
<verify>
<automated>mvn test -pl cameleer-server-app -Dtest="ExecutionControllerIT,DiagramControllerIT,MetricsControllerIT,BackpressureIT" -q 2>&1 | tail -15</automated>
</verify>
<done>All three ingestion endpoints return 202 on valid data. Data arrives in ClickHouse after flush. Buffer-full returns 503 with Retry-After. Unknown JSON fields accepted. Integration tests green.</done>
</task>
</tasks>
<verification>
- `mvn test -pl cameleer-server-app -Dtest="ExecutionControllerIT,DiagramControllerIT,MetricsControllerIT,BackpressureIT" -q` -- all integration tests pass
- POST to /api/v1/data/executions returns 202
- POST to /api/v1/data/diagrams returns 202
- POST to /api/v1/data/metrics returns 202
- Buffer full returns 503
</verification>
<success_criteria>
All four integration test classes green. Data flows from HTTP POST through WriteBuffer through FlushScheduler to ClickHouse. Backpressure returns 503 when buffer full without losing existing data.
</success_criteria>
<output>
After completion, create `.planning/phases/01-ingestion-pipeline-api-foundation/01-02-SUMMARY.md`
</output>