feat: DockerEventMonitor — persistent event stream for container lifecycle
Listens to Docker daemon events (die, oom, start, stop) for containers labeled managed-by=cameleer3-server, updates replica states in Postgres, and recomputes aggregate deployment status (RUNNING/DEGRADED/FAILED). Bean is wired in RuntimeOrchestratorAutoConfig via instanceof guard so it only activates when Docker is available. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,129 @@
|
||||
package com.cameleer3.server.app.runtime;
|
||||
|
||||
import com.cameleer3.server.app.storage.PostgresDeploymentRepository;
|
||||
import com.cameleer3.server.core.runtime.Deployment;
|
||||
import com.cameleer3.server.core.runtime.DeploymentStatus;
|
||||
import com.github.dockerjava.api.DockerClient;
|
||||
import com.github.dockerjava.api.async.ResultCallback;
|
||||
import com.github.dockerjava.api.model.Event;
|
||||
import com.github.dockerjava.api.model.EventType;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
|
||||
public class DockerEventMonitor {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DockerEventMonitor.class);
|
||||
|
||||
private final DockerClient dockerClient;
|
||||
private final PostgresDeploymentRepository deploymentRepository;
|
||||
private Closeable eventStream;
|
||||
|
||||
public DockerEventMonitor(DockerRuntimeOrchestrator orchestrator,
|
||||
PostgresDeploymentRepository deploymentRepository) {
|
||||
this.dockerClient = orchestrator.getDockerClient();
|
||||
this.deploymentRepository = deploymentRepository;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void startListening() {
|
||||
eventStream = dockerClient.eventsCmd()
|
||||
.withEventTypeFilter(EventType.CONTAINER)
|
||||
.withEventFilter("die", "oom", "start", "stop")
|
||||
.exec(new ResultCallback.Adapter<Event>() {
|
||||
@Override
|
||||
public void onNext(Event event) {
|
||||
handleEvent(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
log.warn("Docker event stream error, reconnecting: {}", throwable.getMessage());
|
||||
reconnect();
|
||||
}
|
||||
});
|
||||
|
||||
log.info("Docker event monitor started");
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
if (eventStream != null) {
|
||||
try { eventStream.close(); } catch (IOException e) { /* ignore */ }
|
||||
}
|
||||
}
|
||||
|
||||
private void handleEvent(Event event) {
|
||||
String containerId = event.getId();
|
||||
if (containerId == null) return;
|
||||
|
||||
Map<String, String> labels = event.getActor() != null ? event.getActor().getAttributes() : null;
|
||||
if (labels == null || !"cameleer3-server".equals(labels.get("managed-by"))) return;
|
||||
|
||||
String action = event.getAction();
|
||||
log.debug("Docker event: {} for container {} ({})", action, containerId.substring(0, 12),
|
||||
labels.get("cameleer.app"));
|
||||
|
||||
Optional<Deployment> deploymentOpt = deploymentRepository.findByContainerId(containerId);
|
||||
if (deploymentOpt.isEmpty()) return;
|
||||
|
||||
Deployment deployment = deploymentOpt.get();
|
||||
List<Map<String, Object>> replicas = new ArrayList<>(deployment.replicaStates());
|
||||
|
||||
boolean changed = false;
|
||||
for (int i = 0; i < replicas.size(); i++) {
|
||||
Map<String, Object> replica = replicas.get(i);
|
||||
if (containerId.equals(replica.get("containerId"))) {
|
||||
Map<String, Object> updated = new HashMap<>(replica);
|
||||
switch (action) {
|
||||
case "die", "oom", "stop" -> {
|
||||
updated.put("status", "DEAD");
|
||||
if ("oom".equals(action)) {
|
||||
updated.put("oomKilled", true);
|
||||
log.warn("Container {} OOM-killed (app={}, env={})", containerId.substring(0, 12),
|
||||
labels.get("cameleer.app"), labels.get("cameleer.environment"));
|
||||
}
|
||||
}
|
||||
case "start" -> updated.put("status", "RUNNING");
|
||||
}
|
||||
replicas.set(i, updated);
|
||||
changed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!changed) return;
|
||||
|
||||
deploymentRepository.updateReplicaStates(deployment.id(), replicas);
|
||||
|
||||
long running = replicas.stream().filter(r -> "RUNNING".equals(r.get("status"))).count();
|
||||
DeploymentStatus newStatus;
|
||||
if (running == replicas.size()) {
|
||||
newStatus = DeploymentStatus.RUNNING;
|
||||
} else if (running > 0) {
|
||||
newStatus = DeploymentStatus.DEGRADED;
|
||||
} else {
|
||||
newStatus = DeploymentStatus.FAILED;
|
||||
}
|
||||
|
||||
if (deployment.status() != newStatus) {
|
||||
deploymentRepository.updateStatus(deployment.id(), newStatus, deployment.containerId(), deployment.errorMessage());
|
||||
log.info("Deployment {} status: {} -> {} ({}/{} replicas running)",
|
||||
deployment.id(), deployment.status(), newStatus, running, replicas.size());
|
||||
}
|
||||
}
|
||||
|
||||
private void reconnect() {
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
startListening();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
package com.cameleer3.server.app.runtime;
|
||||
|
||||
import com.cameleer3.server.app.storage.PostgresDeploymentRepository;
|
||||
import com.cameleer3.server.core.runtime.DeploymentRepository;
|
||||
import com.cameleer3.server.core.runtime.RuntimeOrchestrator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -33,4 +35,13 @@ public class RuntimeOrchestratorAutoConfig {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DockerEventMonitor dockerEventMonitor(RuntimeOrchestrator orchestrator,
|
||||
DeploymentRepository deploymentRepository) {
|
||||
if (orchestrator instanceof DockerRuntimeOrchestrator docker) {
|
||||
return new DockerEventMonitor(docker, (PostgresDeploymentRepository) deploymentRepository);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user