diff --git a/src/main/java/net/siegeln/cameleer/saas/log/ContainerLogService.java b/src/main/java/net/siegeln/cameleer/saas/log/ContainerLogService.java new file mode 100644 index 0000000..e4fe00c --- /dev/null +++ b/src/main/java/net/siegeln/cameleer/saas/log/ContainerLogService.java @@ -0,0 +1,137 @@ +package net.siegeln.cameleer.saas.log; + +import net.siegeln.cameleer.saas.log.dto.LogEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Service; + +import javax.sql.DataSource; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Service +public class ContainerLogService { + + private static final Logger log = LoggerFactory.getLogger(ContainerLogService.class); + private static final int FLUSH_THRESHOLD = 100; + + private final DataSource clickHouseDataSource; + private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); + + @Autowired + public ContainerLogService( + @Autowired(required = false) @Qualifier("clickHouseDataSource") DataSource clickHouseDataSource) { + this.clickHouseDataSource = clickHouseDataSource; + if (clickHouseDataSource == null) { + log.warn("ClickHouse data source not available — ContainerLogService running in no-op mode"); + } else { + initSchema(); + } + } + + void initSchema() { + if (clickHouseDataSource == null) return; + try (var conn = clickHouseDataSource.getConnection(); + var stmt = conn.createStatement()) { + stmt.execute(""" + CREATE TABLE IF NOT EXISTS container_logs ( + tenant_id UUID, + environment_id UUID, + app_id UUID, + deployment_id UUID, + timestamp DateTime64(3), + stream String, + message String + ) ENGINE = MergeTree() + ORDER BY (tenant_id, environment_id, app_id, timestamp) + """); + } catch (Exception e) { + log.error("Failed to initialize ClickHouse schema", e); + } + } + + public void write(UUID tenantId, UUID envId, UUID appId, UUID deploymentId, + String stream, String message, long timestampMillis) { + if (clickHouseDataSource == null) return; + buffer.add(new Object[]{tenantId, envId, appId, deploymentId, timestampMillis, stream, message}); + if (buffer.size() >= FLUSH_THRESHOLD) { + flush(); + } + } + + public void flush() { + if (clickHouseDataSource == null || buffer.isEmpty()) return; + List batch = new ArrayList<>(FLUSH_THRESHOLD); + Object[] row; + while ((row = buffer.poll()) != null) { + batch.add(row); + } + if (batch.isEmpty()) return; + String sql = "INSERT INTO container_logs (tenant_id, environment_id, app_id, deployment_id, timestamp, stream, message) VALUES (?, ?, ?, ?, ?, ?, ?)"; + try (var conn = clickHouseDataSource.getConnection(); + var ps = conn.prepareStatement(sql)) { + for (Object[] entry : batch) { + ps.setObject(1, entry[0]); // tenant_id + ps.setObject(2, entry[1]); // environment_id + ps.setObject(3, entry[2]); // app_id + ps.setObject(4, entry[3]); // deployment_id + ps.setTimestamp(5, new Timestamp((Long) entry[4])); + ps.setString(6, (String) entry[5]); + ps.setString(7, (String) entry[6]); + ps.addBatch(); + } + ps.executeBatch(); + } catch (Exception e) { + log.error("Failed to flush log batch to ClickHouse ({} entries)", batch.size(), e); + } + } + + public List query(UUID appId, Instant since, Instant until, int limit, String stream) { + if (clickHouseDataSource == null) return List.of(); + StringBuilder sql = new StringBuilder( + "SELECT app_id, deployment_id, timestamp, stream, message FROM container_logs WHERE app_id = ?"); + List params = new ArrayList<>(); + params.add(appId); + if (since != null) { + sql.append(" AND timestamp >= ?"); + params.add(Timestamp.from(since)); + } + if (until != null) { + sql.append(" AND timestamp <= ?"); + params.add(Timestamp.from(until)); + } + if (stream != null && !"both".equalsIgnoreCase(stream)) { + sql.append(" AND stream = ?"); + params.add(stream); + } + sql.append(" ORDER BY timestamp LIMIT ?"); + params.add(limit); + List results = new ArrayList<>(); + try (var conn = clickHouseDataSource.getConnection(); + var ps = conn.prepareStatement(sql.toString())) { + for (int i = 0; i < params.size(); i++) { + ps.setObject(i + 1, params.get(i)); + } + try (var rs = ps.executeQuery()) { + while (rs.next()) { + results.add(new LogEntry( + UUID.fromString(rs.getString("app_id")), + UUID.fromString(rs.getString("deployment_id")), + rs.getTimestamp("timestamp").toInstant(), + rs.getString("stream"), + rs.getString("message") + )); + } + } + } catch (Exception e) { + log.error("Failed to query container logs for appId={}", appId, e); + } + return results; + } +} diff --git a/src/main/java/net/siegeln/cameleer/saas/log/LogController.java b/src/main/java/net/siegeln/cameleer/saas/log/LogController.java new file mode 100644 index 0000000..14f37b4 --- /dev/null +++ b/src/main/java/net/siegeln/cameleer/saas/log/LogController.java @@ -0,0 +1,36 @@ +package net.siegeln.cameleer.saas.log; + +import net.siegeln.cameleer.saas.log.dto.LogEntry; +import org.springframework.format.annotation.DateTimeFormat; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.time.Instant; +import java.util.List; +import java.util.UUID; + +@RestController +@RequestMapping("/api/apps/{appId}/logs") +public class LogController { + + private final ContainerLogService containerLogService; + + public LogController(ContainerLogService containerLogService) { + this.containerLogService = containerLogService; + } + + @GetMapping + public ResponseEntity> query( + @PathVariable UUID appId, + @RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant since, + @RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant until, + @RequestParam(defaultValue = "500") int limit, + @RequestParam(defaultValue = "both") String stream) { + List entries = containerLogService.query(appId, since, until, limit, stream); + return ResponseEntity.ok(entries); + } +} diff --git a/src/main/java/net/siegeln/cameleer/saas/log/dto/LogEntry.java b/src/main/java/net/siegeln/cameleer/saas/log/dto/LogEntry.java new file mode 100644 index 0000000..2c7afd0 --- /dev/null +++ b/src/main/java/net/siegeln/cameleer/saas/log/dto/LogEntry.java @@ -0,0 +1,8 @@ +package net.siegeln.cameleer.saas.log.dto; + +import java.time.Instant; +import java.util.UUID; + +public record LogEntry( + UUID appId, UUID deploymentId, Instant timestamp, String stream, String message +) {} diff --git a/src/test/java/net/siegeln/cameleer/saas/log/ContainerLogServiceTest.java b/src/test/java/net/siegeln/cameleer/saas/log/ContainerLogServiceTest.java new file mode 100644 index 0000000..62c3342 --- /dev/null +++ b/src/test/java/net/siegeln/cameleer/saas/log/ContainerLogServiceTest.java @@ -0,0 +1,20 @@ +package net.siegeln.cameleer.saas.log; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class ContainerLogServiceTest { + + @Test + void buffer_shouldAccumulateEntries() { + var buffer = new ConcurrentLinkedQueue(); + buffer.add("entry1"); + buffer.add("entry2"); + assertEquals(2, buffer.size()); + assertEquals("entry1", buffer.poll()); + assertEquals(1, buffer.size()); + } +}