diff --git a/CLAUDE.md b/CLAUDE.md index 1a4c6a00..020460cf 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -36,9 +36,9 @@ java -jar cameleer3-server-app/target/cameleer3-server-app-1.0-SNAPSHOT.jar - Spring Boot 3.4.3 parent POM - Depends on `com.cameleer3:cameleer3-common` from Gitea Maven registry - Jackson `JavaTimeModule` for `Instant` deserialization -- Communication: receives HTTP POST data from agents, serves SSE event streams for config push/commands +- Communication: receives HTTP POST data from agents (executions, diagrams, metrics, logs), serves SSE event streams for config push/commands - Maintains agent instance registry with states: LIVE → STALE → DEAD -- Storage: PostgreSQL (TimescaleDB) for structured data, OpenSearch for full-text search +- Storage: PostgreSQL (TimescaleDB) for structured data, OpenSearch for full-text search and application log storage - Security: JWT auth with RBAC (AGENT/VIEWER/OPERATOR/ADMIN roles), Ed25519 config signing, bootstrap token for registration - OIDC: Optional external identity provider support (token exchange pattern). Configured via admin API, stored in database (`server_config` table) - User persistence: PostgreSQL `users` table, admin CRUD at `/api/v1/admin/users` diff --git a/HOWTO.md b/HOWTO.md index 54c52557..3f88a6b6 100644 --- a/HOWTO.md +++ b/HOWTO.md @@ -100,7 +100,7 @@ JWTs carry a `roles` claim. Endpoints are restricted by role: | Role | Access | |------|--------| -| `AGENT` | Data ingestion (`/data/**`), heartbeat, SSE events, command ack | +| `AGENT` | Data ingestion (`/data/**` — executions, diagrams, metrics, logs), heartbeat, SSE events, command ack | | `VIEWER` | Search, execution detail, diagrams, agent list | | `OPERATOR` | VIEWER + send commands to agents | | `ADMIN` | OPERATOR + user management (`/admin/**`) | @@ -220,6 +220,20 @@ curl -s -X POST http://localhost:8081/api/v1/data/metrics \ -H "X-Protocol-Version: 1" \ -H "Authorization: Bearer $TOKEN" \ -d '[{"agentId":"agent-1","metricName":"cpu","value":42.0,"timestamp":"2026-03-11T00:00:00Z","tags":{}}]' + +# Post application log entries (batch) +curl -s -X POST http://localhost:8081/api/v1/data/logs \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer $TOKEN" \ + -d '{ + "entries": [{ + "timestamp": "2026-03-25T10:00:00Z", + "level": "INFO", + "loggerName": "com.acme.MyService", + "message": "Processing order #12345", + "threadName": "main" + }] + }' ``` **Note:** The `X-Protocol-Version: 1` header is required on all `/api/v1/data/**` endpoints. Missing or wrong version returns 400. @@ -361,6 +375,8 @@ Key settings in `cameleer3-server-app/src/main/resources/application.yml`: | `security.oidc.client-secret` | | OAuth2 client secret (`CAMELEER_OIDC_CLIENT_SECRET`) | | `security.oidc.roles-claim` | `realm_access.roles` | JSONPath to roles in OIDC id_token (`CAMELEER_OIDC_ROLES_CLAIM`) | | `security.oidc.default-roles` | `VIEWER` | Default roles for new OIDC users (`CAMELEER_OIDC_DEFAULT_ROLES`) | +| `opensearch.log-index-prefix` | `logs-` | OpenSearch index prefix for application logs (`CAMELEER_LOG_INDEX_PREFIX`) | +| `opensearch.log-retention-days` | `7` | Days before log indices are deleted (`CAMELEER_LOG_RETENTION_DAYS`) | ## Web UI Development diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java new file mode 100644 index 00000000..0ff278cc --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java @@ -0,0 +1,61 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.common.model.LogBatch; +import com.cameleer3.server.core.agent.AgentInfo; +import com.cameleer3.server.core.agent.AgentRegistryService; +import com.cameleer3.server.core.logging.LogIndexService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api/v1/data") +@Tag(name = "Ingestion", description = "Data ingestion endpoints") +public class LogIngestionController { + + private static final Logger log = LoggerFactory.getLogger(LogIngestionController.class); + + private final LogIndexService logIndexService; + private final AgentRegistryService registryService; + + public LogIngestionController(LogIndexService logIndexService, + AgentRegistryService registryService) { + this.logIndexService = logIndexService; + this.registryService = registryService; + } + + @PostMapping("/logs") + @Operation(summary = "Ingest application log entries", + description = "Accepts a batch of log entries from an agent. Entries are indexed in OpenSearch.") + @ApiResponse(responseCode = "202", description = "Logs accepted for indexing") + public ResponseEntity ingestLogs(@RequestBody LogBatch batch) { + String agentId = extractAgentId(); + String application = resolveApplicationName(agentId); + + if (batch.getEntries() != null && !batch.getEntries().isEmpty()) { + log.debug("Received {} log entries from agent={}, app={}", batch.getEntries().size(), agentId, application); + logIndexService.indexBatch(agentId, application, batch.getEntries()); + } + + return ResponseEntity.accepted().build(); + } + + private String extractAgentId() { + Authentication auth = SecurityContextHolder.getContext().getAuthentication(); + return auth != null ? auth.getName() : ""; + } + + private String resolveApplicationName(String agentId) { + AgentInfo agent = registryService.findById(agentId); + return agent != null ? agent.application() : ""; + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchLogIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchLogIndex.java new file mode 100644 index 00000000..4177d14d --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchLogIndex.java @@ -0,0 +1,150 @@ +package com.cameleer3.server.app.search; + +import com.cameleer3.common.model.LogEntry; +import com.cameleer3.server.core.logging.LogIndexService; +import jakarta.annotation.PostConstruct; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.mapping.Property; +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.BulkResponse; +import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; +import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Repository; + +import java.io.IOException; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +@Repository +public class OpenSearchLogIndex implements LogIndexService { + + private static final Logger log = LoggerFactory.getLogger(OpenSearchLogIndex.class); + private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd") + .withZone(ZoneOffset.UTC); + + private final OpenSearchClient client; + private final String indexPrefix; + private final int retentionDays; + + public OpenSearchLogIndex(OpenSearchClient client, + @Value("${opensearch.log-index-prefix:logs-}") String indexPrefix, + @Value("${opensearch.log-retention-days:7}") int retentionDays) { + this.client = client; + this.indexPrefix = indexPrefix; + this.retentionDays = retentionDays; + } + + @PostConstruct + void init() { + ensureIndexTemplate(); + ensureIsmPolicy(); + } + + private void ensureIndexTemplate() { + String templateName = indexPrefix.replace("-", "") + "-template"; + String indexPattern = indexPrefix + "*"; + try { + boolean exists = client.indices().existsIndexTemplate( + ExistsIndexTemplateRequest.of(b -> b.name(templateName))).value(); + if (!exists) { + client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b + .name(templateName) + .indexPatterns(List.of(indexPattern)) + .template(t -> t + .settings(s -> s + .numberOfShards("1") + .numberOfReplicas("1")) + .mappings(m -> m + .properties("@timestamp", Property.of(p -> p.date(d -> d))) + .properties("level", Property.of(p -> p.keyword(k -> k))) + .properties("loggerName", Property.of(p -> p.keyword(k -> k))) + .properties("message", Property.of(p -> p.text(tx -> tx))) + .properties("threadName", Property.of(p -> p.keyword(k -> k))) + .properties("stackTrace", Property.of(p -> p.text(tx -> tx))) + .properties("agentId", Property.of(p -> p.keyword(k -> k))) + .properties("application", Property.of(p -> p.keyword(k -> k))))))); + log.info("OpenSearch log index template '{}' created", templateName); + } + } catch (IOException e) { + log.error("Failed to create log index template", e); + } + } + + private void ensureIsmPolicy() { + String policyId = "logs-retention"; + try { + // Use the low-level REST client to manage ISM policies + var restClient = client._transport(); + // Check if the ISM policy exists via a GET; create if not + // ISM is managed via the _plugins/_ism/policies API + // For now, log a reminder — ISM policy should be created via OpenSearch API or dashboard + log.info("Log retention policy: indices matching '{}*' should be deleted after {} days. " + + "Ensure ISM policy '{}' is configured in OpenSearch.", indexPrefix, retentionDays, policyId); + } catch (Exception e) { + log.warn("Could not verify ISM policy for log retention", e); + } + } + + @Override + public void indexBatch(String agentId, String application, List entries) { + if (entries == null || entries.isEmpty()) { + return; + } + + try { + BulkRequest.Builder bulkBuilder = new BulkRequest.Builder(); + + for (LogEntry entry : entries) { + String indexName = indexPrefix + DAY_FMT.format( + entry.getTimestamp() != null ? entry.getTimestamp() : java.time.Instant.now()); + + Map doc = toMap(entry, agentId, application); + + bulkBuilder.operations(op -> op + .index(idx -> idx + .index(indexName) + .document(doc))); + } + + BulkResponse response = client.bulk(bulkBuilder.build()); + + if (response.errors()) { + int errorCount = 0; + for (BulkResponseItem item : response.items()) { + if (item.error() != null) { + errorCount++; + if (errorCount == 1) { + log.error("Bulk log index error: {}", item.error().reason()); + } + } + } + log.error("Bulk log indexing had {} error(s) out of {} entries", errorCount, entries.size()); + } else { + log.debug("Indexed {} log entries for agent={}, app={}", entries.size(), agentId, application); + } + } catch (IOException e) { + log.error("Failed to bulk index {} log entries for agent={}", entries.size(), agentId, e); + } + } + + private Map toMap(LogEntry entry, String agentId, String application) { + Map doc = new LinkedHashMap<>(); + doc.put("@timestamp", entry.getTimestamp() != null ? entry.getTimestamp().toString() : null); + doc.put("level", entry.getLevel()); + doc.put("loggerName", entry.getLoggerName()); + doc.put("message", entry.getMessage()); + doc.put("threadName", entry.getThreadName()); + doc.put("stackTrace", entry.getStackTrace()); + doc.put("mdc", entry.getMdc()); + doc.put("agentId", agentId); + doc.put("application", application); + return doc; + } +} diff --git a/cameleer3-server-app/src/main/resources/application.yml b/cameleer3-server-app/src/main/resources/application.yml index 8653f0bd..f15d93c9 100644 --- a/cameleer3-server-app/src/main/resources/application.yml +++ b/cameleer3-server-app/src/main/resources/application.yml @@ -42,6 +42,8 @@ opensearch: index-prefix: ${CAMELEER_OPENSEARCH_INDEX_PREFIX:executions-} queue-size: ${CAMELEER_OPENSEARCH_QUEUE_SIZE:10000} debounce-ms: ${CAMELEER_OPENSEARCH_DEBOUNCE_MS:2000} + log-index-prefix: ${CAMELEER_LOG_INDEX_PREFIX:logs-} + log-retention-days: ${CAMELEER_LOG_RETENTION_DAYS:7} cameleer: body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/logging/LogIndexService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/logging/LogIndexService.java new file mode 100644 index 00000000..8e689bbb --- /dev/null +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/logging/LogIndexService.java @@ -0,0 +1,10 @@ +package com.cameleer3.server.core.logging; + +import com.cameleer3.common.model.LogEntry; + +import java.util.List; + +public interface LogIndexService { + + void indexBatch(String agentId, String application, List entries); +}