feat: add OpenSearchAdminController with status, pipeline, indices, performance, and delete endpoints
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,248 @@
|
||||
package com.cameleer3.server.app.controller;
|
||||
|
||||
import com.cameleer3.server.app.dto.IndexInfoResponse;
|
||||
import com.cameleer3.server.app.dto.IndicesPageResponse;
|
||||
import com.cameleer3.server.app.dto.OpenSearchStatusResponse;
|
||||
import com.cameleer3.server.app.dto.PerformanceResponse;
|
||||
import com.cameleer3.server.app.dto.PipelineStatsResponse;
|
||||
import com.cameleer3.server.core.admin.AuditCategory;
|
||||
import com.cameleer3.server.core.admin.AuditResult;
|
||||
import com.cameleer3.server.core.admin.AuditService;
|
||||
import com.cameleer3.server.core.indexing.SearchIndexerStats;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import org.opensearch.client.Request;
|
||||
import org.opensearch.client.Response;
|
||||
import org.opensearch.client.RestClient;
|
||||
import org.opensearch.client.opensearch.OpenSearchClient;
|
||||
import org.opensearch.client.opensearch.cluster.HealthResponse;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.security.access.prepost.PreAuthorize;
|
||||
import org.springframework.web.bind.annotation.DeleteMapping;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.server.ResponseStatusException;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/api/v1/admin/opensearch")
|
||||
@PreAuthorize("hasRole('ADMIN')")
|
||||
@Tag(name = "OpenSearch Admin", description = "OpenSearch monitoring and management (ADMIN only)")
|
||||
public class OpenSearchAdminController {
|
||||
|
||||
private final OpenSearchClient client;
|
||||
private final RestClient restClient;
|
||||
private final SearchIndexerStats indexerStats;
|
||||
private final AuditService auditService;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final String opensearchUrl;
|
||||
|
||||
public OpenSearchAdminController(OpenSearchClient client, RestClient restClient,
|
||||
SearchIndexerStats indexerStats, AuditService auditService,
|
||||
ObjectMapper objectMapper,
|
||||
@Value("${opensearch.url:http://localhost:9200}") String opensearchUrl) {
|
||||
this.client = client;
|
||||
this.restClient = restClient;
|
||||
this.indexerStats = indexerStats;
|
||||
this.auditService = auditService;
|
||||
this.objectMapper = objectMapper;
|
||||
this.opensearchUrl = opensearchUrl;
|
||||
}
|
||||
|
||||
@GetMapping("/status")
|
||||
@Operation(summary = "Get OpenSearch cluster status and version")
|
||||
public ResponseEntity<OpenSearchStatusResponse> getStatus() {
|
||||
try {
|
||||
HealthResponse health = client.cluster().health();
|
||||
String version = client.info().version().number();
|
||||
return ResponseEntity.ok(new OpenSearchStatusResponse(
|
||||
true,
|
||||
health.status().name(),
|
||||
version,
|
||||
health.numberOfNodes(),
|
||||
opensearchUrl));
|
||||
} catch (Exception e) {
|
||||
return ResponseEntity.ok(new OpenSearchStatusResponse(
|
||||
false, "UNREACHABLE", null, 0, opensearchUrl));
|
||||
}
|
||||
}
|
||||
|
||||
@GetMapping("/pipeline")
|
||||
@Operation(summary = "Get indexing pipeline statistics")
|
||||
public ResponseEntity<PipelineStatsResponse> getPipeline() {
|
||||
return ResponseEntity.ok(new PipelineStatsResponse(
|
||||
indexerStats.getQueueDepth(),
|
||||
indexerStats.getMaxQueueSize(),
|
||||
indexerStats.getFailedCount(),
|
||||
indexerStats.getIndexedCount(),
|
||||
indexerStats.getDebounceMs(),
|
||||
indexerStats.getIndexingRate(),
|
||||
indexerStats.getLastIndexedAt()));
|
||||
}
|
||||
|
||||
@GetMapping("/indices")
|
||||
@Operation(summary = "Get OpenSearch indices with pagination")
|
||||
public ResponseEntity<IndicesPageResponse> getIndices(
|
||||
@RequestParam(defaultValue = "0") int page,
|
||||
@RequestParam(defaultValue = "20") int size,
|
||||
@RequestParam(defaultValue = "") String search) {
|
||||
try {
|
||||
Response response = restClient.performRequest(
|
||||
new Request("GET", "/_cat/indices?format=json&h=index,health,docs.count,store.size,pri,rep&bytes=b"));
|
||||
JsonNode indices;
|
||||
try (InputStream is = response.getEntity().getContent()) {
|
||||
indices = objectMapper.readTree(is);
|
||||
}
|
||||
|
||||
List<IndexInfoResponse> allIndices = new ArrayList<>();
|
||||
for (JsonNode idx : indices) {
|
||||
String name = idx.path("index").asText("");
|
||||
if (!search.isEmpty() && !name.contains(search)) {
|
||||
continue;
|
||||
}
|
||||
allIndices.add(new IndexInfoResponse(
|
||||
name,
|
||||
parseLong(idx.path("docs.count").asText("0")),
|
||||
humanSize(parseLong(idx.path("store.size").asText("0"))),
|
||||
parseLong(idx.path("store.size").asText("0")),
|
||||
idx.path("health").asText("unknown"),
|
||||
parseInt(idx.path("pri").asText("0")),
|
||||
parseInt(idx.path("rep").asText("0"))));
|
||||
}
|
||||
|
||||
allIndices.sort(Comparator.comparing(IndexInfoResponse::name));
|
||||
|
||||
long totalDocs = allIndices.stream().mapToLong(IndexInfoResponse::docCount).sum();
|
||||
long totalBytes = allIndices.stream().mapToLong(IndexInfoResponse::sizeBytes).sum();
|
||||
int totalIndices = allIndices.size();
|
||||
int totalPages = Math.max(1, (int) Math.ceil((double) totalIndices / size));
|
||||
|
||||
int fromIndex = Math.min(page * size, totalIndices);
|
||||
int toIndex = Math.min(fromIndex + size, totalIndices);
|
||||
List<IndexInfoResponse> pageItems = allIndices.subList(fromIndex, toIndex);
|
||||
|
||||
return ResponseEntity.ok(new IndicesPageResponse(
|
||||
pageItems, totalIndices, totalDocs,
|
||||
humanSize(totalBytes), page, size, totalPages));
|
||||
} catch (Exception e) {
|
||||
return ResponseEntity.ok(new IndicesPageResponse(
|
||||
List.of(), 0, 0, "0 B", page, size, 0));
|
||||
}
|
||||
}
|
||||
|
||||
@DeleteMapping("/indices/{name}")
|
||||
@Operation(summary = "Delete an OpenSearch index")
|
||||
public ResponseEntity<Void> deleteIndex(@PathVariable String name, HttpServletRequest request) {
|
||||
try {
|
||||
boolean exists = client.indices().exists(r -> r.index(name)).value();
|
||||
if (!exists) {
|
||||
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Index not found: " + name);
|
||||
}
|
||||
client.indices().delete(r -> r.index(name));
|
||||
auditService.log("delete_index", AuditCategory.INFRA, name, null, AuditResult.SUCCESS, request);
|
||||
return ResponseEntity.ok().build();
|
||||
} catch (ResponseStatusException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Failed to delete index: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@GetMapping("/performance")
|
||||
@Operation(summary = "Get OpenSearch performance metrics")
|
||||
public ResponseEntity<PerformanceResponse> getPerformance() {
|
||||
try {
|
||||
Response response = restClient.performRequest(
|
||||
new Request("GET", "/_nodes/stats/jvm,indices"));
|
||||
JsonNode root;
|
||||
try (InputStream is = response.getEntity().getContent()) {
|
||||
root = objectMapper.readTree(is);
|
||||
}
|
||||
|
||||
JsonNode nodes = root.path("nodes");
|
||||
long heapUsed = 0, heapMax = 0;
|
||||
long queryCacheHits = 0, queryCacheMisses = 0;
|
||||
long requestCacheHits = 0, requestCacheMisses = 0;
|
||||
long searchQueryTotal = 0, searchQueryTimeMs = 0;
|
||||
long indexTotal = 0, indexTimeMs = 0;
|
||||
|
||||
var it = nodes.fields();
|
||||
while (it.hasNext()) {
|
||||
var entry = it.next();
|
||||
JsonNode node = entry.getValue();
|
||||
|
||||
JsonNode jvm = node.path("jvm").path("mem");
|
||||
heapUsed += jvm.path("heap_used_in_bytes").asLong(0);
|
||||
heapMax += jvm.path("heap_max_in_bytes").asLong(0);
|
||||
|
||||
JsonNode indicesNode = node.path("indices");
|
||||
JsonNode queryCache = indicesNode.path("query_cache");
|
||||
queryCacheHits += queryCache.path("hit_count").asLong(0);
|
||||
queryCacheMisses += queryCache.path("miss_count").asLong(0);
|
||||
|
||||
JsonNode requestCache = indicesNode.path("request_cache");
|
||||
requestCacheHits += requestCache.path("hit_count").asLong(0);
|
||||
requestCacheMisses += requestCache.path("miss_count").asLong(0);
|
||||
|
||||
JsonNode searchNode = indicesNode.path("search");
|
||||
searchQueryTotal += searchNode.path("query_total").asLong(0);
|
||||
searchQueryTimeMs += searchNode.path("query_time_in_millis").asLong(0);
|
||||
|
||||
JsonNode indexing = indicesNode.path("indexing");
|
||||
indexTotal += indexing.path("index_total").asLong(0);
|
||||
indexTimeMs += indexing.path("index_time_in_millis").asLong(0);
|
||||
}
|
||||
|
||||
double queryCacheHitRate = (queryCacheHits + queryCacheMisses) > 0
|
||||
? (double) queryCacheHits / (queryCacheHits + queryCacheMisses) : 0.0;
|
||||
double requestCacheHitRate = (requestCacheHits + requestCacheMisses) > 0
|
||||
? (double) requestCacheHits / (requestCacheHits + requestCacheMisses) : 0.0;
|
||||
double searchLatency = searchQueryTotal > 0
|
||||
? (double) searchQueryTimeMs / searchQueryTotal : 0.0;
|
||||
double indexingLatency = indexTotal > 0
|
||||
? (double) indexTimeMs / indexTotal : 0.0;
|
||||
|
||||
return ResponseEntity.ok(new PerformanceResponse(
|
||||
queryCacheHitRate, requestCacheHitRate,
|
||||
searchLatency, indexingLatency,
|
||||
heapUsed, heapMax));
|
||||
} catch (Exception e) {
|
||||
return ResponseEntity.ok(new PerformanceResponse(0, 0, 0, 0, 0, 0));
|
||||
}
|
||||
}
|
||||
|
||||
private static long parseLong(String s) {
|
||||
try {
|
||||
return Long.parseLong(s);
|
||||
} catch (NumberFormatException e) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private static int parseInt(String s) {
|
||||
try {
|
||||
return Integer.parseInt(s);
|
||||
} catch (NumberFormatException e) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private static String humanSize(long bytes) {
|
||||
if (bytes < 1024) return bytes + " B";
|
||||
if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0);
|
||||
if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / (1024.0 * 1024));
|
||||
return String.format("%.1f GB", bytes / (1024.0 * 1024 * 1024));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
package com.cameleer3.server.app.controller;
|
||||
|
||||
import com.cameleer3.server.app.AbstractPostgresIT;
|
||||
import com.cameleer3.server.app.TestSecurityHelper;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class OpenSearchAdminControllerIT extends AbstractPostgresIT {
|
||||
|
||||
@Autowired
|
||||
private TestRestTemplate restTemplate;
|
||||
|
||||
@Autowired
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Autowired
|
||||
private TestSecurityHelper securityHelper;
|
||||
|
||||
private String adminJwt;
|
||||
private String viewerJwt;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
adminJwt = securityHelper.adminToken();
|
||||
viewerJwt = securityHelper.viewerToken();
|
||||
}
|
||||
|
||||
@Test
|
||||
void getStatus_asAdmin_returns200() throws Exception {
|
||||
ResponseEntity<String> response = restTemplate.exchange(
|
||||
"/api/v1/admin/opensearch/status", HttpMethod.GET,
|
||||
new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)),
|
||||
String.class);
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
JsonNode body = objectMapper.readTree(response.getBody());
|
||||
assertThat(body.get("reachable").asBoolean()).isTrue();
|
||||
assertThat(body.has("clusterHealth")).isTrue();
|
||||
assertThat(body.has("version")).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
void getStatus_asViewer_returns403() {
|
||||
ResponseEntity<String> response = restTemplate.exchange(
|
||||
"/api/v1/admin/opensearch/status", HttpMethod.GET,
|
||||
new HttpEntity<>(securityHelper.authHeadersNoBody(viewerJwt)),
|
||||
String.class);
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.FORBIDDEN);
|
||||
}
|
||||
|
||||
@Test
|
||||
void getPipeline_asAdmin_returns200() throws Exception {
|
||||
ResponseEntity<String> response = restTemplate.exchange(
|
||||
"/api/v1/admin/opensearch/pipeline", HttpMethod.GET,
|
||||
new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)),
|
||||
String.class);
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
JsonNode body = objectMapper.readTree(response.getBody());
|
||||
assertThat(body.has("queueDepth")).isTrue();
|
||||
assertThat(body.has("maxQueueSize")).isTrue();
|
||||
assertThat(body.has("indexedCount")).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
void getIndices_asAdmin_returns200() throws Exception {
|
||||
ResponseEntity<String> response = restTemplate.exchange(
|
||||
"/api/v1/admin/opensearch/indices", HttpMethod.GET,
|
||||
new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)),
|
||||
String.class);
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
JsonNode body = objectMapper.readTree(response.getBody());
|
||||
assertThat(body.has("indices")).isTrue();
|
||||
assertThat(body.has("totalIndices")).isTrue();
|
||||
assertThat(body.has("page")).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteIndex_nonExistent_returns404() {
|
||||
ResponseEntity<String> response = restTemplate.exchange(
|
||||
"/api/v1/admin/opensearch/indices/nonexistent-index-xyz", HttpMethod.DELETE,
|
||||
new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)),
|
||||
String.class);
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
|
||||
}
|
||||
|
||||
@Test
|
||||
void getPerformance_asAdmin_returns200() throws Exception {
|
||||
ResponseEntity<String> response = restTemplate.exchange(
|
||||
"/api/v1/admin/opensearch/performance", HttpMethod.GET,
|
||||
new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)),
|
||||
String.class);
|
||||
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
JsonNode body = objectMapper.readTree(response.getBody());
|
||||
assertThat(body.has("queryCacheHitRate")).isTrue();
|
||||
assertThat(body.has("jvmHeapUsedBytes")).isTrue();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user