deploy: blue-green strategy (start → health-all → stop old)

Phase 3 of deployment-strategies plan. Refactor executeAsync to
dispatch on DeploymentStrategy.fromWire(config.deploymentStrategy()).

Blue-green (default):
- Start all N new replicas (gen-suffixed names coexist with old).
- Wait for ALL healthy (strict — partial-healthy = FAILED, preserves
  previous deployment untouched).
- Only then find + stop the previous deployment.
- Final status is always RUNNING; DEGRADED is now reserved for
  post-deploy replica crashes (set by DockerEventMonitor).

Rolling: stub — throws UnsupportedOperationException for now, gets
its real implementation in Phase 4.

Refactor details:
- Extract DeployCtx record to carry 13 per-deploy values around.
- Extract startReplica(ctx, i, stateOut) — shared by both strategy paths.
- Extract persistSnapshotAndMarkRunning(ctx, primaryCid) — shared finalizer.
- Rename waitForAnyHealthy → waitForAllHealthy (the name was misleading;
  the method already waited for all, just returned partial on timeout).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-23 09:51:24 +02:00
parent 652346dcd4
commit 459cdfe427

View File

@@ -96,6 +96,27 @@ public class DeploymentExecutor {
return deployment.id().toString().substring(0, 8);
}
/**
* Per-deployment context assembled once at the top of executeAsync and passed
* into strategy handlers. Keeps the strategy methods readable instead of
* threading 12 positional args.
*/
private record DeployCtx(
Deployment deployment,
App app,
Environment env,
ResolvedContainerConfig config,
String jarPath,
String resolvedRuntimeType,
String mainClass,
String generation,
String primaryNetwork,
List<String> additionalNets,
Map<String, String> baseEnvVars,
Map<String, String> prometheusLabels,
long deployStart
) {}
@Async("deploymentTaskExecutor")
public void executeAsync(Deployment deployment) {
long deployStart = System.currentTimeMillis();
@@ -152,7 +173,6 @@ public class DeploymentExecutor {
updateStage(deployment.id(), DeployStage.CREATE_NETWORK);
// Primary network: use configured CAMELEER_DOCKER_NETWORK (tenant-isolated in SaaS mode)
String primaryNetwork = dockerNetwork;
String envNet = null;
List<String> additionalNets = new ArrayList<>();
if (networkManager != null) {
networkManager.ensureNetwork(primaryNetwork);
@@ -160,7 +180,7 @@ public class DeploymentExecutor {
networkManager.ensureNetwork(DockerNetworkManager.TRAEFIK_NETWORK);
additionalNets.add(DockerNetworkManager.TRAEFIK_NETWORK);
// Per-environment network scoped to tenant to prevent cross-tenant collisions
envNet = DockerNetworkManager.envNetworkName(tenantId, env.slug());
String envNet = DockerNetworkManager.envNetworkName(tenantId, env.slug());
networkManager.ensureNetwork(envNet);
additionalNets.add(envNet);
}
@@ -175,135 +195,21 @@ public class DeploymentExecutor {
}
}
// === STOP PREVIOUS ACTIVE DEPLOYMENT ===
// Container names are deterministic ({tenant}-{env}-{app}-{replica}), so a
// previous active deployment holds the Docker names we need. Stop + remove
// it before starting new replicas to avoid a 409 name conflict. Excluding
// the current deployment id by SQL (not Java) because the newly created
// row already has status=STARTING and would otherwise be picked by
// findActiveByAppIdAndEnvironmentId ORDER BY created_at DESC LIMIT 1.
Optional<Deployment> previous = deploymentRepository.findActiveByAppIdAndEnvironmentIdExcluding(
deployment.appId(), deployment.environmentId(), deployment.id());
if (previous.isPresent()) {
log.info("Stopping previous deployment {} before starting new replicas", previous.get().id());
stopDeploymentContainers(previous.get());
deploymentService.markStopped(previous.get().id());
DeployCtx ctx = new DeployCtx(
deployment, app, env, config, jarPath,
resolvedRuntimeType, mainClass, generation,
primaryNetwork, additionalNets,
buildEnvVars(app, env, config),
PrometheusLabelBuilder.build(resolvedRuntimeType),
deployStart);
// Dispatch on strategy. Unknown values fall back to BLUE_GREEN via fromWire.
DeploymentStrategy strategy = DeploymentStrategy.fromWire(config.deploymentStrategy());
switch (strategy) {
case BLUE_GREEN -> deployBlueGreen(ctx);
case ROLLING -> deployRolling(ctx);
}
// === START REPLICAS ===
updateStage(deployment.id(), DeployStage.START_REPLICAS);
Map<String, String> baseEnvVars = buildEnvVars(app, env, config);
Map<String, String> prometheusLabels = PrometheusLabelBuilder.build(resolvedRuntimeType);
List<Map<String, Object>> replicaStates = new ArrayList<>();
List<String> newContainerIds = new ArrayList<>();
for (int i = 0; i < config.replicas(); i++) {
String instanceId = env.slug() + "-" + app.slug() + "-" + i + "-" + generation;
String containerName = tenantId + "-" + instanceId;
// Per-replica labels (include replica index, generation, and instance-id)
Map<String, String> labels = TraefikLabelBuilder.build(app.slug(), env.slug(), tenantId, config, i, generation);
labels.putAll(prometheusLabels);
// Per-replica env vars (set agent instance ID to match container log identity)
Map<String, String> replicaEnvVars = new LinkedHashMap<>(baseEnvVars);
replicaEnvVars.put("CAMELEER_AGENT_INSTANCEID", instanceId);
String volumeName = jarDockerVolume != null && !jarDockerVolume.isBlank() ? jarDockerVolume : null;
ContainerRequest request = new ContainerRequest(
containerName, baseImage, jarPath,
volumeName, jarStoragePath,
primaryNetwork,
additionalNets,
replicaEnvVars, labels,
config.memoryLimitBytes(), config.memoryReserveBytes(),
config.dockerCpuShares(), config.dockerCpuQuota(),
config.exposedPorts(), agentHealthPort,
"on-failure", 3,
resolvedRuntimeType, config.customArgs(), mainClass
);
String containerId = orchestrator.startContainer(request);
newContainerIds.add(containerId);
// Connect to additional networks after container is started
for (String net : additionalNets) {
if (networkManager != null) {
networkManager.connectContainer(containerId, net);
}
}
orchestrator.startLogCapture(containerId, instanceId, app.slug(), env.slug(), tenantId);
replicaStates.add(Map.of(
"index", i,
"containerId", containerId,
"containerName", containerName,
"status", "STARTING"
));
}
pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates);
// === HEALTH CHECK ===
updateStage(deployment.id(), DeployStage.HEALTH_CHECK);
int healthyCount = waitForAnyHealthy(newContainerIds, healthCheckTimeout);
if (healthyCount == 0) {
for (String cid : newContainerIds) {
try { orchestrator.stopContainer(cid); orchestrator.removeContainer(cid); }
catch (Exception e) { log.warn("Cleanup failed for {}: {}", cid, e.getMessage()); }
}
pgDeployRepo.updateDeployStage(deployment.id(), null);
deploymentService.markFailed(deployment.id(), "No replicas passed health check within " + healthCheckTimeout + "s");
serverMetrics.recordDeploymentOutcome("FAILED");
serverMetrics.recordDeploymentDuration(deployStart);
return;
}
replicaStates = updateReplicaHealth(replicaStates, newContainerIds);
pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates);
// === SWAP TRAFFIC ===
// Traffic is routed via Traefik Docker labels, so the "swap" happens
// implicitly once the new replicas are healthy and the old containers
// are gone. The old deployment was already stopped before START_REPLICAS
// to free the deterministic container names.
updateStage(deployment.id(), DeployStage.SWAP_TRAFFIC);
// === COMPLETE ===
updateStage(deployment.id(), DeployStage.COMPLETE);
// Capture config snapshot before marking RUNNING
ApplicationConfig agentConfig = applicationConfigRepository
.findByApplicationAndEnvironment(app.slug(), env.slug())
.orElse(null);
List<String> snapshotSensitiveKeys = agentConfig != null ? agentConfig.getSensitiveKeys() : null;
DeploymentConfigSnapshot snapshot = new DeploymentConfigSnapshot(
deployment.appVersionId(),
agentConfig,
app.containerConfig(),
snapshotSensitiveKeys
);
pgDeployRepo.saveDeployedConfigSnapshot(deployment.id(), snapshot);
String primaryContainerId = newContainerIds.get(0);
DeploymentStatus finalStatus = healthyCount == config.replicas()
? DeploymentStatus.RUNNING : DeploymentStatus.DEGRADED;
deploymentService.markRunning(deployment.id(), primaryContainerId);
if (finalStatus == DeploymentStatus.DEGRADED) {
deploymentRepository.updateStatus(deployment.id(), DeploymentStatus.DEGRADED,
primaryContainerId, null);
}
pgDeployRepo.updateDeployStage(deployment.id(), null);
serverMetrics.recordDeploymentOutcome(finalStatus.name());
serverMetrics.recordDeploymentDuration(deployStart);
log.info("Deployment {} is {} ({}/{} replicas healthy)",
deployment.id(), finalStatus, healthyCount, config.replicas());
} catch (Exception e) {
log.error("Deployment {} FAILED: {}", deployment.id(), e.getMessage(), e);
pgDeployRepo.updateDeployStage(deployment.id(), null);
@@ -313,6 +219,153 @@ public class DeploymentExecutor {
}
}
/**
* Blue/green strategy: start all N new replicas (coexisting with the old
* ones thanks to the gen-suffixed container names), wait for ALL healthy,
* then stop the previous deployment. Strict all-healthy — partial failure
* preserves the previous deployment untouched.
*/
private void deployBlueGreen(DeployCtx ctx) {
ResolvedContainerConfig config = ctx.config();
Deployment deployment = ctx.deployment();
// === START REPLICAS ===
updateStage(deployment.id(), DeployStage.START_REPLICAS);
List<Map<String, Object>> replicaStates = new ArrayList<>();
List<String> newContainerIds = new ArrayList<>();
for (int i = 0; i < config.replicas(); i++) {
Map<String, Object> state = new LinkedHashMap<>();
String containerId = startReplica(ctx, i, state);
newContainerIds.add(containerId);
replicaStates.add(state);
}
pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates);
// === HEALTH CHECK ===
updateStage(deployment.id(), DeployStage.HEALTH_CHECK);
int healthyCount = waitForAllHealthy(newContainerIds, healthCheckTimeout);
if (healthyCount < config.replicas()) {
// Strict abort: tear down new replicas, leave the previous deployment untouched.
for (String cid : newContainerIds) {
try { orchestrator.stopContainer(cid); orchestrator.removeContainer(cid); }
catch (Exception e) { log.warn("Cleanup failed for {}: {}", cid, e.getMessage()); }
}
pgDeployRepo.updateDeployStage(deployment.id(), null);
String reason = String.format(
"blue-green: %d/%d replicas healthy within %ds; preserving previous deployment",
healthyCount, config.replicas(), healthCheckTimeout);
deploymentService.markFailed(deployment.id(), reason);
serverMetrics.recordDeploymentOutcome("FAILED");
serverMetrics.recordDeploymentDuration(ctx.deployStart());
return;
}
replicaStates = updateReplicaHealth(replicaStates, newContainerIds);
pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates);
// === SWAP TRAFFIC ===
// All new replicas are healthy; Traefik labels are already attracting
// traffic to them. Stop the previous deployment now — the swap is
// implicit in the label-driven load balancer.
updateStage(deployment.id(), DeployStage.SWAP_TRAFFIC);
Optional<Deployment> previous = deploymentRepository.findActiveByAppIdAndEnvironmentIdExcluding(
deployment.appId(), deployment.environmentId(), deployment.id());
if (previous.isPresent()) {
log.info("blue-green: stopping previous deployment {} now that new replicas are healthy",
previous.get().id());
stopDeploymentContainers(previous.get());
deploymentService.markStopped(previous.get().id());
}
// === COMPLETE ===
updateStage(deployment.id(), DeployStage.COMPLETE);
persistSnapshotAndMarkRunning(ctx, newContainerIds.get(0));
log.info("Deployment {} is RUNNING (blue-green, {}/{} replicas healthy)",
deployment.id(), healthyCount, config.replicas());
}
/**
* Rolling strategy: replace replicas one at a time — start new[i], wait
* healthy, stop old[i]. On any replica's health failure, stop the failing
* new container and abort without touching the remaining (un-replaced) old
* replicas. Implemented in a follow-up phase.
*/
private void deployRolling(DeployCtx ctx) {
throw new UnsupportedOperationException(
"Rolling deployment strategy is not yet implemented; use blue-green");
}
/** Start one replica container with the gen-suffixed name and return its
* container id. Fills `stateOut` with the replicaStates JSONB row. */
private String startReplica(DeployCtx ctx, int i, Map<String, Object> stateOut) {
Environment env = ctx.env();
App app = ctx.app();
ResolvedContainerConfig config = ctx.config();
String instanceId = env.slug() + "-" + app.slug() + "-" + i + "-" + ctx.generation();
String containerName = tenantId + "-" + instanceId;
Map<String, String> labels = TraefikLabelBuilder.build(
app.slug(), env.slug(), tenantId, config, i, ctx.generation());
labels.putAll(ctx.prometheusLabels());
Map<String, String> replicaEnvVars = new LinkedHashMap<>(ctx.baseEnvVars());
replicaEnvVars.put("CAMELEER_AGENT_INSTANCEID", instanceId);
String volumeName = jarDockerVolume != null && !jarDockerVolume.isBlank() ? jarDockerVolume : null;
ContainerRequest request = new ContainerRequest(
containerName, baseImage, ctx.jarPath(),
volumeName, jarStoragePath,
ctx.primaryNetwork(),
ctx.additionalNets(),
replicaEnvVars, labels,
config.memoryLimitBytes(), config.memoryReserveBytes(),
config.dockerCpuShares(), config.dockerCpuQuota(),
config.exposedPorts(), agentHealthPort,
"on-failure", 3,
ctx.resolvedRuntimeType(), config.customArgs(), ctx.mainClass()
);
String containerId = orchestrator.startContainer(request);
// Connect to additional networks after container is started
for (String net : ctx.additionalNets()) {
if (networkManager != null) {
networkManager.connectContainer(containerId, net);
}
}
orchestrator.startLogCapture(containerId, instanceId, app.slug(), env.slug(), tenantId);
stateOut.put("index", i);
stateOut.put("containerId", containerId);
stateOut.put("containerName", containerName);
stateOut.put("status", "STARTING");
return containerId;
}
/** Persist the deployment snapshot and mark the deployment RUNNING.
* Finalizes the deploy in a single place shared by all strategy paths. */
private void persistSnapshotAndMarkRunning(DeployCtx ctx, String primaryContainerId) {
Deployment deployment = ctx.deployment();
ApplicationConfig agentConfig = applicationConfigRepository
.findByApplicationAndEnvironment(ctx.app().slug(), ctx.env().slug())
.orElse(null);
List<String> snapshotSensitiveKeys = agentConfig != null ? agentConfig.getSensitiveKeys() : null;
DeploymentConfigSnapshot snapshot = new DeploymentConfigSnapshot(
deployment.appVersionId(),
agentConfig,
ctx.app().containerConfig(),
snapshotSensitiveKeys);
pgDeployRepo.saveDeployedConfigSnapshot(deployment.id(), snapshot);
deploymentService.markRunning(deployment.id(), primaryContainerId);
pgDeployRepo.updateDeployStage(deployment.id(), null);
serverMetrics.recordDeploymentOutcome("RUNNING");
serverMetrics.recordDeploymentDuration(ctx.deployStart());
}
public void stopDeployment(Deployment deployment) {
pgDeployRepo.updateTargetState(deployment.id(), "STOPPED");
deploymentRepository.updateStatus(deployment.id(), DeploymentStatus.STOPPING,
@@ -378,7 +431,10 @@ public class DeploymentExecutor {
return envVars;
}
private int waitForAnyHealthy(List<String> containerIds, int timeoutSeconds) {
/** Poll until all containers are healthy or the timeout expires. Returns
* the healthy count at return time — == ids.size() on full success, less
* if the timeout won. */
private int waitForAllHealthy(List<String> containerIds, int timeoutSeconds) {
long deadline = System.currentTimeMillis() + (timeoutSeconds * 1000L);
int lastHealthy = 0;
while (System.currentTimeMillis() < deadline) {