feat: add Logs tab with cursor-paginated search, level filters, and live tail
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m3s
CI / docker (push) Successful in 1m11s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 49s

- Extend GET /api/v1/logs with cursor pagination, multi-level filtering,
  optional application scoping, and level count aggregation
- Add exchangeId, instanceId, application, mdc fields to log responses
- Refactor ClickHouseLogStore with keyset pagination (N+1 pattern)
- Add LogSearchRequest/LogSearchResponse core domain records
- Create LogSearchPageResponse wrapper DTO
- Add Logs as 4th content tab (Exchanges | Dashboard | Runtime | Logs)
- Implement LogSearch component with debounced search, level filter bar,
  expandable log entries, cursor pagination, and live tail mode
- Add cross-navigation: exchange header → logs, log tab → logs tab
- Update ClickHouseLogStoreIT with cursor, multi-level, cross-app tests

Closes: #104

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-02 08:47:16 +02:00
parent a52751da1b
commit b73f5e6dd4
22 changed files with 1405 additions and 119 deletions

View File

@@ -1,7 +1,9 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.dto.LogEntryResponse;
import com.cameleer3.server.core.storage.LogEntryResult;
import com.cameleer3.server.app.dto.LogSearchPageResponse;
import com.cameleer3.server.core.search.LogSearchRequest;
import com.cameleer3.server.core.search.LogSearchResponse;
import com.cameleer3.server.core.storage.LogIndex;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
@@ -12,6 +14,7 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
@RestController
@@ -27,30 +30,52 @@ public class LogQueryController {
@GetMapping
@Operation(summary = "Search application log entries",
description = "Returns log entries for a given application, optionally filtered by agent, level, time range, and text query")
public ResponseEntity<List<LogEntryResponse>> searchLogs(
@RequestParam String application,
@RequestParam(name = "agentId", required = false) String instanceId,
@RequestParam(required = false) String level,
description = "Returns log entries with cursor-based pagination and level count aggregation. " +
"Supports free-text search, multi-level filtering, and optional application scoping.")
public ResponseEntity<LogSearchPageResponse> searchLogs(
@RequestParam(required = false) String q,
@RequestParam(required = false) String query,
@RequestParam(required = false) String level,
@RequestParam(required = false) String application,
@RequestParam(name = "agentId", required = false) String instanceId,
@RequestParam(required = false) String exchangeId,
@RequestParam(required = false) String logger,
@RequestParam(required = false) String from,
@RequestParam(required = false) String to,
@RequestParam(defaultValue = "200") int limit) {
@RequestParam(required = false) String cursor,
@RequestParam(defaultValue = "100") int limit,
@RequestParam(defaultValue = "desc") String sort) {
limit = Math.min(limit, 1000);
// q takes precedence over deprecated query param
String searchText = q != null ? q : query;
// Parse CSV levels
List<String> levels = List.of();
if (level != null && !level.isEmpty()) {
levels = Arrays.stream(level.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.toList();
}
Instant fromInstant = from != null ? Instant.parse(from) : null;
Instant toInstant = to != null ? Instant.parse(to) : null;
List<LogEntryResult> results = logIndex.search(
application, instanceId, level, query, exchangeId, fromInstant, toInstant, limit);
LogSearchRequest request = new LogSearchRequest(
searchText, levels, application, instanceId, exchangeId,
logger, fromInstant, toInstant, cursor, limit, sort);
List<LogEntryResponse> entries = results.stream()
.map(r -> new LogEntryResponse(r.timestamp(), r.level(), r.loggerName(),
r.message(), r.threadName(), r.stackTrace()))
LogSearchResponse result = logIndex.search(request);
List<LogEntryResponse> entries = result.data().stream()
.map(r -> new LogEntryResponse(
r.timestamp(), r.level(), r.loggerName(),
r.message(), r.threadName(), r.stackTrace(),
r.exchangeId(), r.instanceId(), r.application(),
r.mdc()))
.toList();
return ResponseEntity.ok(entries);
return ResponseEntity.ok(new LogSearchPageResponse(
entries, result.nextCursor(), result.hasMore(), result.levelCounts()));
}
}

View File

@@ -2,12 +2,18 @@ package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.Map;
@Schema(description = "Application log entry")
public record LogEntryResponse(
@Schema(description = "Log timestamp (ISO-8601)") String timestamp,
@Schema(description = "Log level (INFO, WARN, ERROR, DEBUG)") String level,
@Schema(description = "Log level (INFO, WARN, ERROR, DEBUG, TRACE)") String level,
@Schema(description = "Logger name") String loggerName,
@Schema(description = "Log message") String message,
@Schema(description = "Thread name") String threadName,
@Schema(description = "Stack trace (if present)") String stackTrace
@Schema(description = "Stack trace (if present)") String stackTrace,
@Schema(description = "Camel exchange ID (if present)") String exchangeId,
@Schema(description = "Agent instance ID") String instanceId,
@Schema(description = "Application ID") String application,
@Schema(description = "MDC context map") Map<String, String> mdc
) {}

View File

@@ -0,0 +1,14 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.List;
import java.util.Map;
@Schema(description = "Log search response with cursor pagination and level counts")
public record LogSearchPageResponse(
@Schema(description = "Log entries for the current page") List<LogEntryResponse> data,
@Schema(description = "Cursor for next page (null if no more results)") String nextCursor,
@Schema(description = "Whether more results exist beyond this page") boolean hasMore,
@Schema(description = "Count of logs per level (unaffected by level filter)") Map<String, Long> levelCounts
) {}

View File

@@ -1,6 +1,8 @@
package com.cameleer3.server.app.search;
import com.cameleer3.common.model.LogEntry;
import com.cameleer3.server.core.search.LogSearchRequest;
import com.cameleer3.server.core.search.LogSearchResponse;
import com.cameleer3.server.core.storage.LogEntryResult;
import com.cameleer3.server.core.storage.LogIndex;
import org.slf4j.Logger;
@@ -14,6 +16,7 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -55,12 +58,9 @@ public class ClickHouseLogStore implements LogIndex {
ps.setString(7, entry.getThreadName() != null ? entry.getThreadName() : "");
ps.setString(8, entry.getStackTrace() != null ? entry.getStackTrace() : "");
// Extract camel.exchangeId from MDC into top-level column
Map<String, String> mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap();
String exchangeId = mdc.getOrDefault("camel.exchangeId", "");
ps.setString(9, exchangeId);
// ClickHouse JDBC handles java.util.Map natively for Map columns
ps.setObject(10, mdc);
});
@@ -68,62 +68,140 @@ public class ClickHouseLogStore implements LogIndex {
}
@Override
public List<LogEntryResult> search(String applicationId, String instanceId, String level,
String query, String exchangeId,
Instant from, Instant to, int limit) {
StringBuilder sql = new StringBuilder(
"SELECT timestamp, level, logger_name, message, thread_name, stack_trace " +
"FROM logs WHERE tenant_id = 'default' AND application = ?");
List<Object> params = new ArrayList<>();
params.add(applicationId);
public LogSearchResponse search(LogSearchRequest request) {
// Build shared WHERE conditions (used by both data and count queries)
List<String> baseConditions = new ArrayList<>();
List<Object> baseParams = new ArrayList<>();
baseConditions.add("tenant_id = 'default'");
if (instanceId != null && !instanceId.isEmpty()) {
sql.append(" AND instance_id = ?");
params.add(instanceId);
if (request.application() != null && !request.application().isEmpty()) {
baseConditions.add("application = ?");
baseParams.add(request.application());
}
if (level != null && !level.isEmpty()) {
sql.append(" AND level = ?");
params.add(level.toUpperCase());
if (request.instanceId() != null && !request.instanceId().isEmpty()) {
baseConditions.add("instance_id = ?");
baseParams.add(request.instanceId());
}
if (exchangeId != null && !exchangeId.isEmpty()) {
sql.append(" AND (exchange_id = ? OR (mapContains(mdc, 'camel.exchangeId') AND mdc['camel.exchangeId'] = ?))");
params.add(exchangeId);
params.add(exchangeId);
if (request.exchangeId() != null && !request.exchangeId().isEmpty()) {
baseConditions.add("(exchange_id = ? OR (mapContains(mdc, 'camel.exchangeId') AND mdc['camel.exchangeId'] = ?))");
baseParams.add(request.exchangeId());
baseParams.add(request.exchangeId());
}
if (query != null && !query.isEmpty()) {
sql.append(" AND message LIKE ?");
params.add("%" + query + "%");
if (request.q() != null && !request.q().isEmpty()) {
String term = "%" + escapeLike(request.q()) + "%";
baseConditions.add("(message LIKE ? OR stack_trace LIKE ?)");
baseParams.add(term);
baseParams.add(term);
}
if (from != null) {
sql.append(" AND timestamp >= ?");
params.add(Timestamp.from(from));
if (request.logger() != null && !request.logger().isEmpty()) {
baseConditions.add("logger_name LIKE ?");
baseParams.add("%" + escapeLike(request.logger()) + "%");
}
if (to != null) {
sql.append(" AND timestamp <= ?");
params.add(Timestamp.from(to));
if (request.from() != null) {
baseConditions.add("timestamp >= ?");
baseParams.add(Timestamp.from(request.from()));
}
sql.append(" ORDER BY timestamp DESC LIMIT ?");
params.add(limit);
if (request.to() != null) {
baseConditions.add("timestamp <= ?");
baseParams.add(Timestamp.from(request.to()));
}
return jdbc.query(sql.toString(), params.toArray(), (rs, rowNum) -> {
// Level counts query: uses base conditions WITHOUT level filter and cursor
String baseWhere = String.join(" AND ", baseConditions);
Map<String, Long> levelCounts = queryLevelCounts(baseWhere, baseParams);
// Data query conditions: add level filter and cursor on top of base
List<String> dataConditions = new ArrayList<>(baseConditions);
List<Object> dataParams = new ArrayList<>(baseParams);
if (request.levels() != null && !request.levels().isEmpty()) {
String placeholders = String.join(", ", Collections.nCopies(request.levels().size(), "?"));
dataConditions.add("level IN (" + placeholders + ")");
for (String lvl : request.levels()) {
dataParams.add(lvl.toUpperCase());
}
}
if (request.cursor() != null && !request.cursor().isEmpty()) {
Instant cursorTs = Instant.parse(request.cursor());
if ("asc".equalsIgnoreCase(request.sort())) {
dataConditions.add("timestamp > ?");
} else {
dataConditions.add("timestamp < ?");
}
dataParams.add(Timestamp.from(cursorTs));
}
String dataWhere = String.join(" AND ", dataConditions);
String orderDir = "asc".equalsIgnoreCase(request.sort()) ? "ASC" : "DESC";
int fetchLimit = request.limit() + 1; // fetch N+1 to detect hasMore
String dataSql = "SELECT timestamp, level, logger_name, message, thread_name, stack_trace, " +
"exchange_id, instance_id, application, mdc " +
"FROM logs WHERE " + dataWhere +
" ORDER BY timestamp " + orderDir + " LIMIT ?";
dataParams.add(fetchLimit);
List<LogEntryResult> results = jdbc.query(dataSql, dataParams.toArray(), (rs, rowNum) -> {
Timestamp ts = rs.getTimestamp("timestamp");
String timestampStr = ts != null
? ts.toInstant().atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT)
? ts.toInstant().atOffset(ZoneOffset.UTC).format(ISO_FMT)
: null;
@SuppressWarnings("unchecked")
Map<String, String> mdc = (Map<String, String>) rs.getObject("mdc");
if (mdc == null) mdc = Collections.emptyMap();
return new LogEntryResult(
timestampStr,
rs.getString("level"),
rs.getString("logger_name"),
rs.getString("message"),
rs.getString("thread_name"),
rs.getString("stack_trace")
rs.getString("stack_trace"),
rs.getString("exchange_id"),
rs.getString("instance_id"),
rs.getString("application"),
mdc
);
});
boolean hasMore = results.size() > request.limit();
if (hasMore) {
results = new ArrayList<>(results.subList(0, request.limit()));
}
String nextCursor = null;
if (hasMore && !results.isEmpty()) {
nextCursor = results.get(results.size() - 1).timestamp();
}
return new LogSearchResponse(results, nextCursor, hasMore, levelCounts);
}
private Map<String, Long> queryLevelCounts(String baseWhere, List<Object> baseParams) {
String sql = "SELECT level, count() AS cnt FROM logs WHERE " + baseWhere + " GROUP BY level";
Map<String, Long> counts = new LinkedHashMap<>();
try {
jdbc.query(sql, baseParams.toArray(), (rs, rowNum) -> {
counts.put(rs.getString("level"), rs.getLong("cnt"));
return null;
});
} catch (Exception e) {
log.warn("Failed to query level counts", e);
}
return counts;
}
private static String escapeLike(String term) {
return term.replace("\\", "\\\\")
.replace("%", "\\%")
.replace("_", "\\_");
}
}