fix: drain all buffered data to server on graceful shutdown
Previously, LogForwarder and ChunkedExporter only flushed a single batch (50 items) per queue on close, silently dropping remaining entries. The collector also shut down after the exporter was already closed, losing any in-flight exchange buffers. Four fixes: - LogForwarder.close() loops until queue is fully drained - ChunkedExporter.close() loops all queues until empty, bypassing backpressure pause - FlatExecutionCollector.shutdown() flushes active exchange buffers before the exporter closes - doGracefulShutdown() reorders collector shutdown before exporter close Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -70,6 +70,7 @@ java -jar cameleer-quarkus-native-app/target/cameleer-quarkus-native-app-1.0-SNA
|
||||
- Extension uses `CamelContextCustomizer` (not `@Observes StartupEvent`) for Phase 1 — Camel Quarkus starts the context from a recorded build step before `StartupEvent` fires. `CamelContextCustomizer.configure()` runs before `start()`, same timing as the agent's `preInstall()`.
|
||||
- **Log forwarding architecture**: early server connection in `preInstall`/`configure` provides a live exporter before the appender registers. No deferred exporter, no early buffer. Unified path for all frameworks (Spring Boot, Quarkus, Plain Java). Pre-agent logs (JVM bootstrap) are the server team's responsibility via Docker/K8s infra.
|
||||
- Auto-recovery: agent re-registers on 404 (heartbeat/data), refreshes token on 401/403
|
||||
- **Graceful shutdown**: on `CamelContextStoppingEvent`, the agent drains all buffered data to the server before deregistering. Shutdown order: stop SSE → stop heartbeat → drain logs → emit AGENT_STOPPED → flush collector buffers → drain exporter queues → deregister. The collector must flush **before** the exporter closes. All queue drains must loop until empty (not just one batch). See `docs/architecture.md` for the full sequence.
|
||||
|
||||
## CI/CD & Deployment
|
||||
|
||||
|
||||
@@ -896,6 +896,8 @@ public class FlatExecutionCollector extends ExecutionCollector {
|
||||
public void shutdown() {
|
||||
periodicFlusher.shutdownNow();
|
||||
evictor.shutdownNow();
|
||||
// Flush any buffered records for in-flight exchanges before the exporter closes
|
||||
flushActiveBuffers();
|
||||
super.shutdown();
|
||||
}
|
||||
|
||||
|
||||
@@ -281,7 +281,6 @@ public class ChunkedExporter implements Exporter {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
flush();
|
||||
scheduler.shutdown();
|
||||
try {
|
||||
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
@@ -291,6 +290,12 @@ public class ChunkedExporter implements Exporter {
|
||||
scheduler.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
flush();
|
||||
// Drain all queues completely (bypasses backpressure pause)
|
||||
while (!chunkQueue.isEmpty() || !metricsQueue.isEmpty() || !eventQueue.isEmpty() || !logQueue.isEmpty()) {
|
||||
flushChunks();
|
||||
flushMetrics();
|
||||
flushEvents();
|
||||
flushLogs();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,7 +98,10 @@ public class LogForwarder {
|
||||
public void close() {
|
||||
scheduler.shutdown();
|
||||
try {
|
||||
flush();
|
||||
// Drain all remaining log entries (not just one batch)
|
||||
while (!queue.isEmpty()) {
|
||||
flush();
|
||||
}
|
||||
scheduler.awaitTermination(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
|
||||
@@ -193,7 +193,9 @@ public class CameleerEventNotifier extends EventNotifierSupport {
|
||||
stopLogForwarding();
|
||||
// Emit AGENT_STOPPED event before closing the exporter so it's included in the final flush
|
||||
sendEvent("AGENT_STOPPED", Map.of());
|
||||
// Flush exporter
|
||||
// Flush collector buffers while exporter is still open (pushes remaining chunks to exporter queue)
|
||||
collector.shutdown();
|
||||
// Drain exporter queues to server (chunks, metrics, events, logs)
|
||||
if (httpExporter != null) {
|
||||
try { httpExporter.close(); } catch (Exception e) {
|
||||
LOG.debug("Cameleer: Error closing HTTP exporter", e);
|
||||
@@ -209,8 +211,6 @@ public class CameleerEventNotifier extends EventNotifierSupport {
|
||||
} else if (metricsBridge != null) {
|
||||
metricsBridge.stop();
|
||||
}
|
||||
// Stop eviction thread
|
||||
collector.shutdown();
|
||||
}
|
||||
|
||||
private void stopLogForwarding() {
|
||||
|
||||
@@ -164,6 +164,28 @@ Timer-driven HTTP caller that invokes backend-app endpoints. Demonstrates `X-Cam
|
||||
|
||||
3 route classes using the Quarkus extension (no agent). Proves CDI lifecycle hooks work without ByteBuddy and that the extension is GraalVM native-image compatible.
|
||||
|
||||
## Graceful Shutdown
|
||||
|
||||
On `CamelContextStoppingEvent`, the `CameleerEventNotifier` runs a graceful shutdown sequence on a dedicated `cameleer-shutdown` thread (30-second timeout). The order ensures all buffered data reaches the server before deregistration:
|
||||
|
||||
```
|
||||
CamelContextStoppingEvent
|
||||
→ Stop SSE client (no more server commands)
|
||||
→ Stop HeartbeatManager (no more keepalives)
|
||||
→ Stop log forwarding (clear bridge handler, drain LogForwarder queue → exporter)
|
||||
→ Emit AGENT_STOPPED event (queued in exporter for final flush)
|
||||
→ Collector shutdown (flush active exchange buffers → chunks land in exporter queue)
|
||||
→ ChunkedExporter close (drain ALL queues to server: chunks, metrics, events, logs)
|
||||
→ ServerConnection.deregister() (notify server of graceful shutdown)
|
||||
→ Stop MetricsBridge
|
||||
```
|
||||
|
||||
**Key invariants:**
|
||||
- The collector flushes its in-flight exchange buffers **before** the exporter closes, so late chunks are not lost.
|
||||
- `LogForwarder.close()` and `ChunkedExporter.close()` drain their queues completely (looping over all batches), not just a single batch.
|
||||
- The `ChunkedExporter` drain bypasses the backpressure pause — shutdown must send everything regardless of prior 503s.
|
||||
- The Quarkus extension's `@Observes ShutdownEvent` is a safety net; primary cleanup runs via the `CamelContextStoppingEvent` path above.
|
||||
|
||||
## Cross-Service Correlation
|
||||
|
||||
`X-Cameleer-CorrelationId` header is set on exchange creation and propagated across `direct:`, `seda:`, and HTTP boundaries. The caller-app attaches the header when calling backend-app, allowing the server to link executions across services.
|
||||
|
||||
Reference in New Issue
Block a user