fix: add periodic deployment status reconciliation
The DockerEventMonitor only reacted to Docker events. If an event was missed (e.g., during reconnect or startup race), a DEGRADED deployment with all replicas healthy would never promote back to RUNNING. Add a @Scheduled reconciliation (every 30s) that inspects actual container state and corrects deployment status mismatches. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,8 +1,10 @@
|
|||||||
package com.cameleer3.server.app.runtime;
|
package com.cameleer3.server.app.runtime;
|
||||||
|
|
||||||
import com.cameleer3.server.app.storage.PostgresDeploymentRepository;
|
import com.cameleer3.server.app.storage.PostgresDeploymentRepository;
|
||||||
|
import com.cameleer3.server.core.runtime.ContainerStatus;
|
||||||
import com.cameleer3.server.core.runtime.Deployment;
|
import com.cameleer3.server.core.runtime.Deployment;
|
||||||
import com.cameleer3.server.core.runtime.DeploymentStatus;
|
import com.cameleer3.server.core.runtime.DeploymentStatus;
|
||||||
|
import com.cameleer3.server.core.runtime.RuntimeOrchestrator;
|
||||||
import com.github.dockerjava.api.DockerClient;
|
import com.github.dockerjava.api.DockerClient;
|
||||||
import com.github.dockerjava.api.async.ResultCallback;
|
import com.github.dockerjava.api.async.ResultCallback;
|
||||||
import com.github.dockerjava.api.model.Event;
|
import com.github.dockerjava.api.model.Event;
|
||||||
@@ -11,6 +13,7 @@ import jakarta.annotation.PostConstruct;
|
|||||||
import jakarta.annotation.PreDestroy;
|
import jakarta.annotation.PreDestroy;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -21,12 +24,14 @@ public class DockerEventMonitor {
|
|||||||
private static final Logger log = LoggerFactory.getLogger(DockerEventMonitor.class);
|
private static final Logger log = LoggerFactory.getLogger(DockerEventMonitor.class);
|
||||||
|
|
||||||
private final DockerClient dockerClient;
|
private final DockerClient dockerClient;
|
||||||
|
private final RuntimeOrchestrator runtimeOrchestrator;
|
||||||
private final PostgresDeploymentRepository deploymentRepository;
|
private final PostgresDeploymentRepository deploymentRepository;
|
||||||
private Closeable eventStream;
|
private Closeable eventStream;
|
||||||
|
|
||||||
public DockerEventMonitor(DockerRuntimeOrchestrator orchestrator,
|
public DockerEventMonitor(DockerRuntimeOrchestrator orchestrator,
|
||||||
PostgresDeploymentRepository deploymentRepository) {
|
PostgresDeploymentRepository deploymentRepository) {
|
||||||
this.dockerClient = orchestrator.getDockerClient();
|
this.dockerClient = orchestrator.getDockerClient();
|
||||||
|
this.runtimeOrchestrator = orchestrator;
|
||||||
this.deploymentRepository = deploymentRepository;
|
this.deploymentRepository = deploymentRepository;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -118,6 +123,65 @@ public class DockerEventMonitor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Periodic reconciliation: inspects actual container state for active deployments
|
||||||
|
* and corrects status mismatches caused by missed Docker events.
|
||||||
|
*/
|
||||||
|
@Scheduled(fixedDelay = 30_000, initialDelay = 60_000)
|
||||||
|
public void reconcile() {
|
||||||
|
List<Deployment> active = deploymentRepository.findByStatus(
|
||||||
|
List.of(DeploymentStatus.RUNNING, DeploymentStatus.DEGRADED, DeploymentStatus.STARTING));
|
||||||
|
|
||||||
|
for (Deployment deployment : active) {
|
||||||
|
if (deployment.replicaStates() == null || deployment.replicaStates().isEmpty()) continue;
|
||||||
|
|
||||||
|
List<Map<String, Object>> replicas = new ArrayList<>(deployment.replicaStates());
|
||||||
|
boolean changed = false;
|
||||||
|
|
||||||
|
for (int i = 0; i < replicas.size(); i++) {
|
||||||
|
Map<String, Object> replica = replicas.get(i);
|
||||||
|
String containerId = (String) replica.get("containerId");
|
||||||
|
if (containerId == null) continue;
|
||||||
|
|
||||||
|
ContainerStatus actual = runtimeOrchestrator.getContainerStatus(containerId);
|
||||||
|
String currentStatus = (String) replica.get("status");
|
||||||
|
String actualStatus = actual.running() ? "RUNNING" : "DEAD";
|
||||||
|
|
||||||
|
if (!actualStatus.equals(currentStatus)) {
|
||||||
|
Map<String, Object> updated = new HashMap<>(replica);
|
||||||
|
updated.put("status", actualStatus);
|
||||||
|
replicas.set(i, updated);
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!changed) {
|
||||||
|
// Even if replica states haven't changed, check if deployment status is correct
|
||||||
|
long running = replicas.stream().filter(r -> "RUNNING".equals(r.get("status"))).count();
|
||||||
|
DeploymentStatus expected = running == replicas.size() ? DeploymentStatus.RUNNING
|
||||||
|
: running > 0 ? DeploymentStatus.DEGRADED : DeploymentStatus.FAILED;
|
||||||
|
if (deployment.status() != expected) {
|
||||||
|
deploymentRepository.updateStatus(deployment.id(), expected, deployment.containerId(), deployment.errorMessage());
|
||||||
|
log.info("Reconcile: deployment {} status corrected {} -> {} ({}/{} running)",
|
||||||
|
deployment.id(), deployment.status(), expected, running, replicas.size());
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
deploymentRepository.updateReplicaStates(deployment.id(), replicas);
|
||||||
|
|
||||||
|
long running = replicas.stream().filter(r -> "RUNNING".equals(r.get("status"))).count();
|
||||||
|
DeploymentStatus newStatus = running == replicas.size() ? DeploymentStatus.RUNNING
|
||||||
|
: running > 0 ? DeploymentStatus.DEGRADED : DeploymentStatus.FAILED;
|
||||||
|
|
||||||
|
if (deployment.status() != newStatus) {
|
||||||
|
deploymentRepository.updateStatus(deployment.id(), newStatus, deployment.containerId(), deployment.errorMessage());
|
||||||
|
log.info("Reconcile: deployment {} status {} -> {} ({}/{} replicas running)",
|
||||||
|
deployment.id(), deployment.status(), newStatus, running, replicas.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void reconnect() {
|
private void reconnect() {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
|
|||||||
@@ -62,6 +62,13 @@ public class PostgresDeploymentRepository implements DeploymentRepository {
|
|||||||
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<Deployment> findByStatus(List<DeploymentStatus> statuses) {
|
||||||
|
String placeholders = String.join(",", statuses.stream().map(s -> "'" + s.name() + "'").toList());
|
||||||
|
return jdbc.query(
|
||||||
|
"SELECT " + SELECT_COLS + " FROM deployments WHERE status IN (" + placeholders + ") ORDER BY created_at DESC",
|
||||||
|
(rs, rowNum) -> mapRow(rs));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public UUID create(UUID appId, UUID appVersionId, UUID environmentId, String containerName) {
|
public UUID create(UUID appId, UUID appVersionId, UUID environmentId, String containerName) {
|
||||||
UUID id = UUID.randomUUID();
|
UUID id = UUID.randomUUID();
|
||||||
|
|||||||
Reference in New Issue
Block a user