feat: remove OpenSearch, add ClickHouse admin page
Some checks failed
CI / cleanup-branch (push) Has been skipped
CI / build (push) Failing after 33s
CI / docker (push) Has been skipped
CI / deploy (push) Has been skipped
CI / deploy-feature (push) Has been skipped

Remove all OpenSearch code, dependencies, configuration, deployment
manifests, and CI/CD references. Replace the OpenSearch admin page
with a ClickHouse admin page showing cluster status, table sizes,
performance metrics, and indexer pipeline stats.

- Delete 11 OpenSearch Java files (config, search impl, admin controller, DTOs, tests)
- Delete 3 OpenSearch frontend files (admin page, CSS, query hooks)
- Delete deploy/opensearch.yaml K8s manifest
- Remove opensearch Maven dependencies from pom.xml
- Remove opensearch config from application.yml, Dockerfile, docker-compose
- Remove opensearch from CI workflow (secrets, deploy, cleanup steps)
- Simplify ThresholdConfig (remove OpenSearch thresholds, database-only)
- Change default search backend from opensearch to clickhouse
- Add ClickHouseAdminController with /status, /tables, /performance, /pipeline
- Add ClickHouseAdminPage with StatCards, pipeline ProgressBar, tables DataTable
- Update CLAUDE.md, HOWTO.md, and source comments

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-01 18:56:06 +02:00
parent 5ed7d38bf7
commit 283e38a20d
49 changed files with 356 additions and 1753 deletions

View File

@@ -1,28 +0,0 @@
package com.cameleer3.server.app.config;
import org.apache.http.HttpHost;
import org.opensearch.client.RestClient;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class OpenSearchConfig {
@Value("${opensearch.url:http://localhost:9200}")
private String opensearchUrl;
@Bean(destroyMethod = "close")
public RestClient opensearchRestClient() {
return RestClient.builder(HttpHost.create(opensearchUrl)).build();
}
@Bean
public OpenSearchClient openSearchClient(RestClient restClient) {
var transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
return new OpenSearchClient(transport);
}
}

View File

@@ -43,8 +43,8 @@ public class StorageBeanConfig {
@Bean(destroyMethod = "shutdown")
public SearchIndexer searchIndexer(ExecutionStore executionStore, SearchIndex searchIndex,
@Value("${opensearch.debounce-ms:2000}") long debounceMs,
@Value("${opensearch.queue-size:10000}") int queueSize) {
@Value("${cameleer.indexer.debounce-ms:2000}") long debounceMs,
@Value("${cameleer.indexer.queue-size:10000}") int queueSize) {
return new SearchIndexer(executionStore, searchIndex, debounceMs, queueSize);
}
@@ -130,7 +130,7 @@ public class StorageBeanConfig {
}
@Bean
@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse")
@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse", matchIfMissing = true)
public SearchIndex clickHouseSearchIndex(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseSearchIndex(clickHouseJdbc);

View File

@@ -0,0 +1,119 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.dto.ClickHousePerformanceResponse;
import com.cameleer3.server.app.dto.ClickHouseStatusResponse;
import com.cameleer3.server.app.dto.ClickHouseTableInfo;
import com.cameleer3.server.app.dto.IndexerPipelineResponse;
import com.cameleer3.server.core.indexing.SearchIndexerStats;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/api/v1/admin/clickhouse")
@PreAuthorize("hasRole('ADMIN')")
@Tag(name = "ClickHouse Admin", description = "ClickHouse monitoring and diagnostics (ADMIN only)")
public class ClickHouseAdminController {
private final JdbcTemplate clickHouseJdbc;
private final SearchIndexerStats indexerStats;
private final String clickHouseUrl;
public ClickHouseAdminController(
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc,
SearchIndexerStats indexerStats,
@Value("${clickhouse.url:}") String clickHouseUrl) {
this.clickHouseJdbc = clickHouseJdbc;
this.indexerStats = indexerStats;
this.clickHouseUrl = clickHouseUrl;
}
@GetMapping("/status")
@Operation(summary = "ClickHouse cluster status")
public ClickHouseStatusResponse getStatus() {
try {
var row = clickHouseJdbc.queryForMap(
"SELECT version() AS version, formatReadableTimeDelta(uptime()) AS uptime");
return new ClickHouseStatusResponse(true,
(String) row.get("version"),
(String) row.get("uptime"),
clickHouseUrl);
} catch (Exception e) {
return new ClickHouseStatusResponse(false, null, null, clickHouseUrl);
}
}
@GetMapping("/tables")
@Operation(summary = "List ClickHouse tables with sizes")
public List<ClickHouseTableInfo> getTables() {
return clickHouseJdbc.query("""
SELECT t.name, t.engine,
t.total_rows AS row_count,
formatReadableSize(t.total_bytes) AS data_size,
t.total_bytes AS data_size_bytes,
ifNull(p.partition_count, 0) AS partition_count
FROM system.tables t
LEFT JOIN (
SELECT table, countDistinct(partition) AS partition_count
FROM system.parts
WHERE database = currentDatabase() AND active
GROUP BY table
) p ON t.name = p.table
WHERE t.database = currentDatabase()
ORDER BY t.total_bytes DESC NULLS LAST
""",
(rs, rowNum) -> new ClickHouseTableInfo(
rs.getString("name"),
rs.getString("engine"),
rs.getLong("row_count"),
rs.getString("data_size"),
rs.getLong("data_size_bytes"),
rs.getInt("partition_count")));
}
@GetMapping("/performance")
@Operation(summary = "ClickHouse performance metrics")
public ClickHousePerformanceResponse getPerformance() {
try {
long selectQueries = queryEvent("SelectQuery");
long insertQueries = queryEvent("InsertQuery");
long insertedRows = queryEvent("InsertedRows");
long readRows = queryEvent("SelectedRows");
String memoryUsage = clickHouseJdbc.queryForObject(
"SELECT formatReadableSize(value) FROM system.metrics WHERE metric = 'MemoryTracking'",
String.class);
return new ClickHousePerformanceResponse(selectQueries, insertQueries,
memoryUsage != null ? memoryUsage : "0 B", insertedRows, readRows);
} catch (Exception e) {
return new ClickHousePerformanceResponse(0, 0, "N/A", 0, 0);
}
}
@GetMapping("/pipeline")
@Operation(summary = "Search indexer pipeline statistics")
public IndexerPipelineResponse getPipeline() {
return new IndexerPipelineResponse(
indexerStats.getQueueDepth(),
indexerStats.getMaxQueueSize(),
indexerStats.getFailedCount(),
indexerStats.getIndexedCount(),
indexerStats.getDebounceMs(),
indexerStats.getIndexingRate(),
indexerStats.getLastIndexedAt());
}
private long queryEvent(String eventName) {
Long val = clickHouseJdbc.queryForObject(
"SELECT value FROM system.events WHERE event = ?",
Long.class, eventName);
return val != null ? val : 0;
}
}

View File

@@ -35,7 +35,7 @@ public class LogIngestionController {
@PostMapping("/logs")
@Operation(summary = "Ingest application log entries",
description = "Accepts a batch of log entries from an agent. Entries are indexed in OpenSearch.")
description = "Accepts a batch of log entries from an agent. Entries are stored in the configured log store.")
@ApiResponse(responseCode = "202", description = "Logs accepted for indexing")
public ResponseEntity<Void> ingestLogs(@RequestBody LogBatch batch) {
String instanceId = extractAgentId();

View File

@@ -16,7 +16,7 @@ import java.util.List;
@RestController
@RequestMapping("/api/v1/logs")
@Tag(name = "Application Logs", description = "Query application logs stored in OpenSearch")
@Tag(name = "Application Logs", description = "Query application logs")
public class LogQueryController {
private final LogIndex logIndex;

View File

@@ -1,266 +0,0 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.dto.IndexInfoResponse;
import com.cameleer3.server.app.dto.IndicesPageResponse;
import com.cameleer3.server.app.dto.OpenSearchStatusResponse;
import com.cameleer3.server.app.dto.PerformanceResponse;
import com.cameleer3.server.app.dto.PipelineStatsResponse;
import com.cameleer3.server.core.admin.AuditCategory;
import com.cameleer3.server.core.admin.AuditResult;
import com.cameleer3.server.core.admin.AuditService;
import com.cameleer3.server.core.indexing.SearchIndexerStats;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletRequest;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.RestClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.cluster.HealthResponse;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@RestController
@RequestMapping("/api/v1/admin/opensearch")
@PreAuthorize("hasRole('ADMIN')")
@Tag(name = "OpenSearch Admin", description = "OpenSearch monitoring and management (ADMIN only)")
public class OpenSearchAdminController {
private final OpenSearchClient client;
private final RestClient restClient;
private final SearchIndexerStats indexerStats;
private final AuditService auditService;
private final ObjectMapper objectMapper;
private final String opensearchUrl;
private final String indexPrefix;
private final String logIndexPrefix;
public OpenSearchAdminController(OpenSearchClient client, RestClient restClient,
SearchIndexerStats indexerStats, AuditService auditService,
ObjectMapper objectMapper,
@Value("${opensearch.url:http://localhost:9200}") String opensearchUrl,
@Value("${opensearch.index-prefix:executions-}") String indexPrefix,
@Value("${opensearch.log-index-prefix:logs-}") String logIndexPrefix) {
this.client = client;
this.restClient = restClient;
this.indexerStats = indexerStats;
this.auditService = auditService;
this.objectMapper = objectMapper;
this.opensearchUrl = opensearchUrl;
this.indexPrefix = indexPrefix;
this.logIndexPrefix = logIndexPrefix;
}
@GetMapping("/status")
@Operation(summary = "Get OpenSearch cluster status and version")
public ResponseEntity<OpenSearchStatusResponse> getStatus() {
try {
HealthResponse health = client.cluster().health();
String version = client.info().version().number();
return ResponseEntity.ok(new OpenSearchStatusResponse(
true,
health.status().name(),
version,
health.numberOfNodes(),
opensearchUrl));
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(new OpenSearchStatusResponse(
false, "UNREACHABLE", null, 0, opensearchUrl));
}
}
@GetMapping("/pipeline")
@Operation(summary = "Get indexing pipeline statistics")
public ResponseEntity<PipelineStatsResponse> getPipeline() {
return ResponseEntity.ok(new PipelineStatsResponse(
indexerStats.getQueueDepth(),
indexerStats.getMaxQueueSize(),
indexerStats.getFailedCount(),
indexerStats.getIndexedCount(),
indexerStats.getDebounceMs(),
indexerStats.getIndexingRate(),
indexerStats.getLastIndexedAt()));
}
@GetMapping("/indices")
@Operation(summary = "Get OpenSearch indices with pagination")
public ResponseEntity<IndicesPageResponse> getIndices(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size,
@RequestParam(defaultValue = "") String search,
@RequestParam(defaultValue = "executions") String prefix) {
try {
Response response = restClient.performRequest(
new Request("GET", "/_cat/indices?format=json&h=index,health,docs.count,store.size,pri,rep&bytes=b"));
JsonNode indices;
try (InputStream is = response.getEntity().getContent()) {
indices = objectMapper.readTree(is);
}
String filterPrefix = "logs".equals(prefix) ? logIndexPrefix : indexPrefix;
List<IndexInfoResponse> allIndices = new ArrayList<>();
for (JsonNode idx : indices) {
String name = idx.path("index").asText("");
if (!name.startsWith(filterPrefix)) {
continue;
}
if (!search.isEmpty() && !name.contains(search)) {
continue;
}
allIndices.add(new IndexInfoResponse(
name,
parseLong(idx.path("docs.count").asText("0")),
humanSize(parseLong(idx.path("store.size").asText("0"))),
parseLong(idx.path("store.size").asText("0")),
idx.path("health").asText("unknown"),
parseInt(idx.path("pri").asText("0")),
parseInt(idx.path("rep").asText("0"))));
}
allIndices.sort(Comparator.comparing(IndexInfoResponse::name));
long totalDocs = allIndices.stream().mapToLong(IndexInfoResponse::docCount).sum();
long totalBytes = allIndices.stream().mapToLong(IndexInfoResponse::sizeBytes).sum();
int totalIndices = allIndices.size();
int totalPages = Math.max(1, (int) Math.ceil((double) totalIndices / size));
int fromIndex = Math.min(page * size, totalIndices);
int toIndex = Math.min(fromIndex + size, totalIndices);
List<IndexInfoResponse> pageItems = allIndices.subList(fromIndex, toIndex);
return ResponseEntity.ok(new IndicesPageResponse(
pageItems, totalIndices, totalDocs,
humanSize(totalBytes), page, size, totalPages));
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.BAD_GATEWAY)
.body(new IndicesPageResponse(
List.of(), 0, 0, "0 B", page, size, 0));
}
}
@DeleteMapping("/indices/{name}")
@Operation(summary = "Delete an OpenSearch index")
public ResponseEntity<Void> deleteIndex(@PathVariable String name, HttpServletRequest request) {
try {
if (!name.startsWith(indexPrefix) && !name.startsWith(logIndexPrefix)) {
throw new ResponseStatusException(HttpStatus.FORBIDDEN, "Cannot delete index outside application scope");
}
boolean exists = client.indices().exists(r -> r.index(name)).value();
if (!exists) {
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Index not found: " + name);
}
client.indices().delete(r -> r.index(name));
auditService.log("delete_index", AuditCategory.INFRA, name, null, AuditResult.SUCCESS, request);
return ResponseEntity.ok().build();
} catch (ResponseStatusException e) {
throw e;
} catch (Exception e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Failed to delete index: " + e.getMessage());
}
}
@GetMapping("/performance")
@Operation(summary = "Get OpenSearch performance metrics")
public ResponseEntity<PerformanceResponse> getPerformance() {
try {
Response response = restClient.performRequest(
new Request("GET", "/_nodes/stats/jvm,indices"));
JsonNode root;
try (InputStream is = response.getEntity().getContent()) {
root = objectMapper.readTree(is);
}
JsonNode nodes = root.path("nodes");
long heapUsed = 0, heapMax = 0;
long queryCacheHits = 0, queryCacheMisses = 0;
long requestCacheHits = 0, requestCacheMisses = 0;
long searchQueryTotal = 0, searchQueryTimeMs = 0;
long indexTotal = 0, indexTimeMs = 0;
var it = nodes.fields();
while (it.hasNext()) {
var entry = it.next();
JsonNode node = entry.getValue();
JsonNode jvm = node.path("jvm").path("mem");
heapUsed += jvm.path("heap_used_in_bytes").asLong(0);
heapMax += jvm.path("heap_max_in_bytes").asLong(0);
JsonNode indicesNode = node.path("indices");
JsonNode queryCache = indicesNode.path("query_cache");
queryCacheHits += queryCache.path("hit_count").asLong(0);
queryCacheMisses += queryCache.path("miss_count").asLong(0);
JsonNode requestCache = indicesNode.path("request_cache");
requestCacheHits += requestCache.path("hit_count").asLong(0);
requestCacheMisses += requestCache.path("miss_count").asLong(0);
JsonNode searchNode = indicesNode.path("search");
searchQueryTotal += searchNode.path("query_total").asLong(0);
searchQueryTimeMs += searchNode.path("query_time_in_millis").asLong(0);
JsonNode indexing = indicesNode.path("indexing");
indexTotal += indexing.path("index_total").asLong(0);
indexTimeMs += indexing.path("index_time_in_millis").asLong(0);
}
double queryCacheHitRate = (queryCacheHits + queryCacheMisses) > 0
? (double) queryCacheHits / (queryCacheHits + queryCacheMisses) : 0.0;
double requestCacheHitRate = (requestCacheHits + requestCacheMisses) > 0
? (double) requestCacheHits / (requestCacheHits + requestCacheMisses) : 0.0;
double searchLatency = searchQueryTotal > 0
? (double) searchQueryTimeMs / searchQueryTotal : 0.0;
double indexingLatency = indexTotal > 0
? (double) indexTimeMs / indexTotal : 0.0;
return ResponseEntity.ok(new PerformanceResponse(
queryCacheHitRate, requestCacheHitRate,
searchLatency, indexingLatency,
heapUsed, heapMax));
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.BAD_GATEWAY)
.body(new PerformanceResponse(0, 0, 0, 0, 0, 0));
}
}
private static long parseLong(String s) {
try {
return Long.parseLong(s);
} catch (NumberFormatException e) {
return 0;
}
}
private static int parseInt(String s) {
try {
return Integer.parseInt(s);
} catch (NumberFormatException e) {
return 0;
}
}
private static String humanSize(long bytes) {
if (bytes < 1024) return bytes + " B";
if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0);
if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / (1024.0 * 1024));
return String.format("%.1f GB", bytes / (1024.0 * 1024 * 1024));
}
}

View File

@@ -0,0 +1,12 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(description = "ClickHouse performance metrics")
public record ClickHousePerformanceResponse(
long queryCount,
long insertQueryCount,
String memoryUsage,
long insertedRows,
long readRows
) {}

View File

@@ -0,0 +1,11 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(description = "ClickHouse cluster status")
public record ClickHouseStatusResponse(
boolean reachable,
String version,
String uptime,
String host
) {}

View File

@@ -0,0 +1,13 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(description = "ClickHouse table information")
public record ClickHouseTableInfo(
String name,
String engine,
long rowCount,
String dataSize,
long dataSizeBytes,
int partitionCount
) {}

View File

@@ -1,14 +0,0 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(description = "OpenSearch index information")
public record IndexInfoResponse(
@Schema(description = "Index name") String name,
@Schema(description = "Document count") long docCount,
@Schema(description = "Human-readable index size") String size,
@Schema(description = "Index size in bytes") long sizeBytes,
@Schema(description = "Index health status") String health,
@Schema(description = "Number of primary shards") int primaryShards,
@Schema(description = "Number of replica shards") int replicaShards
) {}

View File

@@ -0,0 +1,16 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import java.time.Instant;
@Schema(description = "Search indexer pipeline statistics")
public record IndexerPipelineResponse(
int queueDepth,
int maxQueueSize,
long failedCount,
long indexedCount,
long debounceMs,
double indexingRate,
Instant lastIndexedAt
) {}

View File

@@ -1,16 +0,0 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.List;
@Schema(description = "Paginated list of OpenSearch indices")
public record IndicesPageResponse(
@Schema(description = "Index list for current page") List<IndexInfoResponse> indices,
@Schema(description = "Total number of indices") long totalIndices,
@Schema(description = "Total document count across all indices") long totalDocs,
@Schema(description = "Human-readable total size") String totalSize,
@Schema(description = "Current page number (0-based)") int page,
@Schema(description = "Page size") int pageSize,
@Schema(description = "Total number of pages") int totalPages
) {}

View File

@@ -2,7 +2,7 @@ package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(description = "Application log entry from OpenSearch")
@Schema(description = "Application log entry")
public record LogEntryResponse(
@Schema(description = "Log timestamp (ISO-8601)") String timestamp,
@Schema(description = "Log level (INFO, WARN, ERROR, DEBUG)") String level,

View File

@@ -1,12 +0,0 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(description = "OpenSearch cluster status")
public record OpenSearchStatusResponse(
@Schema(description = "Whether the cluster is reachable") boolean reachable,
@Schema(description = "Cluster health status (GREEN, YELLOW, RED)") String clusterHealth,
@Schema(description = "OpenSearch version") String version,
@Schema(description = "Number of nodes in the cluster") int nodeCount,
@Schema(description = "OpenSearch host") String host
) {}

View File

@@ -1,13 +0,0 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(description = "OpenSearch performance metrics")
public record PerformanceResponse(
@Schema(description = "Query cache hit rate (0.0-1.0)") double queryCacheHitRate,
@Schema(description = "Request cache hit rate (0.0-1.0)") double requestCacheHitRate,
@Schema(description = "Average search latency in milliseconds") double searchLatencyMs,
@Schema(description = "Average indexing latency in milliseconds") double indexingLatencyMs,
@Schema(description = "JVM heap used in bytes") long jvmHeapUsedBytes,
@Schema(description = "JVM heap max in bytes") long jvmHeapMaxBytes
) {}

View File

@@ -1,16 +0,0 @@
package com.cameleer3.server.app.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import java.time.Instant;
@Schema(description = "Search indexing pipeline statistics")
public record PipelineStatsResponse(
@Schema(description = "Current queue depth") int queueDepth,
@Schema(description = "Maximum queue size") int maxQueueSize,
@Schema(description = "Number of failed indexing operations") long failedCount,
@Schema(description = "Number of successfully indexed documents") long indexedCount,
@Schema(description = "Debounce interval in milliseconds") long debounceMs,
@Schema(description = "Current indexing rate (docs/sec)") double indexingRate,
@Schema(description = "Timestamp of last indexed document") Instant lastIndexedAt
) {}

View File

@@ -5,18 +5,15 @@ import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Schema(description = "Threshold configuration for admin monitoring")
public record ThresholdConfigRequest(
@Valid @NotNull DatabaseThresholdsRequest database,
@Valid @NotNull OpenSearchThresholdsRequest opensearch
@Valid @NotNull DatabaseThresholdsRequest database
) {
@Schema(description = "Database monitoring thresholds")
@@ -38,41 +35,6 @@ public record ThresholdConfigRequest(
double queryDurationCritical
) {}
@Schema(description = "OpenSearch monitoring thresholds")
public record OpenSearchThresholdsRequest(
@NotBlank
@Schema(description = "Cluster health warning threshold (GREEN, YELLOW, RED)")
String clusterHealthWarning,
@NotBlank
@Schema(description = "Cluster health critical threshold (GREEN, YELLOW, RED)")
String clusterHealthCritical,
@Min(0)
@Schema(description = "Queue depth warning threshold")
int queueDepthWarning,
@Min(0)
@Schema(description = "Queue depth critical threshold")
int queueDepthCritical,
@Min(0) @Max(100)
@Schema(description = "JVM heap usage warning threshold (percentage)")
int jvmHeapWarning,
@Min(0) @Max(100)
@Schema(description = "JVM heap usage critical threshold (percentage)")
int jvmHeapCritical,
@Min(0)
@Schema(description = "Failed document count warning threshold")
int failedDocsWarning,
@Min(0)
@Schema(description = "Failed document count critical threshold")
int failedDocsCritical
) {}
/** Convert to core domain model */
public ThresholdConfig toConfig() {
return new ThresholdConfig(
@@ -81,16 +43,6 @@ public record ThresholdConfigRequest(
database.connectionPoolCritical(),
database.queryDurationWarning(),
database.queryDurationCritical()
),
new ThresholdConfig.OpenSearchThresholds(
opensearch.clusterHealthWarning(),
opensearch.clusterHealthCritical(),
opensearch.queueDepthWarning(),
opensearch.queueDepthCritical(),
opensearch.jvmHeapWarning(),
opensearch.jvmHeapCritical(),
opensearch.failedDocsWarning(),
opensearch.failedDocsCritical()
)
);
}
@@ -108,37 +60,6 @@ public record ThresholdConfigRequest(
}
}
if (opensearch != null) {
if (opensearch.queueDepthWarning() > opensearch.queueDepthCritical()) {
errors.add("opensearch.queueDepthWarning must be <= queueDepthCritical");
}
if (opensearch.jvmHeapWarning() > opensearch.jvmHeapCritical()) {
errors.add("opensearch.jvmHeapWarning must be <= jvmHeapCritical");
}
if (opensearch.failedDocsWarning() > opensearch.failedDocsCritical()) {
errors.add("opensearch.failedDocsWarning must be <= failedDocsCritical");
}
// Validate health severity ordering: GREEN < YELLOW < RED
int warningSeverity = healthSeverity(opensearch.clusterHealthWarning());
int criticalSeverity = healthSeverity(opensearch.clusterHealthCritical());
if (warningSeverity < 0) {
errors.add("opensearch.clusterHealthWarning must be GREEN, YELLOW, or RED");
}
if (criticalSeverity < 0) {
errors.add("opensearch.clusterHealthCritical must be GREEN, YELLOW, or RED");
}
if (warningSeverity >= 0 && criticalSeverity >= 0 && warningSeverity > criticalSeverity) {
errors.add("opensearch.clusterHealthWarning severity must be <= clusterHealthCritical (GREEN < YELLOW < RED)");
}
}
return errors;
}
private static final Map<String, Integer> HEALTH_SEVERITY =
Map.of("GREEN", 0, "YELLOW", 1, "RED", 2);
private static int healthSeverity(String health) {
return HEALTH_SEVERITY.getOrDefault(health != null ? health.toUpperCase() : "", -1);
}
}

View File

@@ -43,6 +43,4 @@ public class RetentionScheduler {
log.error("Retention job failed", e);
}
}
// Note: OpenSearch daily index deletion should be handled via ILM policy
// configured at deployment time, not in application code.
}

View File

@@ -1,435 +0,0 @@
package com.cameleer3.server.app.search;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.SortOrder;
import org.opensearch.client.opensearch._types.query_dsl.*;
import org.opensearch.client.opensearch.core.*;
import org.opensearch.client.opensearch.core.search.Hit;
import org.opensearch.client.opensearch.indices.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Repository;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@Repository
@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "opensearch", matchIfMissing = true)
public class OpenSearchIndex implements SearchIndex {
private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class);
private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd")
.withZone(ZoneOffset.UTC);
private static final ObjectMapper JSON = new ObjectMapper();
private static final TypeReference<Map<String, String>> STR_MAP = new TypeReference<>() {};
private final OpenSearchClient client;
private final String indexPrefix;
public OpenSearchIndex(OpenSearchClient client,
@Value("${opensearch.index-prefix:executions-}") String indexPrefix) {
this.client = client;
this.indexPrefix = indexPrefix;
}
@PostConstruct
void ensureIndexTemplate() {
String templateName = indexPrefix + "template";
String indexPattern = indexPrefix + "*";
try {
boolean exists = client.indices().existsIndexTemplate(
ExistsIndexTemplateRequest.of(b -> b.name(templateName))).value();
if (!exists) {
client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b
.name(templateName)
.indexPatterns(List.of(indexPattern))
.template(t -> t
.settings(s -> s
.numberOfShards("3")
.numberOfReplicas("1"))
.mappings(m -> m
.properties("processors", p -> p
.nested(n -> n))))));
log.info("OpenSearch index template created");
}
} catch (IOException e) {
log.error("Failed to create index template", e);
}
}
@Override
public void index(ExecutionDocument doc) {
String indexName = indexPrefix + DAY_FMT.format(doc.startTime());
try {
client.index(IndexRequest.of(b -> b
.index(indexName)
.id(doc.executionId())
.document(toMap(doc))));
} catch (IOException e) {
log.error("Failed to index execution {}", doc.executionId(), e);
}
}
@Override
public SearchResult<ExecutionSummary> search(SearchRequest request) {
try {
var searchReq = buildSearchRequest(request, request.limit());
var response = client.search(searchReq, Map.class);
List<ExecutionSummary> items = response.hits().hits().stream()
.map(this::hitToSummary)
.collect(Collectors.toList());
long total = response.hits().total() != null ? response.hits().total().value() : 0;
return new SearchResult<>(items, total, request.offset(), request.limit());
} catch (IOException e) {
log.error("Search failed", e);
return SearchResult.empty(request.offset(), request.limit());
}
}
@Override
public long count(SearchRequest request) {
try {
var countReq = CountRequest.of(b -> b
.index(indexPrefix + "*")
.query(buildQuery(request)));
return client.count(countReq).count();
} catch (IOException e) {
log.error("Count failed", e);
return 0;
}
}
@Override
public void delete(String executionId) {
try {
client.deleteByQuery(DeleteByQueryRequest.of(b -> b
.index(List.of(indexPrefix + "*"))
.query(Query.of(q -> q.term(t -> t
.field("execution_id")
.value(FieldValue.of(executionId)))))));
} catch (IOException e) {
log.error("Failed to delete execution {}", executionId, e);
}
}
private static final List<String> HIGHLIGHT_FIELDS = List.of(
"error_message", "attributes_text",
"processors.input_body", "processors.output_body",
"processors.input_headers", "processors.output_headers",
"processors.attributes_text");
private org.opensearch.client.opensearch.core.SearchRequest buildSearchRequest(
SearchRequest request, int size) {
return org.opensearch.client.opensearch.core.SearchRequest.of(b -> {
b.index(indexPrefix + "*")
.query(buildQuery(request))
.trackTotalHits(th -> th.enabled(true))
.size(size)
.from(request.offset())
.sort(s -> s.field(f -> f
.field(request.sortColumn())
.order("asc".equalsIgnoreCase(request.sortDir())
? SortOrder.Asc : SortOrder.Desc)));
// Add highlight when full-text search is active
if (request.text() != null && !request.text().isBlank()) {
b.highlight(h -> {
for (String field : HIGHLIGHT_FIELDS) {
h.fields(field, hf -> hf
.fragmentSize(120)
.numberOfFragments(1));
}
return h;
});
}
return b;
});
}
private Query buildQuery(SearchRequest request) {
List<Query> must = new ArrayList<>();
List<Query> filter = new ArrayList<>();
// Time range
if (request.timeFrom() != null || request.timeTo() != null) {
filter.add(Query.of(q -> q.range(r -> {
r.field("start_time");
if (request.timeFrom() != null)
r.gte(JsonData.of(request.timeFrom().toString()));
if (request.timeTo() != null)
r.lte(JsonData.of(request.timeTo().toString()));
return r;
})));
}
// Keyword filters (use .keyword sub-field for exact matching on dynamically mapped text fields)
if (request.status() != null && !request.status().isBlank()) {
String[] statuses = request.status().split(",");
if (statuses.length == 1) {
filter.add(termQuery("status.keyword", statuses[0].trim()));
} else {
filter.add(Query.of(q -> q.terms(t -> t
.field("status.keyword")
.terms(tv -> tv.value(
java.util.Arrays.stream(statuses)
.map(String::trim)
.map(FieldValue::of)
.toList())))));
}
}
if (request.routeId() != null)
filter.add(termQuery("route_id.keyword", request.routeId()));
if (request.instanceId() != null)
filter.add(termQuery("instance_id.keyword", request.instanceId()));
if (request.correlationId() != null)
filter.add(termQuery("correlation_id.keyword", request.correlationId()));
if (request.applicationId() != null && !request.applicationId().isBlank())
filter.add(termQuery("application_id.keyword", request.applicationId()));
// Full-text search across all fields + nested processor fields
if (request.text() != null && !request.text().isBlank()) {
String text = request.text();
String wildcard = "*" + text.toLowerCase() + "*";
List<Query> textQueries = new ArrayList<>();
// Search top-level text fields (analyzed match + wildcard for substring)
textQueries.add(Query.of(q -> q.multiMatch(m -> m
.query(text)
.fields("error_message", "error_stacktrace", "attributes_text"))));
textQueries.add(Query.of(q -> q.wildcard(w -> w
.field("error_message").value(wildcard).caseInsensitive(true))));
textQueries.add(Query.of(q -> q.wildcard(w -> w
.field("error_stacktrace").value(wildcard).caseInsensitive(true))));
textQueries.add(Query.of(q -> q.wildcard(w -> w
.field("attributes_text").value(wildcard).caseInsensitive(true))));
// Search nested processor fields (analyzed match + wildcard)
textQueries.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.multiMatch(m -> m
.query(text)
.fields("processors.input_body", "processors.output_body",
"processors.input_headers", "processors.output_headers",
"processors.error_message", "processors.error_stacktrace",
"processors.attributes_text"))))));
textQueries.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.bool(nb -> nb.should(
wildcardQuery("processors.input_body", wildcard),
wildcardQuery("processors.output_body", wildcard),
wildcardQuery("processors.input_headers", wildcard),
wildcardQuery("processors.output_headers", wildcard),
wildcardQuery("processors.attributes_text", wildcard)
).minimumShouldMatch("1"))))));
// Also try keyword fields for exact matches
textQueries.add(Query.of(q -> q.multiMatch(m -> m
.query(text)
.fields("execution_id", "route_id", "instance_id", "correlation_id", "exchange_id"))));
must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1"))));
}
// Scoped text searches (multiMatch + wildcard fallback for substring matching)
if (request.textInBody() != null && !request.textInBody().isBlank()) {
String bodyText = request.textInBody();
String bodyWildcard = "*" + bodyText.toLowerCase() + "*";
must.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.bool(nb -> nb.should(
Query.of(mq -> mq.multiMatch(m -> m
.query(bodyText)
.fields("processors.input_body", "processors.output_body"))),
wildcardQuery("processors.input_body", bodyWildcard),
wildcardQuery("processors.output_body", bodyWildcard)
).minimumShouldMatch("1"))))));
}
if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) {
String headerText = request.textInHeaders();
String headerWildcard = "*" + headerText.toLowerCase() + "*";
must.add(Query.of(q -> q.nested(n -> n
.path("processors")
.query(nq -> nq.bool(nb -> nb.should(
Query.of(mq -> mq.multiMatch(m -> m
.query(headerText)
.fields("processors.input_headers", "processors.output_headers"))),
wildcardQuery("processors.input_headers", headerWildcard),
wildcardQuery("processors.output_headers", headerWildcard)
).minimumShouldMatch("1"))))));
}
if (request.textInErrors() != null && !request.textInErrors().isBlank()) {
String errText = request.textInErrors();
String errWildcard = "*" + errText.toLowerCase() + "*";
must.add(Query.of(q -> q.bool(b -> b.should(
Query.of(sq -> sq.multiMatch(m -> m
.query(errText)
.fields("error_message", "error_stacktrace"))),
wildcardQuery("error_message", errWildcard),
wildcardQuery("error_stacktrace", errWildcard),
Query.of(sq -> sq.nested(n -> n
.path("processors")
.query(nq -> nq.bool(nb -> nb.should(
Query.of(nmq -> nmq.multiMatch(m -> m
.query(errText)
.fields("processors.error_message", "processors.error_stacktrace"))),
wildcardQuery("processors.error_message", errWildcard),
wildcardQuery("processors.error_stacktrace", errWildcard)
).minimumShouldMatch("1")))))
).minimumShouldMatch("1"))));
}
// Duration range
if (request.durationMin() != null || request.durationMax() != null) {
filter.add(Query.of(q -> q.range(r -> {
r.field("duration_ms");
if (request.durationMin() != null)
r.gte(JsonData.of(request.durationMin()));
if (request.durationMax() != null)
r.lte(JsonData.of(request.durationMax()));
return r;
})));
}
return Query.of(q -> q.bool(b -> {
if (!must.isEmpty()) b.must(must);
if (!filter.isEmpty()) b.filter(filter);
if (must.isEmpty() && filter.isEmpty()) b.must(Query.of(mq -> mq.matchAll(m -> m)));
return b;
}));
}
private Query termQuery(String field, String value) {
return Query.of(q -> q.term(t -> t.field(field).value(FieldValue.of(value))));
}
private Query wildcardQuery(String field, String pattern) {
return Query.of(q -> q.wildcard(w -> w.field(field).value(pattern).caseInsensitive(true)));
}
private Map<String, Object> toMap(ExecutionDocument doc) {
Map<String, Object> map = new LinkedHashMap<>();
map.put("execution_id", doc.executionId());
map.put("route_id", doc.routeId());
map.put("instance_id", doc.instanceId());
map.put("application_id", doc.applicationId());
map.put("status", doc.status());
map.put("correlation_id", doc.correlationId());
map.put("exchange_id", doc.exchangeId());
map.put("start_time", doc.startTime() != null ? doc.startTime().toString() : null);
map.put("end_time", doc.endTime() != null ? doc.endTime().toString() : null);
map.put("duration_ms", doc.durationMs());
map.put("error_message", doc.errorMessage());
map.put("error_stacktrace", doc.errorStacktrace());
if (doc.attributes() != null) {
Map<String, String> attrs = parseAttributesJson(doc.attributes());
map.put("attributes", attrs);
map.put("attributes_text", flattenAttributes(attrs));
}
if (doc.processors() != null) {
map.put("processors", doc.processors().stream().map(p -> {
Map<String, Object> pm = new LinkedHashMap<>();
pm.put("processor_id", p.processorId());
pm.put("processor_type", p.processorType());
pm.put("status", p.status());
pm.put("error_message", p.errorMessage());
pm.put("error_stacktrace", p.errorStacktrace());
pm.put("input_body", p.inputBody());
pm.put("output_body", p.outputBody());
pm.put("input_headers", p.inputHeaders());
pm.put("output_headers", p.outputHeaders());
if (p.attributes() != null) {
Map<String, String> pAttrs = parseAttributesJson(p.attributes());
pm.put("attributes", pAttrs);
pm.put("attributes_text", flattenAttributes(pAttrs));
}
return pm;
}).toList());
}
map.put("has_trace_data", doc.hasTraceData());
map.put("is_replay", doc.isReplay());
return map;
}
@SuppressWarnings("unchecked")
private ExecutionSummary hitToSummary(Hit<Map> hit) {
Map<String, Object> src = hit.source();
if (src == null) return null;
@SuppressWarnings("unchecked")
Map<String, String> attributes = src.get("attributes") instanceof Map
? new LinkedHashMap<>((Map<String, String>) src.get("attributes")) : null;
// Merge processor-level attributes (execution-level takes precedence)
if (src.get("processors") instanceof List<?> procs) {
for (Object pObj : procs) {
if (pObj instanceof Map<?, ?> pm && pm.get("attributes") instanceof Map<?, ?> pa) {
if (attributes == null) attributes = new LinkedHashMap<>();
for (var entry : pa.entrySet()) {
attributes.putIfAbsent(
String.valueOf(entry.getKey()),
String.valueOf(entry.getValue()));
}
}
}
}
return new ExecutionSummary(
(String) src.get("execution_id"),
(String) src.get("route_id"),
(String) src.get("instance_id"),
(String) src.get("application_id"),
(String) src.get("status"),
src.get("start_time") != null ? Instant.parse((String) src.get("start_time")) : null,
src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null,
src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L,
(String) src.get("correlation_id"),
(String) src.get("error_message"),
null, // diagramContentHash not stored in index
extractHighlight(hit),
attributes,
Boolean.TRUE.equals(src.get("has_trace_data")),
Boolean.TRUE.equals(src.get("is_replay"))
);
}
private String extractHighlight(Hit<Map> hit) {
if (hit.highlight() == null || hit.highlight().isEmpty()) return null;
for (List<String> fragments : hit.highlight().values()) {
if (fragments != null && !fragments.isEmpty()) {
return fragments.get(0);
}
}
return null;
}
private static Map<String, String> parseAttributesJson(String json) {
if (json == null || json.isBlank()) return null;
try {
return JSON.readValue(json, STR_MAP);
} catch (Exception e) {
return null;
}
}
private static String flattenAttributes(Map<String, String> attrs) {
if (attrs == null || attrs.isEmpty()) return "";
return attrs.entrySet().stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining(" "));
}
}

View File

@@ -1,228 +0,0 @@
package com.cameleer3.server.app.search;
import com.cameleer3.common.model.LogEntry;
import com.cameleer3.server.core.storage.LogEntryResult;
import com.cameleer3.server.core.storage.LogIndex;
import jakarta.annotation.PostConstruct;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.SortOrder;
import org.opensearch.client.opensearch._types.mapping.Property;
import org.opensearch.client.opensearch._types.query_dsl.BoolQuery;
import org.opensearch.client.opensearch._types.query_dsl.Query;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Repository;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@Repository
@ConditionalOnProperty(name = "cameleer.storage.logs", havingValue = "opensearch")
public class OpenSearchLogIndex implements LogIndex {
private static final Logger log = LoggerFactory.getLogger(OpenSearchLogIndex.class);
private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd")
.withZone(ZoneOffset.UTC);
private final OpenSearchClient client;
private final String indexPrefix;
private final int retentionDays;
public OpenSearchLogIndex(OpenSearchClient client,
@Value("${opensearch.log-index-prefix:logs-}") String indexPrefix,
@Value("${opensearch.log-retention-days:7}") int retentionDays) {
this.client = client;
this.indexPrefix = indexPrefix;
this.retentionDays = retentionDays;
}
@PostConstruct
void init() {
ensureIndexTemplate();
ensureIsmPolicy();
}
private void ensureIndexTemplate() {
String templateName = indexPrefix.replace("-", "") + "-template";
String indexPattern = indexPrefix + "*";
try {
boolean exists = client.indices().existsIndexTemplate(
ExistsIndexTemplateRequest.of(b -> b.name(templateName))).value();
if (!exists) {
client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b
.name(templateName)
.indexPatterns(List.of(indexPattern))
.template(t -> t
.settings(s -> s
.numberOfShards("1")
.numberOfReplicas("1"))
.mappings(m -> m
.properties("@timestamp", Property.of(p -> p.date(d -> d)))
.properties("level", Property.of(p -> p.keyword(k -> k)))
.properties("loggerName", Property.of(p -> p.keyword(k -> k)))
.properties("message", Property.of(p -> p.text(tx -> tx)))
.properties("threadName", Property.of(p -> p.keyword(k -> k)))
.properties("stackTrace", Property.of(p -> p.text(tx -> tx)))
.properties("instanceId", Property.of(p -> p.keyword(k -> k)))
.properties("applicationId", Property.of(p -> p.keyword(k -> k)))
.properties("exchangeId", Property.of(p -> p.keyword(k -> k)))))));
log.info("OpenSearch log index template '{}' created", templateName);
}
} catch (IOException e) {
log.error("Failed to create log index template", e);
}
}
private void ensureIsmPolicy() {
String policyId = "logs-retention";
try {
// Use the low-level REST client to manage ISM policies
var restClient = client._transport();
// Check if the ISM policy exists via a GET; create if not
// ISM is managed via the _plugins/_ism/policies API
// For now, log a reminder — ISM policy should be created via OpenSearch API or dashboard
log.info("Log retention policy: indices matching '{}*' should be deleted after {} days. " +
"Ensure ISM policy '{}' is configured in OpenSearch.", indexPrefix, retentionDays, policyId);
} catch (Exception e) {
log.warn("Could not verify ISM policy for log retention", e);
}
}
@Override
public List<LogEntryResult> search(String applicationId, String instanceId, String level,
String query, String exchangeId,
Instant from, Instant to, int limit) {
try {
BoolQuery.Builder bool = new BoolQuery.Builder();
bool.must(Query.of(q -> q.term(t -> t.field("applicationId").value(FieldValue.of(applicationId)))));
if (instanceId != null && !instanceId.isEmpty()) {
bool.must(Query.of(q -> q.term(t -> t.field("instanceId").value(FieldValue.of(instanceId)))));
}
if (exchangeId != null && !exchangeId.isEmpty()) {
// Match on top-level field (new records) or MDC nested field (old records)
bool.must(Query.of(q -> q.bool(b -> b
.should(Query.of(s -> s.term(t -> t.field("exchangeId.keyword").value(FieldValue.of(exchangeId)))))
.should(Query.of(s -> s.term(t -> t.field("mdc.camel.exchangeId.keyword").value(FieldValue.of(exchangeId)))))
.minimumShouldMatch("1"))));
}
if (level != null && !level.isEmpty()) {
bool.must(Query.of(q -> q.term(t -> t.field("level").value(FieldValue.of(level.toUpperCase())))));
}
if (query != null && !query.isEmpty()) {
bool.must(Query.of(q -> q.match(m -> m.field("message").query(FieldValue.of(query)))));
}
if (from != null || to != null) {
bool.must(Query.of(q -> q.range(r -> {
r.field("@timestamp");
if (from != null) r.gte(JsonData.of(from.toString()));
if (to != null) r.lte(JsonData.of(to.toString()));
return r;
})));
}
var response = client.search(s -> s
.index(indexPrefix + "*")
.query(Query.of(q -> q.bool(bool.build())))
.sort(so -> so.field(f -> f.field("@timestamp").order(SortOrder.Desc)))
.size(limit), Map.class);
List<LogEntryResult> results = new ArrayList<>();
for (var hit : response.hits().hits()) {
@SuppressWarnings("unchecked")
Map<String, Object> src = (Map<String, Object>) hit.source();
if (src == null) continue;
results.add(new LogEntryResult(
str(src, "@timestamp"),
str(src, "level"),
str(src, "loggerName"),
str(src, "message"),
str(src, "threadName"),
str(src, "stackTrace")));
}
return results;
} catch (IOException e) {
log.error("Failed to search log entries for application={}", applicationId, e);
return List.of();
}
}
private static String str(Map<String, Object> map, String key) {
Object v = map.get(key);
return v != null ? v.toString() : null;
}
@Override
public void indexBatch(String instanceId, String applicationId, List<LogEntry> entries) {
if (entries == null || entries.isEmpty()) {
return;
}
try {
BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
for (LogEntry entry : entries) {
String indexName = indexPrefix + DAY_FMT.format(
entry.getTimestamp() != null ? entry.getTimestamp() : java.time.Instant.now());
Map<String, Object> doc = toMap(entry, instanceId, applicationId);
bulkBuilder.operations(op -> op
.index(idx -> idx
.index(indexName)
.document(doc)));
}
BulkResponse response = client.bulk(bulkBuilder.build());
if (response.errors()) {
int errorCount = 0;
for (BulkResponseItem item : response.items()) {
if (item.error() != null) {
errorCount++;
if (errorCount == 1) {
log.error("Bulk log index error: {}", item.error().reason());
}
}
}
log.error("Bulk log indexing had {} error(s) out of {} entries", errorCount, entries.size());
} else {
log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId);
}
} catch (IOException e) {
log.error("Failed to bulk index {} log entries for instance={}", entries.size(), instanceId, e);
}
}
private Map<String, Object> toMap(LogEntry entry, String instanceId, String applicationId) {
Map<String, Object> doc = new LinkedHashMap<>();
doc.put("@timestamp", entry.getTimestamp() != null ? entry.getTimestamp().toString() : null);
doc.put("level", entry.getLevel());
doc.put("loggerName", entry.getLoggerName());
doc.put("message", entry.getMessage());
doc.put("threadName", entry.getThreadName());
doc.put("stackTrace", entry.getStackTrace());
doc.put("mdc", entry.getMdc());
doc.put("instanceId", instanceId);
doc.put("applicationId", applicationId);
if (entry.getMdc() != null) {
String exId = entry.getMdc().get("camel.exchangeId");
if (exId != null) doc.put("exchangeId", exId);
}
return doc;
}
}

View File

@@ -37,20 +37,15 @@ ingestion:
batch-size: 5000
flush-interval-ms: 1000
opensearch:
url: ${OPENSEARCH_URL:http://localhost:9200}
index-prefix: ${CAMELEER_OPENSEARCH_INDEX_PREFIX:executions-}
queue-size: ${CAMELEER_OPENSEARCH_QUEUE_SIZE:10000}
debounce-ms: ${CAMELEER_OPENSEARCH_DEBOUNCE_MS:2000}
log-index-prefix: ${CAMELEER_LOG_INDEX_PREFIX:logs-}
log-retention-days: ${CAMELEER_LOG_RETENTION_DAYS:7}
cameleer:
body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384}
indexer:
debounce-ms: ${CAMELEER_INDEXER_DEBOUNCE_MS:2000}
queue-size: ${CAMELEER_INDEXER_QUEUE_SIZE:10000}
retention-days: ${CAMELEER_RETENTION_DAYS:30}
storage:
metrics: ${CAMELEER_STORAGE_METRICS:postgres}
search: ${CAMELEER_STORAGE_SEARCH:opensearch}
search: ${CAMELEER_STORAGE_SEARCH:clickhouse}
stats: ${CAMELEER_STORAGE_STATS:clickhouse}
diagrams: ${CAMELEER_STORAGE_DIAGRAMS:clickhouse}
events: ${CAMELEER_STORAGE_EVENTS:clickhouse}