diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/runtime/DockerEventMonitor.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/runtime/DockerEventMonitor.java index 333139c1..ff97afa0 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/runtime/DockerEventMonitor.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/runtime/DockerEventMonitor.java @@ -1,8 +1,10 @@ package com.cameleer3.server.app.runtime; import com.cameleer3.server.app.storage.PostgresDeploymentRepository; +import com.cameleer3.server.core.runtime.ContainerStatus; import com.cameleer3.server.core.runtime.Deployment; import com.cameleer3.server.core.runtime.DeploymentStatus; +import com.cameleer3.server.core.runtime.RuntimeOrchestrator; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.async.ResultCallback; import com.github.dockerjava.api.model.Event; @@ -11,6 +13,7 @@ import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; import java.io.Closeable; import java.io.IOException; @@ -21,12 +24,14 @@ public class DockerEventMonitor { private static final Logger log = LoggerFactory.getLogger(DockerEventMonitor.class); private final DockerClient dockerClient; + private final RuntimeOrchestrator runtimeOrchestrator; private final PostgresDeploymentRepository deploymentRepository; private Closeable eventStream; public DockerEventMonitor(DockerRuntimeOrchestrator orchestrator, PostgresDeploymentRepository deploymentRepository) { this.dockerClient = orchestrator.getDockerClient(); + this.runtimeOrchestrator = orchestrator; this.deploymentRepository = deploymentRepository; } @@ -118,6 +123,65 @@ public class DockerEventMonitor { } } + /** + * Periodic reconciliation: inspects actual container state for active deployments + * and corrects status mismatches caused by missed Docker events. + */ + @Scheduled(fixedDelay = 30_000, initialDelay = 60_000) + public void reconcile() { + List active = deploymentRepository.findByStatus( + List.of(DeploymentStatus.RUNNING, DeploymentStatus.DEGRADED, DeploymentStatus.STARTING)); + + for (Deployment deployment : active) { + if (deployment.replicaStates() == null || deployment.replicaStates().isEmpty()) continue; + + List> replicas = new ArrayList<>(deployment.replicaStates()); + boolean changed = false; + + for (int i = 0; i < replicas.size(); i++) { + Map replica = replicas.get(i); + String containerId = (String) replica.get("containerId"); + if (containerId == null) continue; + + ContainerStatus actual = runtimeOrchestrator.getContainerStatus(containerId); + String currentStatus = (String) replica.get("status"); + String actualStatus = actual.running() ? "RUNNING" : "DEAD"; + + if (!actualStatus.equals(currentStatus)) { + Map updated = new HashMap<>(replica); + updated.put("status", actualStatus); + replicas.set(i, updated); + changed = true; + } + } + + if (!changed) { + // Even if replica states haven't changed, check if deployment status is correct + long running = replicas.stream().filter(r -> "RUNNING".equals(r.get("status"))).count(); + DeploymentStatus expected = running == replicas.size() ? DeploymentStatus.RUNNING + : running > 0 ? DeploymentStatus.DEGRADED : DeploymentStatus.FAILED; + if (deployment.status() != expected) { + deploymentRepository.updateStatus(deployment.id(), expected, deployment.containerId(), deployment.errorMessage()); + log.info("Reconcile: deployment {} status corrected {} -> {} ({}/{} running)", + deployment.id(), deployment.status(), expected, running, replicas.size()); + } + continue; + } + + deploymentRepository.updateReplicaStates(deployment.id(), replicas); + + long running = replicas.stream().filter(r -> "RUNNING".equals(r.get("status"))).count(); + DeploymentStatus newStatus = running == replicas.size() ? DeploymentStatus.RUNNING + : running > 0 ? DeploymentStatus.DEGRADED : DeploymentStatus.FAILED; + + if (deployment.status() != newStatus) { + deploymentRepository.updateStatus(deployment.id(), newStatus, deployment.containerId(), deployment.errorMessage()); + log.info("Reconcile: deployment {} status {} -> {} ({}/{} replicas running)", + deployment.id(), deployment.status(), newStatus, running, replicas.size()); + } + } + } + private void reconnect() { try { Thread.sleep(5000); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDeploymentRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDeploymentRepository.java index 69533761..a478bd1c 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDeploymentRepository.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/PostgresDeploymentRepository.java @@ -62,6 +62,13 @@ public class PostgresDeploymentRepository implements DeploymentRepository { return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } + public List findByStatus(List statuses) { + String placeholders = String.join(",", statuses.stream().map(s -> "'" + s.name() + "'").toList()); + return jdbc.query( + "SELECT " + SELECT_COLS + " FROM deployments WHERE status IN (" + placeholders + ") ORDER BY created_at DESC", + (rs, rowNum) -> mapRow(rs)); + } + @Override public UUID create(UUID appId, UUID appVersionId, UUID environmentId, String containerName) { UUID id = UUID.randomUUID();