feat: rewrite DeploymentExecutor with staged deploy, config merge, replicas

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-08 20:27:37 +02:00
parent fef3ef6184
commit f8d42026da

View File

@@ -1,26 +1,17 @@
package com.cameleer3.server.app.runtime;
import com.cameleer3.server.core.runtime.App;
import com.cameleer3.server.core.runtime.AppService;
import com.cameleer3.server.core.runtime.ContainerRequest;
import com.cameleer3.server.core.runtime.ContainerStatus;
import com.cameleer3.server.core.runtime.Deployment;
import com.cameleer3.server.core.runtime.DeploymentRepository;
import com.cameleer3.server.core.runtime.DeploymentService;
import com.cameleer3.server.core.runtime.DeploymentStatus;
import com.cameleer3.server.core.runtime.Environment;
import com.cameleer3.server.core.runtime.EnvironmentService;
import com.cameleer3.server.core.runtime.RuntimeOrchestrator;
import com.cameleer3.server.app.storage.PostgresDeploymentRepository;
import com.cameleer3.server.core.runtime.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
@Service
public class DeploymentExecutor {
@@ -29,9 +20,13 @@ public class DeploymentExecutor {
private final RuntimeOrchestrator orchestrator;
private final DeploymentService deploymentService;
private final DeploymentRepository deploymentRepository;
private final AppService appService;
private final EnvironmentService envService;
private final DeploymentRepository deploymentRepository;
private final PostgresDeploymentRepository pgDeployRepo;
@Autowired(required = false)
private DockerNetworkManager networkManager;
@Value("${cameleer.runtime.base-image:cameleer-runtime-base:latest}")
private String baseImage;
@@ -40,10 +35,10 @@ public class DeploymentExecutor {
private String dockerNetwork;
@Value("${cameleer.runtime.container-memory-limit:512m}")
private String containerMemoryLimit;
private String globalMemoryLimit;
@Value("${cameleer.runtime.container-cpu-shares:512}")
private int containerCpuShares;
private int globalCpuShares;
@Value("${cameleer.runtime.health-check-timeout:60}")
private int healthCheckTimeout;
@@ -54,115 +49,252 @@ public class DeploymentExecutor {
@Value("${security.bootstrap-token:}")
private String bootstrapToken;
public DeploymentExecutor(RuntimeOrchestrator orchestrator, DeploymentService deploymentService,
DeploymentRepository deploymentRepository,
AppService appService, EnvironmentService envService) {
@Value("${cameleer.runtime.routing-mode:path}")
private String globalRoutingMode;
@Value("${cameleer.runtime.routing-domain:localhost}")
private String globalRoutingDomain;
@Value("${cameleer.runtime.server-url:}")
private String globalServerUrl;
public DeploymentExecutor(RuntimeOrchestrator orchestrator,
DeploymentService deploymentService,
AppService appService,
EnvironmentService envService,
DeploymentRepository deploymentRepository) {
this.orchestrator = orchestrator;
this.deploymentService = deploymentService;
this.deploymentRepository = deploymentRepository;
this.appService = appService;
this.envService = envService;
this.deploymentRepository = deploymentRepository;
this.pgDeployRepo = (PostgresDeploymentRepository) deploymentRepository;
}
@Async("deploymentTaskExecutor")
public void executeAsync(Deployment deployment) {
try {
// Stop existing deployment in same environment for same app
App app = appService.getById(deployment.appId());
Environment env = envService.getById(deployment.environmentId());
String jarPath = appService.resolveJarPath(deployment.appVersionId());
var globalDefaults = new ConfigMerger.GlobalRuntimeDefaults(
parseMemoryLimitMb(globalMemoryLimit),
globalCpuShares,
globalRoutingMode,
globalRoutingDomain,
globalServerUrl.isBlank() ? "http://cameleer3-server:8081" : globalServerUrl
);
ResolvedContainerConfig config = ConfigMerger.resolve(
globalDefaults, env.defaultContainerConfig(), app.containerConfig());
pgDeployRepo.updateDeploymentStrategy(deployment.id(), config.deploymentStrategy());
// === PRE-FLIGHT ===
updateStage(deployment.id(), DeployStage.PRE_FLIGHT);
preFlightChecks(jarPath, config);
// === PULL IMAGE ===
updateStage(deployment.id(), DeployStage.PULL_IMAGE);
// Docker pulls on create if not present locally
// === CREATE NETWORKS ===
updateStage(deployment.id(), DeployStage.CREATE_NETWORK);
String primaryNetwork = dockerNetwork;
String envNet = null;
if (networkManager != null) {
primaryNetwork = DockerNetworkManager.TRAEFIK_NETWORK;
networkManager.ensureNetwork(primaryNetwork);
envNet = DockerNetworkManager.envNetworkName(env.slug());
networkManager.ensureNetwork(envNet);
}
// === START REPLICAS ===
updateStage(deployment.id(), DeployStage.START_REPLICAS);
Map<String, String> baseEnvVars = buildEnvVars(app, env, config);
Map<String, String> labels = TraefikLabelBuilder.build(app.slug(), env.slug(), config);
List<Map<String, Object>> replicaStates = new ArrayList<>();
List<String> newContainerIds = new ArrayList<>();
for (int i = 0; i < config.replicas(); i++) {
String containerName = env.slug() + "-" + app.slug() + "-" + i;
Long cpuQuota = config.cpuLimit() != null ? (long) (config.cpuLimit() * 100_000) : null;
ContainerRequest request = new ContainerRequest(
containerName, baseImage, jarPath, primaryNetwork,
envNet != null ? List.of(envNet) : List.of(),
baseEnvVars, labels,
config.memoryLimitBytes(), config.memoryReserveBytes(),
config.cpuShares(), cpuQuota,
config.exposedPorts(), agentHealthPort,
"on-failure", 3
);
String containerId = orchestrator.startContainer(request);
newContainerIds.add(containerId);
replicaStates.add(Map.of(
"index", i,
"containerId", containerId,
"containerName", containerName,
"status", "STARTING"
));
}
pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates);
// === HEALTH CHECK ===
updateStage(deployment.id(), DeployStage.HEALTH_CHECK);
int healthyCount = waitForAnyHealthy(newContainerIds, healthCheckTimeout);
if (healthyCount == 0) {
for (String cid : newContainerIds) {
try { orchestrator.stopContainer(cid); orchestrator.removeContainer(cid); }
catch (Exception e) { log.warn("Cleanup failed for {}: {}", cid, e.getMessage()); }
}
pgDeployRepo.updateDeployStage(deployment.id(), null);
deploymentService.markFailed(deployment.id(), "No replicas passed health check within " + healthCheckTimeout + "s");
return;
}
replicaStates = updateReplicaHealth(replicaStates, newContainerIds);
pgDeployRepo.updateReplicaStates(deployment.id(), replicaStates);
// === SWAP TRAFFIC ===
updateStage(deployment.id(), DeployStage.SWAP_TRAFFIC);
Optional<Deployment> existing = deploymentRepository.findActiveByAppIdAndEnvironmentId(
deployment.appId(), deployment.environmentId());
if (existing.isPresent() && !existing.get().id().equals(deployment.id())) {
Deployment old = existing.get();
if (old.containerId() != null) {
orchestrator.stopContainer(old.containerId());
orchestrator.removeContainer(old.containerId());
}
deploymentService.markStopped(old.id());
log.info("Stopped previous deployment {} for replacement", old.id());
stopDeploymentContainers(existing.get());
deploymentService.markStopped(existing.get().id());
log.info("Stopped previous deployment {} for replacement", existing.get().id());
}
String jarPath = appService.resolveJarPath(deployment.appVersionId());
App app = appService.getById(deployment.appId());
Environment env = envService.getById(deployment.environmentId());
// === COMPLETE ===
updateStage(deployment.id(), DeployStage.COMPLETE);
Map<String, String> envVars = new HashMap<>();
envVars.put("CAMELEER_EXPORT_TYPE", "HTTP");
envVars.put("CAMELEER_APPLICATION_ID", app.slug());
envVars.put("CAMELEER_ENVIRONMENT_ID", env.slug());
envVars.put("CAMELEER_DISPLAY_NAME", deployment.containerName());
if (bootstrapToken != null && !bootstrapToken.isBlank()) {
envVars.put("CAMELEER_AUTH_TOKEN", bootstrapToken);
String primaryContainerId = newContainerIds.get(0);
DeploymentStatus finalStatus = healthyCount == config.replicas()
? DeploymentStatus.RUNNING : DeploymentStatus.DEGRADED;
deploymentService.markRunning(deployment.id(), primaryContainerId);
if (finalStatus == DeploymentStatus.DEGRADED) {
deploymentRepository.updateStatus(deployment.id(), DeploymentStatus.DEGRADED,
primaryContainerId, null);
}
Map<String, String> labels = buildTraefikLabels(app, env, deployment);
ContainerRequest request = new ContainerRequest(
deployment.containerName(),
baseImage,
jarPath,
dockerNetwork,
List.of(),
envVars,
labels,
parseMemoryLimitBytes(containerMemoryLimit),
null,
containerCpuShares,
null,
List.of(),
agentHealthPort,
"on-failure",
3);
String containerId = orchestrator.startContainer(request);
waitForHealthy(containerId, healthCheckTimeout);
deploymentService.markRunning(deployment.id(), containerId);
log.info("Deployment {} is RUNNING (container={})", deployment.id(), containerId);
pgDeployRepo.updateDeployStage(deployment.id(), null);
log.info("Deployment {} is {} ({}/{} replicas healthy)",
deployment.id(), finalStatus, healthyCount, config.replicas());
} catch (Exception e) {
log.error("Deployment {} FAILED: {}", deployment.id(), e.getMessage(), e);
pgDeployRepo.updateDeployStage(deployment.id(), null);
deploymentService.markFailed(deployment.id(), e.getMessage());
}
}
public void stopDeployment(Deployment deployment) {
if (deployment.containerId() != null) {
orchestrator.stopContainer(deployment.containerId());
orchestrator.removeContainer(deployment.containerId());
}
pgDeployRepo.updateTargetState(deployment.id(), "STOPPED");
deploymentRepository.updateStatus(deployment.id(), DeploymentStatus.STOPPING,
deployment.containerId(), null);
stopDeploymentContainers(deployment);
deploymentService.markStopped(deployment.id());
}
private void waitForHealthy(String containerId, int timeoutSeconds) throws InterruptedException {
long deadline = System.currentTimeMillis() + timeoutSeconds * 1000L;
private void stopDeploymentContainers(Deployment deployment) {
List<Map<String, Object>> replicas = deployment.replicaStates() != null
? deployment.replicaStates() : List.of();
for (Map<String, Object> replica : replicas) {
String cid = (String) replica.get("containerId");
if (cid != null) {
try {
orchestrator.stopContainer(cid);
orchestrator.removeContainer(cid);
} catch (Exception e) {
log.warn("Failed to stop replica container {}: {}", cid, e.getMessage());
}
}
}
if (deployment.containerId() != null && replicas.isEmpty()) {
try {
orchestrator.stopContainer(deployment.containerId());
orchestrator.removeContainer(deployment.containerId());
} catch (Exception e) {
log.warn("Failed to stop container {}: {}", deployment.containerId(), e.getMessage());
}
}
}
private void preFlightChecks(String jarPath, ResolvedContainerConfig config) {
if (!Files.exists(Path.of(jarPath))) {
throw new IllegalStateException("JAR file not found: " + jarPath);
}
if (config.memoryLimitMb() <= 0) {
throw new IllegalStateException("Memory limit must be positive, got: " + config.memoryLimitMb());
}
if (config.appPort() <= 0 || config.appPort() > 65535) {
throw new IllegalStateException("Invalid app port: " + config.appPort());
}
if (config.replicas() < 1) {
throw new IllegalStateException("Replicas must be >= 1, got: " + config.replicas());
}
}
private Map<String, String> buildEnvVars(App app, Environment env, ResolvedContainerConfig config) {
Map<String, String> envVars = new LinkedHashMap<>();
envVars.put("CAMELEER_EXPORT_TYPE", "HTTP");
envVars.put("CAMELEER_APPLICATION_ID", app.slug());
envVars.put("CAMELEER_ENVIRONMENT_ID", env.slug());
envVars.put("CAMELEER_SERVER_URL", config.serverUrl());
if (bootstrapToken != null && !bootstrapToken.isBlank()) {
envVars.put("CAMELEER_AUTH_TOKEN", bootstrapToken);
}
envVars.putAll(config.customEnvVars());
return envVars;
}
private int waitForAnyHealthy(List<String> containerIds, int timeoutSeconds) {
long deadline = System.currentTimeMillis() + (timeoutSeconds * 1000L);
while (System.currentTimeMillis() < deadline) {
ContainerStatus status = orchestrator.getContainerStatus(containerId);
if ("healthy".equalsIgnoreCase(status.state()) || (status.running() && "running".equalsIgnoreCase(status.state()))) {
return;
int healthy = 0;
for (String cid : containerIds) {
ContainerStatus status = orchestrator.getContainerStatus(cid);
if ("healthy".equals(status.state())) healthy++;
}
if (!status.running()) {
throw new RuntimeException("Container stopped unexpectedly: " + status.error());
if (healthy > 0) return healthy;
try { Thread.sleep(2000); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
return 0;
}
Thread.sleep(2000);
}
throw new RuntimeException("Container health check timed out after " + timeoutSeconds + "s");
return 0;
}
private Map<String, String> buildTraefikLabels(App app, Environment env, Deployment deployment) {
Map<String, String> labels = new HashMap<>();
labels.put("traefik.enable", "true");
labels.put("managed-by", "cameleer3-server");
labels.put("cameleer.app", app.slug());
labels.put("cameleer.environment", env.slug());
return labels;
private List<Map<String, Object>> updateReplicaHealth(List<Map<String, Object>> replicas,
List<String> containerIds) {
List<Map<String, Object>> updated = new ArrayList<>();
for (Map<String, Object> replica : replicas) {
String cid = (String) replica.get("containerId");
ContainerStatus status = orchestrator.getContainerStatus(cid);
Map<String, Object> copy = new HashMap<>(replica);
copy.put("status", status.running() ? "RUNNING" : "DEAD");
updated.add(copy);
}
return updated;
}
private long parseMemoryLimitBytes(String limit) {
String trimmed = limit.trim().toLowerCase();
if (trimmed.endsWith("g")) {
return Long.parseLong(trimmed.substring(0, trimmed.length() - 1)) * 1024 * 1024 * 1024;
} else if (trimmed.endsWith("m")) {
return Long.parseLong(trimmed.substring(0, trimmed.length() - 1)) * 1024 * 1024;
}
return Long.parseLong(trimmed);
private void updateStage(UUID deploymentId, DeployStage stage) {
pgDeployRepo.updateDeployStage(deploymentId, stage.name());
}
private int parseMemoryLimitMb(String limit) {
limit = limit.trim().toLowerCase();
if (limit.endsWith("g")) return (int) (Double.parseDouble(limit.replace("g", "")) * 1024);
if (limit.endsWith("m")) return (int) Double.parseDouble(limit.replace("m", ""));
return Integer.parseInt(limit);
}
}