diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/runtime/ContainerLogForwarder.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/runtime/ContainerLogForwarder.java new file mode 100644 index 00000000..407918a3 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/runtime/ContainerLogForwarder.java @@ -0,0 +1,260 @@ +package com.cameleer3.server.app.runtime; + +import com.cameleer3.common.model.LogEntry; +import com.cameleer3.server.app.search.ClickHouseLogStore; +import com.cameleer3.server.core.ingestion.BufferedLogEntry; +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.async.ResultCallback; +import com.github.dockerjava.api.model.Frame; +import jakarta.annotation.PreDestroy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.time.format.DateTimeParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Streams Docker container stdout/stderr to ClickHouse until the agent registers via SSE. + * One capture session per container, managed by container ID. + */ +public class ContainerLogForwarder { + + private static final Logger log = LoggerFactory.getLogger(ContainerLogForwarder.class); + + private static final int FLUSH_BATCH_SIZE = 50; + private static final long FLUSH_INTERVAL_MS = 2_000; + private static final long MAX_CAPTURE_DURATION_MS = 5 * 60 * 1_000; + private static final long CLEANUP_INTERVAL_MS = 30_000; + + // Pattern to match Docker timestamp prefix: "2026-04-14T14:23:01.234567890Z " + private static final Pattern DOCKER_TS_PATTERN = Pattern.compile( + "^(\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+Z)\\s(.*)$", Pattern.DOTALL); + + // Pattern to infer log level from Java log output + private static final Pattern LEVEL_PATTERN = Pattern.compile( + "\\b(ERROR|WARN|INFO|DEBUG|TRACE)\\b"); + + private final DockerClient dockerClient; + private final ClickHouseLogStore logStore; + private final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); + private final ExecutorService executor = Executors.newFixedThreadPool(10, + r -> { Thread t = new Thread(r, "log-capture"); t.setDaemon(true); return t; }); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( + r -> { Thread t = new Thread(r, "log-capture-cleanup"); t.setDaemon(true); return t; }); + + public ContainerLogForwarder(DockerClient dockerClient, ClickHouseLogStore logStore) { + this.dockerClient = dockerClient; + this.logStore = logStore; + scheduler.scheduleAtFixedRate(this::cleanupExpiredSessions, + CLEANUP_INTERVAL_MS, CLEANUP_INTERVAL_MS, TimeUnit.MILLISECONDS); + } + + public void startCapture(String containerId, String appSlug, String envSlug, String tenantId) { + if (sessions.containsKey(containerId)) { + log.debug("Already capturing logs for container {}", containerId.substring(0, 12)); + return; + } + + CaptureSession session = new CaptureSession(containerId, appSlug, envSlug, tenantId); + if (sessions.putIfAbsent(containerId, session) != null) { + return; // another thread beat us + } + + Future future = executor.submit(() -> streamLogs(session)); + session.future = future; + log.info("Started log capture for container {} (app={}, env={})", + containerId.substring(0, 12), appSlug, envSlug); + } + + public void stopCapture(String containerId) { + CaptureSession session = sessions.remove(containerId); + if (session == null) return; + + session.cancel(); + flushBuffer(session); + log.info("Stopped log capture for container {} ({} lines captured)", + containerId.substring(0, 12), session.lineCount); + } + + public void stopCaptureByApp(String appSlug, String envSlug) { + List toRemove = new ArrayList<>(); + for (Map.Entry entry : sessions.entrySet()) { + CaptureSession s = entry.getValue(); + if (appSlug.equals(s.appSlug) && envSlug.equals(s.envSlug)) { + toRemove.add(entry.getKey()); + } + } + for (String containerId : toRemove) { + stopCapture(containerId); + } + if (!toRemove.isEmpty()) { + log.info("Stopped log capture for app={} env={} ({} containers)", + appSlug, envSlug, toRemove.size()); + } + } + + @PreDestroy + public void shutdown() { + for (String containerId : new ArrayList<>(sessions.keySet())) { + stopCapture(containerId); + } + scheduler.shutdownNow(); + executor.shutdownNow(); + } + + private void streamLogs(CaptureSession session) { + try { + session.callback = dockerClient.logContainerCmd(session.containerId) + .withFollowStream(true) + .withStdOut(true) + .withStdErr(true) + .withTimestamps(true) + .exec(new ResultCallback.Adapter() { + @Override + public void onNext(Frame frame) { + if (session.cancelled) return; + String line = new String(frame.getPayload()).trim(); + if (line.isEmpty()) return; + + session.buffer.add(line); + session.lineCount++; + + if (session.buffer.size() >= FLUSH_BATCH_SIZE + || System.currentTimeMillis() - session.lastFlush >= FLUSH_INTERVAL_MS) { + flushBuffer(session); + } + } + + @Override + public void onComplete() { + flushBuffer(session); + sessions.remove(session.containerId); + log.debug("Log stream completed for container {}", + session.containerId.substring(0, 12)); + } + + @Override + public void onError(Throwable throwable) { + flushBuffer(session); + sessions.remove(session.containerId); + log.debug("Log stream error for container {}: {}", + session.containerId.substring(0, 12), throwable.getMessage()); + } + }); + } catch (Exception e) { + sessions.remove(session.containerId); + log.warn("Failed to start log capture for container {}: {}", + session.containerId.substring(0, 12), e.getMessage()); + } + } + + private void flushBuffer(CaptureSession session) { + List lines; + synchronized (session.buffer) { + if (session.buffer.isEmpty()) return; + lines = new ArrayList<>(session.buffer); + session.buffer.clear(); + } + session.lastFlush = System.currentTimeMillis(); + + List entries = new ArrayList<>(lines.size()); + for (String line : lines) { + Instant timestamp = Instant.now(); + String message = line; + + Matcher tsMatcher = DOCKER_TS_PATTERN.matcher(line); + if (tsMatcher.matches()) { + try { + timestamp = Instant.parse(tsMatcher.group(1)); + } catch (DateTimeParseException e) { + // keep Instant.now() + } + message = tsMatcher.group(2); + } + + String level = inferLevel(message); + + LogEntry logEntry = new LogEntry(); + logEntry.setTimestamp(timestamp); + logEntry.setLevel(level); + logEntry.setMessage(message); + logEntry.setLoggerName(""); + logEntry.setThreadName(""); + logEntry.setStackTrace(""); + logEntry.setMdc(Collections.emptyMap()); + logEntry.setSource("container"); + + entries.add(new BufferedLogEntry( + session.tenantId, session.envSlug, session.containerId.substring(0, 12), + session.appSlug, logEntry)); + } + + try { + logStore.insertBufferedBatch(entries); + } catch (Exception e) { + log.warn("Failed to flush {} container log entries for {}: {}", + entries.size(), session.appSlug, e.getMessage()); + } + } + + private String inferLevel(String message) { + if (message.startsWith("\tat ") || message.startsWith("Caused by:")) { + return "ERROR"; + } + Matcher m = LEVEL_PATTERN.matcher(message); + if (m.find()) { + return m.group(1); + } + return "INFO"; + } + + private void cleanupExpiredSessions() { + long now = System.currentTimeMillis(); + for (Map.Entry entry : sessions.entrySet()) { + CaptureSession session = entry.getValue(); + if (now - session.startedAt > MAX_CAPTURE_DURATION_MS) { + log.info("Log capture timeout for container {} (app={}), stopping", + entry.getKey().substring(0, 12), session.appSlug); + stopCapture(entry.getKey()); + } + } + } + + private static class CaptureSession { + final String containerId; + final String appSlug; + final String envSlug; + final String tenantId; + final long startedAt = System.currentTimeMillis(); + final List buffer = Collections.synchronizedList(new ArrayList<>()); + volatile long lastFlush = System.currentTimeMillis(); + volatile long lineCount = 0; + volatile boolean cancelled = false; + volatile Future future; + volatile ResultCallback.Adapter callback; + + CaptureSession(String containerId, String appSlug, String envSlug, String tenantId) { + this.containerId = containerId; + this.appSlug = appSlug; + this.envSlug = envSlug; + this.tenantId = tenantId; + } + + void cancel() { + cancelled = true; + if (callback != null) { + try { callback.close(); } catch (Exception e) { /* ignore */ } + } + if (future != null) { + future.cancel(true); + } + } + } +}