feat: add ContainerLogForwarder for Docker log streaming to ClickHouse

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-14 23:15:49 +02:00
parent 9c65a3c3b9
commit 729944d3ac

View File

@@ -0,0 +1,260 @@
package com.cameleer3.server.app.runtime;
import com.cameleer3.common.model.LogEntry;
import com.cameleer3.server.app.search.ClickHouseLogStore;
import com.cameleer3.server.core.ingestion.BufferedLogEntry;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.model.Frame;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Streams Docker container stdout/stderr to ClickHouse until the agent registers via SSE.
* One capture session per container, managed by container ID.
*/
public class ContainerLogForwarder {
private static final Logger log = LoggerFactory.getLogger(ContainerLogForwarder.class);
private static final int FLUSH_BATCH_SIZE = 50;
private static final long FLUSH_INTERVAL_MS = 2_000;
private static final long MAX_CAPTURE_DURATION_MS = 5 * 60 * 1_000;
private static final long CLEANUP_INTERVAL_MS = 30_000;
// Pattern to match Docker timestamp prefix: "2026-04-14T14:23:01.234567890Z "
private static final Pattern DOCKER_TS_PATTERN = Pattern.compile(
"^(\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+Z)\\s(.*)$", Pattern.DOTALL);
// Pattern to infer log level from Java log output
private static final Pattern LEVEL_PATTERN = Pattern.compile(
"\\b(ERROR|WARN|INFO|DEBUG|TRACE)\\b");
private final DockerClient dockerClient;
private final ClickHouseLogStore logStore;
private final ConcurrentHashMap<String, CaptureSession> sessions = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newFixedThreadPool(10,
r -> { Thread t = new Thread(r, "log-capture"); t.setDaemon(true); return t; });
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
r -> { Thread t = new Thread(r, "log-capture-cleanup"); t.setDaemon(true); return t; });
public ContainerLogForwarder(DockerClient dockerClient, ClickHouseLogStore logStore) {
this.dockerClient = dockerClient;
this.logStore = logStore;
scheduler.scheduleAtFixedRate(this::cleanupExpiredSessions,
CLEANUP_INTERVAL_MS, CLEANUP_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
public void startCapture(String containerId, String appSlug, String envSlug, String tenantId) {
if (sessions.containsKey(containerId)) {
log.debug("Already capturing logs for container {}", containerId.substring(0, 12));
return;
}
CaptureSession session = new CaptureSession(containerId, appSlug, envSlug, tenantId);
if (sessions.putIfAbsent(containerId, session) != null) {
return; // another thread beat us
}
Future<?> future = executor.submit(() -> streamLogs(session));
session.future = future;
log.info("Started log capture for container {} (app={}, env={})",
containerId.substring(0, 12), appSlug, envSlug);
}
public void stopCapture(String containerId) {
CaptureSession session = sessions.remove(containerId);
if (session == null) return;
session.cancel();
flushBuffer(session);
log.info("Stopped log capture for container {} ({} lines captured)",
containerId.substring(0, 12), session.lineCount);
}
public void stopCaptureByApp(String appSlug, String envSlug) {
List<String> toRemove = new ArrayList<>();
for (Map.Entry<String, CaptureSession> entry : sessions.entrySet()) {
CaptureSession s = entry.getValue();
if (appSlug.equals(s.appSlug) && envSlug.equals(s.envSlug)) {
toRemove.add(entry.getKey());
}
}
for (String containerId : toRemove) {
stopCapture(containerId);
}
if (!toRemove.isEmpty()) {
log.info("Stopped log capture for app={} env={} ({} containers)",
appSlug, envSlug, toRemove.size());
}
}
@PreDestroy
public void shutdown() {
for (String containerId : new ArrayList<>(sessions.keySet())) {
stopCapture(containerId);
}
scheduler.shutdownNow();
executor.shutdownNow();
}
private void streamLogs(CaptureSession session) {
try {
session.callback = dockerClient.logContainerCmd(session.containerId)
.withFollowStream(true)
.withStdOut(true)
.withStdErr(true)
.withTimestamps(true)
.exec(new ResultCallback.Adapter<Frame>() {
@Override
public void onNext(Frame frame) {
if (session.cancelled) return;
String line = new String(frame.getPayload()).trim();
if (line.isEmpty()) return;
session.buffer.add(line);
session.lineCount++;
if (session.buffer.size() >= FLUSH_BATCH_SIZE
|| System.currentTimeMillis() - session.lastFlush >= FLUSH_INTERVAL_MS) {
flushBuffer(session);
}
}
@Override
public void onComplete() {
flushBuffer(session);
sessions.remove(session.containerId);
log.debug("Log stream completed for container {}",
session.containerId.substring(0, 12));
}
@Override
public void onError(Throwable throwable) {
flushBuffer(session);
sessions.remove(session.containerId);
log.debug("Log stream error for container {}: {}",
session.containerId.substring(0, 12), throwable.getMessage());
}
});
} catch (Exception e) {
sessions.remove(session.containerId);
log.warn("Failed to start log capture for container {}: {}",
session.containerId.substring(0, 12), e.getMessage());
}
}
private void flushBuffer(CaptureSession session) {
List<String> lines;
synchronized (session.buffer) {
if (session.buffer.isEmpty()) return;
lines = new ArrayList<>(session.buffer);
session.buffer.clear();
}
session.lastFlush = System.currentTimeMillis();
List<BufferedLogEntry> entries = new ArrayList<>(lines.size());
for (String line : lines) {
Instant timestamp = Instant.now();
String message = line;
Matcher tsMatcher = DOCKER_TS_PATTERN.matcher(line);
if (tsMatcher.matches()) {
try {
timestamp = Instant.parse(tsMatcher.group(1));
} catch (DateTimeParseException e) {
// keep Instant.now()
}
message = tsMatcher.group(2);
}
String level = inferLevel(message);
LogEntry logEntry = new LogEntry();
logEntry.setTimestamp(timestamp);
logEntry.setLevel(level);
logEntry.setMessage(message);
logEntry.setLoggerName("");
logEntry.setThreadName("");
logEntry.setStackTrace("");
logEntry.setMdc(Collections.emptyMap());
logEntry.setSource("container");
entries.add(new BufferedLogEntry(
session.tenantId, session.envSlug, session.containerId.substring(0, 12),
session.appSlug, logEntry));
}
try {
logStore.insertBufferedBatch(entries);
} catch (Exception e) {
log.warn("Failed to flush {} container log entries for {}: {}",
entries.size(), session.appSlug, e.getMessage());
}
}
private String inferLevel(String message) {
if (message.startsWith("\tat ") || message.startsWith("Caused by:")) {
return "ERROR";
}
Matcher m = LEVEL_PATTERN.matcher(message);
if (m.find()) {
return m.group(1);
}
return "INFO";
}
private void cleanupExpiredSessions() {
long now = System.currentTimeMillis();
for (Map.Entry<String, CaptureSession> entry : sessions.entrySet()) {
CaptureSession session = entry.getValue();
if (now - session.startedAt > MAX_CAPTURE_DURATION_MS) {
log.info("Log capture timeout for container {} (app={}), stopping",
entry.getKey().substring(0, 12), session.appSlug);
stopCapture(entry.getKey());
}
}
}
private static class CaptureSession {
final String containerId;
final String appSlug;
final String envSlug;
final String tenantId;
final long startedAt = System.currentTimeMillis();
final List<String> buffer = Collections.synchronizedList(new ArrayList<>());
volatile long lastFlush = System.currentTimeMillis();
volatile long lineCount = 0;
volatile boolean cancelled = false;
volatile Future<?> future;
volatile ResultCallback.Adapter<Frame> callback;
CaptureSession(String containerId, String appSlug, String envSlug, String tenantId) {
this.containerId = containerId;
this.appSlug = appSlug;
this.envSlug = envSlug;
this.tenantId = tenantId;
}
void cancel() {
cancelled = true;
if (callback != null) {
try { callback.close(); } catch (Exception e) { /* ignore */ }
}
if (future != null) {
future.cancel(true);
}
}
}
}