fix: unify container/agent log identity and fix multi-replica log capture
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m21s
CI / docker (push) Successful in 1m18s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 42s

Four logging pipeline fixes:

1. Multi-replica startup logs: remove stopLogCaptureByApp from
   SseConnectionManager — container log capture now expires naturally
   after 60s instead of being killed when the first agent connects SSE.
   This ensures all replicas' bootstrap output is captured.

2. Unified instance_id: container logs and agent logs now share the same
   instance identity ({envSlug}-{appSlug}-{replicaIndex}). DeploymentExecutor
   sets CAMELEER_AGENT_INSTANCEID per replica so the agent uses the same
   ID as ContainerLogForwarder. Instance-level log views now show both
   container and agent logs.

3. Labels-first container identity: TraefikLabelBuilder emits cameleer.replica
   and cameleer.instance-id labels. Container names are tenant-prefixed
   ({tenantId}-{envSlug}-{appSlug}-{idx}) for global Docker daemon uniqueness.

4. Environment filter on log queries: useApplicationLogs now passes the
   selected environment to the API, preventing log leakage across environments.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-15 10:54:05 +02:00
parent bf2d07f3ba
commit fd806b9f2d
9 changed files with 43 additions and 65 deletions

View File

@@ -3,9 +3,7 @@ package com.cameleer3.server.app.agent;
import com.cameleer3.server.app.config.AgentRegistryConfig;
import com.cameleer3.server.core.agent.AgentCommand;
import com.cameleer3.server.core.agent.AgentEventListener;
import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.runtime.RuntimeOrchestrator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
@@ -37,16 +35,13 @@ public class SseConnectionManager implements AgentEventListener {
private final AgentRegistryConfig config;
private final SsePayloadSigner ssePayloadSigner;
private final ObjectMapper objectMapper;
private final RuntimeOrchestrator runtimeOrchestrator;
public SseConnectionManager(AgentRegistryService registryService, AgentRegistryConfig config,
SsePayloadSigner ssePayloadSigner, ObjectMapper objectMapper,
RuntimeOrchestrator runtimeOrchestrator) {
SsePayloadSigner ssePayloadSigner, ObjectMapper objectMapper) {
this.registryService = registryService;
this.config = config;
this.ssePayloadSigner = ssePayloadSigner;
this.objectMapper = objectMapper;
this.runtimeOrchestrator = runtimeOrchestrator;
}
@PostConstruct
@@ -87,12 +82,6 @@ public class SseConnectionManager implements AgentEventListener {
log.info("SSE connection established for agent {}", agentId);
// Stop container log capture — agent is now online and will send its own logs
AgentInfo agent = registryService.findById(agentId);
if (agent != null) {
runtimeOrchestrator.stopLogCaptureByApp(agent.applicationId(), agent.environmentId());
}
return emitter;
}

View File

@@ -30,7 +30,7 @@ public class ContainerLogForwarder {
private static final int FLUSH_BATCH_SIZE = 50;
private static final long FLUSH_INTERVAL_MS = 2_000;
private static final long MAX_CAPTURE_DURATION_MS = 5 * 60 * 1_000;
private static final long MAX_CAPTURE_DURATION_MS = 60_000;
private static final long CLEANUP_INTERVAL_MS = 30_000;
// Pattern to match Docker timestamp prefix: "2026-04-14T14:23:01.234567890Z "
@@ -56,21 +56,21 @@ public class ContainerLogForwarder {
CLEANUP_INTERVAL_MS, CLEANUP_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
public void startCapture(String containerId, String appSlug, String envSlug, String tenantId) {
public void startCapture(String containerId, String instanceId, String appSlug, String envSlug, String tenantId) {
if (sessions.containsKey(containerId)) {
log.debug("Already capturing logs for container {}", containerId.substring(0, 12));
return;
}
CaptureSession session = new CaptureSession(containerId, appSlug, envSlug, tenantId);
CaptureSession session = new CaptureSession(containerId, instanceId, appSlug, envSlug, tenantId);
if (sessions.putIfAbsent(containerId, session) != null) {
return; // another thread beat us
}
Future<?> future = executor.submit(() -> streamLogs(session));
session.future = future;
log.info("Started log capture for container {} (app={}, env={})",
containerId.substring(0, 12), appSlug, envSlug);
log.info("Started log capture for container {} (instance={}, app={}, env={})",
containerId.substring(0, 12), instanceId, appSlug, envSlug);
}
public void stopCapture(String containerId) {
@@ -83,23 +83,6 @@ public class ContainerLogForwarder {
containerId.substring(0, 12), session.lineCount);
}
public void stopCaptureByApp(String appSlug, String envSlug) {
List<String> toRemove = new ArrayList<>();
for (Map.Entry<String, CaptureSession> entry : sessions.entrySet()) {
CaptureSession s = entry.getValue();
if (appSlug.equals(s.appSlug) && envSlug.equals(s.envSlug)) {
toRemove.add(entry.getKey());
}
}
for (String containerId : toRemove) {
stopCapture(containerId);
}
if (!toRemove.isEmpty()) {
log.info("Stopped log capture for app={} env={} ({} containers)",
appSlug, envSlug, toRemove.size());
}
}
@PreDestroy
public void shutdown() {
for (String containerId : new ArrayList<>(sessions.keySet())) {
@@ -192,7 +175,7 @@ public class ContainerLogForwarder {
logEntry.setSource("container");
entries.add(new BufferedLogEntry(
session.tenantId, session.envSlug, session.containerId.substring(0, 12),
session.tenantId, session.envSlug, session.instanceId,
session.appSlug, logEntry));
}
@@ -229,6 +212,7 @@ public class ContainerLogForwarder {
private static class CaptureSession {
final String containerId;
final String instanceId;
final String appSlug;
final String envSlug;
final String tenantId;
@@ -240,8 +224,9 @@ public class ContainerLogForwarder {
volatile Future<?> future;
volatile ResultCallback.Adapter<Frame> callback;
CaptureSession(String containerId, String appSlug, String envSlug, String tenantId) {
CaptureSession(String containerId, String instanceId, String appSlug, String envSlug, String tenantId) {
this.containerId = containerId;
this.instanceId = instanceId;
this.appSlug = appSlug;
this.envSlug = envSlug;
this.tenantId = tenantId;

View File

@@ -166,14 +166,22 @@ public class DeploymentExecutor {
updateStage(deployment.id(), DeployStage.START_REPLICAS);
Map<String, String> baseEnvVars = buildEnvVars(app, env, config);
Map<String, String> labels = TraefikLabelBuilder.build(app.slug(), env.slug(), tenantId, config);
labels.putAll(PrometheusLabelBuilder.build(resolvedRuntimeType));
Map<String, String> prometheusLabels = PrometheusLabelBuilder.build(resolvedRuntimeType);
List<Map<String, Object>> replicaStates = new ArrayList<>();
List<String> newContainerIds = new ArrayList<>();
for (int i = 0; i < config.replicas(); i++) {
String containerName = env.slug() + "-" + app.slug() + "-" + i;
String instanceId = env.slug() + "-" + app.slug() + "-" + i;
String containerName = tenantId + "-" + instanceId;
// Per-replica labels (include replica index and instance-id)
Map<String, String> labels = TraefikLabelBuilder.build(app.slug(), env.slug(), tenantId, config, i);
labels.putAll(prometheusLabels);
// Per-replica env vars (set agent instance ID to match container log identity)
Map<String, String> replicaEnvVars = new LinkedHashMap<>(baseEnvVars);
replicaEnvVars.put("CAMELEER_AGENT_INSTANCEID", instanceId);
String volumeName = jarDockerVolume != null && !jarDockerVolume.isBlank() ? jarDockerVolume : null;
ContainerRequest request = new ContainerRequest(
@@ -181,7 +189,7 @@ public class DeploymentExecutor {
volumeName, jarStoragePath,
primaryNetwork,
additionalNets,
baseEnvVars, labels,
replicaEnvVars, labels,
config.memoryLimitBytes(), config.memoryReserveBytes(),
config.dockerCpuShares(), config.dockerCpuQuota(),
config.exposedPorts(), agentHealthPort,
@@ -199,7 +207,7 @@ public class DeploymentExecutor {
}
}
orchestrator.startLogCapture(containerId, app.slug(), env.slug(), tenantId);
orchestrator.startLogCapture(containerId, instanceId, app.slug(), env.slug(), tenantId);
replicaStates.add(Map.of(
"index", i,

View File

@@ -13,7 +13,6 @@ public class DisabledRuntimeOrchestrator implements RuntimeOrchestrator {
@Override public void removeContainer(String id) { throw new UnsupportedOperationException("Runtime management disabled"); }
@Override public ContainerStatus getContainerStatus(String id) { return ContainerStatus.notFound(); }
@Override public Stream<String> getLogs(String id, int tail) { return Stream.empty(); }
@Override public void startLogCapture(String containerId, String appSlug, String envSlug, String tenantId) {}
@Override public void startLogCapture(String containerId, String instanceId, String appSlug, String envSlug, String tenantId) {}
@Override public void stopLogCapture(String containerId) {}
@Override public void stopLogCaptureByApp(String appSlug, String envSlug) {}
}

View File

@@ -192,9 +192,9 @@ public class DockerRuntimeOrchestrator implements RuntimeOrchestrator {
}
@Override
public void startLogCapture(String containerId, String appSlug, String envSlug, String tenantId) {
public void startLogCapture(String containerId, String instanceId, String appSlug, String envSlug, String tenantId) {
if (logForwarder != null) {
logForwarder.startCapture(containerId, appSlug, envSlug, tenantId);
logForwarder.startCapture(containerId, instanceId, appSlug, envSlug, tenantId);
}
}
@@ -204,11 +204,4 @@ public class DockerRuntimeOrchestrator implements RuntimeOrchestrator {
logForwarder.stopCapture(containerId);
}
}
@Override
public void stopLogCaptureByApp(String appSlug, String envSlug) {
if (logForwarder != null) {
logForwarder.stopCaptureByApp(appSlug, envSlug);
}
}
}

View File

@@ -9,8 +9,10 @@ public final class TraefikLabelBuilder {
private TraefikLabelBuilder() {}
public static Map<String, String> build(String appSlug, String envSlug, String tenantId, ResolvedContainerConfig config) {
public static Map<String, String> build(String appSlug, String envSlug, String tenantId,
ResolvedContainerConfig config, int replicaIndex) {
String svc = envSlug + "-" + appSlug;
String instanceId = envSlug + "-" + appSlug + "-" + replicaIndex;
Map<String, String> labels = new LinkedHashMap<>();
labels.put("traefik.enable", "true");
@@ -18,6 +20,8 @@ public final class TraefikLabelBuilder {
labels.put("cameleer.tenant", tenantId);
labels.put("cameleer.app", appSlug);
labels.put("cameleer.environment", envSlug);
labels.put("cameleer.replica", String.valueOf(replicaIndex));
labels.put("cameleer.instance-id", instanceId);
labels.put("traefik.http.services." + svc + ".loadbalancer.server.port",
String.valueOf(config.appPort()));