From f8d42026da7668c2a8ae7dc001c348257f29696f Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 8 Apr 2026 20:27:37 +0200 Subject: [PATCH] feat: rewrite DeploymentExecutor with staged deploy, config merge, replicas Co-Authored-By: Claude Sonnet 4.6 --- .../app/runtime/DeploymentExecutor.java | 316 +++++++++++++----- 1 file changed, 224 insertions(+), 92 deletions(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/runtime/DeploymentExecutor.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/runtime/DeploymentExecutor.java index d11c3b99..882f5645 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/runtime/DeploymentExecutor.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/runtime/DeploymentExecutor.java @@ -1,26 +1,17 @@ package com.cameleer3.server.app.runtime; -import com.cameleer3.server.core.runtime.App; -import com.cameleer3.server.core.runtime.AppService; -import com.cameleer3.server.core.runtime.ContainerRequest; -import com.cameleer3.server.core.runtime.ContainerStatus; -import com.cameleer3.server.core.runtime.Deployment; -import com.cameleer3.server.core.runtime.DeploymentRepository; -import com.cameleer3.server.core.runtime.DeploymentService; -import com.cameleer3.server.core.runtime.DeploymentStatus; -import com.cameleer3.server.core.runtime.Environment; -import com.cameleer3.server.core.runtime.EnvironmentService; -import com.cameleer3.server.core.runtime.RuntimeOrchestrator; +import com.cameleer3.server.app.storage.PostgresDeploymentRepository; +import com.cameleer3.server.core.runtime.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; @Service public class DeploymentExecutor { @@ -29,9 +20,13 @@ public class DeploymentExecutor { private final RuntimeOrchestrator orchestrator; private final DeploymentService deploymentService; - private final DeploymentRepository deploymentRepository; private final AppService appService; private final EnvironmentService envService; + private final DeploymentRepository deploymentRepository; + private final PostgresDeploymentRepository pgDeployRepo; + + @Autowired(required = false) + private DockerNetworkManager networkManager; @Value("${cameleer.runtime.base-image:cameleer-runtime-base:latest}") private String baseImage; @@ -40,10 +35,10 @@ public class DeploymentExecutor { private String dockerNetwork; @Value("${cameleer.runtime.container-memory-limit:512m}") - private String containerMemoryLimit; + private String globalMemoryLimit; @Value("${cameleer.runtime.container-cpu-shares:512}") - private int containerCpuShares; + private int globalCpuShares; @Value("${cameleer.runtime.health-check-timeout:60}") private int healthCheckTimeout; @@ -54,115 +49,252 @@ public class DeploymentExecutor { @Value("${security.bootstrap-token:}") private String bootstrapToken; - public DeploymentExecutor(RuntimeOrchestrator orchestrator, DeploymentService deploymentService, - DeploymentRepository deploymentRepository, - AppService appService, EnvironmentService envService) { + @Value("${cameleer.runtime.routing-mode:path}") + private String globalRoutingMode; + + @Value("${cameleer.runtime.routing-domain:localhost}") + private String globalRoutingDomain; + + @Value("${cameleer.runtime.server-url:}") + private String globalServerUrl; + + public DeploymentExecutor(RuntimeOrchestrator orchestrator, + DeploymentService deploymentService, + AppService appService, + EnvironmentService envService, + DeploymentRepository deploymentRepository) { this.orchestrator = orchestrator; this.deploymentService = deploymentService; - this.deploymentRepository = deploymentRepository; this.appService = appService; this.envService = envService; + this.deploymentRepository = deploymentRepository; + this.pgDeployRepo = (PostgresDeploymentRepository) deploymentRepository; } @Async("deploymentTaskExecutor") public void executeAsync(Deployment deployment) { try { - // Stop existing deployment in same environment for same app + App app = appService.getById(deployment.appId()); + Environment env = envService.getById(deployment.environmentId()); + String jarPath = appService.resolveJarPath(deployment.appVersionId()); + + var globalDefaults = new ConfigMerger.GlobalRuntimeDefaults( + parseMemoryLimitMb(globalMemoryLimit), + globalCpuShares, + globalRoutingMode, + globalRoutingDomain, + globalServerUrl.isBlank() ? "http://cameleer3-server:8081" : globalServerUrl + ); + ResolvedContainerConfig config = ConfigMerger.resolve( + globalDefaults, env.defaultContainerConfig(), app.containerConfig()); + + pgDeployRepo.updateDeploymentStrategy(deployment.id(), config.deploymentStrategy()); + + // === PRE-FLIGHT === + updateStage(deployment.id(), DeployStage.PRE_FLIGHT); + preFlightChecks(jarPath, config); + + // === PULL IMAGE === + updateStage(deployment.id(), DeployStage.PULL_IMAGE); + // Docker pulls on create if not present locally + + // === CREATE NETWORKS === + updateStage(deployment.id(), DeployStage.CREATE_NETWORK); + String primaryNetwork = dockerNetwork; + String envNet = null; + if (networkManager != null) { + primaryNetwork = DockerNetworkManager.TRAEFIK_NETWORK; + networkManager.ensureNetwork(primaryNetwork); + envNet = DockerNetworkManager.envNetworkName(env.slug()); + networkManager.ensureNetwork(envNet); + } + + // === START REPLICAS === + updateStage(deployment.id(), DeployStage.START_REPLICAS); + + Map baseEnvVars = buildEnvVars(app, env, config); + Map labels = TraefikLabelBuilder.build(app.slug(), env.slug(), config); + + List> replicaStates = new ArrayList<>(); + List newContainerIds = new ArrayList<>(); + + for (int i = 0; i < config.replicas(); i++) { + String containerName = env.slug() + "-" + app.slug() + "-" + i; + Long cpuQuota = config.cpuLimit() != null ? (long) (config.cpuLimit() * 100_000) : null; + + ContainerRequest request = new ContainerRequest( + containerName, baseImage, jarPath, primaryNetwork, + envNet != null ? List.of(envNet) : List.of(), + baseEnvVars, labels, + config.memoryLimitBytes(), config.memoryReserveBytes(), + config.cpuShares(), cpuQuota, + config.exposedPorts(), agentHealthPort, + "on-failure", 3 + ); + + String containerId = orchestrator.startContainer(request); + newContainerIds.add(containerId); + + replicaStates.add(Map.of( + "index", i, + "containerId", containerId, + "containerName", containerName, + "status", "STARTING" + )); + } + + pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates); + + // === HEALTH CHECK === + updateStage(deployment.id(), DeployStage.HEALTH_CHECK); + int healthyCount = waitForAnyHealthy(newContainerIds, healthCheckTimeout); + + if (healthyCount == 0) { + for (String cid : newContainerIds) { + try { orchestrator.stopContainer(cid); orchestrator.removeContainer(cid); } + catch (Exception e) { log.warn("Cleanup failed for {}: {}", cid, e.getMessage()); } + } + pgDeployRepo.updateDeployStage(deployment.id(), null); + deploymentService.markFailed(deployment.id(), "No replicas passed health check within " + healthCheckTimeout + "s"); + return; + } + + replicaStates = updateReplicaHealth(replicaStates, newContainerIds); + pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates); + + // === SWAP TRAFFIC === + updateStage(deployment.id(), DeployStage.SWAP_TRAFFIC); + Optional existing = deploymentRepository.findActiveByAppIdAndEnvironmentId( deployment.appId(), deployment.environmentId()); if (existing.isPresent() && !existing.get().id().equals(deployment.id())) { - Deployment old = existing.get(); - if (old.containerId() != null) { - orchestrator.stopContainer(old.containerId()); - orchestrator.removeContainer(old.containerId()); - } - deploymentService.markStopped(old.id()); - log.info("Stopped previous deployment {} for replacement", old.id()); + stopDeploymentContainers(existing.get()); + deploymentService.markStopped(existing.get().id()); + log.info("Stopped previous deployment {} for replacement", existing.get().id()); } - String jarPath = appService.resolveJarPath(deployment.appVersionId()); - App app = appService.getById(deployment.appId()); - Environment env = envService.getById(deployment.environmentId()); + // === COMPLETE === + updateStage(deployment.id(), DeployStage.COMPLETE); - Map envVars = new HashMap<>(); - envVars.put("CAMELEER_EXPORT_TYPE", "HTTP"); - envVars.put("CAMELEER_APPLICATION_ID", app.slug()); - envVars.put("CAMELEER_ENVIRONMENT_ID", env.slug()); - envVars.put("CAMELEER_DISPLAY_NAME", deployment.containerName()); - if (bootstrapToken != null && !bootstrapToken.isBlank()) { - envVars.put("CAMELEER_AUTH_TOKEN", bootstrapToken); + String primaryContainerId = newContainerIds.get(0); + DeploymentStatus finalStatus = healthyCount == config.replicas() + ? DeploymentStatus.RUNNING : DeploymentStatus.DEGRADED; + deploymentService.markRunning(deployment.id(), primaryContainerId); + if (finalStatus == DeploymentStatus.DEGRADED) { + deploymentRepository.updateStatus(deployment.id(), DeploymentStatus.DEGRADED, + primaryContainerId, null); } - Map labels = buildTraefikLabels(app, env, deployment); - - ContainerRequest request = new ContainerRequest( - deployment.containerName(), - baseImage, - jarPath, - dockerNetwork, - List.of(), - envVars, - labels, - parseMemoryLimitBytes(containerMemoryLimit), - null, - containerCpuShares, - null, - List.of(), - agentHealthPort, - "on-failure", - 3); - - String containerId = orchestrator.startContainer(request); - waitForHealthy(containerId, healthCheckTimeout); - - deploymentService.markRunning(deployment.id(), containerId); - log.info("Deployment {} is RUNNING (container={})", deployment.id(), containerId); + pgDeployRepo.updateDeployStage(deployment.id(), null); + log.info("Deployment {} is {} ({}/{} replicas healthy)", + deployment.id(), finalStatus, healthyCount, config.replicas()); } catch (Exception e) { log.error("Deployment {} FAILED: {}", deployment.id(), e.getMessage(), e); + pgDeployRepo.updateDeployStage(deployment.id(), null); deploymentService.markFailed(deployment.id(), e.getMessage()); } } public void stopDeployment(Deployment deployment) { - if (deployment.containerId() != null) { - orchestrator.stopContainer(deployment.containerId()); - orchestrator.removeContainer(deployment.containerId()); - } + pgDeployRepo.updateTargetState(deployment.id(), "STOPPED"); + deploymentRepository.updateStatus(deployment.id(), DeploymentStatus.STOPPING, + deployment.containerId(), null); + + stopDeploymentContainers(deployment); deploymentService.markStopped(deployment.id()); } - private void waitForHealthy(String containerId, int timeoutSeconds) throws InterruptedException { - long deadline = System.currentTimeMillis() + timeoutSeconds * 1000L; + private void stopDeploymentContainers(Deployment deployment) { + List> replicas = deployment.replicaStates() != null + ? deployment.replicaStates() : List.of(); + for (Map replica : replicas) { + String cid = (String) replica.get("containerId"); + if (cid != null) { + try { + orchestrator.stopContainer(cid); + orchestrator.removeContainer(cid); + } catch (Exception e) { + log.warn("Failed to stop replica container {}: {}", cid, e.getMessage()); + } + } + } + if (deployment.containerId() != null && replicas.isEmpty()) { + try { + orchestrator.stopContainer(deployment.containerId()); + orchestrator.removeContainer(deployment.containerId()); + } catch (Exception e) { + log.warn("Failed to stop container {}: {}", deployment.containerId(), e.getMessage()); + } + } + } + + private void preFlightChecks(String jarPath, ResolvedContainerConfig config) { + if (!Files.exists(Path.of(jarPath))) { + throw new IllegalStateException("JAR file not found: " + jarPath); + } + if (config.memoryLimitMb() <= 0) { + throw new IllegalStateException("Memory limit must be positive, got: " + config.memoryLimitMb()); + } + if (config.appPort() <= 0 || config.appPort() > 65535) { + throw new IllegalStateException("Invalid app port: " + config.appPort()); + } + if (config.replicas() < 1) { + throw new IllegalStateException("Replicas must be >= 1, got: " + config.replicas()); + } + } + + private Map buildEnvVars(App app, Environment env, ResolvedContainerConfig config) { + Map envVars = new LinkedHashMap<>(); + envVars.put("CAMELEER_EXPORT_TYPE", "HTTP"); + envVars.put("CAMELEER_APPLICATION_ID", app.slug()); + envVars.put("CAMELEER_ENVIRONMENT_ID", env.slug()); + envVars.put("CAMELEER_SERVER_URL", config.serverUrl()); + if (bootstrapToken != null && !bootstrapToken.isBlank()) { + envVars.put("CAMELEER_AUTH_TOKEN", bootstrapToken); + } + envVars.putAll(config.customEnvVars()); + return envVars; + } + + private int waitForAnyHealthy(List containerIds, int timeoutSeconds) { + long deadline = System.currentTimeMillis() + (timeoutSeconds * 1000L); while (System.currentTimeMillis() < deadline) { - ContainerStatus status = orchestrator.getContainerStatus(containerId); - if ("healthy".equalsIgnoreCase(status.state()) || (status.running() && "running".equalsIgnoreCase(status.state()))) { - return; + int healthy = 0; + for (String cid : containerIds) { + ContainerStatus status = orchestrator.getContainerStatus(cid); + if ("healthy".equals(status.state())) healthy++; } - if (!status.running()) { - throw new RuntimeException("Container stopped unexpectedly: " + status.error()); + if (healthy > 0) return healthy; + try { Thread.sleep(2000); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return 0; } - Thread.sleep(2000); } - throw new RuntimeException("Container health check timed out after " + timeoutSeconds + "s"); + return 0; } - private Map buildTraefikLabels(App app, Environment env, Deployment deployment) { - Map labels = new HashMap<>(); - labels.put("traefik.enable", "true"); - labels.put("managed-by", "cameleer3-server"); - labels.put("cameleer.app", app.slug()); - labels.put("cameleer.environment", env.slug()); - return labels; + private List> updateReplicaHealth(List> replicas, + List containerIds) { + List> updated = new ArrayList<>(); + for (Map replica : replicas) { + String cid = (String) replica.get("containerId"); + ContainerStatus status = orchestrator.getContainerStatus(cid); + Map copy = new HashMap<>(replica); + copy.put("status", status.running() ? "RUNNING" : "DEAD"); + updated.add(copy); + } + return updated; } - private long parseMemoryLimitBytes(String limit) { - String trimmed = limit.trim().toLowerCase(); - if (trimmed.endsWith("g")) { - return Long.parseLong(trimmed.substring(0, trimmed.length() - 1)) * 1024 * 1024 * 1024; - } else if (trimmed.endsWith("m")) { - return Long.parseLong(trimmed.substring(0, trimmed.length() - 1)) * 1024 * 1024; - } - return Long.parseLong(trimmed); + private void updateStage(UUID deploymentId, DeployStage stage) { + pgDeployRepo.updateDeployStage(deploymentId, stage.name()); + } + + private int parseMemoryLimitMb(String limit) { + limit = limit.trim().toLowerCase(); + if (limit.endsWith("g")) return (int) (Double.parseDouble(limit.replace("g", "")) * 1024); + if (limit.endsWith("m")) return (int) Double.parseDouble(limit.replace("m", "")); + return Integer.parseInt(limit); } }