diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/runtime/DeploymentExecutor.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/runtime/DeploymentExecutor.java index a39fbad7..87d9b6ab 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/runtime/DeploymentExecutor.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/runtime/DeploymentExecutor.java @@ -96,6 +96,27 @@ public class DeploymentExecutor { return deployment.id().toString().substring(0, 8); } + /** + * Per-deployment context assembled once at the top of executeAsync and passed + * into strategy handlers. Keeps the strategy methods readable instead of + * threading 12 positional args. + */ + private record DeployCtx( + Deployment deployment, + App app, + Environment env, + ResolvedContainerConfig config, + String jarPath, + String resolvedRuntimeType, + String mainClass, + String generation, + String primaryNetwork, + List additionalNets, + Map baseEnvVars, + Map prometheusLabels, + long deployStart + ) {} + @Async("deploymentTaskExecutor") public void executeAsync(Deployment deployment) { long deployStart = System.currentTimeMillis(); @@ -152,7 +173,6 @@ public class DeploymentExecutor { updateStage(deployment.id(), DeployStage.CREATE_NETWORK); // Primary network: use configured CAMELEER_DOCKER_NETWORK (tenant-isolated in SaaS mode) String primaryNetwork = dockerNetwork; - String envNet = null; List additionalNets = new ArrayList<>(); if (networkManager != null) { networkManager.ensureNetwork(primaryNetwork); @@ -160,7 +180,7 @@ public class DeploymentExecutor { networkManager.ensureNetwork(DockerNetworkManager.TRAEFIK_NETWORK); additionalNets.add(DockerNetworkManager.TRAEFIK_NETWORK); // Per-environment network scoped to tenant to prevent cross-tenant collisions - envNet = DockerNetworkManager.envNetworkName(tenantId, env.slug()); + String envNet = DockerNetworkManager.envNetworkName(tenantId, env.slug()); networkManager.ensureNetwork(envNet); additionalNets.add(envNet); } @@ -175,135 +195,21 @@ public class DeploymentExecutor { } } - // === STOP PREVIOUS ACTIVE DEPLOYMENT === - // Container names are deterministic ({tenant}-{env}-{app}-{replica}), so a - // previous active deployment holds the Docker names we need. Stop + remove - // it before starting new replicas to avoid a 409 name conflict. Excluding - // the current deployment id by SQL (not Java) because the newly created - // row already has status=STARTING and would otherwise be picked by - // findActiveByAppIdAndEnvironmentId ORDER BY created_at DESC LIMIT 1. - Optional previous = deploymentRepository.findActiveByAppIdAndEnvironmentIdExcluding( - deployment.appId(), deployment.environmentId(), deployment.id()); - if (previous.isPresent()) { - log.info("Stopping previous deployment {} before starting new replicas", previous.get().id()); - stopDeploymentContainers(previous.get()); - deploymentService.markStopped(previous.get().id()); + DeployCtx ctx = new DeployCtx( + deployment, app, env, config, jarPath, + resolvedRuntimeType, mainClass, generation, + primaryNetwork, additionalNets, + buildEnvVars(app, env, config), + PrometheusLabelBuilder.build(resolvedRuntimeType), + deployStart); + + // Dispatch on strategy. Unknown values fall back to BLUE_GREEN via fromWire. + DeploymentStrategy strategy = DeploymentStrategy.fromWire(config.deploymentStrategy()); + switch (strategy) { + case BLUE_GREEN -> deployBlueGreen(ctx); + case ROLLING -> deployRolling(ctx); } - // === START REPLICAS === - updateStage(deployment.id(), DeployStage.START_REPLICAS); - - Map baseEnvVars = buildEnvVars(app, env, config); - Map prometheusLabels = PrometheusLabelBuilder.build(resolvedRuntimeType); - - List> replicaStates = new ArrayList<>(); - List newContainerIds = new ArrayList<>(); - - for (int i = 0; i < config.replicas(); i++) { - String instanceId = env.slug() + "-" + app.slug() + "-" + i + "-" + generation; - String containerName = tenantId + "-" + instanceId; - - // Per-replica labels (include replica index, generation, and instance-id) - Map labels = TraefikLabelBuilder.build(app.slug(), env.slug(), tenantId, config, i, generation); - labels.putAll(prometheusLabels); - - // Per-replica env vars (set agent instance ID to match container log identity) - Map replicaEnvVars = new LinkedHashMap<>(baseEnvVars); - replicaEnvVars.put("CAMELEER_AGENT_INSTANCEID", instanceId); - - String volumeName = jarDockerVolume != null && !jarDockerVolume.isBlank() ? jarDockerVolume : null; - ContainerRequest request = new ContainerRequest( - containerName, baseImage, jarPath, - volumeName, jarStoragePath, - primaryNetwork, - additionalNets, - replicaEnvVars, labels, - config.memoryLimitBytes(), config.memoryReserveBytes(), - config.dockerCpuShares(), config.dockerCpuQuota(), - config.exposedPorts(), agentHealthPort, - "on-failure", 3, - resolvedRuntimeType, config.customArgs(), mainClass - ); - - String containerId = orchestrator.startContainer(request); - newContainerIds.add(containerId); - - // Connect to additional networks after container is started - for (String net : additionalNets) { - if (networkManager != null) { - networkManager.connectContainer(containerId, net); - } - } - - orchestrator.startLogCapture(containerId, instanceId, app.slug(), env.slug(), tenantId); - - replicaStates.add(Map.of( - "index", i, - "containerId", containerId, - "containerName", containerName, - "status", "STARTING" - )); - } - - pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates); - - // === HEALTH CHECK === - updateStage(deployment.id(), DeployStage.HEALTH_CHECK); - int healthyCount = waitForAnyHealthy(newContainerIds, healthCheckTimeout); - - if (healthyCount == 0) { - for (String cid : newContainerIds) { - try { orchestrator.stopContainer(cid); orchestrator.removeContainer(cid); } - catch (Exception e) { log.warn("Cleanup failed for {}: {}", cid, e.getMessage()); } - } - pgDeployRepo.updateDeployStage(deployment.id(), null); - deploymentService.markFailed(deployment.id(), "No replicas passed health check within " + healthCheckTimeout + "s"); - serverMetrics.recordDeploymentOutcome("FAILED"); - serverMetrics.recordDeploymentDuration(deployStart); - return; - } - - replicaStates = updateReplicaHealth(replicaStates, newContainerIds); - pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates); - - // === SWAP TRAFFIC === - // Traffic is routed via Traefik Docker labels, so the "swap" happens - // implicitly once the new replicas are healthy and the old containers - // are gone. The old deployment was already stopped before START_REPLICAS - // to free the deterministic container names. - updateStage(deployment.id(), DeployStage.SWAP_TRAFFIC); - - // === COMPLETE === - updateStage(deployment.id(), DeployStage.COMPLETE); - - // Capture config snapshot before marking RUNNING - ApplicationConfig agentConfig = applicationConfigRepository - .findByApplicationAndEnvironment(app.slug(), env.slug()) - .orElse(null); - List snapshotSensitiveKeys = agentConfig != null ? agentConfig.getSensitiveKeys() : null; - DeploymentConfigSnapshot snapshot = new DeploymentConfigSnapshot( - deployment.appVersionId(), - agentConfig, - app.containerConfig(), - snapshotSensitiveKeys - ); - pgDeployRepo.saveDeployedConfigSnapshot(deployment.id(), snapshot); - - String primaryContainerId = newContainerIds.get(0); - DeploymentStatus finalStatus = healthyCount == config.replicas() - ? DeploymentStatus.RUNNING : DeploymentStatus.DEGRADED; - deploymentService.markRunning(deployment.id(), primaryContainerId); - if (finalStatus == DeploymentStatus.DEGRADED) { - deploymentRepository.updateStatus(deployment.id(), DeploymentStatus.DEGRADED, - primaryContainerId, null); - } - - pgDeployRepo.updateDeployStage(deployment.id(), null); - serverMetrics.recordDeploymentOutcome(finalStatus.name()); - serverMetrics.recordDeploymentDuration(deployStart); - log.info("Deployment {} is {} ({}/{} replicas healthy)", - deployment.id(), finalStatus, healthyCount, config.replicas()); - } catch (Exception e) { log.error("Deployment {} FAILED: {}", deployment.id(), e.getMessage(), e); pgDeployRepo.updateDeployStage(deployment.id(), null); @@ -313,6 +219,153 @@ public class DeploymentExecutor { } } + /** + * Blue/green strategy: start all N new replicas (coexisting with the old + * ones thanks to the gen-suffixed container names), wait for ALL healthy, + * then stop the previous deployment. Strict all-healthy — partial failure + * preserves the previous deployment untouched. + */ + private void deployBlueGreen(DeployCtx ctx) { + ResolvedContainerConfig config = ctx.config(); + Deployment deployment = ctx.deployment(); + + // === START REPLICAS === + updateStage(deployment.id(), DeployStage.START_REPLICAS); + List> replicaStates = new ArrayList<>(); + List newContainerIds = new ArrayList<>(); + for (int i = 0; i < config.replicas(); i++) { + Map state = new LinkedHashMap<>(); + String containerId = startReplica(ctx, i, state); + newContainerIds.add(containerId); + replicaStates.add(state); + } + pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates); + + // === HEALTH CHECK === + updateStage(deployment.id(), DeployStage.HEALTH_CHECK); + int healthyCount = waitForAllHealthy(newContainerIds, healthCheckTimeout); + + if (healthyCount < config.replicas()) { + // Strict abort: tear down new replicas, leave the previous deployment untouched. + for (String cid : newContainerIds) { + try { orchestrator.stopContainer(cid); orchestrator.removeContainer(cid); } + catch (Exception e) { log.warn("Cleanup failed for {}: {}", cid, e.getMessage()); } + } + pgDeployRepo.updateDeployStage(deployment.id(), null); + String reason = String.format( + "blue-green: %d/%d replicas healthy within %ds; preserving previous deployment", + healthyCount, config.replicas(), healthCheckTimeout); + deploymentService.markFailed(deployment.id(), reason); + serverMetrics.recordDeploymentOutcome("FAILED"); + serverMetrics.recordDeploymentDuration(ctx.deployStart()); + return; + } + + replicaStates = updateReplicaHealth(replicaStates, newContainerIds); + pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates); + + // === SWAP TRAFFIC === + // All new replicas are healthy; Traefik labels are already attracting + // traffic to them. Stop the previous deployment now — the swap is + // implicit in the label-driven load balancer. + updateStage(deployment.id(), DeployStage.SWAP_TRAFFIC); + Optional previous = deploymentRepository.findActiveByAppIdAndEnvironmentIdExcluding( + deployment.appId(), deployment.environmentId(), deployment.id()); + if (previous.isPresent()) { + log.info("blue-green: stopping previous deployment {} now that new replicas are healthy", + previous.get().id()); + stopDeploymentContainers(previous.get()); + deploymentService.markStopped(previous.get().id()); + } + + // === COMPLETE === + updateStage(deployment.id(), DeployStage.COMPLETE); + persistSnapshotAndMarkRunning(ctx, newContainerIds.get(0)); + log.info("Deployment {} is RUNNING (blue-green, {}/{} replicas healthy)", + deployment.id(), healthyCount, config.replicas()); + } + + /** + * Rolling strategy: replace replicas one at a time — start new[i], wait + * healthy, stop old[i]. On any replica's health failure, stop the failing + * new container and abort without touching the remaining (un-replaced) old + * replicas. Implemented in a follow-up phase. + */ + private void deployRolling(DeployCtx ctx) { + throw new UnsupportedOperationException( + "Rolling deployment strategy is not yet implemented; use blue-green"); + } + + /** Start one replica container with the gen-suffixed name and return its + * container id. Fills `stateOut` with the replicaStates JSONB row. */ + private String startReplica(DeployCtx ctx, int i, Map stateOut) { + Environment env = ctx.env(); + App app = ctx.app(); + ResolvedContainerConfig config = ctx.config(); + + String instanceId = env.slug() + "-" + app.slug() + "-" + i + "-" + ctx.generation(); + String containerName = tenantId + "-" + instanceId; + + Map labels = TraefikLabelBuilder.build( + app.slug(), env.slug(), tenantId, config, i, ctx.generation()); + labels.putAll(ctx.prometheusLabels()); + + Map replicaEnvVars = new LinkedHashMap<>(ctx.baseEnvVars()); + replicaEnvVars.put("CAMELEER_AGENT_INSTANCEID", instanceId); + + String volumeName = jarDockerVolume != null && !jarDockerVolume.isBlank() ? jarDockerVolume : null; + ContainerRequest request = new ContainerRequest( + containerName, baseImage, ctx.jarPath(), + volumeName, jarStoragePath, + ctx.primaryNetwork(), + ctx.additionalNets(), + replicaEnvVars, labels, + config.memoryLimitBytes(), config.memoryReserveBytes(), + config.dockerCpuShares(), config.dockerCpuQuota(), + config.exposedPorts(), agentHealthPort, + "on-failure", 3, + ctx.resolvedRuntimeType(), config.customArgs(), ctx.mainClass() + ); + + String containerId = orchestrator.startContainer(request); + + // Connect to additional networks after container is started + for (String net : ctx.additionalNets()) { + if (networkManager != null) { + networkManager.connectContainer(containerId, net); + } + } + + orchestrator.startLogCapture(containerId, instanceId, app.slug(), env.slug(), tenantId); + + stateOut.put("index", i); + stateOut.put("containerId", containerId); + stateOut.put("containerName", containerName); + stateOut.put("status", "STARTING"); + return containerId; + } + + /** Persist the deployment snapshot and mark the deployment RUNNING. + * Finalizes the deploy in a single place shared by all strategy paths. */ + private void persistSnapshotAndMarkRunning(DeployCtx ctx, String primaryContainerId) { + Deployment deployment = ctx.deployment(); + ApplicationConfig agentConfig = applicationConfigRepository + .findByApplicationAndEnvironment(ctx.app().slug(), ctx.env().slug()) + .orElse(null); + List snapshotSensitiveKeys = agentConfig != null ? agentConfig.getSensitiveKeys() : null; + DeploymentConfigSnapshot snapshot = new DeploymentConfigSnapshot( + deployment.appVersionId(), + agentConfig, + ctx.app().containerConfig(), + snapshotSensitiveKeys); + pgDeployRepo.saveDeployedConfigSnapshot(deployment.id(), snapshot); + + deploymentService.markRunning(deployment.id(), primaryContainerId); + pgDeployRepo.updateDeployStage(deployment.id(), null); + serverMetrics.recordDeploymentOutcome("RUNNING"); + serverMetrics.recordDeploymentDuration(ctx.deployStart()); + } + public void stopDeployment(Deployment deployment) { pgDeployRepo.updateTargetState(deployment.id(), "STOPPED"); deploymentRepository.updateStatus(deployment.id(), DeploymentStatus.STOPPING, @@ -378,7 +431,10 @@ public class DeploymentExecutor { return envVars; } - private int waitForAnyHealthy(List containerIds, int timeoutSeconds) { + /** Poll until all containers are healthy or the timeout expires. Returns + * the healthy count at return time — == ids.size() on full success, less + * if the timeout won. */ + private int waitForAllHealthy(List containerIds, int timeoutSeconds) { long deadline = System.currentTimeMillis() + (timeoutSeconds * 1000L); int lastHealthy = 0; while (System.currentTimeMillis() < deadline) {