deploy: rolling strategy (per-replica replacement)
Replace the Phase 3 stub with a working rolling implementation.
Flow:
- Capture previous deployment's per-index container ids up front.
- For i = 0..replicas-1:
- Start new[i] (gen-suffixed name, coexists with old[i]).
- Wait for new[i] healthy (new waitForOneHealthy helper).
- On success: stop old[i] if present, continue.
- On failure: stop in-flight new[0..i], leave un-replaced old[i+1..N]
running, mark FAILED. Already-replaced old replicas are not
restored — rolling is not reversible; user redeploys to recover.
- After the loop: sweep any leftover old replicas (when replica count
shrank) and mark the old deployment STOPPED.
Resource peak: replicas + 1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -287,13 +287,122 @@ public class DeploymentExecutor {
|
||||
|
||||
/**
|
||||
* Rolling strategy: replace replicas one at a time — start new[i], wait
|
||||
* healthy, stop old[i]. On any replica's health failure, stop the failing
|
||||
* new container and abort without touching the remaining (un-replaced) old
|
||||
* replicas. Implemented in a follow-up phase.
|
||||
* healthy, stop old[i]. On any replica's health failure, stop the
|
||||
* in-flight new container, leave remaining old replicas serving, mark
|
||||
* FAILED. Already-replaced old containers are not restored (can't unring
|
||||
* that bell) — user redeploys to recover.
|
||||
*
|
||||
* Resource peak: replicas + 1 (briefly while a new replica warms up
|
||||
* before its counterpart is stopped).
|
||||
*/
|
||||
private void deployRolling(DeployCtx ctx) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Rolling deployment strategy is not yet implemented; use blue-green");
|
||||
ResolvedContainerConfig config = ctx.config();
|
||||
Deployment deployment = ctx.deployment();
|
||||
|
||||
// Capture previous deployment's per-index container ids up front.
|
||||
Optional<Deployment> previousOpt = deploymentRepository.findActiveByAppIdAndEnvironmentIdExcluding(
|
||||
deployment.appId(), deployment.environmentId(), deployment.id());
|
||||
Map<Integer, String> oldContainerByIndex = new LinkedHashMap<>();
|
||||
if (previousOpt.isPresent() && previousOpt.get().replicaStates() != null) {
|
||||
for (Map<String, Object> r : previousOpt.get().replicaStates()) {
|
||||
Object idx = r.get("index");
|
||||
Object cid = r.get("containerId");
|
||||
if (idx instanceof Number n && cid instanceof String s) {
|
||||
oldContainerByIndex.put(n.intValue(), s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === START REPLICAS ===
|
||||
updateStage(deployment.id(), DeployStage.START_REPLICAS);
|
||||
List<Map<String, Object>> replicaStates = new ArrayList<>();
|
||||
List<String> newContainerIds = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < config.replicas(); i++) {
|
||||
// Start new replica i (gen-suffixed name; coexists with old[i]).
|
||||
Map<String, Object> state = new LinkedHashMap<>();
|
||||
String newCid = startReplica(ctx, i, state);
|
||||
newContainerIds.add(newCid);
|
||||
replicaStates.add(state);
|
||||
pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates);
|
||||
|
||||
// === HEALTH CHECK (per-replica) ===
|
||||
updateStage(deployment.id(), DeployStage.HEALTH_CHECK);
|
||||
boolean healthy = waitForOneHealthy(newCid, healthCheckTimeout);
|
||||
if (!healthy) {
|
||||
// Abort: stop this in-flight new replica AND any new replicas
|
||||
// started so far. Already-stopped old replicas stay stopped
|
||||
// (rolling is not reversible). Remaining un-replaced old
|
||||
// replicas keep serving traffic.
|
||||
for (String cid : newContainerIds) {
|
||||
try { orchestrator.stopContainer(cid); orchestrator.removeContainer(cid); }
|
||||
catch (Exception e) { log.warn("Cleanup failed for {}: {}", cid, e.getMessage()); }
|
||||
}
|
||||
pgDeployRepo.updateDeployStage(deployment.id(), null);
|
||||
String reason = String.format(
|
||||
"rolling: replica %d failed to reach healthy within %ds; %d previous replicas still running",
|
||||
i, healthCheckTimeout, oldContainerByIndex.size());
|
||||
deploymentService.markFailed(deployment.id(), reason);
|
||||
serverMetrics.recordDeploymentOutcome("FAILED");
|
||||
serverMetrics.recordDeploymentDuration(ctx.deployStart());
|
||||
return;
|
||||
}
|
||||
|
||||
// Health check passed: update replica status to RUNNING, stop the
|
||||
// corresponding old[i] if present, and continue with replica i+1.
|
||||
replicaStates = updateReplicaHealth(replicaStates, newContainerIds);
|
||||
pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates);
|
||||
|
||||
String oldCid = oldContainerByIndex.remove(i);
|
||||
if (oldCid != null) {
|
||||
try {
|
||||
orchestrator.stopContainer(oldCid);
|
||||
orchestrator.removeContainer(oldCid);
|
||||
log.info("rolling: replaced replica {} (old={}, new={})", i, oldCid, newCid);
|
||||
} catch (Exception e) {
|
||||
log.warn("rolling: failed to stop old replica {} ({}): {}", i, oldCid, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === SWAP TRAFFIC ===
|
||||
// Any old replicas with indices >= new.replicas (e.g., when replica
|
||||
// count shrank) are still running; sweep them now so the old
|
||||
// deployment can be marked STOPPED.
|
||||
updateStage(deployment.id(), DeployStage.SWAP_TRAFFIC);
|
||||
for (Map.Entry<Integer, String> e : oldContainerByIndex.entrySet()) {
|
||||
try {
|
||||
orchestrator.stopContainer(e.getValue());
|
||||
orchestrator.removeContainer(e.getValue());
|
||||
log.info("rolling: stopped leftover old replica {} ({})", e.getKey(), e.getValue());
|
||||
} catch (Exception ex) {
|
||||
log.warn("rolling: failed to stop leftover old replica {}: {}", e.getKey(), ex.getMessage());
|
||||
}
|
||||
}
|
||||
if (previousOpt.isPresent()) {
|
||||
deploymentService.markStopped(previousOpt.get().id());
|
||||
}
|
||||
|
||||
// === COMPLETE ===
|
||||
updateStage(deployment.id(), DeployStage.COMPLETE);
|
||||
persistSnapshotAndMarkRunning(ctx, newContainerIds.get(0));
|
||||
log.info("Deployment {} is RUNNING (rolling, {}/{} replicas replaced)",
|
||||
deployment.id(), config.replicas(), config.replicas());
|
||||
}
|
||||
|
||||
/** Poll a single container until healthy or the timeout expires. Returns
|
||||
* true on healthy, false on timeout or thread interrupt. */
|
||||
private boolean waitForOneHealthy(String containerId, int timeoutSeconds) {
|
||||
long deadline = System.currentTimeMillis() + (timeoutSeconds * 1000L);
|
||||
while (System.currentTimeMillis() < deadline) {
|
||||
ContainerStatus status = orchestrator.getContainerStatus(containerId);
|
||||
if ("healthy".equals(status.state())) return true;
|
||||
try { Thread.sleep(2000); } catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Start one replica container with the gen-suffixed name and return its
|
||||
|
||||
Reference in New Issue
Block a user