feat: implement async DeploymentExecutor pipeline
- Async container deployment with health check polling - Stops previous deployment before starting new one - Configurable memory, CPU, health timeout via application properties - @EnableAsync on application class for Spring async proxy Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -5,6 +5,7 @@ import com.cameleer3.server.app.config.IngestionConfig;
|
|||||||
import org.springframework.boot.SpringApplication;
|
import org.springframework.boot.SpringApplication;
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
|
import org.springframework.scheduling.annotation.EnableAsync;
|
||||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -16,6 +17,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
|
|||||||
"com.cameleer3.server.app",
|
"com.cameleer3.server.app",
|
||||||
"com.cameleer3.server.core"
|
"com.cameleer3.server.core"
|
||||||
})
|
})
|
||||||
|
@EnableAsync
|
||||||
@EnableScheduling
|
@EnableScheduling
|
||||||
@EnableConfigurationProperties({IngestionConfig.class, AgentRegistryConfig.class})
|
@EnableConfigurationProperties({IngestionConfig.class, AgentRegistryConfig.class})
|
||||||
public class Cameleer3ServerApplication {
|
public class Cameleer3ServerApplication {
|
||||||
|
|||||||
@@ -0,0 +1,161 @@
|
|||||||
|
package com.cameleer3.server.app.runtime;
|
||||||
|
|
||||||
|
import com.cameleer3.server.core.runtime.App;
|
||||||
|
import com.cameleer3.server.core.runtime.AppService;
|
||||||
|
import com.cameleer3.server.core.runtime.ContainerRequest;
|
||||||
|
import com.cameleer3.server.core.runtime.ContainerStatus;
|
||||||
|
import com.cameleer3.server.core.runtime.Deployment;
|
||||||
|
import com.cameleer3.server.core.runtime.DeploymentRepository;
|
||||||
|
import com.cameleer3.server.core.runtime.DeploymentService;
|
||||||
|
import com.cameleer3.server.core.runtime.DeploymentStatus;
|
||||||
|
import com.cameleer3.server.core.runtime.Environment;
|
||||||
|
import com.cameleer3.server.core.runtime.EnvironmentService;
|
||||||
|
import com.cameleer3.server.core.runtime.RuntimeOrchestrator;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class DeploymentExecutor {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(DeploymentExecutor.class);
|
||||||
|
|
||||||
|
private final RuntimeOrchestrator orchestrator;
|
||||||
|
private final DeploymentService deploymentService;
|
||||||
|
private final DeploymentRepository deploymentRepository;
|
||||||
|
private final AppService appService;
|
||||||
|
private final EnvironmentService envService;
|
||||||
|
|
||||||
|
@Value("${cameleer.runtime.base-image:cameleer-runtime-base:latest}")
|
||||||
|
private String baseImage;
|
||||||
|
|
||||||
|
@Value("${cameleer.runtime.docker-network:cameleer}")
|
||||||
|
private String dockerNetwork;
|
||||||
|
|
||||||
|
@Value("${cameleer.runtime.container-memory-limit:512m}")
|
||||||
|
private String containerMemoryLimit;
|
||||||
|
|
||||||
|
@Value("${cameleer.runtime.container-cpu-shares:512}")
|
||||||
|
private int containerCpuShares;
|
||||||
|
|
||||||
|
@Value("${cameleer.runtime.health-check-timeout:60}")
|
||||||
|
private int healthCheckTimeout;
|
||||||
|
|
||||||
|
@Value("${cameleer.runtime.agent-health-port:9464}")
|
||||||
|
private int agentHealthPort;
|
||||||
|
|
||||||
|
@Value("${security.bootstrap-token:}")
|
||||||
|
private String bootstrapToken;
|
||||||
|
|
||||||
|
public DeploymentExecutor(RuntimeOrchestrator orchestrator, DeploymentService deploymentService,
|
||||||
|
DeploymentRepository deploymentRepository,
|
||||||
|
AppService appService, EnvironmentService envService) {
|
||||||
|
this.orchestrator = orchestrator;
|
||||||
|
this.deploymentService = deploymentService;
|
||||||
|
this.deploymentRepository = deploymentRepository;
|
||||||
|
this.appService = appService;
|
||||||
|
this.envService = envService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Async("deploymentExecutor")
|
||||||
|
public void executeAsync(Deployment deployment) {
|
||||||
|
try {
|
||||||
|
// Stop existing deployment in same environment for same app
|
||||||
|
Optional<Deployment> existing = deploymentRepository.findActiveByAppIdAndEnvironmentId(
|
||||||
|
deployment.appId(), deployment.environmentId());
|
||||||
|
if (existing.isPresent() && !existing.get().id().equals(deployment.id())) {
|
||||||
|
Deployment old = existing.get();
|
||||||
|
if (old.containerId() != null) {
|
||||||
|
orchestrator.stopContainer(old.containerId());
|
||||||
|
orchestrator.removeContainer(old.containerId());
|
||||||
|
}
|
||||||
|
deploymentService.markStopped(old.id());
|
||||||
|
log.info("Stopped previous deployment {} for replacement", old.id());
|
||||||
|
}
|
||||||
|
|
||||||
|
String jarPath = appService.resolveJarPath(deployment.appVersionId());
|
||||||
|
App app = appService.getById(deployment.appId());
|
||||||
|
Environment env = envService.getById(deployment.environmentId());
|
||||||
|
|
||||||
|
Map<String, String> envVars = new HashMap<>();
|
||||||
|
envVars.put("CAMELEER_EXPORT_TYPE", "HTTP");
|
||||||
|
envVars.put("CAMELEER_APPLICATION_ID", app.slug());
|
||||||
|
envVars.put("CAMELEER_ENVIRONMENT_ID", env.slug());
|
||||||
|
envVars.put("CAMELEER_DISPLAY_NAME", deployment.containerName());
|
||||||
|
if (bootstrapToken != null && !bootstrapToken.isBlank()) {
|
||||||
|
envVars.put("CAMELEER_AUTH_TOKEN", bootstrapToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, String> labels = buildTraefikLabels(app, env, deployment);
|
||||||
|
|
||||||
|
ContainerRequest request = new ContainerRequest(
|
||||||
|
deployment.containerName(),
|
||||||
|
baseImage,
|
||||||
|
jarPath,
|
||||||
|
dockerNetwork,
|
||||||
|
envVars,
|
||||||
|
labels,
|
||||||
|
parseMemoryLimitBytes(containerMemoryLimit),
|
||||||
|
containerCpuShares,
|
||||||
|
agentHealthPort);
|
||||||
|
|
||||||
|
String containerId = orchestrator.startContainer(request);
|
||||||
|
waitForHealthy(containerId, healthCheckTimeout);
|
||||||
|
|
||||||
|
deploymentService.markRunning(deployment.id(), containerId);
|
||||||
|
log.info("Deployment {} is RUNNING (container={})", deployment.id(), containerId);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Deployment {} FAILED: {}", deployment.id(), e.getMessage(), e);
|
||||||
|
deploymentService.markFailed(deployment.id(), e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stopDeployment(Deployment deployment) {
|
||||||
|
if (deployment.containerId() != null) {
|
||||||
|
orchestrator.stopContainer(deployment.containerId());
|
||||||
|
orchestrator.removeContainer(deployment.containerId());
|
||||||
|
}
|
||||||
|
deploymentService.markStopped(deployment.id());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForHealthy(String containerId, int timeoutSeconds) throws InterruptedException {
|
||||||
|
long deadline = System.currentTimeMillis() + timeoutSeconds * 1000L;
|
||||||
|
while (System.currentTimeMillis() < deadline) {
|
||||||
|
ContainerStatus status = orchestrator.getContainerStatus(containerId);
|
||||||
|
if ("healthy".equalsIgnoreCase(status.state()) || (status.running() && "running".equalsIgnoreCase(status.state()))) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!status.running()) {
|
||||||
|
throw new RuntimeException("Container stopped unexpectedly: " + status.error());
|
||||||
|
}
|
||||||
|
Thread.sleep(2000);
|
||||||
|
}
|
||||||
|
throw new RuntimeException("Container health check timed out after " + timeoutSeconds + "s");
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, String> buildTraefikLabels(App app, Environment env, Deployment deployment) {
|
||||||
|
Map<String, String> labels = new HashMap<>();
|
||||||
|
labels.put("traefik.enable", "true");
|
||||||
|
labels.put("managed-by", "cameleer3-server");
|
||||||
|
labels.put("cameleer.app", app.slug());
|
||||||
|
labels.put("cameleer.environment", env.slug());
|
||||||
|
return labels;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long parseMemoryLimitBytes(String limit) {
|
||||||
|
String trimmed = limit.trim().toLowerCase();
|
||||||
|
if (trimmed.endsWith("g")) {
|
||||||
|
return Long.parseLong(trimmed.substring(0, trimmed.length() - 1)) * 1024 * 1024 * 1024;
|
||||||
|
} else if (trimmed.endsWith("m")) {
|
||||||
|
return Long.parseLong(trimmed.substring(0, trimmed.length() - 1)) * 1024 * 1024;
|
||||||
|
}
|
||||||
|
return Long.parseLong(trimmed);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user