Integration tests in 01-02 extend AbstractClickHouseIT from 01-03. Both were wave 2 (parallel), causing potential compilation failure. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
14 KiB
phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves
| phase | plan | type | wave | depends_on | files_modified | autonomous | requirements | must_haves | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 01-ingestion-pipeline-api-foundation | 02 | execute | 3 |
|
|
true |
|
|
Purpose: This is the core data pipeline -- agents POST data to endpoints, IngestionService buffers it, ClickHouseFlushScheduler drains and batch-inserts to ClickHouse. Backpressure returns 503 when buffer full. Output: Working ingestion flow verified by integration tests against Testcontainers ClickHouse.
<execution_context> @C:/Users/Hendrik/.claude/get-shit-done/workflows/execute-plan.md @C:/Users/Hendrik/.claude/get-shit-done/templates/summary.md </execution_context>
@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/01-ingestion-pipeline-api-foundation/01-RESEARCH.md @.planning/phases/01-ingestion-pipeline-api-foundation/01-01-SUMMARY.md From cameleer3-server-core WriteBuffer.java: ```java public class WriteBuffer { public WriteBuffer(int capacity); public boolean offer(T item); public boolean offerBatch(List items); public List drain(int maxBatch); public int size(); public int capacity(); public boolean isFull(); public int remainingCapacity(); } ```From cameleer3-server-core repository interfaces:
public interface ExecutionRepository {
void insertBatch(List<RouteExecution> executions);
}
public interface DiagramRepository {
void store(RouteGraph graph);
Optional<RouteGraph> findByContentHash(String hash);
Optional<String> findContentHashForRoute(String routeId, String agentId);
}
public interface MetricsRepository {
void insertBatch(List<MetricsSnapshot> metrics);
}
From IngestionConfig:
@ConfigurationProperties("ingestion")
public class IngestionConfig {
int bufferCapacity; // default 50000
int batchSize; // default 5000
long flushIntervalMs; // default 1000
}
2. Create ClickHouseExecutionRepository implements ExecutionRepository:
- @Repository, inject JdbcTemplate
- insertBatch: INSERT INTO route_executions with all columns. Map RouteExecution fields to ClickHouse columns.
For processor execution arrays: extract from RouteExecution.getProcessorExecutions() into parallel arrays (processor_ids, processor_types, etc.)
Use JdbcTemplate.batchUpdate with BatchPreparedStatementSetter.
For Array columns, use java.sql.Array via connection.createArrayOf() or pass as comma-separated and cast.
Note: ClickHouse JDBC V2 handles Array types -- pass Java arrays directly via ps.setObject().
3. Create ClickHouseDiagramRepository implements DiagramRepository:
- @Repository, inject JdbcTemplate
- store(RouteGraph): serialize graph to JSON (Jackson ObjectMapper), compute SHA-256 hex hash of JSON bytes, INSERT INTO route_diagrams (content_hash, route_id, agent_id, definition)
- findByContentHash: SELECT by content_hash, deserialize definition JSON back to RouteGraph
- findContentHashForRoute: SELECT content_hash WHERE route_id=? AND agent_id=? ORDER BY created_at DESC LIMIT 1
4. Create ClickHouseMetricsRepository implements MetricsRepository:
- @Repository, inject JdbcTemplate
- insertBatch: INSERT INTO agent_metrics with batchUpdate
5. Create ClickHouseFlushScheduler:
- @Component, @EnableScheduling on the app config or main class
- Inject three WriteBuffer instances and three repository implementations
- Inject IngestionConfig for batchSize
- @Scheduled(fixedDelayString="${ingestion.flush-interval-ms:1000}") flushAll(): drains each buffer up to batchSize, calls insertBatch if non-empty. Wrap each in try-catch to log errors without stopping the scheduler.
- Implement SmartLifecycle: on stop(), flush all remaining data (loop drain until empty) before returning.
mvn clean compile -q 2>&1 | tail -5
IngestionService routes data to WriteBuffers. ClickHouse repositories implement batch inserts via JdbcTemplate. FlushScheduler drains buffers on interval and flushes remaining on shutdown.
Task 2: Ingestion REST controllers and integration tests
cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ExecutionController.java,
cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/DiagramController.java,
cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/MetricsController.java,
cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ExecutionControllerIT.java,
cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/DiagramControllerIT.java,
cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/MetricsControllerIT.java,
cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/BackpressureIT.java
- POST /api/v1/data/executions with single RouteExecution JSON returns 202
- POST /api/v1/data/executions with array of RouteExecutions returns 202
- POST /api/v1/data/diagrams with single RouteGraph returns 202
- POST /api/v1/data/diagrams with array of RouteGraphs returns 202
- POST /api/v1/data/metrics with metrics payload returns 202
- After flush interval, posted data is queryable in ClickHouse
- When buffer is full, POST returns 503 with Retry-After header
- Unknown JSON fields in request body are accepted (not rejected)
1. Create ExecutionController:
- @RestController @RequestMapping("/api/v1/data")
- POST /executions: accepts @RequestBody that handles both single RouteExecution and List. Use a custom deserializer or accept Object and check type, OR simply always accept as List (require agents to send arrays). Per protocol, accept both single and array.
- Calls ingestionService.acceptExecutions(). If returns false -> 503 with Retry-After: 5 header. If true -> 202 Accepted.
- Add @Operation annotations for OpenAPI documentation.
2. Create DiagramController:
- @RestController @RequestMapping("/api/v1/data")
- POST /diagrams: same pattern, accepts single or array of RouteGraph, delegates to ingestionService.
3. Create MetricsController:
- @RestController @RequestMapping("/api/v1/data")
- POST /metrics: same pattern.
4. Create ExecutionControllerIT (extends AbstractClickHouseIT):
- Use TestRestTemplate or MockMvc with @AutoConfigureMockMvc
- Test: POST valid RouteExecution JSON with X-Cameleer-Protocol-Version:1 header -> 202
- Test: POST array of executions -> 202
- Test: After post, wait for flush (use Awaitility), query ClickHouse directly via JdbcTemplate to verify data arrived
- Test: POST with unknown JSON fields -> 202 (forward compat, API-05)
5. Create DiagramControllerIT (extends AbstractClickHouseIT):
- Test: POST RouteGraph -> 202
- Test: After flush, diagram stored in ClickHouse with content_hash
6. Create MetricsControllerIT (extends AbstractClickHouseIT):
- Test: POST metrics -> 202
- Test: After flush, metrics in ClickHouse
7. Create BackpressureIT (extends AbstractClickHouseIT):
- Configure test with tiny buffer (ingestion.buffer-capacity=5)
- Fill buffer by posting enough data
- Next POST returns 503 with Retry-After header
- Verify previously buffered data is NOT lost (still flushes to ClickHouse)
Note: All integration tests must include X-Cameleer-Protocol-Version:1 header (API-04 will be enforced by Plan 03's interceptor, but include the header now for forward compatibility).
mvn test -pl cameleer3-server-app -Dtest="ExecutionControllerIT,DiagramControllerIT,MetricsControllerIT,BackpressureIT" -q 2>&1 | tail -15
All three ingestion endpoints return 202 on valid data. Data arrives in ClickHouse after flush. Buffer-full returns 503 with Retry-After. Unknown JSON fields accepted. Integration tests green.
- `mvn test -pl cameleer3-server-app -Dtest="ExecutionControllerIT,DiagramControllerIT,MetricsControllerIT,BackpressureIT" -q` -- all integration tests pass
- POST to /api/v1/data/executions returns 202
- POST to /api/v1/data/diagrams returns 202
- POST to /api/v1/data/metrics returns 202
- Buffer full returns 503
<success_criteria> All four integration test classes green. Data flows from HTTP POST through WriteBuffer through FlushScheduler to ClickHouse. Backpressure returns 503 when buffer full without losing existing data. </success_criteria>
After completion, create `.planning/phases/01-ingestion-pipeline-api-foundation/01-02-SUMMARY.md`