docs(01): create phase plan (3 plans, 2 waves)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,270 @@
|
||||
---
|
||||
phase: 01-ingestion-pipeline-api-foundation
|
||||
plan: 02
|
||||
type: execute
|
||||
wave: 2
|
||||
depends_on: ["01-01"]
|
||||
files_modified:
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/MetricsController.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramRepository.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsRepository.java
|
||||
- cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java
|
||||
- cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java
|
||||
- cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java
|
||||
- cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java
|
||||
- cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java
|
||||
- cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java
|
||||
autonomous: true
|
||||
requirements:
|
||||
- INGST-01
|
||||
- INGST-02
|
||||
- INGST-03
|
||||
- INGST-05
|
||||
|
||||
must_haves:
|
||||
truths:
|
||||
- "POST /api/v1/data/executions with valid RouteExecution payload returns 202 Accepted"
|
||||
- "POST /api/v1/data/diagrams with valid RouteGraph payload returns 202 Accepted"
|
||||
- "POST /api/v1/data/metrics with valid metrics payload returns 202 Accepted"
|
||||
- "Data posted to endpoints appears in ClickHouse after flush interval"
|
||||
- "When buffer is full, endpoints return 503 with Retry-After header"
|
||||
artifacts:
|
||||
- path: "cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java"
|
||||
provides: "POST /api/v1/data/executions endpoint"
|
||||
min_lines: 20
|
||||
- path: "cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java"
|
||||
provides: "Batch insert to route_executions table via JdbcTemplate"
|
||||
min_lines: 30
|
||||
- path: "cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java"
|
||||
provides: "Scheduled drain of WriteBuffer into ClickHouse"
|
||||
min_lines: 20
|
||||
- path: "cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java"
|
||||
provides: "Routes data to appropriate WriteBuffer instances"
|
||||
min_lines: 20
|
||||
key_links:
|
||||
- from: "ExecutionController.java"
|
||||
to: "IngestionService.java"
|
||||
via: "constructor injection"
|
||||
pattern: "IngestionService"
|
||||
- from: "IngestionService.java"
|
||||
to: "WriteBuffer.java"
|
||||
via: "offer/offerBatch calls"
|
||||
pattern: "writeBuffer\\.offer"
|
||||
- from: "ClickHouseFlushScheduler.java"
|
||||
to: "WriteBuffer.java"
|
||||
via: "drain call on scheduled interval"
|
||||
pattern: "writeBuffer\\.drain"
|
||||
- from: "ClickHouseFlushScheduler.java"
|
||||
to: "ClickHouseExecutionRepository.java"
|
||||
via: "insertBatch call"
|
||||
pattern: "repository\\.insertBatch"
|
||||
---
|
||||
|
||||
<objective>
|
||||
Implement the three ingestion REST endpoints, ClickHouse repository implementations, flush scheduler, and IngestionService that wires controllers to the WriteBuffer.
|
||||
|
||||
Purpose: This is the core data pipeline -- agents POST data to endpoints, IngestionService buffers it, ClickHouseFlushScheduler drains and batch-inserts to ClickHouse. Backpressure returns 503 when buffer full.
|
||||
Output: Working ingestion flow verified by integration tests against Testcontainers ClickHouse.
|
||||
</objective>
|
||||
|
||||
<execution_context>
|
||||
@C:/Users/Hendrik/.claude/get-shit-done/workflows/execute-plan.md
|
||||
@C:/Users/Hendrik/.claude/get-shit-done/templates/summary.md
|
||||
</execution_context>
|
||||
|
||||
<context>
|
||||
@.planning/PROJECT.md
|
||||
@.planning/ROADMAP.md
|
||||
@.planning/STATE.md
|
||||
@.planning/phases/01-ingestion-pipeline-api-foundation/01-RESEARCH.md
|
||||
@.planning/phases/01-ingestion-pipeline-api-foundation/01-01-SUMMARY.md
|
||||
|
||||
<!-- Interfaces from Plan 01 that this plan depends on -->
|
||||
<interfaces>
|
||||
From cameleer3-server-core WriteBuffer.java:
|
||||
```java
|
||||
public class WriteBuffer<T> {
|
||||
public WriteBuffer(int capacity);
|
||||
public boolean offer(T item);
|
||||
public boolean offerBatch(List<T> items);
|
||||
public List<T> drain(int maxBatch);
|
||||
public int size();
|
||||
public int capacity();
|
||||
public boolean isFull();
|
||||
public int remainingCapacity();
|
||||
}
|
||||
```
|
||||
|
||||
From cameleer3-server-core repository interfaces:
|
||||
```java
|
||||
public interface ExecutionRepository {
|
||||
void insertBatch(List<RouteExecution> executions);
|
||||
}
|
||||
public interface DiagramRepository {
|
||||
void store(RouteGraph graph);
|
||||
Optional<RouteGraph> findByContentHash(String hash);
|
||||
Optional<String> findContentHashForRoute(String routeId, String agentId);
|
||||
}
|
||||
public interface MetricsRepository {
|
||||
void insertBatch(List<MetricsSnapshot> metrics);
|
||||
}
|
||||
```
|
||||
|
||||
From IngestionConfig:
|
||||
```java
|
||||
@ConfigurationProperties("ingestion")
|
||||
public class IngestionConfig {
|
||||
int bufferCapacity; // default 50000
|
||||
int batchSize; // default 5000
|
||||
long flushIntervalMs; // default 1000
|
||||
}
|
||||
```
|
||||
</interfaces>
|
||||
</context>
|
||||
|
||||
<tasks>
|
||||
|
||||
<task type="auto" tdd="true">
|
||||
<name>Task 1: IngestionService, ClickHouse repositories, and flush scheduler</name>
|
||||
<files>
|
||||
cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/IngestionService.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionRepository.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramRepository.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsRepository.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/ingestion/ClickHouseFlushScheduler.java
|
||||
</files>
|
||||
<behavior>
|
||||
- IngestionService.acceptExecutions(list) delegates to WriteBuffer.offerBatch, returns boolean
|
||||
- IngestionService.acceptDiagram(graph) computes content hash, delegates to WriteBuffer.offer
|
||||
- IngestionService.acceptMetrics(list) delegates to WriteBuffer.offerBatch
|
||||
- ClickHouseExecutionRepository.insertBatch uses JdbcTemplate.batchUpdate with all columns including Array columns for processor executions
|
||||
- ClickHouseDiagramRepository.store computes SHA-256 content hash and does INSERT with content_hash dedup
|
||||
- ClickHouseFlushScheduler runs on @Scheduled(fixedDelayString="${ingestion.flush-interval-ms:1000}"), drains each buffer and calls respective repository.insertBatch
|
||||
- ClickHouseFlushScheduler implements SmartLifecycle for graceful shutdown (flush remaining on stop)
|
||||
</behavior>
|
||||
<action>
|
||||
1. Create IngestionService in core module (no Spring annotations -- it's a plain class):
|
||||
- Constructor takes three WriteBuffer instances (executions, diagrams, metrics)
|
||||
- acceptExecutions(List<RouteExecution>): calls executionBuffer.offerBatch(), returns boolean
|
||||
- acceptExecution(RouteExecution): calls executionBuffer.offer(), returns boolean
|
||||
- acceptDiagram(RouteGraph): calls diagramBuffer.offer(), returns boolean
|
||||
- acceptDiagrams(List<RouteGraph>): calls diagramBuffer.offerBatch(), returns boolean
|
||||
- acceptMetrics(List<MetricsSnapshot>): calls metricsBuffer.offerBatch(), returns boolean
|
||||
- getExecutionBufferDepth(), getDiagramBufferDepth(), getMetricsBufferDepth() for monitoring
|
||||
|
||||
2. Create ClickHouseExecutionRepository implements ExecutionRepository:
|
||||
- @Repository, inject JdbcTemplate
|
||||
- insertBatch: INSERT INTO route_executions with all columns. Map RouteExecution fields to ClickHouse columns.
|
||||
For processor execution arrays: extract from RouteExecution.getProcessorExecutions() into parallel arrays (processor_ids, processor_types, etc.)
|
||||
Use JdbcTemplate.batchUpdate with BatchPreparedStatementSetter.
|
||||
For Array columns, use java.sql.Array via connection.createArrayOf() or pass as comma-separated and cast.
|
||||
Note: ClickHouse JDBC V2 handles Array types -- pass Java arrays directly via ps.setObject().
|
||||
|
||||
3. Create ClickHouseDiagramRepository implements DiagramRepository:
|
||||
- @Repository, inject JdbcTemplate
|
||||
- store(RouteGraph): serialize graph to JSON (Jackson ObjectMapper), compute SHA-256 hex hash of JSON bytes, INSERT INTO route_diagrams (content_hash, route_id, agent_id, definition)
|
||||
- findByContentHash: SELECT by content_hash, deserialize definition JSON back to RouteGraph
|
||||
- findContentHashForRoute: SELECT content_hash WHERE route_id=? AND agent_id=? ORDER BY created_at DESC LIMIT 1
|
||||
|
||||
4. Create ClickHouseMetricsRepository implements MetricsRepository:
|
||||
- @Repository, inject JdbcTemplate
|
||||
- insertBatch: INSERT INTO agent_metrics with batchUpdate
|
||||
|
||||
5. Create ClickHouseFlushScheduler:
|
||||
- @Component, @EnableScheduling on the app config or main class
|
||||
- Inject three WriteBuffer instances and three repository implementations
|
||||
- Inject IngestionConfig for batchSize
|
||||
- @Scheduled(fixedDelayString="${ingestion.flush-interval-ms:1000}") flushAll(): drains each buffer up to batchSize, calls insertBatch if non-empty. Wrap each in try-catch to log errors without stopping the scheduler.
|
||||
- Implement SmartLifecycle: on stop(), flush all remaining data (loop drain until empty) before returning.
|
||||
</action>
|
||||
<verify>
|
||||
<automated>mvn clean compile -q 2>&1 | tail -5</automated>
|
||||
</verify>
|
||||
<done>IngestionService routes data to WriteBuffers. ClickHouse repositories implement batch inserts via JdbcTemplate. FlushScheduler drains buffers on interval and flushes remaining on shutdown.</done>
|
||||
</task>
|
||||
|
||||
<task type="auto" tdd="true">
|
||||
<name>Task 2: Ingestion REST controllers and integration tests</name>
|
||||
<files>
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java,
|
||||
cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/MetricsController.java,
|
||||
cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java,
|
||||
cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java,
|
||||
cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java,
|
||||
cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java
|
||||
</files>
|
||||
<behavior>
|
||||
- POST /api/v1/data/executions with single RouteExecution JSON returns 202
|
||||
- POST /api/v1/data/executions with array of RouteExecutions returns 202
|
||||
- POST /api/v1/data/diagrams with single RouteGraph returns 202
|
||||
- POST /api/v1/data/diagrams with array of RouteGraphs returns 202
|
||||
- POST /api/v1/data/metrics with metrics payload returns 202
|
||||
- After flush interval, posted data is queryable in ClickHouse
|
||||
- When buffer is full, POST returns 503 with Retry-After header
|
||||
- Unknown JSON fields in request body are accepted (not rejected)
|
||||
</behavior>
|
||||
<action>
|
||||
1. Create ExecutionController:
|
||||
- @RestController @RequestMapping("/api/v1/data")
|
||||
- POST /executions: accepts @RequestBody that handles both single RouteExecution and List<RouteExecution>. Use a custom deserializer or accept Object and check type, OR simply always accept as List (require agents to send arrays). Per protocol, accept both single and array.
|
||||
- Calls ingestionService.acceptExecutions(). If returns false -> 503 with Retry-After: 5 header. If true -> 202 Accepted.
|
||||
- Add @Operation annotations for OpenAPI documentation.
|
||||
|
||||
2. Create DiagramController:
|
||||
- @RestController @RequestMapping("/api/v1/data")
|
||||
- POST /diagrams: same pattern, accepts single or array of RouteGraph, delegates to ingestionService.
|
||||
|
||||
3. Create MetricsController:
|
||||
- @RestController @RequestMapping("/api/v1/data")
|
||||
- POST /metrics: same pattern.
|
||||
|
||||
4. Create ExecutionControllerIT (extends AbstractClickHouseIT):
|
||||
- Use TestRestTemplate or MockMvc with @AutoConfigureMockMvc
|
||||
- Test: POST valid RouteExecution JSON with X-Cameleer-Protocol-Version:1 header -> 202
|
||||
- Test: POST array of executions -> 202
|
||||
- Test: After post, wait for flush (use Awaitility), query ClickHouse directly via JdbcTemplate to verify data arrived
|
||||
- Test: POST with unknown JSON fields -> 202 (forward compat, API-05)
|
||||
|
||||
5. Create DiagramControllerIT (extends AbstractClickHouseIT):
|
||||
- Test: POST RouteGraph -> 202
|
||||
- Test: After flush, diagram stored in ClickHouse with content_hash
|
||||
|
||||
6. Create MetricsControllerIT (extends AbstractClickHouseIT):
|
||||
- Test: POST metrics -> 202
|
||||
- Test: After flush, metrics in ClickHouse
|
||||
|
||||
7. Create BackpressureIT (extends AbstractClickHouseIT):
|
||||
- Configure test with tiny buffer (ingestion.buffer-capacity=5)
|
||||
- Fill buffer by posting enough data
|
||||
- Next POST returns 503 with Retry-After header
|
||||
- Verify previously buffered data is NOT lost (still flushes to ClickHouse)
|
||||
|
||||
Note: All integration tests must include X-Cameleer-Protocol-Version:1 header (API-04 will be enforced by Plan 03's interceptor, but include the header now for forward compatibility).
|
||||
</action>
|
||||
<verify>
|
||||
<automated>mvn test -pl cameleer3-server-app -Dtest="ExecutionControllerIT,DiagramControllerIT,MetricsControllerIT,BackpressureIT" -q 2>&1 | tail -15</automated>
|
||||
</verify>
|
||||
<done>All three ingestion endpoints return 202 on valid data. Data arrives in ClickHouse after flush. Buffer-full returns 503 with Retry-After. Unknown JSON fields accepted. Integration tests green.</done>
|
||||
</task>
|
||||
|
||||
</tasks>
|
||||
|
||||
<verification>
|
||||
- `mvn test -pl cameleer3-server-app -Dtest="ExecutionControllerIT,DiagramControllerIT,MetricsControllerIT,BackpressureIT" -q` -- all integration tests pass
|
||||
- POST to /api/v1/data/executions returns 202
|
||||
- POST to /api/v1/data/diagrams returns 202
|
||||
- POST to /api/v1/data/metrics returns 202
|
||||
- Buffer full returns 503
|
||||
</verification>
|
||||
|
||||
<success_criteria>
|
||||
All four integration test classes green. Data flows from HTTP POST through WriteBuffer through FlushScheduler to ClickHouse. Backpressure returns 503 when buffer full without losing existing data.
|
||||
</success_criteria>
|
||||
|
||||
<output>
|
||||
After completion, create `.planning/phases/01-ingestion-pipeline-api-foundation/01-02-SUMMARY.md`
|
||||
</output>
|
||||
Reference in New Issue
Block a user