feat: add application log ingestion with OpenSearch storage
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 59s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped

Agents can now send application log entries in batches via POST /api/v1/data/logs.
Logs are indexed directly into OpenSearch daily indices (logs-{yyyy-MM-dd}) using
the bulk API. Index template defines explicit mappings for full-text search readiness.

New DTOs (LogEntry, LogBatch) added to cameleer3-common in the agent repo.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-25 11:53:27 +01:00
parent bf600f8c5f
commit 7423e2ca14
6 changed files with 242 additions and 3 deletions

View File

@@ -0,0 +1,150 @@
package com.cameleer3.server.app.search;
import com.cameleer3.common.model.LogEntry;
import com.cameleer3.server.core.logging.LogIndexService;
import jakarta.annotation.PostConstruct;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.mapping.Property;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Repository;
import java.io.IOException;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@Repository
public class OpenSearchLogIndex implements LogIndexService {
private static final Logger log = LoggerFactory.getLogger(OpenSearchLogIndex.class);
private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd")
.withZone(ZoneOffset.UTC);
private final OpenSearchClient client;
private final String indexPrefix;
private final int retentionDays;
public OpenSearchLogIndex(OpenSearchClient client,
@Value("${opensearch.log-index-prefix:logs-}") String indexPrefix,
@Value("${opensearch.log-retention-days:7}") int retentionDays) {
this.client = client;
this.indexPrefix = indexPrefix;
this.retentionDays = retentionDays;
}
@PostConstruct
void init() {
ensureIndexTemplate();
ensureIsmPolicy();
}
private void ensureIndexTemplate() {
String templateName = indexPrefix.replace("-", "") + "-template";
String indexPattern = indexPrefix + "*";
try {
boolean exists = client.indices().existsIndexTemplate(
ExistsIndexTemplateRequest.of(b -> b.name(templateName))).value();
if (!exists) {
client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b
.name(templateName)
.indexPatterns(List.of(indexPattern))
.template(t -> t
.settings(s -> s
.numberOfShards("1")
.numberOfReplicas("1"))
.mappings(m -> m
.properties("@timestamp", Property.of(p -> p.date(d -> d)))
.properties("level", Property.of(p -> p.keyword(k -> k)))
.properties("loggerName", Property.of(p -> p.keyword(k -> k)))
.properties("message", Property.of(p -> p.text(tx -> tx)))
.properties("threadName", Property.of(p -> p.keyword(k -> k)))
.properties("stackTrace", Property.of(p -> p.text(tx -> tx)))
.properties("agentId", Property.of(p -> p.keyword(k -> k)))
.properties("application", Property.of(p -> p.keyword(k -> k)))))));
log.info("OpenSearch log index template '{}' created", templateName);
}
} catch (IOException e) {
log.error("Failed to create log index template", e);
}
}
private void ensureIsmPolicy() {
String policyId = "logs-retention";
try {
// Use the low-level REST client to manage ISM policies
var restClient = client._transport();
// Check if the ISM policy exists via a GET; create if not
// ISM is managed via the _plugins/_ism/policies API
// For now, log a reminder — ISM policy should be created via OpenSearch API or dashboard
log.info("Log retention policy: indices matching '{}*' should be deleted after {} days. " +
"Ensure ISM policy '{}' is configured in OpenSearch.", indexPrefix, retentionDays, policyId);
} catch (Exception e) {
log.warn("Could not verify ISM policy for log retention", e);
}
}
@Override
public void indexBatch(String agentId, String application, List<LogEntry> entries) {
if (entries == null || entries.isEmpty()) {
return;
}
try {
BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
for (LogEntry entry : entries) {
String indexName = indexPrefix + DAY_FMT.format(
entry.getTimestamp() != null ? entry.getTimestamp() : java.time.Instant.now());
Map<String, Object> doc = toMap(entry, agentId, application);
bulkBuilder.operations(op -> op
.index(idx -> idx
.index(indexName)
.document(doc)));
}
BulkResponse response = client.bulk(bulkBuilder.build());
if (response.errors()) {
int errorCount = 0;
for (BulkResponseItem item : response.items()) {
if (item.error() != null) {
errorCount++;
if (errorCount == 1) {
log.error("Bulk log index error: {}", item.error().reason());
}
}
}
log.error("Bulk log indexing had {} error(s) out of {} entries", errorCount, entries.size());
} else {
log.debug("Indexed {} log entries for agent={}, app={}", entries.size(), agentId, application);
}
} catch (IOException e) {
log.error("Failed to bulk index {} log entries for agent={}", entries.size(), agentId, e);
}
}
private Map<String, Object> toMap(LogEntry entry, String agentId, String application) {
Map<String, Object> doc = new LinkedHashMap<>();
doc.put("@timestamp", entry.getTimestamp() != null ? entry.getTimestamp().toString() : null);
doc.put("level", entry.getLevel());
doc.put("loggerName", entry.getLoggerName());
doc.put("message", entry.getMessage());
doc.put("threadName", entry.getThreadName());
doc.put("stackTrace", entry.getStackTrace());
doc.put("mdc", entry.getMdc());
doc.put("agentId", agentId);
doc.put("application", application);
return doc;
}
}