From 283e38a20da56a388e061bed3d438a1a44d818b0 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 1 Apr 2026 18:56:06 +0200 Subject: [PATCH] feat: remove OpenSearch, add ClickHouse admin page Remove all OpenSearch code, dependencies, configuration, deployment manifests, and CI/CD references. Replace the OpenSearch admin page with a ClickHouse admin page showing cluster status, table sizes, performance metrics, and indexer pipeline stats. - Delete 11 OpenSearch Java files (config, search impl, admin controller, DTOs, tests) - Delete 3 OpenSearch frontend files (admin page, CSS, query hooks) - Delete deploy/opensearch.yaml K8s manifest - Remove opensearch Maven dependencies from pom.xml - Remove opensearch config from application.yml, Dockerfile, docker-compose - Remove opensearch from CI workflow (secrets, deploy, cleanup steps) - Simplify ThresholdConfig (remove OpenSearch thresholds, database-only) - Change default search backend from opensearch to clickhouse - Add ClickHouseAdminController with /status, /tables, /performance, /pipeline - Add ClickHouseAdminPage with StatCards, pipeline ProgressBar, tables DataTable - Update CLAUDE.md, HOWTO.md, and source comments Co-Authored-By: Claude Opus 4.6 (1M context) --- .gitea/workflows/ci.yml | 22 +- CLAUDE.md | 10 +- Dockerfile | 1 - HOWTO.md | 19 +- cameleer3-server-app/pom.xml | 16 - .../server/app/config/OpenSearchConfig.java | 28 -- .../server/app/config/StorageBeanConfig.java | 6 +- .../controller/ClickHouseAdminController.java | 119 +++++ .../controller/LogIngestionController.java | 2 +- .../app/controller/LogQueryController.java | 2 +- .../controller/OpenSearchAdminController.java | 266 ----------- .../dto/ClickHousePerformanceResponse.java | 12 + .../app/dto/ClickHouseStatusResponse.java | 11 + .../server/app/dto/ClickHouseTableInfo.java | 13 + .../server/app/dto/IndexInfoResponse.java | 14 - .../app/dto/IndexerPipelineResponse.java | 16 + .../server/app/dto/IndicesPageResponse.java | 16 - .../server/app/dto/LogEntryResponse.java | 2 +- .../app/dto/OpenSearchStatusResponse.java | 12 - .../server/app/dto/PerformanceResponse.java | 13 - .../server/app/dto/PipelineStatsResponse.java | 16 - .../app/dto/ThresholdConfigRequest.java | 81 +--- .../app/retention/RetentionScheduler.java | 2 - .../server/app/search/OpenSearchIndex.java | 435 ------------------ .../server/app/search/OpenSearchLogIndex.java | 228 --------- .../src/main/resources/application.yml | 13 +- .../server/app/AbstractPostgresIT.java | 6 - .../OpenSearchAdminControllerIT.java | 112 ----- .../app/controller/SearchControllerIT.java | 2 +- .../ThresholdAdminControllerIT.java | 21 - .../server/app/search/OpenSearchIndexIT.java | 84 ---- .../src/test/resources/application-test.yml | 6 +- .../server/core/admin/ThresholdConfig.java | 20 +- .../server/core/search/SearchRequest.java | 5 +- deploy/base/server.yaml | 4 - deploy/opensearch.yaml | 98 ---- deploy/overlays/feature/kustomization.yaml | 2 - deploy/overlays/main/kustomization.yaml | 2 - docker-compose.yml | 13 - ui/src/api/queries/admin/clickhouse.ts | 77 ++++ ui/src/api/queries/admin/opensearch.ts | 109 ----- ui/src/api/queries/admin/thresholds.ts | 12 - ui/src/components/LayoutShell.tsx | 2 +- ui/src/pages/Admin/AdminLayout.tsx | 2 +- ...ule.css => ClickHouseAdminPage.module.css} | 8 +- ui/src/pages/Admin/ClickHouseAdminPage.tsx | 65 +++ ui/src/pages/Admin/OpenSearchAdminPage.tsx | 78 ---- ui/src/pages/AgentInstance/AgentInstance.tsx | 2 +- ui/src/router.tsx | 4 +- 49 files changed, 356 insertions(+), 1753 deletions(-) delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ClickHouseAdminController.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/OpenSearchAdminController.java create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ClickHousePerformanceResponse.java create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ClickHouseStatusResponse.java create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ClickHouseTableInfo.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/IndexInfoResponse.java create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/IndexerPipelineResponse.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/IndicesPageResponse.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/OpenSearchStatusResponse.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/PerformanceResponse.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/PipelineStatsResponse.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java delete mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchLogIndex.java delete mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenSearchAdminControllerIT.java delete mode 100644 cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java delete mode 100644 deploy/opensearch.yaml create mode 100644 ui/src/api/queries/admin/clickhouse.ts delete mode 100644 ui/src/api/queries/admin/opensearch.ts rename ui/src/pages/Admin/{OpenSearchAdminPage.module.css => ClickHouseAdminPage.module.css} (94%) create mode 100644 ui/src/pages/Admin/ClickHouseAdminPage.tsx delete mode 100644 ui/src/pages/Admin/OpenSearchAdminPage.tsx diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml index 259f8684..558694c4 100644 --- a/.gitea/workflows/ci.yml +++ b/.gitea/workflows/ci.yml @@ -209,12 +209,6 @@ jobs: --from-literal=POSTGRES_DB="${POSTGRES_DB:-cameleer}" \ --dry-run=client -o yaml | kubectl apply -f - - kubectl create secret generic opensearch-credentials \ - --namespace=cameleer \ - --from-literal=OPENSEARCH_USER="${OPENSEARCH_USER:-admin}" \ - --from-literal=OPENSEARCH_PASSWORD="$OPENSEARCH_PASSWORD" \ - --dry-run=client -o yaml | kubectl apply -f - - kubectl create secret generic authentik-credentials \ --namespace=cameleer \ --from-literal=PG_USER="${AUTHENTIK_PG_USER:-authentik}" \ @@ -231,9 +225,6 @@ jobs: kubectl apply -f deploy/postgres.yaml kubectl -n cameleer rollout status statefulset/postgres --timeout=120s - kubectl apply -f deploy/opensearch.yaml - kubectl -n cameleer rollout status statefulset/opensearch --timeout=180s - kubectl apply -f deploy/clickhouse.yaml kubectl -n cameleer rollout status statefulset/clickhouse --timeout=180s @@ -257,8 +248,6 @@ jobs: POSTGRES_USER: ${{ secrets.POSTGRES_USER }} POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }} POSTGRES_DB: ${{ secrets.POSTGRES_DB }} - OPENSEARCH_USER: ${{ secrets.OPENSEARCH_USER }} - OPENSEARCH_PASSWORD: ${{ secrets.OPENSEARCH_PASSWORD }} AUTHENTIK_PG_USER: ${{ secrets.AUTHENTIK_PG_USER }} AUTHENTIK_PG_PASSWORD: ${{ secrets.AUTHENTIK_PG_PASSWORD }} AUTHENTIK_SECRET_KEY: ${{ secrets.AUTHENTIK_SECRET_KEY }} @@ -303,7 +292,7 @@ jobs: run: kubectl create namespace "$BRANCH_NS" --dry-run=client -o yaml | kubectl apply -f - - name: Copy secrets from cameleer namespace run: | - for SECRET in gitea-registry postgres-credentials opensearch-credentials clickhouse-credentials cameleer-auth; do + for SECRET in gitea-registry postgres-credentials clickhouse-credentials cameleer-auth; do kubectl get secret "$SECRET" -n cameleer -o json \ | jq 'del(.metadata.namespace, .metadata.resourceVersion, .metadata.uid, .metadata.creationTimestamp, .metadata.managedFields)' \ | kubectl apply -n "$BRANCH_NS" -f - @@ -383,15 +372,6 @@ jobs: kubectl wait --for=condition=Ready pod/cleanup-schema-${BRANCH_SLUG} -n cameleer --timeout=30s || true kubectl wait --for=jsonpath='{.status.phase}'=Succeeded pod/cleanup-schema-${BRANCH_SLUG} -n cameleer --timeout=60s || true kubectl delete pod cleanup-schema-${BRANCH_SLUG} -n cameleer --ignore-not-found - - name: Delete OpenSearch indices - run: | - kubectl run cleanup-indices-${BRANCH_SLUG} \ - --namespace=cameleer \ - --image=curlimages/curl:latest \ - --restart=Never \ - --command -- curl -sf -X DELETE "http://opensearch:9200/cam-${BRANCH_SLUG}-*" - kubectl wait --for=jsonpath='{.status.phase}'=Succeeded pod/cleanup-indices-${BRANCH_SLUG} -n cameleer --timeout=60s || true - kubectl delete pod cleanup-indices-${BRANCH_SLUG} -n cameleer --ignore-not-found - name: Cleanup Docker images run: | API="https://gitea.siegeln.net/api/v1" diff --git a/CLAUDE.md b/CLAUDE.md index de52438b..9beec93d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -38,7 +38,7 @@ java -jar cameleer3-server-app/target/cameleer3-server-app-1.0-SNAPSHOT.jar - Jackson `JavaTimeModule` for `Instant` deserialization - Communication: receives HTTP POST data from agents (executions, diagrams, metrics, logs), serves SSE event streams for config push/commands (config-update, deep-trace, replay, route-control) - Maintains agent instance registry with states: LIVE → STALE → DEAD -- Storage: PostgreSQL (TimescaleDB) for structured data, OpenSearch for full-text search and application log storage +- Storage: PostgreSQL (TimescaleDB) for structured data, ClickHouse for analytics, search, logs, and time-series - Security: JWT auth with RBAC (AGENT/VIEWER/OPERATOR/ADMIN roles), Ed25519 config signing, bootstrap token for registration - OIDC: Optional external identity provider support (token exchange pattern). Configured via admin API, stored in database (`server_config` table) - User persistence: PostgreSQL `users` table, admin CRUD at `/api/v1/admin/users` @@ -50,11 +50,11 @@ java -jar cameleer3-server-app/target/cameleer3-server-app-1.0-SNAPSHOT.jar - Docker: multi-stage build (`Dockerfile`), `$BUILDPLATFORM` for native Maven on ARM64 runner, amd64 runtime - `REGISTRY_TOKEN` build arg required for `cameleer3-common` dependency resolution - Registry: `gitea.siegeln.net/cameleer/cameleer3-server` (container images) -- K8s manifests in `deploy/` — Kustomize base + overlays (main/feature), shared infra (PostgreSQL, OpenSearch, Authentik) as top-level manifests +- K8s manifests in `deploy/` — Kustomize base + overlays (main/feature), shared infra (PostgreSQL, ClickHouse, Authentik) as top-level manifests - Deployment target: k3s at 192.168.50.86, namespace `cameleer` (main), `cam-` (feature branches) -- Feature branches: isolated namespace, PG schema, OpenSearch index prefix; Traefik Ingress at `-api.cameleer.siegeln.net` -- Secrets managed in CI deploy step (idempotent `--dry-run=client | kubectl apply`): `cameleer-auth`, `postgres-credentials`, `opensearch-credentials` -- K8s probes: server uses `/api/v1/health`, PostgreSQL uses `pg_isready`, OpenSearch uses `/_cluster/health` +- Feature branches: isolated namespace, PG schema; Traefik Ingress at `-api.cameleer.siegeln.net` +- Secrets managed in CI deploy step (idempotent `--dry-run=client | kubectl apply`): `cameleer-auth`, `postgres-credentials`, `clickhouse-credentials` +- K8s probes: server uses `/api/v1/health`, PostgreSQL uses `pg_isready` - Docker build uses buildx registry cache + `--provenance=false` for Gitea compatibility ## UI Styling diff --git a/Dockerfile b/Dockerfile index bf9eb6c2..ee27cbce 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,7 +21,6 @@ COPY --from=build /build/cameleer3-server-app/target/cameleer3-server-app-*.jar ENV SPRING_DATASOURCE_URL=jdbc:postgresql://postgres:5432/cameleer3 ENV SPRING_DATASOURCE_USERNAME=cameleer ENV SPRING_DATASOURCE_PASSWORD=cameleer_dev -ENV OPENSEARCH_URL=http://opensearch:9200 EXPOSE 8081 ENV TZ=UTC diff --git a/HOWTO.md b/HOWTO.md index 8cc74f76..0d2367fb 100644 --- a/HOWTO.md +++ b/HOWTO.md @@ -21,18 +21,17 @@ mvn clean verify # compile + run all tests (needs Docker for integrati ## Infrastructure Setup -Start PostgreSQL and OpenSearch: +Start PostgreSQL: ```bash docker compose up -d ``` -This starts TimescaleDB (PostgreSQL 16) and OpenSearch 2.19. The database schema is applied automatically via Flyway migrations on server startup. +This starts TimescaleDB (PostgreSQL 16). The database schema is applied automatically via Flyway migrations on server startup. ClickHouse tables are created by the schema initializer on startup. | Service | Port | Purpose | |------------|------|----------------------| | PostgreSQL | 5432 | JDBC (Spring JDBC) | -| OpenSearch | 9200 | REST API (full-text) | PostgreSQL credentials: `cameleer` / `cameleer_dev`, database `cameleer3`. @@ -381,8 +380,8 @@ Key settings in `cameleer3-server-app/src/main/resources/application.yml`: | `security.oidc.client-secret` | | OAuth2 client secret (`CAMELEER_OIDC_CLIENT_SECRET`) | | `security.oidc.roles-claim` | `realm_access.roles` | JSONPath to roles in OIDC id_token (`CAMELEER_OIDC_ROLES_CLAIM`) | | `security.oidc.default-roles` | `VIEWER` | Default roles for new OIDC users (`CAMELEER_OIDC_DEFAULT_ROLES`) | -| `opensearch.log-index-prefix` | `logs-` | OpenSearch index prefix for application logs (`CAMELEER_LOG_INDEX_PREFIX`) | -| `opensearch.log-retention-days` | `7` | Days before log indices are deleted (`CAMELEER_LOG_RETENTION_DAYS`) | +| `cameleer.indexer.debounce-ms` | `2000` | Search indexer debounce delay (`CAMELEER_INDEXER_DEBOUNCE_MS`) | +| `cameleer.indexer.queue-size` | `10000` | Search indexer queue capacity (`CAMELEER_INDEXER_QUEUE_SIZE`) | ## Web UI Development @@ -407,7 +406,7 @@ npm run generate-api # Requires backend running on :8081 ## Running Tests -Integration tests use Testcontainers (starts PostgreSQL and OpenSearch automatically — requires Docker): +Integration tests use Testcontainers (starts PostgreSQL automatically — requires Docker): ```bash # All tests @@ -438,7 +437,7 @@ The full stack is deployed to k3s via CI/CD on push to `main`. K8s manifests are ``` cameleer namespace: PostgreSQL (StatefulSet, 10Gi PVC) ← postgres:5432 (ClusterIP) - OpenSearch (StatefulSet, 10Gi PVC) ← opensearch:9200 (ClusterIP) + ClickHouse (StatefulSet, 10Gi PVC) ← clickhouse:8123 (ClusterIP) cameleer3-server (Deployment) ← NodePort 30081 cameleer3-ui (Deployment, Nginx) ← NodePort 30090 Authentik Server (Deployment) ← NodePort 30950 @@ -460,7 +459,7 @@ cameleer namespace: Push to `main` triggers: **build** (UI npm + Maven, unit tests) → **docker** (buildx amd64 for server + UI, push to Gitea registry) → **deploy** (kubectl apply + rolling update). -Required Gitea org secrets: `REGISTRY_TOKEN`, `KUBECONFIG_BASE64`, `CAMELEER_AUTH_TOKEN`, `CAMELEER_JWT_SECRET`, `POSTGRES_USER`, `POSTGRES_PASSWORD`, `POSTGRES_DB`, `OPENSEARCH_USER`, `OPENSEARCH_PASSWORD`, `CAMELEER_UI_USER` (optional), `CAMELEER_UI_PASSWORD` (optional), `AUTHENTIK_PG_USER`, `AUTHENTIK_PG_PASSWORD`, `AUTHENTIK_SECRET_KEY`, `CAMELEER_OIDC_ENABLED`, `CAMELEER_OIDC_ISSUER`, `CAMELEER_OIDC_CLIENT_ID`, `CAMELEER_OIDC_CLIENT_SECRET`. +Required Gitea org secrets: `REGISTRY_TOKEN`, `KUBECONFIG_BASE64`, `CAMELEER_AUTH_TOKEN`, `CAMELEER_JWT_SECRET`, `POSTGRES_USER`, `POSTGRES_PASSWORD`, `POSTGRES_DB`, `CLICKHOUSE_USER`, `CLICKHOUSE_PASSWORD`, `CAMELEER_UI_USER` (optional), `CAMELEER_UI_PASSWORD` (optional), `AUTHENTIK_PG_USER`, `AUTHENTIK_PG_PASSWORD`, `AUTHENTIK_SECRET_KEY`, `CAMELEER_OIDC_ENABLED`, `CAMELEER_OIDC_ISSUER`, `CAMELEER_OIDC_CLIENT_ID`, `CAMELEER_OIDC_CLIENT_SECRET`. ### Manual K8s Commands @@ -474,8 +473,8 @@ kubectl -n cameleer logs -f deploy/cameleer3-server # View PostgreSQL logs kubectl -n cameleer logs -f statefulset/postgres -# View OpenSearch logs -kubectl -n cameleer logs -f statefulset/opensearch +# View ClickHouse logs +kubectl -n cameleer logs -f statefulset/clickhouse # Restart server kubectl -n cameleer rollout restart deployment/cameleer3-server diff --git a/cameleer3-server-app/pom.xml b/cameleer3-server-app/pom.xml index 4691e969..d3e89e03 100644 --- a/cameleer3-server-app/pom.xml +++ b/cameleer3-server-app/pom.xml @@ -47,16 +47,6 @@ org.flywaydb flyway-database-postgresql - - org.opensearch.client - opensearch-java - 2.19.0 - - - org.opensearch.client - opensearch-rest-client - 2.19.0 - com.clickhouse clickhouse-jdbc @@ -126,12 +116,6 @@ testcontainers-junit-jupiter test - - org.opensearch - opensearch-testcontainers - 2.1.1 - test - org.testcontainers testcontainers-clickhouse diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java deleted file mode 100644 index 3ff7edea..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/OpenSearchConfig.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.cameleer3.server.app.config; - -import org.apache.http.HttpHost; -import org.opensearch.client.RestClient; -import org.opensearch.client.json.jackson.JacksonJsonpMapper; -import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.client.transport.rest_client.RestClientTransport; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class OpenSearchConfig { - - @Value("${opensearch.url:http://localhost:9200}") - private String opensearchUrl; - - @Bean(destroyMethod = "close") - public RestClient opensearchRestClient() { - return RestClient.builder(HttpHost.create(opensearchUrl)).build(); - } - - @Bean - public OpenSearchClient openSearchClient(RestClient restClient) { - var transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); - return new OpenSearchClient(transport); - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 1fc8254e..94c4c11d 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -43,8 +43,8 @@ public class StorageBeanConfig { @Bean(destroyMethod = "shutdown") public SearchIndexer searchIndexer(ExecutionStore executionStore, SearchIndex searchIndex, - @Value("${opensearch.debounce-ms:2000}") long debounceMs, - @Value("${opensearch.queue-size:10000}") int queueSize) { + @Value("${cameleer.indexer.debounce-ms:2000}") long debounceMs, + @Value("${cameleer.indexer.queue-size:10000}") int queueSize) { return new SearchIndexer(executionStore, searchIndex, debounceMs, queueSize); } @@ -130,7 +130,7 @@ public class StorageBeanConfig { } @Bean - @ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse") + @ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "clickhouse", matchIfMissing = true) public SearchIndex clickHouseSearchIndex( @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { return new ClickHouseSearchIndex(clickHouseJdbc); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ClickHouseAdminController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ClickHouseAdminController.java new file mode 100644 index 00000000..78d3a447 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/ClickHouseAdminController.java @@ -0,0 +1,119 @@ +package com.cameleer3.server.app.controller; + +import com.cameleer3.server.app.dto.ClickHousePerformanceResponse; +import com.cameleer3.server.app.dto.ClickHouseStatusResponse; +import com.cameleer3.server.app.dto.ClickHouseTableInfo; +import com.cameleer3.server.app.dto.IndexerPipelineResponse; +import com.cameleer3.server.core.indexing.SearchIndexerStats; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +@RestController +@RequestMapping("/api/v1/admin/clickhouse") +@PreAuthorize("hasRole('ADMIN')") +@Tag(name = "ClickHouse Admin", description = "ClickHouse monitoring and diagnostics (ADMIN only)") +public class ClickHouseAdminController { + + private final JdbcTemplate clickHouseJdbc; + private final SearchIndexerStats indexerStats; + private final String clickHouseUrl; + + public ClickHouseAdminController( + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc, + SearchIndexerStats indexerStats, + @Value("${clickhouse.url:}") String clickHouseUrl) { + this.clickHouseJdbc = clickHouseJdbc; + this.indexerStats = indexerStats; + this.clickHouseUrl = clickHouseUrl; + } + + @GetMapping("/status") + @Operation(summary = "ClickHouse cluster status") + public ClickHouseStatusResponse getStatus() { + try { + var row = clickHouseJdbc.queryForMap( + "SELECT version() AS version, formatReadableTimeDelta(uptime()) AS uptime"); + return new ClickHouseStatusResponse(true, + (String) row.get("version"), + (String) row.get("uptime"), + clickHouseUrl); + } catch (Exception e) { + return new ClickHouseStatusResponse(false, null, null, clickHouseUrl); + } + } + + @GetMapping("/tables") + @Operation(summary = "List ClickHouse tables with sizes") + public List getTables() { + return clickHouseJdbc.query(""" + SELECT t.name, t.engine, + t.total_rows AS row_count, + formatReadableSize(t.total_bytes) AS data_size, + t.total_bytes AS data_size_bytes, + ifNull(p.partition_count, 0) AS partition_count + FROM system.tables t + LEFT JOIN ( + SELECT table, countDistinct(partition) AS partition_count + FROM system.parts + WHERE database = currentDatabase() AND active + GROUP BY table + ) p ON t.name = p.table + WHERE t.database = currentDatabase() + ORDER BY t.total_bytes DESC NULLS LAST + """, + (rs, rowNum) -> new ClickHouseTableInfo( + rs.getString("name"), + rs.getString("engine"), + rs.getLong("row_count"), + rs.getString("data_size"), + rs.getLong("data_size_bytes"), + rs.getInt("partition_count"))); + } + + @GetMapping("/performance") + @Operation(summary = "ClickHouse performance metrics") + public ClickHousePerformanceResponse getPerformance() { + try { + long selectQueries = queryEvent("SelectQuery"); + long insertQueries = queryEvent("InsertQuery"); + long insertedRows = queryEvent("InsertedRows"); + long readRows = queryEvent("SelectedRows"); + String memoryUsage = clickHouseJdbc.queryForObject( + "SELECT formatReadableSize(value) FROM system.metrics WHERE metric = 'MemoryTracking'", + String.class); + return new ClickHousePerformanceResponse(selectQueries, insertQueries, + memoryUsage != null ? memoryUsage : "0 B", insertedRows, readRows); + } catch (Exception e) { + return new ClickHousePerformanceResponse(0, 0, "N/A", 0, 0); + } + } + + @GetMapping("/pipeline") + @Operation(summary = "Search indexer pipeline statistics") + public IndexerPipelineResponse getPipeline() { + return new IndexerPipelineResponse( + indexerStats.getQueueDepth(), + indexerStats.getMaxQueueSize(), + indexerStats.getFailedCount(), + indexerStats.getIndexedCount(), + indexerStats.getDebounceMs(), + indexerStats.getIndexingRate(), + indexerStats.getLastIndexedAt()); + } + + private long queryEvent(String eventName) { + Long val = clickHouseJdbc.queryForObject( + "SELECT value FROM system.events WHERE event = ?", + Long.class, eventName); + return val != null ? val : 0; + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java index 25eca814..5fe718d5 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java @@ -35,7 +35,7 @@ public class LogIngestionController { @PostMapping("/logs") @Operation(summary = "Ingest application log entries", - description = "Accepts a batch of log entries from an agent. Entries are indexed in OpenSearch.") + description = "Accepts a batch of log entries from an agent. Entries are stored in the configured log store.") @ApiResponse(responseCode = "202", description = "Logs accepted for indexing") public ResponseEntity ingestLogs(@RequestBody LogBatch batch) { String instanceId = extractAgentId(); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java index b8759698..32500c17 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogQueryController.java @@ -16,7 +16,7 @@ import java.util.List; @RestController @RequestMapping("/api/v1/logs") -@Tag(name = "Application Logs", description = "Query application logs stored in OpenSearch") +@Tag(name = "Application Logs", description = "Query application logs") public class LogQueryController { private final LogIndex logIndex; diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/OpenSearchAdminController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/OpenSearchAdminController.java deleted file mode 100644 index aadf602c..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/OpenSearchAdminController.java +++ /dev/null @@ -1,266 +0,0 @@ -package com.cameleer3.server.app.controller; - -import com.cameleer3.server.app.dto.IndexInfoResponse; -import com.cameleer3.server.app.dto.IndicesPageResponse; -import com.cameleer3.server.app.dto.OpenSearchStatusResponse; -import com.cameleer3.server.app.dto.PerformanceResponse; -import com.cameleer3.server.app.dto.PipelineStatsResponse; -import com.cameleer3.server.core.admin.AuditCategory; -import com.cameleer3.server.core.admin.AuditResult; -import com.cameleer3.server.core.admin.AuditService; -import com.cameleer3.server.core.indexing.SearchIndexerStats; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.swagger.v3.oas.annotations.Operation; -import io.swagger.v3.oas.annotations.tags.Tag; -import jakarta.servlet.http.HttpServletRequest; -import org.opensearch.client.Request; -import org.opensearch.client.Response; -import org.opensearch.client.RestClient; -import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.client.opensearch.cluster.HealthResponse; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.security.access.prepost.PreAuthorize; -import org.springframework.web.bind.annotation.DeleteMapping; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.server.ResponseStatusException; - -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; - -@RestController -@RequestMapping("/api/v1/admin/opensearch") -@PreAuthorize("hasRole('ADMIN')") -@Tag(name = "OpenSearch Admin", description = "OpenSearch monitoring and management (ADMIN only)") -public class OpenSearchAdminController { - - private final OpenSearchClient client; - private final RestClient restClient; - private final SearchIndexerStats indexerStats; - private final AuditService auditService; - private final ObjectMapper objectMapper; - private final String opensearchUrl; - private final String indexPrefix; - private final String logIndexPrefix; - - public OpenSearchAdminController(OpenSearchClient client, RestClient restClient, - SearchIndexerStats indexerStats, AuditService auditService, - ObjectMapper objectMapper, - @Value("${opensearch.url:http://localhost:9200}") String opensearchUrl, - @Value("${opensearch.index-prefix:executions-}") String indexPrefix, - @Value("${opensearch.log-index-prefix:logs-}") String logIndexPrefix) { - this.client = client; - this.restClient = restClient; - this.indexerStats = indexerStats; - this.auditService = auditService; - this.objectMapper = objectMapper; - this.opensearchUrl = opensearchUrl; - this.indexPrefix = indexPrefix; - this.logIndexPrefix = logIndexPrefix; - } - - @GetMapping("/status") - @Operation(summary = "Get OpenSearch cluster status and version") - public ResponseEntity getStatus() { - try { - HealthResponse health = client.cluster().health(); - String version = client.info().version().number(); - return ResponseEntity.ok(new OpenSearchStatusResponse( - true, - health.status().name(), - version, - health.numberOfNodes(), - opensearchUrl)); - } catch (Exception e) { - return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE) - .body(new OpenSearchStatusResponse( - false, "UNREACHABLE", null, 0, opensearchUrl)); - } - } - - @GetMapping("/pipeline") - @Operation(summary = "Get indexing pipeline statistics") - public ResponseEntity getPipeline() { - return ResponseEntity.ok(new PipelineStatsResponse( - indexerStats.getQueueDepth(), - indexerStats.getMaxQueueSize(), - indexerStats.getFailedCount(), - indexerStats.getIndexedCount(), - indexerStats.getDebounceMs(), - indexerStats.getIndexingRate(), - indexerStats.getLastIndexedAt())); - } - - @GetMapping("/indices") - @Operation(summary = "Get OpenSearch indices with pagination") - public ResponseEntity getIndices( - @RequestParam(defaultValue = "0") int page, - @RequestParam(defaultValue = "20") int size, - @RequestParam(defaultValue = "") String search, - @RequestParam(defaultValue = "executions") String prefix) { - try { - Response response = restClient.performRequest( - new Request("GET", "/_cat/indices?format=json&h=index,health,docs.count,store.size,pri,rep&bytes=b")); - JsonNode indices; - try (InputStream is = response.getEntity().getContent()) { - indices = objectMapper.readTree(is); - } - - String filterPrefix = "logs".equals(prefix) ? logIndexPrefix : indexPrefix; - - List allIndices = new ArrayList<>(); - for (JsonNode idx : indices) { - String name = idx.path("index").asText(""); - if (!name.startsWith(filterPrefix)) { - continue; - } - if (!search.isEmpty() && !name.contains(search)) { - continue; - } - allIndices.add(new IndexInfoResponse( - name, - parseLong(idx.path("docs.count").asText("0")), - humanSize(parseLong(idx.path("store.size").asText("0"))), - parseLong(idx.path("store.size").asText("0")), - idx.path("health").asText("unknown"), - parseInt(idx.path("pri").asText("0")), - parseInt(idx.path("rep").asText("0")))); - } - - allIndices.sort(Comparator.comparing(IndexInfoResponse::name)); - - long totalDocs = allIndices.stream().mapToLong(IndexInfoResponse::docCount).sum(); - long totalBytes = allIndices.stream().mapToLong(IndexInfoResponse::sizeBytes).sum(); - int totalIndices = allIndices.size(); - int totalPages = Math.max(1, (int) Math.ceil((double) totalIndices / size)); - - int fromIndex = Math.min(page * size, totalIndices); - int toIndex = Math.min(fromIndex + size, totalIndices); - List pageItems = allIndices.subList(fromIndex, toIndex); - - return ResponseEntity.ok(new IndicesPageResponse( - pageItems, totalIndices, totalDocs, - humanSize(totalBytes), page, size, totalPages)); - } catch (Exception e) { - return ResponseEntity.status(HttpStatus.BAD_GATEWAY) - .body(new IndicesPageResponse( - List.of(), 0, 0, "0 B", page, size, 0)); - } - } - - @DeleteMapping("/indices/{name}") - @Operation(summary = "Delete an OpenSearch index") - public ResponseEntity deleteIndex(@PathVariable String name, HttpServletRequest request) { - try { - if (!name.startsWith(indexPrefix) && !name.startsWith(logIndexPrefix)) { - throw new ResponseStatusException(HttpStatus.FORBIDDEN, "Cannot delete index outside application scope"); - } - boolean exists = client.indices().exists(r -> r.index(name)).value(); - if (!exists) { - throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Index not found: " + name); - } - client.indices().delete(r -> r.index(name)); - auditService.log("delete_index", AuditCategory.INFRA, name, null, AuditResult.SUCCESS, request); - return ResponseEntity.ok().build(); - } catch (ResponseStatusException e) { - throw e; - } catch (Exception e) { - throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Failed to delete index: " + e.getMessage()); - } - } - - @GetMapping("/performance") - @Operation(summary = "Get OpenSearch performance metrics") - public ResponseEntity getPerformance() { - try { - Response response = restClient.performRequest( - new Request("GET", "/_nodes/stats/jvm,indices")); - JsonNode root; - try (InputStream is = response.getEntity().getContent()) { - root = objectMapper.readTree(is); - } - - JsonNode nodes = root.path("nodes"); - long heapUsed = 0, heapMax = 0; - long queryCacheHits = 0, queryCacheMisses = 0; - long requestCacheHits = 0, requestCacheMisses = 0; - long searchQueryTotal = 0, searchQueryTimeMs = 0; - long indexTotal = 0, indexTimeMs = 0; - - var it = nodes.fields(); - while (it.hasNext()) { - var entry = it.next(); - JsonNode node = entry.getValue(); - - JsonNode jvm = node.path("jvm").path("mem"); - heapUsed += jvm.path("heap_used_in_bytes").asLong(0); - heapMax += jvm.path("heap_max_in_bytes").asLong(0); - - JsonNode indicesNode = node.path("indices"); - JsonNode queryCache = indicesNode.path("query_cache"); - queryCacheHits += queryCache.path("hit_count").asLong(0); - queryCacheMisses += queryCache.path("miss_count").asLong(0); - - JsonNode requestCache = indicesNode.path("request_cache"); - requestCacheHits += requestCache.path("hit_count").asLong(0); - requestCacheMisses += requestCache.path("miss_count").asLong(0); - - JsonNode searchNode = indicesNode.path("search"); - searchQueryTotal += searchNode.path("query_total").asLong(0); - searchQueryTimeMs += searchNode.path("query_time_in_millis").asLong(0); - - JsonNode indexing = indicesNode.path("indexing"); - indexTotal += indexing.path("index_total").asLong(0); - indexTimeMs += indexing.path("index_time_in_millis").asLong(0); - } - - double queryCacheHitRate = (queryCacheHits + queryCacheMisses) > 0 - ? (double) queryCacheHits / (queryCacheHits + queryCacheMisses) : 0.0; - double requestCacheHitRate = (requestCacheHits + requestCacheMisses) > 0 - ? (double) requestCacheHits / (requestCacheHits + requestCacheMisses) : 0.0; - double searchLatency = searchQueryTotal > 0 - ? (double) searchQueryTimeMs / searchQueryTotal : 0.0; - double indexingLatency = indexTotal > 0 - ? (double) indexTimeMs / indexTotal : 0.0; - - return ResponseEntity.ok(new PerformanceResponse( - queryCacheHitRate, requestCacheHitRate, - searchLatency, indexingLatency, - heapUsed, heapMax)); - } catch (Exception e) { - return ResponseEntity.status(HttpStatus.BAD_GATEWAY) - .body(new PerformanceResponse(0, 0, 0, 0, 0, 0)); - } - } - - private static long parseLong(String s) { - try { - return Long.parseLong(s); - } catch (NumberFormatException e) { - return 0; - } - } - - private static int parseInt(String s) { - try { - return Integer.parseInt(s); - } catch (NumberFormatException e) { - return 0; - } - } - - private static String humanSize(long bytes) { - if (bytes < 1024) return bytes + " B"; - if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0); - if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / (1024.0 * 1024)); - return String.format("%.1f GB", bytes / (1024.0 * 1024 * 1024)); - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ClickHousePerformanceResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ClickHousePerformanceResponse.java new file mode 100644 index 00000000..5c85c2c0 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ClickHousePerformanceResponse.java @@ -0,0 +1,12 @@ +package com.cameleer3.server.app.dto; + +import io.swagger.v3.oas.annotations.media.Schema; + +@Schema(description = "ClickHouse performance metrics") +public record ClickHousePerformanceResponse( + long queryCount, + long insertQueryCount, + String memoryUsage, + long insertedRows, + long readRows +) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ClickHouseStatusResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ClickHouseStatusResponse.java new file mode 100644 index 00000000..863c47de --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ClickHouseStatusResponse.java @@ -0,0 +1,11 @@ +package com.cameleer3.server.app.dto; + +import io.swagger.v3.oas.annotations.media.Schema; + +@Schema(description = "ClickHouse cluster status") +public record ClickHouseStatusResponse( + boolean reachable, + String version, + String uptime, + String host +) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ClickHouseTableInfo.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ClickHouseTableInfo.java new file mode 100644 index 00000000..b64adeaf --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ClickHouseTableInfo.java @@ -0,0 +1,13 @@ +package com.cameleer3.server.app.dto; + +import io.swagger.v3.oas.annotations.media.Schema; + +@Schema(description = "ClickHouse table information") +public record ClickHouseTableInfo( + String name, + String engine, + long rowCount, + String dataSize, + long dataSizeBytes, + int partitionCount +) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/IndexInfoResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/IndexInfoResponse.java deleted file mode 100644 index 6ab5dcd3..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/IndexInfoResponse.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.cameleer3.server.app.dto; - -import io.swagger.v3.oas.annotations.media.Schema; - -@Schema(description = "OpenSearch index information") -public record IndexInfoResponse( - @Schema(description = "Index name") String name, - @Schema(description = "Document count") long docCount, - @Schema(description = "Human-readable index size") String size, - @Schema(description = "Index size in bytes") long sizeBytes, - @Schema(description = "Index health status") String health, - @Schema(description = "Number of primary shards") int primaryShards, - @Schema(description = "Number of replica shards") int replicaShards -) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/IndexerPipelineResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/IndexerPipelineResponse.java new file mode 100644 index 00000000..eb7d25da --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/IndexerPipelineResponse.java @@ -0,0 +1,16 @@ +package com.cameleer3.server.app.dto; + +import io.swagger.v3.oas.annotations.media.Schema; + +import java.time.Instant; + +@Schema(description = "Search indexer pipeline statistics") +public record IndexerPipelineResponse( + int queueDepth, + int maxQueueSize, + long failedCount, + long indexedCount, + long debounceMs, + double indexingRate, + Instant lastIndexedAt +) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/IndicesPageResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/IndicesPageResponse.java deleted file mode 100644 index 469ab84e..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/IndicesPageResponse.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.cameleer3.server.app.dto; - -import io.swagger.v3.oas.annotations.media.Schema; - -import java.util.List; - -@Schema(description = "Paginated list of OpenSearch indices") -public record IndicesPageResponse( - @Schema(description = "Index list for current page") List indices, - @Schema(description = "Total number of indices") long totalIndices, - @Schema(description = "Total document count across all indices") long totalDocs, - @Schema(description = "Human-readable total size") String totalSize, - @Schema(description = "Current page number (0-based)") int page, - @Schema(description = "Page size") int pageSize, - @Schema(description = "Total number of pages") int totalPages -) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/LogEntryResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/LogEntryResponse.java index b373a311..7d5f3e9e 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/LogEntryResponse.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/LogEntryResponse.java @@ -2,7 +2,7 @@ package com.cameleer3.server.app.dto; import io.swagger.v3.oas.annotations.media.Schema; -@Schema(description = "Application log entry from OpenSearch") +@Schema(description = "Application log entry") public record LogEntryResponse( @Schema(description = "Log timestamp (ISO-8601)") String timestamp, @Schema(description = "Log level (INFO, WARN, ERROR, DEBUG)") String level, diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/OpenSearchStatusResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/OpenSearchStatusResponse.java deleted file mode 100644 index 612982fe..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/OpenSearchStatusResponse.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.cameleer3.server.app.dto; - -import io.swagger.v3.oas.annotations.media.Schema; - -@Schema(description = "OpenSearch cluster status") -public record OpenSearchStatusResponse( - @Schema(description = "Whether the cluster is reachable") boolean reachable, - @Schema(description = "Cluster health status (GREEN, YELLOW, RED)") String clusterHealth, - @Schema(description = "OpenSearch version") String version, - @Schema(description = "Number of nodes in the cluster") int nodeCount, - @Schema(description = "OpenSearch host") String host -) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/PerformanceResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/PerformanceResponse.java deleted file mode 100644 index d34a3fad..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/PerformanceResponse.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.cameleer3.server.app.dto; - -import io.swagger.v3.oas.annotations.media.Schema; - -@Schema(description = "OpenSearch performance metrics") -public record PerformanceResponse( - @Schema(description = "Query cache hit rate (0.0-1.0)") double queryCacheHitRate, - @Schema(description = "Request cache hit rate (0.0-1.0)") double requestCacheHitRate, - @Schema(description = "Average search latency in milliseconds") double searchLatencyMs, - @Schema(description = "Average indexing latency in milliseconds") double indexingLatencyMs, - @Schema(description = "JVM heap used in bytes") long jvmHeapUsedBytes, - @Schema(description = "JVM heap max in bytes") long jvmHeapMaxBytes -) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/PipelineStatsResponse.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/PipelineStatsResponse.java deleted file mode 100644 index f4285dc5..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/PipelineStatsResponse.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.cameleer3.server.app.dto; - -import io.swagger.v3.oas.annotations.media.Schema; - -import java.time.Instant; - -@Schema(description = "Search indexing pipeline statistics") -public record PipelineStatsResponse( - @Schema(description = "Current queue depth") int queueDepth, - @Schema(description = "Maximum queue size") int maxQueueSize, - @Schema(description = "Number of failed indexing operations") long failedCount, - @Schema(description = "Number of successfully indexed documents") long indexedCount, - @Schema(description = "Debounce interval in milliseconds") long debounceMs, - @Schema(description = "Current indexing rate (docs/sec)") double indexingRate, - @Schema(description = "Timestamp of last indexed document") Instant lastIndexedAt -) {} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ThresholdConfigRequest.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ThresholdConfigRequest.java index 736210cb..5b4ac2d9 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ThresholdConfigRequest.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/ThresholdConfigRequest.java @@ -5,18 +5,15 @@ import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.Valid; import jakarta.validation.constraints.Max; import jakarta.validation.constraints.Min; -import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Positive; import java.util.ArrayList; import java.util.List; -import java.util.Map; @Schema(description = "Threshold configuration for admin monitoring") public record ThresholdConfigRequest( - @Valid @NotNull DatabaseThresholdsRequest database, - @Valid @NotNull OpenSearchThresholdsRequest opensearch + @Valid @NotNull DatabaseThresholdsRequest database ) { @Schema(description = "Database monitoring thresholds") @@ -38,41 +35,6 @@ public record ThresholdConfigRequest( double queryDurationCritical ) {} - @Schema(description = "OpenSearch monitoring thresholds") - public record OpenSearchThresholdsRequest( - @NotBlank - @Schema(description = "Cluster health warning threshold (GREEN, YELLOW, RED)") - String clusterHealthWarning, - - @NotBlank - @Schema(description = "Cluster health critical threshold (GREEN, YELLOW, RED)") - String clusterHealthCritical, - - @Min(0) - @Schema(description = "Queue depth warning threshold") - int queueDepthWarning, - - @Min(0) - @Schema(description = "Queue depth critical threshold") - int queueDepthCritical, - - @Min(0) @Max(100) - @Schema(description = "JVM heap usage warning threshold (percentage)") - int jvmHeapWarning, - - @Min(0) @Max(100) - @Schema(description = "JVM heap usage critical threshold (percentage)") - int jvmHeapCritical, - - @Min(0) - @Schema(description = "Failed document count warning threshold") - int failedDocsWarning, - - @Min(0) - @Schema(description = "Failed document count critical threshold") - int failedDocsCritical - ) {} - /** Convert to core domain model */ public ThresholdConfig toConfig() { return new ThresholdConfig( @@ -81,16 +43,6 @@ public record ThresholdConfigRequest( database.connectionPoolCritical(), database.queryDurationWarning(), database.queryDurationCritical() - ), - new ThresholdConfig.OpenSearchThresholds( - opensearch.clusterHealthWarning(), - opensearch.clusterHealthCritical(), - opensearch.queueDepthWarning(), - opensearch.queueDepthCritical(), - opensearch.jvmHeapWarning(), - opensearch.jvmHeapCritical(), - opensearch.failedDocsWarning(), - opensearch.failedDocsCritical() ) ); } @@ -108,37 +60,6 @@ public record ThresholdConfigRequest( } } - if (opensearch != null) { - if (opensearch.queueDepthWarning() > opensearch.queueDepthCritical()) { - errors.add("opensearch.queueDepthWarning must be <= queueDepthCritical"); - } - if (opensearch.jvmHeapWarning() > opensearch.jvmHeapCritical()) { - errors.add("opensearch.jvmHeapWarning must be <= jvmHeapCritical"); - } - if (opensearch.failedDocsWarning() > opensearch.failedDocsCritical()) { - errors.add("opensearch.failedDocsWarning must be <= failedDocsCritical"); - } - // Validate health severity ordering: GREEN < YELLOW < RED - int warningSeverity = healthSeverity(opensearch.clusterHealthWarning()); - int criticalSeverity = healthSeverity(opensearch.clusterHealthCritical()); - if (warningSeverity < 0) { - errors.add("opensearch.clusterHealthWarning must be GREEN, YELLOW, or RED"); - } - if (criticalSeverity < 0) { - errors.add("opensearch.clusterHealthCritical must be GREEN, YELLOW, or RED"); - } - if (warningSeverity >= 0 && criticalSeverity >= 0 && warningSeverity > criticalSeverity) { - errors.add("opensearch.clusterHealthWarning severity must be <= clusterHealthCritical (GREEN < YELLOW < RED)"); - } - } - return errors; } - - private static final Map HEALTH_SEVERITY = - Map.of("GREEN", 0, "YELLOW", 1, "RED", 2); - - private static int healthSeverity(String health) { - return HEALTH_SEVERITY.getOrDefault(health != null ? health.toUpperCase() : "", -1); - } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java index 152bb1c9..1a48d3fd 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/retention/RetentionScheduler.java @@ -43,6 +43,4 @@ public class RetentionScheduler { log.error("Retention job failed", e); } } - // Note: OpenSearch daily index deletion should be handled via ILM policy - // configured at deployment time, not in application code. } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java deleted file mode 100644 index 3df7982e..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java +++ /dev/null @@ -1,435 +0,0 @@ -package com.cameleer3.server.app.search; - -import com.cameleer3.server.core.search.ExecutionSummary; -import com.cameleer3.server.core.search.SearchRequest; -import com.cameleer3.server.core.search.SearchResult; -import com.cameleer3.server.core.storage.SearchIndex; -import com.cameleer3.server.core.storage.model.ExecutionDocument; -import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.annotation.PostConstruct; -import org.opensearch.client.json.JsonData; -import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.client.opensearch._types.FieldValue; -import org.opensearch.client.opensearch._types.SortOrder; -import org.opensearch.client.opensearch._types.query_dsl.*; -import org.opensearch.client.opensearch.core.*; -import org.opensearch.client.opensearch.core.search.Hit; -import org.opensearch.client.opensearch.indices.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Repository; - -import java.io.IOException; -import java.time.Instant; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.*; -import java.util.stream.Collectors; - -@Repository -@ConditionalOnProperty(name = "cameleer.storage.search", havingValue = "opensearch", matchIfMissing = true) -public class OpenSearchIndex implements SearchIndex { - - private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class); - private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd") - .withZone(ZoneOffset.UTC); - private static final ObjectMapper JSON = new ObjectMapper(); - private static final TypeReference> STR_MAP = new TypeReference<>() {}; - - private final OpenSearchClient client; - private final String indexPrefix; - - public OpenSearchIndex(OpenSearchClient client, - @Value("${opensearch.index-prefix:executions-}") String indexPrefix) { - this.client = client; - this.indexPrefix = indexPrefix; - } - - @PostConstruct - void ensureIndexTemplate() { - String templateName = indexPrefix + "template"; - String indexPattern = indexPrefix + "*"; - try { - boolean exists = client.indices().existsIndexTemplate( - ExistsIndexTemplateRequest.of(b -> b.name(templateName))).value(); - if (!exists) { - client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b - .name(templateName) - .indexPatterns(List.of(indexPattern)) - .template(t -> t - .settings(s -> s - .numberOfShards("3") - .numberOfReplicas("1")) - .mappings(m -> m - .properties("processors", p -> p - .nested(n -> n)))))); - log.info("OpenSearch index template created"); - } - } catch (IOException e) { - log.error("Failed to create index template", e); - } - } - - @Override - public void index(ExecutionDocument doc) { - String indexName = indexPrefix + DAY_FMT.format(doc.startTime()); - try { - client.index(IndexRequest.of(b -> b - .index(indexName) - .id(doc.executionId()) - .document(toMap(doc)))); - } catch (IOException e) { - log.error("Failed to index execution {}", doc.executionId(), e); - } - } - - @Override - public SearchResult search(SearchRequest request) { - try { - var searchReq = buildSearchRequest(request, request.limit()); - var response = client.search(searchReq, Map.class); - - List items = response.hits().hits().stream() - .map(this::hitToSummary) - .collect(Collectors.toList()); - - long total = response.hits().total() != null ? response.hits().total().value() : 0; - return new SearchResult<>(items, total, request.offset(), request.limit()); - } catch (IOException e) { - log.error("Search failed", e); - return SearchResult.empty(request.offset(), request.limit()); - } - } - - @Override - public long count(SearchRequest request) { - try { - var countReq = CountRequest.of(b -> b - .index(indexPrefix + "*") - .query(buildQuery(request))); - return client.count(countReq).count(); - } catch (IOException e) { - log.error("Count failed", e); - return 0; - } - } - - @Override - public void delete(String executionId) { - try { - client.deleteByQuery(DeleteByQueryRequest.of(b -> b - .index(List.of(indexPrefix + "*")) - .query(Query.of(q -> q.term(t -> t - .field("execution_id") - .value(FieldValue.of(executionId))))))); - } catch (IOException e) { - log.error("Failed to delete execution {}", executionId, e); - } - } - - private static final List HIGHLIGHT_FIELDS = List.of( - "error_message", "attributes_text", - "processors.input_body", "processors.output_body", - "processors.input_headers", "processors.output_headers", - "processors.attributes_text"); - - private org.opensearch.client.opensearch.core.SearchRequest buildSearchRequest( - SearchRequest request, int size) { - return org.opensearch.client.opensearch.core.SearchRequest.of(b -> { - b.index(indexPrefix + "*") - .query(buildQuery(request)) - .trackTotalHits(th -> th.enabled(true)) - .size(size) - .from(request.offset()) - .sort(s -> s.field(f -> f - .field(request.sortColumn()) - .order("asc".equalsIgnoreCase(request.sortDir()) - ? SortOrder.Asc : SortOrder.Desc))); - // Add highlight when full-text search is active - if (request.text() != null && !request.text().isBlank()) { - b.highlight(h -> { - for (String field : HIGHLIGHT_FIELDS) { - h.fields(field, hf -> hf - .fragmentSize(120) - .numberOfFragments(1)); - } - return h; - }); - } - return b; - }); - } - - private Query buildQuery(SearchRequest request) { - List must = new ArrayList<>(); - List filter = new ArrayList<>(); - - // Time range - if (request.timeFrom() != null || request.timeTo() != null) { - filter.add(Query.of(q -> q.range(r -> { - r.field("start_time"); - if (request.timeFrom() != null) - r.gte(JsonData.of(request.timeFrom().toString())); - if (request.timeTo() != null) - r.lte(JsonData.of(request.timeTo().toString())); - return r; - }))); - } - - // Keyword filters (use .keyword sub-field for exact matching on dynamically mapped text fields) - if (request.status() != null && !request.status().isBlank()) { - String[] statuses = request.status().split(","); - if (statuses.length == 1) { - filter.add(termQuery("status.keyword", statuses[0].trim())); - } else { - filter.add(Query.of(q -> q.terms(t -> t - .field("status.keyword") - .terms(tv -> tv.value( - java.util.Arrays.stream(statuses) - .map(String::trim) - .map(FieldValue::of) - .toList()))))); - } - } - if (request.routeId() != null) - filter.add(termQuery("route_id.keyword", request.routeId())); - if (request.instanceId() != null) - filter.add(termQuery("instance_id.keyword", request.instanceId())); - if (request.correlationId() != null) - filter.add(termQuery("correlation_id.keyword", request.correlationId())); - if (request.applicationId() != null && !request.applicationId().isBlank()) - filter.add(termQuery("application_id.keyword", request.applicationId())); - - // Full-text search across all fields + nested processor fields - if (request.text() != null && !request.text().isBlank()) { - String text = request.text(); - String wildcard = "*" + text.toLowerCase() + "*"; - List textQueries = new ArrayList<>(); - - // Search top-level text fields (analyzed match + wildcard for substring) - textQueries.add(Query.of(q -> q.multiMatch(m -> m - .query(text) - .fields("error_message", "error_stacktrace", "attributes_text")))); - textQueries.add(Query.of(q -> q.wildcard(w -> w - .field("error_message").value(wildcard).caseInsensitive(true)))); - textQueries.add(Query.of(q -> q.wildcard(w -> w - .field("error_stacktrace").value(wildcard).caseInsensitive(true)))); - textQueries.add(Query.of(q -> q.wildcard(w -> w - .field("attributes_text").value(wildcard).caseInsensitive(true)))); - - // Search nested processor fields (analyzed match + wildcard) - textQueries.add(Query.of(q -> q.nested(n -> n - .path("processors") - .query(nq -> nq.multiMatch(m -> m - .query(text) - .fields("processors.input_body", "processors.output_body", - "processors.input_headers", "processors.output_headers", - "processors.error_message", "processors.error_stacktrace", - "processors.attributes_text")))))); - textQueries.add(Query.of(q -> q.nested(n -> n - .path("processors") - .query(nq -> nq.bool(nb -> nb.should( - wildcardQuery("processors.input_body", wildcard), - wildcardQuery("processors.output_body", wildcard), - wildcardQuery("processors.input_headers", wildcard), - wildcardQuery("processors.output_headers", wildcard), - wildcardQuery("processors.attributes_text", wildcard) - ).minimumShouldMatch("1")))))); - - // Also try keyword fields for exact matches - textQueries.add(Query.of(q -> q.multiMatch(m -> m - .query(text) - .fields("execution_id", "route_id", "instance_id", "correlation_id", "exchange_id")))); - - must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1")))); - } - - // Scoped text searches (multiMatch + wildcard fallback for substring matching) - if (request.textInBody() != null && !request.textInBody().isBlank()) { - String bodyText = request.textInBody(); - String bodyWildcard = "*" + bodyText.toLowerCase() + "*"; - must.add(Query.of(q -> q.nested(n -> n - .path("processors") - .query(nq -> nq.bool(nb -> nb.should( - Query.of(mq -> mq.multiMatch(m -> m - .query(bodyText) - .fields("processors.input_body", "processors.output_body"))), - wildcardQuery("processors.input_body", bodyWildcard), - wildcardQuery("processors.output_body", bodyWildcard) - ).minimumShouldMatch("1")))))); - } - if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) { - String headerText = request.textInHeaders(); - String headerWildcard = "*" + headerText.toLowerCase() + "*"; - must.add(Query.of(q -> q.nested(n -> n - .path("processors") - .query(nq -> nq.bool(nb -> nb.should( - Query.of(mq -> mq.multiMatch(m -> m - .query(headerText) - .fields("processors.input_headers", "processors.output_headers"))), - wildcardQuery("processors.input_headers", headerWildcard), - wildcardQuery("processors.output_headers", headerWildcard) - ).minimumShouldMatch("1")))))); - } - if (request.textInErrors() != null && !request.textInErrors().isBlank()) { - String errText = request.textInErrors(); - String errWildcard = "*" + errText.toLowerCase() + "*"; - must.add(Query.of(q -> q.bool(b -> b.should( - Query.of(sq -> sq.multiMatch(m -> m - .query(errText) - .fields("error_message", "error_stacktrace"))), - wildcardQuery("error_message", errWildcard), - wildcardQuery("error_stacktrace", errWildcard), - Query.of(sq -> sq.nested(n -> n - .path("processors") - .query(nq -> nq.bool(nb -> nb.should( - Query.of(nmq -> nmq.multiMatch(m -> m - .query(errText) - .fields("processors.error_message", "processors.error_stacktrace"))), - wildcardQuery("processors.error_message", errWildcard), - wildcardQuery("processors.error_stacktrace", errWildcard) - ).minimumShouldMatch("1"))))) - ).minimumShouldMatch("1")))); - } - - // Duration range - if (request.durationMin() != null || request.durationMax() != null) { - filter.add(Query.of(q -> q.range(r -> { - r.field("duration_ms"); - if (request.durationMin() != null) - r.gte(JsonData.of(request.durationMin())); - if (request.durationMax() != null) - r.lte(JsonData.of(request.durationMax())); - return r; - }))); - } - - return Query.of(q -> q.bool(b -> { - if (!must.isEmpty()) b.must(must); - if (!filter.isEmpty()) b.filter(filter); - if (must.isEmpty() && filter.isEmpty()) b.must(Query.of(mq -> mq.matchAll(m -> m))); - return b; - })); - } - - private Query termQuery(String field, String value) { - return Query.of(q -> q.term(t -> t.field(field).value(FieldValue.of(value)))); - } - - private Query wildcardQuery(String field, String pattern) { - return Query.of(q -> q.wildcard(w -> w.field(field).value(pattern).caseInsensitive(true))); - } - - private Map toMap(ExecutionDocument doc) { - Map map = new LinkedHashMap<>(); - map.put("execution_id", doc.executionId()); - map.put("route_id", doc.routeId()); - map.put("instance_id", doc.instanceId()); - map.put("application_id", doc.applicationId()); - map.put("status", doc.status()); - map.put("correlation_id", doc.correlationId()); - map.put("exchange_id", doc.exchangeId()); - map.put("start_time", doc.startTime() != null ? doc.startTime().toString() : null); - map.put("end_time", doc.endTime() != null ? doc.endTime().toString() : null); - map.put("duration_ms", doc.durationMs()); - map.put("error_message", doc.errorMessage()); - map.put("error_stacktrace", doc.errorStacktrace()); - if (doc.attributes() != null) { - Map attrs = parseAttributesJson(doc.attributes()); - map.put("attributes", attrs); - map.put("attributes_text", flattenAttributes(attrs)); - } - if (doc.processors() != null) { - map.put("processors", doc.processors().stream().map(p -> { - Map pm = new LinkedHashMap<>(); - pm.put("processor_id", p.processorId()); - pm.put("processor_type", p.processorType()); - pm.put("status", p.status()); - pm.put("error_message", p.errorMessage()); - pm.put("error_stacktrace", p.errorStacktrace()); - pm.put("input_body", p.inputBody()); - pm.put("output_body", p.outputBody()); - pm.put("input_headers", p.inputHeaders()); - pm.put("output_headers", p.outputHeaders()); - if (p.attributes() != null) { - Map pAttrs = parseAttributesJson(p.attributes()); - pm.put("attributes", pAttrs); - pm.put("attributes_text", flattenAttributes(pAttrs)); - } - return pm; - }).toList()); - } - map.put("has_trace_data", doc.hasTraceData()); - map.put("is_replay", doc.isReplay()); - return map; - } - - @SuppressWarnings("unchecked") - private ExecutionSummary hitToSummary(Hit hit) { - Map src = hit.source(); - if (src == null) return null; - @SuppressWarnings("unchecked") - Map attributes = src.get("attributes") instanceof Map - ? new LinkedHashMap<>((Map) src.get("attributes")) : null; - // Merge processor-level attributes (execution-level takes precedence) - if (src.get("processors") instanceof List procs) { - for (Object pObj : procs) { - if (pObj instanceof Map pm && pm.get("attributes") instanceof Map pa) { - if (attributes == null) attributes = new LinkedHashMap<>(); - for (var entry : pa.entrySet()) { - attributes.putIfAbsent( - String.valueOf(entry.getKey()), - String.valueOf(entry.getValue())); - } - } - } - } - return new ExecutionSummary( - (String) src.get("execution_id"), - (String) src.get("route_id"), - (String) src.get("instance_id"), - (String) src.get("application_id"), - (String) src.get("status"), - src.get("start_time") != null ? Instant.parse((String) src.get("start_time")) : null, - src.get("end_time") != null ? Instant.parse((String) src.get("end_time")) : null, - src.get("duration_ms") != null ? ((Number) src.get("duration_ms")).longValue() : 0L, - (String) src.get("correlation_id"), - (String) src.get("error_message"), - null, // diagramContentHash not stored in index - extractHighlight(hit), - attributes, - Boolean.TRUE.equals(src.get("has_trace_data")), - Boolean.TRUE.equals(src.get("is_replay")) - ); - } - - private String extractHighlight(Hit hit) { - if (hit.highlight() == null || hit.highlight().isEmpty()) return null; - for (List fragments : hit.highlight().values()) { - if (fragments != null && !fragments.isEmpty()) { - return fragments.get(0); - } - } - return null; - } - - private static Map parseAttributesJson(String json) { - if (json == null || json.isBlank()) return null; - try { - return JSON.readValue(json, STR_MAP); - } catch (Exception e) { - return null; - } - } - - private static String flattenAttributes(Map attrs) { - if (attrs == null || attrs.isEmpty()) return ""; - return attrs.entrySet().stream() - .map(e -> e.getKey() + "=" + e.getValue()) - .collect(Collectors.joining(" ")); - } -} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchLogIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchLogIndex.java deleted file mode 100644 index a2402f9b..00000000 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchLogIndex.java +++ /dev/null @@ -1,228 +0,0 @@ -package com.cameleer3.server.app.search; - -import com.cameleer3.common.model.LogEntry; -import com.cameleer3.server.core.storage.LogEntryResult; -import com.cameleer3.server.core.storage.LogIndex; -import jakarta.annotation.PostConstruct; -import org.opensearch.client.json.JsonData; -import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.client.opensearch._types.FieldValue; -import org.opensearch.client.opensearch._types.SortOrder; -import org.opensearch.client.opensearch._types.mapping.Property; -import org.opensearch.client.opensearch._types.query_dsl.BoolQuery; -import org.opensearch.client.opensearch._types.query_dsl.Query; -import org.opensearch.client.opensearch.core.BulkRequest; -import org.opensearch.client.opensearch.core.BulkResponse; -import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; -import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest; -import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Repository; - -import java.io.IOException; -import java.time.Instant; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -@Repository -@ConditionalOnProperty(name = "cameleer.storage.logs", havingValue = "opensearch") -public class OpenSearchLogIndex implements LogIndex { - - private static final Logger log = LoggerFactory.getLogger(OpenSearchLogIndex.class); - private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd") - .withZone(ZoneOffset.UTC); - - private final OpenSearchClient client; - private final String indexPrefix; - private final int retentionDays; - - public OpenSearchLogIndex(OpenSearchClient client, - @Value("${opensearch.log-index-prefix:logs-}") String indexPrefix, - @Value("${opensearch.log-retention-days:7}") int retentionDays) { - this.client = client; - this.indexPrefix = indexPrefix; - this.retentionDays = retentionDays; - } - - @PostConstruct - void init() { - ensureIndexTemplate(); - ensureIsmPolicy(); - } - - private void ensureIndexTemplate() { - String templateName = indexPrefix.replace("-", "") + "-template"; - String indexPattern = indexPrefix + "*"; - try { - boolean exists = client.indices().existsIndexTemplate( - ExistsIndexTemplateRequest.of(b -> b.name(templateName))).value(); - if (!exists) { - client.indices().putIndexTemplate(PutIndexTemplateRequest.of(b -> b - .name(templateName) - .indexPatterns(List.of(indexPattern)) - .template(t -> t - .settings(s -> s - .numberOfShards("1") - .numberOfReplicas("1")) - .mappings(m -> m - .properties("@timestamp", Property.of(p -> p.date(d -> d))) - .properties("level", Property.of(p -> p.keyword(k -> k))) - .properties("loggerName", Property.of(p -> p.keyword(k -> k))) - .properties("message", Property.of(p -> p.text(tx -> tx))) - .properties("threadName", Property.of(p -> p.keyword(k -> k))) - .properties("stackTrace", Property.of(p -> p.text(tx -> tx))) - .properties("instanceId", Property.of(p -> p.keyword(k -> k))) - .properties("applicationId", Property.of(p -> p.keyword(k -> k))) - .properties("exchangeId", Property.of(p -> p.keyword(k -> k))))))); - log.info("OpenSearch log index template '{}' created", templateName); - } - } catch (IOException e) { - log.error("Failed to create log index template", e); - } - } - - private void ensureIsmPolicy() { - String policyId = "logs-retention"; - try { - // Use the low-level REST client to manage ISM policies - var restClient = client._transport(); - // Check if the ISM policy exists via a GET; create if not - // ISM is managed via the _plugins/_ism/policies API - // For now, log a reminder — ISM policy should be created via OpenSearch API or dashboard - log.info("Log retention policy: indices matching '{}*' should be deleted after {} days. " + - "Ensure ISM policy '{}' is configured in OpenSearch.", indexPrefix, retentionDays, policyId); - } catch (Exception e) { - log.warn("Could not verify ISM policy for log retention", e); - } - } - - @Override - public List search(String applicationId, String instanceId, String level, - String query, String exchangeId, - Instant from, Instant to, int limit) { - try { - BoolQuery.Builder bool = new BoolQuery.Builder(); - bool.must(Query.of(q -> q.term(t -> t.field("applicationId").value(FieldValue.of(applicationId))))); - if (instanceId != null && !instanceId.isEmpty()) { - bool.must(Query.of(q -> q.term(t -> t.field("instanceId").value(FieldValue.of(instanceId))))); - } - if (exchangeId != null && !exchangeId.isEmpty()) { - // Match on top-level field (new records) or MDC nested field (old records) - bool.must(Query.of(q -> q.bool(b -> b - .should(Query.of(s -> s.term(t -> t.field("exchangeId.keyword").value(FieldValue.of(exchangeId))))) - .should(Query.of(s -> s.term(t -> t.field("mdc.camel.exchangeId.keyword").value(FieldValue.of(exchangeId))))) - .minimumShouldMatch("1")))); - } - if (level != null && !level.isEmpty()) { - bool.must(Query.of(q -> q.term(t -> t.field("level").value(FieldValue.of(level.toUpperCase()))))); - } - if (query != null && !query.isEmpty()) { - bool.must(Query.of(q -> q.match(m -> m.field("message").query(FieldValue.of(query))))); - } - if (from != null || to != null) { - bool.must(Query.of(q -> q.range(r -> { - r.field("@timestamp"); - if (from != null) r.gte(JsonData.of(from.toString())); - if (to != null) r.lte(JsonData.of(to.toString())); - return r; - }))); - } - - var response = client.search(s -> s - .index(indexPrefix + "*") - .query(Query.of(q -> q.bool(bool.build()))) - .sort(so -> so.field(f -> f.field("@timestamp").order(SortOrder.Desc))) - .size(limit), Map.class); - - List results = new ArrayList<>(); - for (var hit : response.hits().hits()) { - @SuppressWarnings("unchecked") - Map src = (Map) hit.source(); - if (src == null) continue; - results.add(new LogEntryResult( - str(src, "@timestamp"), - str(src, "level"), - str(src, "loggerName"), - str(src, "message"), - str(src, "threadName"), - str(src, "stackTrace"))); - } - return results; - } catch (IOException e) { - log.error("Failed to search log entries for application={}", applicationId, e); - return List.of(); - } - } - - private static String str(Map map, String key) { - Object v = map.get(key); - return v != null ? v.toString() : null; - } - - @Override - public void indexBatch(String instanceId, String applicationId, List entries) { - if (entries == null || entries.isEmpty()) { - return; - } - - try { - BulkRequest.Builder bulkBuilder = new BulkRequest.Builder(); - - for (LogEntry entry : entries) { - String indexName = indexPrefix + DAY_FMT.format( - entry.getTimestamp() != null ? entry.getTimestamp() : java.time.Instant.now()); - - Map doc = toMap(entry, instanceId, applicationId); - - bulkBuilder.operations(op -> op - .index(idx -> idx - .index(indexName) - .document(doc))); - } - - BulkResponse response = client.bulk(bulkBuilder.build()); - - if (response.errors()) { - int errorCount = 0; - for (BulkResponseItem item : response.items()) { - if (item.error() != null) { - errorCount++; - if (errorCount == 1) { - log.error("Bulk log index error: {}", item.error().reason()); - } - } - } - log.error("Bulk log indexing had {} error(s) out of {} entries", errorCount, entries.size()); - } else { - log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId); - } - } catch (IOException e) { - log.error("Failed to bulk index {} log entries for instance={}", entries.size(), instanceId, e); - } - } - - private Map toMap(LogEntry entry, String instanceId, String applicationId) { - Map doc = new LinkedHashMap<>(); - doc.put("@timestamp", entry.getTimestamp() != null ? entry.getTimestamp().toString() : null); - doc.put("level", entry.getLevel()); - doc.put("loggerName", entry.getLoggerName()); - doc.put("message", entry.getMessage()); - doc.put("threadName", entry.getThreadName()); - doc.put("stackTrace", entry.getStackTrace()); - doc.put("mdc", entry.getMdc()); - doc.put("instanceId", instanceId); - doc.put("applicationId", applicationId); - if (entry.getMdc() != null) { - String exId = entry.getMdc().get("camel.exchangeId"); - if (exId != null) doc.put("exchangeId", exId); - } - return doc; - } -} diff --git a/cameleer3-server-app/src/main/resources/application.yml b/cameleer3-server-app/src/main/resources/application.yml index dd0553b8..8abf9284 100644 --- a/cameleer3-server-app/src/main/resources/application.yml +++ b/cameleer3-server-app/src/main/resources/application.yml @@ -37,20 +37,15 @@ ingestion: batch-size: 5000 flush-interval-ms: 1000 -opensearch: - url: ${OPENSEARCH_URL:http://localhost:9200} - index-prefix: ${CAMELEER_OPENSEARCH_INDEX_PREFIX:executions-} - queue-size: ${CAMELEER_OPENSEARCH_QUEUE_SIZE:10000} - debounce-ms: ${CAMELEER_OPENSEARCH_DEBOUNCE_MS:2000} - log-index-prefix: ${CAMELEER_LOG_INDEX_PREFIX:logs-} - log-retention-days: ${CAMELEER_LOG_RETENTION_DAYS:7} - cameleer: body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384} + indexer: + debounce-ms: ${CAMELEER_INDEXER_DEBOUNCE_MS:2000} + queue-size: ${CAMELEER_INDEXER_QUEUE_SIZE:10000} retention-days: ${CAMELEER_RETENTION_DAYS:30} storage: metrics: ${CAMELEER_STORAGE_METRICS:postgres} - search: ${CAMELEER_STORAGE_SEARCH:opensearch} + search: ${CAMELEER_STORAGE_SEARCH:clickhouse} stats: ${CAMELEER_STORAGE_STATS:clickhouse} diagrams: ${CAMELEER_STORAGE_DIAGRAMS:clickhouse} events: ${CAMELEER_STORAGE_EVENTS:clickhouse} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java index cf7d8c38..8a59a6db 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/AbstractPostgresIT.java @@ -1,6 +1,5 @@ package com.cameleer3.server.app; -import org.opensearch.testcontainers.OpensearchContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.jdbc.core.JdbcTemplate; @@ -20,7 +19,6 @@ public abstract class AbstractPostgresIT { .asCompatibleSubstituteFor("postgres"); static final PostgreSQLContainer postgres; - static final OpensearchContainer opensearch; static final ClickHouseContainer clickhouse; static { @@ -30,9 +28,6 @@ public abstract class AbstractPostgresIT { .withPassword("test"); postgres.start(); - opensearch = new OpensearchContainer<>("opensearchproject/opensearch:2.19.0"); - opensearch.start(); - clickhouse = new ClickHouseContainer("clickhouse/clickhouse-server:24.12"); clickhouse.start(); } @@ -50,7 +45,6 @@ public abstract class AbstractPostgresIT { registry.add("spring.flyway.url", postgres::getJdbcUrl); registry.add("spring.flyway.user", postgres::getUsername); registry.add("spring.flyway.password", postgres::getPassword); - registry.add("opensearch.url", opensearch::getHttpHostAddress); registry.add("clickhouse.enabled", () -> "true"); registry.add("clickhouse.url", clickhouse::getJdbcUrl); registry.add("clickhouse.username", clickhouse::getUsername); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenSearchAdminControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenSearchAdminControllerIT.java deleted file mode 100644 index 0f5284dc..00000000 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/OpenSearchAdminControllerIT.java +++ /dev/null @@ -1,112 +0,0 @@ -package com.cameleer3.server.app.controller; - -import com.cameleer3.server.app.AbstractPostgresIT; -import com.cameleer3.server.app.TestSecurityHelper; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.web.client.TestRestTemplate; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpMethod; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; - -import static org.assertj.core.api.Assertions.assertThat; - -class OpenSearchAdminControllerIT extends AbstractPostgresIT { - - @Autowired - private TestRestTemplate restTemplate; - - @Autowired - private ObjectMapper objectMapper; - - @Autowired - private TestSecurityHelper securityHelper; - - private String adminJwt; - private String viewerJwt; - - @BeforeEach - void setUp() { - adminJwt = securityHelper.adminToken(); - viewerJwt = securityHelper.viewerToken(); - } - - @Test - void getStatus_asAdmin_returns200() throws Exception { - ResponseEntity response = restTemplate.exchange( - "/api/v1/admin/opensearch/status", HttpMethod.GET, - new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)), - String.class); - - assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); - JsonNode body = objectMapper.readTree(response.getBody()); - assertThat(body.get("reachable").asBoolean()).isTrue(); - assertThat(body.has("clusterHealth")).isTrue(); - assertThat(body.has("version")).isTrue(); - } - - @Test - void getStatus_asViewer_returns403() { - ResponseEntity response = restTemplate.exchange( - "/api/v1/admin/opensearch/status", HttpMethod.GET, - new HttpEntity<>(securityHelper.authHeadersNoBody(viewerJwt)), - String.class); - - assertThat(response.getStatusCode()).isEqualTo(HttpStatus.FORBIDDEN); - } - - @Test - void getPipeline_asAdmin_returns200() throws Exception { - ResponseEntity response = restTemplate.exchange( - "/api/v1/admin/opensearch/pipeline", HttpMethod.GET, - new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)), - String.class); - - assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); - JsonNode body = objectMapper.readTree(response.getBody()); - assertThat(body.has("queueDepth")).isTrue(); - assertThat(body.has("maxQueueSize")).isTrue(); - assertThat(body.has("indexedCount")).isTrue(); - } - - @Test - void getIndices_asAdmin_returns200() throws Exception { - ResponseEntity response = restTemplate.exchange( - "/api/v1/admin/opensearch/indices", HttpMethod.GET, - new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)), - String.class); - - assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); - JsonNode body = objectMapper.readTree(response.getBody()); - assertThat(body.has("indices")).isTrue(); - assertThat(body.has("totalIndices")).isTrue(); - assertThat(body.has("page")).isTrue(); - } - - @Test - void deleteIndex_nonExistent_returns404() { - ResponseEntity response = restTemplate.exchange( - "/api/v1/admin/opensearch/indices/nonexistent-index-xyz", HttpMethod.DELETE, - new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)), - String.class); - - assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND); - } - - @Test - void getPerformance_asAdmin_returns200() throws Exception { - ResponseEntity response = restTemplate.exchange( - "/api/v1/admin/opensearch/performance", HttpMethod.GET, - new HttpEntity<>(securityHelper.authHeadersNoBody(adminJwt)), - String.class); - - assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); - JsonNode body = objectMapper.readTree(response.getBody()); - assertThat(body.has("queryCacheHitRate")).isTrue(); - assertThat(body.has("jvmHeapUsedBytes")).isTrue(); - } -} diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java index 6a21552f..b77d869f 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/SearchControllerIT.java @@ -161,7 +161,7 @@ class SearchControllerIT extends AbstractPostgresIT { Integer.class); assertThat(count).isEqualTo(10); - // Wait for async OpenSearch indexing (debounce + index time) + // Wait for async search indexing (debounce + index time) // Check for last seeded execution specifically to avoid false positives from other test classes await().atMost(30, SECONDS).untilAsserted(() -> { ResponseEntity r = searchGet("?correlationId=corr-page-10"); diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ThresholdAdminControllerIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ThresholdAdminControllerIT.java index 0329e880..a8f4365b 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ThresholdAdminControllerIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/controller/ThresholdAdminControllerIT.java @@ -46,7 +46,6 @@ class ThresholdAdminControllerIT extends AbstractPostgresIT { assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); JsonNode body = objectMapper.readTree(response.getBody()); assertThat(body.has("database")).isTrue(); - assertThat(body.has("opensearch")).isTrue(); assertThat(body.path("database").path("connectionPoolWarning").asInt()).isEqualTo(80); } @@ -69,16 +68,6 @@ class ThresholdAdminControllerIT extends AbstractPostgresIT { "connectionPoolCritical": 90, "queryDurationWarning": 2.0, "queryDurationCritical": 15.0 - }, - "opensearch": { - "clusterHealthWarning": "YELLOW", - "clusterHealthCritical": "RED", - "queueDepthWarning": 200, - "queueDepthCritical": 1000, - "jvmHeapWarning": 80, - "jvmHeapCritical": 95, - "failedDocsWarning": 5, - "failedDocsCritical": 20 } } """; @@ -102,16 +91,6 @@ class ThresholdAdminControllerIT extends AbstractPostgresIT { "connectionPoolCritical": 80, "queryDurationWarning": 2.0, "queryDurationCritical": 15.0 - }, - "opensearch": { - "clusterHealthWarning": "YELLOW", - "clusterHealthCritical": "RED", - "queueDepthWarning": 100, - "queueDepthCritical": 500, - "jvmHeapWarning": 75, - "jvmHeapCritical": 90, - "failedDocsWarning": 1, - "failedDocsCritical": 10 } } """; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java deleted file mode 100644 index 894b2246..00000000 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/OpenSearchIndexIT.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.cameleer3.server.app.search; - -import com.cameleer3.server.app.AbstractPostgresIT; -import com.cameleer3.server.core.search.ExecutionSummary; -import com.cameleer3.server.core.search.SearchRequest; -import com.cameleer3.server.core.search.SearchResult; -import com.cameleer3.server.core.storage.SearchIndex; -import com.cameleer3.server.core.storage.model.ExecutionDocument; -import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; -import org.junit.jupiter.api.Test; -import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.client.opensearch.indices.RefreshRequest; -import org.springframework.beans.factory.annotation.Autowired; - -import java.time.Instant; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.*; - -// Extends AbstractPostgresIT which provides both PostgreSQL and OpenSearch testcontainers -class OpenSearchIndexIT extends AbstractPostgresIT { - - @Autowired - SearchIndex searchIndex; - - @Autowired - OpenSearchClient openSearchClient; - - @Test - void indexAndSearchByText() throws Exception { - Instant now = Instant.now(); - ExecutionDocument doc = new ExecutionDocument( - "search-1", "route-a", "agent-1", "app-1", - "FAILED", "corr-1", "exch-1", - now, now.plusMillis(100), 100L, - "OrderNotFoundException: order-12345 not found", null, - List.of(new ProcessorDoc("proc-1", "log", "COMPLETED", - null, null, "request body with customer-99", null, null, null, null)), - null, false, false); - - searchIndex.index(doc); - refreshOpenSearchIndices(); - - SearchRequest request = new SearchRequest( - null, now.minusSeconds(60), now.plusSeconds(60), - null, null, null, - "OrderNotFoundException", null, null, null, - null, null, null, null, null, - 0, 50, "startTime", "desc"); - - SearchResult result = searchIndex.search(request); - assertTrue(result.total() > 0); - assertEquals("search-1", result.data().get(0).executionId()); - } - - @Test - void wildcardSearchFindsSubstring() throws Exception { - Instant now = Instant.now(); - ExecutionDocument doc = new ExecutionDocument( - "wild-1", "route-b", "agent-1", "app-1", - "COMPLETED", null, null, - now, now.plusMillis(50), 50L, null, null, - List.of(new ProcessorDoc("proc-1", "bean", "COMPLETED", - null, null, "UniquePayloadIdentifier12345", null, null, null, null)), - null, false, false); - - searchIndex.index(doc); - refreshOpenSearchIndices(); - - SearchRequest request = new SearchRequest( - null, now.minusSeconds(60), now.plusSeconds(60), - null, null, null, - "PayloadIdentifier", null, null, null, - null, null, null, null, null, - 0, 50, "startTime", "desc"); - - SearchResult result = searchIndex.search(request); - assertTrue(result.total() > 0); - } - - private void refreshOpenSearchIndices() throws Exception { - openSearchClient.indices().refresh(RefreshRequest.of(r -> r.index("executions-*"))); - } -} diff --git a/cameleer3-server-app/src/test/resources/application-test.yml b/cameleer3-server-app/src/test/resources/application-test.yml index 8a6708b5..e17b8027 100644 --- a/cameleer3-server-app/src/test/resources/application-test.yml +++ b/cameleer3-server-app/src/test/resources/application-test.yml @@ -2,9 +2,9 @@ spring: flyway: enabled: true -opensearch: - url: http://localhost:9200 - debounce-ms: 100 +cameleer: + indexer: + debounce-ms: 100 ingestion: buffer-capacity: 100 diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/admin/ThresholdConfig.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/admin/ThresholdConfig.java index 58714c28..babcd89f 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/admin/ThresholdConfig.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/admin/ThresholdConfig.java @@ -1,8 +1,7 @@ package com.cameleer3.server.core.admin; public record ThresholdConfig( - DatabaseThresholds database, - OpenSearchThresholds opensearch + DatabaseThresholds database ) { public record DatabaseThresholds( int connectionPoolWarning, @@ -15,22 +14,7 @@ public record ThresholdConfig( } } - public record OpenSearchThresholds( - String clusterHealthWarning, - String clusterHealthCritical, - int queueDepthWarning, - int queueDepthCritical, - int jvmHeapWarning, - int jvmHeapCritical, - int failedDocsWarning, - int failedDocsCritical - ) { - public static OpenSearchThresholds defaults() { - return new OpenSearchThresholds("YELLOW", "RED", 100, 500, 75, 90, 1, 10); - } - } - public static ThresholdConfig defaults() { - return new ThresholdConfig(DatabaseThresholds.defaults(), OpenSearchThresholds.defaults()); + return new ThresholdConfig(DatabaseThresholds.defaults()); } } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java index db4e6718..38f315d8 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/search/SearchRequest.java @@ -59,8 +59,7 @@ public record SearchRequest( "durationMs", "executionId", "applicationId" ); - /** Maps camelCase API sort field names to OpenSearch field names. - * Text fields use .keyword subfield; date/numeric fields are used directly. */ + /** Maps camelCase API sort field names to storage column names. */ private static final java.util.Map SORT_FIELD_TO_COLUMN = java.util.Map.ofEntries( java.util.Map.entry("startTime", "start_time"), java.util.Map.entry("durationMs", "duration_ms"), @@ -80,7 +79,7 @@ public record SearchRequest( if (!"asc".equalsIgnoreCase(sortDir)) sortDir = "desc"; } - /** Returns the snake_case column name for OpenSearch/DB ORDER BY. */ + /** Returns the snake_case column name for ORDER BY. */ public String sortColumn() { return SORT_FIELD_TO_COLUMN.getOrDefault(sortField, "start_time"); } diff --git a/deploy/base/server.yaml b/deploy/base/server.yaml index ceaf7676..6f8fc23b 100644 --- a/deploy/base/server.yaml +++ b/deploy/base/server.yaml @@ -46,10 +46,6 @@ spec: secretKeyRef: name: postgres-credentials key: POSTGRES_PASSWORD - - name: OPENSEARCH_URL - value: "http://opensearch.cameleer.svc.cluster.local:9200" - - name: CAMELEER_OPENSEARCH_INDEX_PREFIX - value: "executions-" - name: CAMELEER_AUTH_TOKEN valueFrom: secretKeyRef: diff --git a/deploy/opensearch.yaml b/deploy/opensearch.yaml deleted file mode 100644 index c89f3110..00000000 --- a/deploy/opensearch.yaml +++ /dev/null @@ -1,98 +0,0 @@ -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: opensearch - namespace: cameleer -spec: - serviceName: opensearch - replicas: 1 - selector: - matchLabels: - app: opensearch - template: - metadata: - labels: - app: opensearch - spec: - containers: - - name: opensearch - image: opensearchproject/opensearch:2.19.0 - ports: - - containerPort: 9200 - name: http - - containerPort: 9300 - name: transport - env: - - name: discovery.type - value: single-node - - name: DISABLE_SECURITY_PLUGIN - value: "true" - volumeMounts: - - name: data - mountPath: /usr/share/opensearch/data - resources: - requests: - memory: "1Gi" - cpu: "200m" - limits: - memory: "4Gi" - cpu: "1000m" - livenessProbe: - exec: - command: - - sh - - -c - - curl -s http://localhost:9200/_cluster/health - initialDelaySeconds: 30 - periodSeconds: 10 - timeoutSeconds: 5 - failureThreshold: 3 - readinessProbe: - exec: - command: - - sh - - -c - - curl -s http://localhost:9200/_cluster/health - initialDelaySeconds: 15 - periodSeconds: 10 - timeoutSeconds: 5 - failureThreshold: 3 - volumeClaimTemplates: - - metadata: - name: data - spec: - accessModes: ["ReadWriteOnce"] - resources: - requests: - storage: 10Gi ---- -apiVersion: v1 -kind: Service -metadata: - name: opensearch - namespace: cameleer -spec: - clusterIP: None - selector: - app: opensearch - ports: - - port: 9200 - targetPort: 9200 - name: http - - port: 9300 - targetPort: 9300 - name: transport ---- -apiVersion: v1 -kind: Service -metadata: - name: opensearch-external - namespace: cameleer -spec: - type: NodePort - selector: - app: opensearch - ports: - - port: 9200 - targetPort: 9200 - nodePort: 30920 diff --git a/deploy/overlays/feature/kustomization.yaml b/deploy/overlays/feature/kustomization.yaml index cc957721..f3eec3b2 100644 --- a/deploy/overlays/feature/kustomization.yaml +++ b/deploy/overlays/feature/kustomization.yaml @@ -25,8 +25,6 @@ patches: env: - name: CAMELEER_DB_SCHEMA value: "BRANCH_SCHEMA" - - name: CAMELEER_OPENSEARCH_INDEX_PREFIX - value: "cam-BRANCH_SLUG-executions-" - name: CAMELEER_UI_ORIGIN value: "http://BRANCH_SLUG.cameleer.siegeln.net" # UI ConfigMap: branch-specific API URL diff --git a/deploy/overlays/main/kustomization.yaml b/deploy/overlays/main/kustomization.yaml index b9c86579..5d397714 100644 --- a/deploy/overlays/main/kustomization.yaml +++ b/deploy/overlays/main/kustomization.yaml @@ -40,8 +40,6 @@ patches: env: - name: SPRING_DATASOURCE_URL value: "jdbc:postgresql://postgres:5432/cameleer3?currentSchema=public" - - name: OPENSEARCH_URL - value: "http://opensearch:9200" - name: CAMELEER_UI_ORIGIN value: "http://192.168.50.86:30090" # UI ConfigMap: production API URL diff --git a/docker-compose.yml b/docker-compose.yml index c5698b23..1cd5d6d5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,18 +10,5 @@ services: volumes: - pgdata:/home/postgres/pgdata/data - opensearch: - image: opensearchproject/opensearch:2.19.0 - ports: - - "9200:9200" - - "9300:9300" - environment: - discovery.type: single-node - DISABLE_SECURITY_PLUGIN: "true" - OPENSEARCH_JAVA_OPTS: "-Xms512m -Xmx512m" - volumes: - - osdata:/usr/share/opensearch/data - volumes: pgdata: - osdata: diff --git a/ui/src/api/queries/admin/clickhouse.ts b/ui/src/api/queries/admin/clickhouse.ts new file mode 100644 index 00000000..d8aeb687 --- /dev/null +++ b/ui/src/api/queries/admin/clickhouse.ts @@ -0,0 +1,77 @@ +import { useQuery } from '@tanstack/react-query'; +import { adminFetch } from './admin-api'; +import { useRefreshInterval } from '../use-refresh-interval'; + +// ── Types ────────────────────────────────────────────────────────────── + +export interface ClickHouseStatus { + reachable: boolean; + version: string | null; + uptime: string | null; + host: string | null; +} + +export interface ClickHouseTableInfo { + name: string; + engine: string; + rowCount: number; + dataSize: string; + dataSizeBytes: number; + partitionCount: number; +} + +export interface ClickHousePerformance { + queryCount: number; + insertQueryCount: number; + memoryUsage: string; + insertedRows: number; + readRows: number; +} + +export interface IndexerPipeline { + queueDepth: number; + maxQueueSize: number; + failedCount: number; + indexedCount: number; + debounceMs: number; + indexingRate: number; + lastIndexedAt: string | null; +} + +// ── Query Hooks ──────────────────────────────────────────────────────── + +export function useClickHouseStatus() { + const refetchInterval = useRefreshInterval(30_000); + return useQuery({ + queryKey: ['admin', 'clickhouse', 'status'], + queryFn: () => adminFetch('/clickhouse/status'), + refetchInterval, + }); +} + +export function useClickHouseTables() { + const refetchInterval = useRefreshInterval(60_000); + return useQuery({ + queryKey: ['admin', 'clickhouse', 'tables'], + queryFn: () => adminFetch('/clickhouse/tables'), + refetchInterval, + }); +} + +export function useClickHousePerformance() { + const refetchInterval = useRefreshInterval(30_000); + return useQuery({ + queryKey: ['admin', 'clickhouse', 'performance'], + queryFn: () => adminFetch('/clickhouse/performance'), + refetchInterval, + }); +} + +export function useIndexerPipeline() { + const refetchInterval = useRefreshInterval(10_000); + return useQuery({ + queryKey: ['admin', 'clickhouse', 'pipeline'], + queryFn: () => adminFetch('/clickhouse/pipeline'), + refetchInterval, + }); +} diff --git a/ui/src/api/queries/admin/opensearch.ts b/ui/src/api/queries/admin/opensearch.ts deleted file mode 100644 index 19f91b10..00000000 --- a/ui/src/api/queries/admin/opensearch.ts +++ /dev/null @@ -1,109 +0,0 @@ -import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'; -import { adminFetch } from './admin-api'; -import { useRefreshInterval } from '../use-refresh-interval'; - -// ── Types ────────────────────────────────────────────────────────────── - -export interface OpenSearchStatus { - reachable: boolean; - clusterHealth: string; - version: string | null; - nodeCount: number; - host: string; -} - -export interface PipelineStats { - queueDepth: number; - maxQueueSize: number; - failedCount: number; - indexedCount: number; - debounceMs: number; - indexingRate: number; - lastIndexedAt: string | null; -} - -export interface IndexInfo { - name: string; - docCount: number; - size: string; - sizeBytes: number; - health: string; - primaryShards: number; - replicas: number; -} - -export interface IndicesPage { - indices: IndexInfo[]; - totalIndices: number; - totalDocs: number; - totalSize: string; - page: number; - pageSize: number; - totalPages: number; -} - -export interface PerformanceStats { - queryCacheHitRate: number; - requestCacheHitRate: number; - searchLatencyMs: number; - indexingLatencyMs: number; - heapUsedBytes: number; - heapMaxBytes: number; -} - -// ── Query Hooks ──────────────────────────────────────────────────────── - -export function useOpenSearchStatus() { - const refetchInterval = useRefreshInterval(30_000); - return useQuery({ - queryKey: ['admin', 'opensearch', 'status'], - queryFn: () => adminFetch('/opensearch/status'), - refetchInterval, - }); -} - -export function usePipelineStats() { - const refetchInterval = useRefreshInterval(10_000); - return useQuery({ - queryKey: ['admin', 'opensearch', 'pipeline'], - queryFn: () => adminFetch('/opensearch/pipeline'), - refetchInterval, - }); -} - -export function useOpenSearchIndices(page = 0, size = 20, search = '', prefix = 'executions') { - return useQuery({ - queryKey: ['admin', 'opensearch', 'indices', prefix, page, size, search], - queryFn: () => { - const params = new URLSearchParams(); - params.set('page', String(page)); - params.set('size', String(size)); - params.set('prefix', prefix); - if (search) params.set('search', search); - return adminFetch(`/opensearch/indices?${params}`); - }, - placeholderData: (prev) => prev, - }); -} - -export function useOpenSearchPerformance() { - const refetchInterval = useRefreshInterval(30_000); - return useQuery({ - queryKey: ['admin', 'opensearch', 'performance'], - queryFn: () => adminFetch('/opensearch/performance'), - refetchInterval, - }); -} - -// ── Mutation Hooks ───────────────────────────────────────────────────── - -export function useDeleteIndex() { - const qc = useQueryClient(); - return useMutation({ - mutationFn: (indexName: string) => - adminFetch(`/opensearch/indices/${indexName}`, { method: 'DELETE' }), - onSuccess: () => { - qc.invalidateQueries({ queryKey: ['admin', 'opensearch', 'indices'] }); - }, - }); -} diff --git a/ui/src/api/queries/admin/thresholds.ts b/ui/src/api/queries/admin/thresholds.ts index 02aa03f8..c477139d 100644 --- a/ui/src/api/queries/admin/thresholds.ts +++ b/ui/src/api/queries/admin/thresholds.ts @@ -10,20 +10,8 @@ export interface DatabaseThresholds { queryDurationCritical: number; } -export interface OpenSearchThresholds { - clusterHealthWarning: string; - clusterHealthCritical: string; - queueDepthWarning: number; - queueDepthCritical: number; - jvmHeapWarning: number; - jvmHeapCritical: number; - failedDocsWarning: number; - failedDocsCritical: number; -} - export interface ThresholdConfig { database: DatabaseThresholds; - opensearch: OpenSearchThresholds; } // ── Query Hooks ──────────────────────────────────────────────────────── diff --git a/ui/src/components/LayoutShell.tsx b/ui/src/components/LayoutShell.tsx index 6cf5a5bd..0ac5e843 100644 --- a/ui/src/components/LayoutShell.tsx +++ b/ui/src/components/LayoutShell.tsx @@ -184,7 +184,7 @@ function LayoutContent() { audit: 'Audit Log', oidc: 'OIDC', database: 'Database', - opensearch: 'OpenSearch', + clickhouse: 'ClickHouse', appconfig: 'App Config', }; const parts = location.pathname.split('/').filter(Boolean); diff --git a/ui/src/pages/Admin/AdminLayout.tsx b/ui/src/pages/Admin/AdminLayout.tsx index 70e4f4a6..19453f21 100644 --- a/ui/src/pages/Admin/AdminLayout.tsx +++ b/ui/src/pages/Admin/AdminLayout.tsx @@ -7,7 +7,7 @@ const ADMIN_TABS = [ { label: 'OIDC', value: '/admin/oidc' }, { label: 'App Config', value: '/admin/appconfig' }, { label: 'Database', value: '/admin/database' }, - { label: 'OpenSearch', value: '/admin/opensearch' }, + { label: 'ClickHouse', value: '/admin/clickhouse' }, ]; export default function AdminLayout() { diff --git a/ui/src/pages/Admin/OpenSearchAdminPage.module.css b/ui/src/pages/Admin/ClickHouseAdminPage.module.css similarity index 94% rename from ui/src/pages/Admin/OpenSearchAdminPage.module.css rename to ui/src/pages/Admin/ClickHouseAdminPage.module.css index 93403ced..ed91ecd4 100644 --- a/ui/src/pages/Admin/OpenSearchAdminPage.module.css +++ b/ui/src/pages/Admin/ClickHouseAdminPage.module.css @@ -33,7 +33,7 @@ font-family: var(--font-mono); } -.indexSection { +.tableSection { background: var(--bg-surface); border: 1px solid var(--border-subtle); border-radius: var(--radius-lg); @@ -42,7 +42,7 @@ overflow: hidden; } -.indexHeader { +.tableHeader { display: flex; align-items: center; justify-content: space-between; @@ -50,13 +50,13 @@ border-bottom: 1px solid var(--border-subtle); } -.indexTitle { +.tableTitle { font-size: 13px; font-weight: 600; color: var(--text-primary); } -.indexMeta { +.tableMeta { font-size: 11px; color: var(--text-muted); font-family: var(--font-mono); diff --git a/ui/src/pages/Admin/ClickHouseAdminPage.tsx b/ui/src/pages/Admin/ClickHouseAdminPage.tsx new file mode 100644 index 00000000..7bf4672f --- /dev/null +++ b/ui/src/pages/Admin/ClickHouseAdminPage.tsx @@ -0,0 +1,65 @@ +import { StatCard, DataTable, ProgressBar } from '@cameleer/design-system'; +import type { Column } from '@cameleer/design-system'; +import { useClickHouseStatus, useClickHouseTables, useClickHousePerformance, useIndexerPipeline } from '../../api/queries/admin/clickhouse'; +import styles from './ClickHouseAdminPage.module.css'; + +export default function ClickHouseAdminPage() { + const { data: status, isError: statusError } = useClickHouseStatus(); + const { data: tables } = useClickHouseTables(); + const { data: perf } = useClickHousePerformance(); + const { data: pipeline } = useIndexerPipeline(); + const unreachable = statusError || (status && !status.reachable); + + const tableColumns: Column[] = [ + { key: 'name', header: 'Table', sortable: true }, + { key: 'engine', header: 'Engine' }, + { key: 'rowCount', header: 'Rows', sortable: true, render: (v) => Number(v).toLocaleString() }, + { key: 'dataSize', header: 'Size', sortable: true }, + { key: 'partitionCount', header: 'Partitions', sortable: true }, + ]; + + return ( +
+
+ + + +
+ + {pipeline && ( +
+
Indexer Pipeline
+ 0 ? (pipeline.queueDepth / pipeline.maxQueueSize) * 100 : 0} /> +
+ Queue: {pipeline.queueDepth}/{pipeline.maxQueueSize} + Indexed: {pipeline.indexedCount.toLocaleString()} + Failed: {pipeline.failedCount} + Rate: {pipeline.indexingRate.toFixed(1)}/s +
+
+ )} + + {perf && ( +
+ + + + +
+ )} + +
+
+ Tables ({(tables || []).length}) +
+ ({ ...t, id: t.name }))} + sortable + pageSize={20} + flush + /> +
+
+ ); +} diff --git a/ui/src/pages/Admin/OpenSearchAdminPage.tsx b/ui/src/pages/Admin/OpenSearchAdminPage.tsx deleted file mode 100644 index ad441367..00000000 --- a/ui/src/pages/Admin/OpenSearchAdminPage.tsx +++ /dev/null @@ -1,78 +0,0 @@ -import { StatCard, DataTable, Badge, ProgressBar } from '@cameleer/design-system'; -import type { Column } from '@cameleer/design-system'; -import { useOpenSearchStatus, usePipelineStats, useOpenSearchIndices, useOpenSearchPerformance, useDeleteIndex } from '../../api/queries/admin/opensearch'; -import styles from './OpenSearchAdminPage.module.css'; - -export default function OpenSearchAdminPage() { - const { data: status, isError: statusError } = useOpenSearchStatus(); - const { data: pipeline } = usePipelineStats(); - const { data: perf } = useOpenSearchPerformance(); - const { data: execIndices } = useOpenSearchIndices(0, 50, '', 'executions'); - const { data: logIndices } = useOpenSearchIndices(0, 50, '', 'logs'); - const unreachable = statusError || (status && !status.reachable); - const deleteIndex = useDeleteIndex(); - - const indexColumns: Column[] = [ - { key: 'name', header: 'Index' }, - { key: 'health', header: 'Health', render: (v) => }, - { key: 'docCount', header: 'Documents', sortable: true, render: (v) => Number(v).toLocaleString() }, - { key: 'size', header: 'Size' }, - { key: 'primaryShards', header: 'Shards' }, - ]; - - return ( -
-
- - - - -
- - {pipeline && ( -
-
Indexing Pipeline
- -
- Queue: {pipeline.queueDepth}/{pipeline.maxQueueSize} - Indexed: {pipeline.indexedCount.toLocaleString()} - Failed: {pipeline.failedCount} - Rate: {pipeline.indexingRate}/s -
-
- )} - -
-
- Execution Indices ({execIndices?.totalIndices ?? 0}) - - {execIndices ? `${execIndices.totalDocs.toLocaleString()} docs \u00b7 ${execIndices.totalSize}` : ''} - -
- ({ ...i, id: i.name }))} - sortable - pageSize={20} - flush - /> -
- -
-
- Log Indices ({logIndices?.totalIndices ?? 0}) - - {logIndices ? `${logIndices.totalDocs.toLocaleString()} docs \u00b7 ${logIndices.totalSize}` : ''} - -
- ({ ...i, id: i.name }))} - sortable - pageSize={20} - flush - /> -
-
- ); -} diff --git a/ui/src/pages/AgentInstance/AgentInstance.tsx b/ui/src/pages/AgentInstance/AgentInstance.tsx index 77817c2b..3182cfe3 100644 --- a/ui/src/pages/AgentInstance/AgentInstance.tsx +++ b/ui/src/pages/AgentInstance/AgentInstance.tsx @@ -141,7 +141,7 @@ export default function AgentInstance() { [chartData], ); - // Application logs from OpenSearch + // Application logs const { data: rawLogs } = useApplicationLogs(appId, instanceId, { toOverride: logRefreshTo }); const logEntries = useMemo(() => { const mapped = (rawLogs || []).map((l) => ({ diff --git a/ui/src/router.tsx b/ui/src/router.tsx index 8c9383f8..6c9296f2 100644 --- a/ui/src/router.tsx +++ b/ui/src/router.tsx @@ -14,7 +14,7 @@ const RbacPage = lazy(() => import('./pages/Admin/RbacPage')); const AuditLogPage = lazy(() => import('./pages/Admin/AuditLogPage')); const OidcConfigPage = lazy(() => import('./pages/Admin/OidcConfigPage')); const DatabaseAdminPage = lazy(() => import('./pages/Admin/DatabaseAdminPage')); -const OpenSearchAdminPage = lazy(() => import('./pages/Admin/OpenSearchAdminPage')); +const ClickHouseAdminPage = lazy(() => import('./pages/Admin/ClickHouseAdminPage')); const AppConfigPage = lazy(() => import('./pages/Admin/AppConfigPage')); const SwaggerPage = lazy(() => import('./pages/Swagger/SwaggerPage')); @@ -87,7 +87,7 @@ export const router = createBrowserRouter([ { path: 'oidc', element: }, { path: 'appconfig', element: }, { path: 'database', element: }, - { path: 'opensearch', element: }, + { path: 'clickhouse', element: }, ], }, { path: 'api-docs', element: },