From c6da858c2f948d848533405649806a8cdc056da7 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Tue, 17 Mar 2026 15:57:18 +0100 Subject: [PATCH] feat: add OpenSearchAdminController with status, pipeline, indices, performance, and delete endpoints Co-Authored-By: Claude Opus 4.6 (1M context) --- .../controller/OpenSearchAdminController.java | 248 ++++++++++++++++++ .../OpenSearchAdminControllerIT.java | 112 ++++++++ 2 files changed, 360 insertions(+) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/OpenSearchAdminController.java create mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenSearchAdminControllerIT.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/OpenSearchAdminController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/OpenSearchAdminController.java new file mode 100644 index 00000000..f4f13ff4 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/OpenSearchAdminController.java @@ -0,0 +1,248 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.dto.IndexInfoResponse; +import com.cameleer3.server.app.dto.IndicesPageResponse; +import com.cameleer3.server.app.dto.OpenSearchStatusResponse; +import com.cameleer3.server.app.dto.PerformanceResponse; +import com.cameleer3.server.app.dto.PipelineStatsResponse; +import com.cameleer3.server.core.admin.AuditCategory; +import com.cameleer3.server.core.admin.AuditResult; +import com.cameleer3.server.core.admin.AuditService; +import com.cameleer3.server.core.indexing.SearchIndexerStats; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.servlet.http.HttpServletRequest; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.RestClient; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.cluster.HealthResponse; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ResponseStatusException; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +@RestController +@RequestMapping("/api/v1/admin/opensearch") +@PreAuthorize("hasRole('ADMIN')") +@Tag(name = "OpenSearch Admin", description = "OpenSearch monitoring and management (ADMIN only)") +public class OpenSearchAdminController { + + private final OpenSearchClient client; + private final RestClient restClient; + private final SearchIndexerStats indexerStats; + private final AuditService auditService; + private final ObjectMapper objectMapper; + private final String opensearchUrl; + + public OpenSearchAdminController(OpenSearchClient client, RestClient restClient, + SearchIndexerStats indexerStats, AuditService auditService, + ObjectMapper objectMapper, + @Value("${opensearch.url:http://localhost:9200}") String opensearchUrl) { + this.client = client; + this.restClient = restClient; + this.indexerStats = indexerStats; + this.auditService = auditService; + this.objectMapper = objectMapper; + this.opensearchUrl = opensearchUrl; + } + + @GetMapping("/status") + @Operation(summary = "Get OpenSearch cluster status and version") + public ResponseEntity getStatus() { + try { + HealthResponse health = client.cluster().health(); + String version = client.info().version().number(); + return ResponseEntity.ok(new OpenSearchStatusResponse( + true, + health.status().name(), + version, + health.numberOfNodes(), + opensearchUrl)); + } catch (Exception e) { + return ResponseEntity.ok(new OpenSearchStatusResponse( + false, "UNREACHABLE", null, 0, opensearchUrl)); + } + } + + @GetMapping("/pipeline") + @Operation(summary = "Get indexing pipeline statistics") + public ResponseEntity getPipeline() { + return ResponseEntity.ok(new PipelineStatsResponse( + indexerStats.getQueueDepth(), + indexerStats.getMaxQueueSize(), + indexerStats.getFailedCount(), + indexerStats.getIndexedCount(), + indexerStats.getDebounceMs(), + indexerStats.getIndexingRate(), + indexerStats.getLastIndexedAt())); + } + + @GetMapping("/indices") + @Operation(summary = "Get OpenSearch indices with pagination") + public ResponseEntity getIndices( + @RequestParam(defaultValue = "0") int page, + @RequestParam(defaultValue = "20") int size, + @RequestParam(defaultValue = "") String search) { + try { + Response response = restClient.performRequest( + new Request("GET", "/_cat/indices?format=json&h=index,health,docs.count,store.size,pri,rep&bytes=b")); + JsonNode indices; + try (InputStream is = response.getEntity().getContent()) { + indices = objectMapper.readTree(is); + } + + List allIndices = new ArrayList<>(); + for (JsonNode idx : indices) { + String name = idx.path("index").asText(""); + if (!search.isEmpty() && !name.contains(search)) { + continue; + } + allIndices.add(new IndexInfoResponse( + name, + parseLong(idx.path("docs.count").asText("0")), + humanSize(parseLong(idx.path("store.size").asText("0"))), + parseLong(idx.path("store.size").asText("0")), + idx.path("health").asText("unknown"), + parseInt(idx.path("pri").asText("0")), + parseInt(idx.path("rep").asText("0")))); + } + + allIndices.sort(Comparator.comparing(IndexInfoResponse::name)); + + long totalDocs = allIndices.stream().mapToLong(IndexInfoResponse::docCount).sum(); + long totalBytes = allIndices.stream().mapToLong(IndexInfoResponse::sizeBytes).sum(); + int totalIndices = allIndices.size(); + int totalPages = Math.max(1, (int) Math.ceil((double) totalIndices / size)); + + int fromIndex = Math.min(page * size, totalIndices); + int toIndex = Math.min(fromIndex + size, totalIndices); + List pageItems = allIndices.subList(fromIndex, toIndex); + + return ResponseEntity.ok(new IndicesPageResponse( + pageItems, totalIndices, totalDocs, + humanSize(totalBytes), page, size, totalPages)); + } catch (Exception e) { + return ResponseEntity.ok(new IndicesPageResponse( + List.of(), 0, 0, "0 B", page, size, 0)); + } + } + + @DeleteMapping("/indices/{name}") + @Operation(summary = "Delete an OpenSearch index") + public ResponseEntity deleteIndex(@PathVariable String name, HttpServletRequest request) { + try { + boolean exists = client.indices().exists(r -> r.index(name)).value(); + if (!exists) { + throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Index not found: " + name); + } + client.indices().delete(r -> r.index(name)); + auditService.log("delete_index", AuditCategory.INFRA, name, null, AuditResult.SUCCESS, request); + return ResponseEntity.ok().build(); + } catch (ResponseStatusException e) { + throw e; + } catch (Exception e) { + throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Failed to delete index: " + e.getMessage()); + } + } + + @GetMapping("/performance") + @Operation(summary = "Get OpenSearch performance metrics") + public ResponseEntity getPerformance() { + try { + Response response = restClient.performRequest( + new Request("GET", "/_nodes/stats/jvm,indices")); + JsonNode root; + try (InputStream is = response.getEntity().getContent()) { + root = objectMapper.readTree(is); + } + + JsonNode nodes = root.path("nodes"); + long heapUsed = 0, heapMax = 0; + long queryCacheHits = 0, queryCacheMisses = 0; + long requestCacheHits = 0, requestCacheMisses = 0; + long searchQueryTotal = 0, searchQueryTimeMs = 0; + long indexTotal = 0, indexTimeMs = 0; + + var it = nodes.fields(); + while (it.hasNext()) { + var entry = it.next(); + JsonNode node = entry.getValue(); + + JsonNode jvm = node.path("jvm").path("mem"); + heapUsed += jvm.path("heap_used_in_bytes").asLong(0); + heapMax += jvm.path("heap_max_in_bytes").asLong(0); + + JsonNode indicesNode = node.path("indices"); + JsonNode queryCache = indicesNode.path("query_cache"); + queryCacheHits += queryCache.path("hit_count").asLong(0); + queryCacheMisses += queryCache.path("miss_count").asLong(0); + + JsonNode requestCache = indicesNode.path("request_cache"); + requestCacheHits += requestCache.path("hit_count").asLong(0); + requestCacheMisses += requestCache.path("miss_count").asLong(0); + + JsonNode searchNode = indicesNode.path("search"); + searchQueryTotal += searchNode.path("query_total").asLong(0); + searchQueryTimeMs += searchNode.path("query_time_in_millis").asLong(0); + + JsonNode indexing = indicesNode.path("indexing"); + indexTotal += indexing.path("index_total").asLong(0); + indexTimeMs += indexing.path("index_time_in_millis").asLong(0); + } + + double queryCacheHitRate = (queryCacheHits + queryCacheMisses) > 0 + ? (double) queryCacheHits / (queryCacheHits + queryCacheMisses) : 0.0; + double requestCacheHitRate = (requestCacheHits + requestCacheMisses) > 0 + ? (double) requestCacheHits / (requestCacheHits + requestCacheMisses) : 0.0; + double searchLatency = searchQueryTotal > 0 + ? (double) searchQueryTimeMs / searchQueryTotal : 0.0; + double indexingLatency = indexTotal > 0 + ? (double) indexTimeMs / indexTotal : 0.0; + + return ResponseEntity.ok(new PerformanceResponse( + queryCacheHitRate, requestCacheHitRate, + searchLatency, indexingLatency, + heapUsed, heapMax)); + } catch (Exception e) { + return ResponseEntity.ok(new PerformanceResponse(0, 0, 0, 0, 0, 0)); + } + } + + private static long parseLong(String s) { + try { + return Long.parseLong(s); + } catch (NumberFormatException e) { + return 0; + } + } + + private static int parseInt(String s) { + try { + return Integer.parseInt(s); + } catch (NumberFormatException e) { + return 0; + } + } + + private static String humanSize(long bytes) { + if (bytes < 1024) return bytes + " B"; + if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0); + if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / (1024.0 * 1024)); + return String.format("%.1f GB", bytes / (1024.0 * 1024 * 1024)); + } +} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenSearchAdminControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenSearchAdminControllerIT.java new file mode 100644 index 00000000..0f5284dc --- /dev/null +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenSearchAdminControllerIT.java @@ -0,0 +1,112 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.AbstractPostgresIT; +import com.cameleer3.server.app.TestSecurityHelper; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; + +import static org.assertj.core.api.Assertions.assertThat; + +class OpenSearchAdminControllerIT extends AbstractPostgresIT { + + @Autowired + private TestRestTemplate restTemplate; + + @Autowired + private ObjectMapper objectMapper; + + @Autowired + private TestSecurityHelper securityHelper; + + private String adminJwt; + private String viewerJwt; + + @BeforeEach + void setUp() { + adminJwt = securityHelper.adminToken(); + viewerJwt = securityHelper.viewerToken(); + } + + @Test + void getStatus_asAdmin_returns200() throws Exception { + ResponseEntity response = restTemplate.exchange( + "/api/v1/admin/opensearch/status", HttpMethod.GET, + new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + JsonNode body = objectMapper.readTree(response.getBody()); + assertThat(body.get("reachable").asBoolean()).isTrue(); + assertThat(body.has("clusterHealth")).isTrue(); + assertThat(body.has("version")).isTrue(); + } + + @Test + void getStatus_asViewer_returns403() { + ResponseEntity response = restTemplate.exchange( + "/api/v1/admin/opensearch/status", HttpMethod.GET, + new HttpEntity<>(securityHelper.authHeadersNoBody(viewerJwt)), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.FORBIDDEN); + } + + @Test + void getPipeline_asAdmin_returns200() throws Exception { + ResponseEntity response = restTemplate.exchange( + "/api/v1/admin/opensearch/pipeline", HttpMethod.GET, + new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + JsonNode body = objectMapper.readTree(response.getBody()); + assertThat(body.has("queueDepth")).isTrue(); + assertThat(body.has("maxQueueSize")).isTrue(); + assertThat(body.has("indexedCount")).isTrue(); + } + + @Test + void getIndices_asAdmin_returns200() throws Exception { + ResponseEntity response = restTemplate.exchange( + "/api/v1/admin/opensearch/indices", HttpMethod.GET, + new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + JsonNode body = objectMapper.readTree(response.getBody()); + assertThat(body.has("indices")).isTrue(); + assertThat(body.has("totalIndices")).isTrue(); + assertThat(body.has("page")).isTrue(); + } + + @Test + void deleteIndex_nonExistent_returns404() { + ResponseEntity response = restTemplate.exchange( + "/api/v1/admin/opensearch/indices/nonexistent-index-xyz", HttpMethod.DELETE, + new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND); + } + + @Test + void getPerformance_asAdmin_returns200() throws Exception { + ResponseEntity response = restTemplate.exchange( + "/api/v1/admin/opensearch/performance", HttpMethod.GET, + new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + JsonNode body = objectMapper.readTree(response.getBody()); + assertThat(body.has("queryCacheHitRate")).isTrue(); + assertThat(body.has("jvmHeapUsedBytes")).isTrue(); + } +}