feat: add container log service with ClickHouse storage and log API
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,137 @@
|
||||
package net.siegeln.cameleer.saas.log;
|
||||
|
||||
import net.siegeln.cameleer.saas.log.dto.LogEntry;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
@Service
|
||||
public class ContainerLogService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ContainerLogService.class);
|
||||
private static final int FLUSH_THRESHOLD = 100;
|
||||
|
||||
private final DataSource clickHouseDataSource;
|
||||
private final ConcurrentLinkedQueue<Object[]> buffer = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Autowired
|
||||
public ContainerLogService(
|
||||
@Autowired(required = false) @Qualifier("clickHouseDataSource") DataSource clickHouseDataSource) {
|
||||
this.clickHouseDataSource = clickHouseDataSource;
|
||||
if (clickHouseDataSource == null) {
|
||||
log.warn("ClickHouse data source not available — ContainerLogService running in no-op mode");
|
||||
} else {
|
||||
initSchema();
|
||||
}
|
||||
}
|
||||
|
||||
void initSchema() {
|
||||
if (clickHouseDataSource == null) return;
|
||||
try (var conn = clickHouseDataSource.getConnection();
|
||||
var stmt = conn.createStatement()) {
|
||||
stmt.execute("""
|
||||
CREATE TABLE IF NOT EXISTS container_logs (
|
||||
tenant_id UUID,
|
||||
environment_id UUID,
|
||||
app_id UUID,
|
||||
deployment_id UUID,
|
||||
timestamp DateTime64(3),
|
||||
stream String,
|
||||
message String
|
||||
) ENGINE = MergeTree()
|
||||
ORDER BY (tenant_id, environment_id, app_id, timestamp)
|
||||
""");
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to initialize ClickHouse schema", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void write(UUID tenantId, UUID envId, UUID appId, UUID deploymentId,
|
||||
String stream, String message, long timestampMillis) {
|
||||
if (clickHouseDataSource == null) return;
|
||||
buffer.add(new Object[]{tenantId, envId, appId, deploymentId, timestampMillis, stream, message});
|
||||
if (buffer.size() >= FLUSH_THRESHOLD) {
|
||||
flush();
|
||||
}
|
||||
}
|
||||
|
||||
public void flush() {
|
||||
if (clickHouseDataSource == null || buffer.isEmpty()) return;
|
||||
List<Object[]> batch = new ArrayList<>(FLUSH_THRESHOLD);
|
||||
Object[] row;
|
||||
while ((row = buffer.poll()) != null) {
|
||||
batch.add(row);
|
||||
}
|
||||
if (batch.isEmpty()) return;
|
||||
String sql = "INSERT INTO container_logs (tenant_id, environment_id, app_id, deployment_id, timestamp, stream, message) VALUES (?, ?, ?, ?, ?, ?, ?)";
|
||||
try (var conn = clickHouseDataSource.getConnection();
|
||||
var ps = conn.prepareStatement(sql)) {
|
||||
for (Object[] entry : batch) {
|
||||
ps.setObject(1, entry[0]); // tenant_id
|
||||
ps.setObject(2, entry[1]); // environment_id
|
||||
ps.setObject(3, entry[2]); // app_id
|
||||
ps.setObject(4, entry[3]); // deployment_id
|
||||
ps.setTimestamp(5, new Timestamp((Long) entry[4]));
|
||||
ps.setString(6, (String) entry[5]);
|
||||
ps.setString(7, (String) entry[6]);
|
||||
ps.addBatch();
|
||||
}
|
||||
ps.executeBatch();
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to flush log batch to ClickHouse ({} entries)", batch.size(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public List<LogEntry> query(UUID appId, Instant since, Instant until, int limit, String stream) {
|
||||
if (clickHouseDataSource == null) return List.of();
|
||||
StringBuilder sql = new StringBuilder(
|
||||
"SELECT app_id, deployment_id, timestamp, stream, message FROM container_logs WHERE app_id = ?");
|
||||
List<Object> params = new ArrayList<>();
|
||||
params.add(appId);
|
||||
if (since != null) {
|
||||
sql.append(" AND timestamp >= ?");
|
||||
params.add(Timestamp.from(since));
|
||||
}
|
||||
if (until != null) {
|
||||
sql.append(" AND timestamp <= ?");
|
||||
params.add(Timestamp.from(until));
|
||||
}
|
||||
if (stream != null && !"both".equalsIgnoreCase(stream)) {
|
||||
sql.append(" AND stream = ?");
|
||||
params.add(stream);
|
||||
}
|
||||
sql.append(" ORDER BY timestamp LIMIT ?");
|
||||
params.add(limit);
|
||||
List<LogEntry> results = new ArrayList<>();
|
||||
try (var conn = clickHouseDataSource.getConnection();
|
||||
var ps = conn.prepareStatement(sql.toString())) {
|
||||
for (int i = 0; i < params.size(); i++) {
|
||||
ps.setObject(i + 1, params.get(i));
|
||||
}
|
||||
try (var rs = ps.executeQuery()) {
|
||||
while (rs.next()) {
|
||||
results.add(new LogEntry(
|
||||
UUID.fromString(rs.getString("app_id")),
|
||||
UUID.fromString(rs.getString("deployment_id")),
|
||||
rs.getTimestamp("timestamp").toInstant(),
|
||||
rs.getString("stream"),
|
||||
rs.getString("message")
|
||||
));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to query container logs for appId={}", appId, e);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package net.siegeln.cameleer.saas.log;
|
||||
|
||||
import net.siegeln.cameleer.saas.log.dto.LogEntry;
|
||||
import org.springframework.format.annotation.DateTimeFormat;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/api/apps/{appId}/logs")
|
||||
public class LogController {
|
||||
|
||||
private final ContainerLogService containerLogService;
|
||||
|
||||
public LogController(ContainerLogService containerLogService) {
|
||||
this.containerLogService = containerLogService;
|
||||
}
|
||||
|
||||
@GetMapping
|
||||
public ResponseEntity<List<LogEntry>> query(
|
||||
@PathVariable UUID appId,
|
||||
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant since,
|
||||
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant until,
|
||||
@RequestParam(defaultValue = "500") int limit,
|
||||
@RequestParam(defaultValue = "both") String stream) {
|
||||
List<LogEntry> entries = containerLogService.query(appId, since, until, limit, stream);
|
||||
return ResponseEntity.ok(entries);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package net.siegeln.cameleer.saas.log.dto;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.UUID;
|
||||
|
||||
public record LogEntry(
|
||||
UUID appId, UUID deploymentId, Instant timestamp, String stream, String message
|
||||
) {}
|
||||
@@ -0,0 +1,20 @@
|
||||
package net.siegeln.cameleer.saas.log;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
class ContainerLogServiceTest {
|
||||
|
||||
@Test
|
||||
void buffer_shouldAccumulateEntries() {
|
||||
var buffer = new ConcurrentLinkedQueue<String>();
|
||||
buffer.add("entry1");
|
||||
buffer.add("entry2");
|
||||
assertEquals(2, buffer.size());
|
||||
assertEquals("entry1", buffer.poll());
|
||||
assertEquals(1, buffer.size());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user