Compare commits
2 Commits
26f5a2ce3b
...
c316e80d7f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c316e80d7f | ||
|
|
796be06a09 |
10
CLAUDE.md
10
CLAUDE.md
@@ -38,10 +38,10 @@ java -jar cameleer3-server-app/target/cameleer3-server-app-1.0-SNAPSHOT.jar
|
|||||||
- Jackson `JavaTimeModule` for `Instant` deserialization
|
- Jackson `JavaTimeModule` for `Instant` deserialization
|
||||||
- Communication: receives HTTP POST data from agents, serves SSE event streams for config push/commands
|
- Communication: receives HTTP POST data from agents, serves SSE event streams for config push/commands
|
||||||
- Maintains agent instance registry with states: LIVE → STALE → DEAD
|
- Maintains agent instance registry with states: LIVE → STALE → DEAD
|
||||||
- Storage: ClickHouse for structured data, text index for full-text search
|
- Storage: PostgreSQL (TimescaleDB) for structured data, OpenSearch for full-text search
|
||||||
- Security: JWT auth with RBAC (AGENT/VIEWER/OPERATOR/ADMIN roles), Ed25519 config signing, bootstrap token for registration
|
- Security: JWT auth with RBAC (AGENT/VIEWER/OPERATOR/ADMIN roles), Ed25519 config signing, bootstrap token for registration
|
||||||
- OIDC: Optional external identity provider support (token exchange pattern). Configured via `CAMELEER_OIDC_*` env vars
|
- OIDC: Optional external identity provider support (token exchange pattern). Configured via `CAMELEER_OIDC_*` env vars
|
||||||
- User persistence: ClickHouse `users` table, admin CRUD at `/api/v1/admin/users`
|
- User persistence: PostgreSQL `users` table, admin CRUD at `/api/v1/admin/users`
|
||||||
|
|
||||||
## CI/CD & Deployment
|
## CI/CD & Deployment
|
||||||
|
|
||||||
@@ -50,8 +50,8 @@ java -jar cameleer3-server-app/target/cameleer3-server-app-1.0-SNAPSHOT.jar
|
|||||||
- Docker: multi-stage build (`Dockerfile`), `$BUILDPLATFORM` for native Maven on ARM64 runner, amd64 runtime
|
- Docker: multi-stage build (`Dockerfile`), `$BUILDPLATFORM` for native Maven on ARM64 runner, amd64 runtime
|
||||||
- `REGISTRY_TOKEN` build arg required for `cameleer3-common` dependency resolution
|
- `REGISTRY_TOKEN` build arg required for `cameleer3-common` dependency resolution
|
||||||
- Registry: `gitea.siegeln.net/cameleer/cameleer3-server` (container images)
|
- Registry: `gitea.siegeln.net/cameleer/cameleer3-server` (container images)
|
||||||
- K8s manifests in `deploy/` — ClickHouse StatefulSet + server Deployment + NodePort Service (30081)
|
- K8s manifests in `deploy/` — PostgreSQL + OpenSearch StatefulSets, server Deployment + NodePort Service (30081)
|
||||||
- Deployment target: k3s at 192.168.50.86, namespace `cameleer`
|
- Deployment target: k3s at 192.168.50.86, namespace `cameleer`
|
||||||
- Secrets managed in CI deploy step (idempotent `--dry-run=client | kubectl apply`): `cameleer-auth`, `clickhouse-credentials`, `CAMELEER_JWT_SECRET`
|
- Secrets managed in CI deploy step (idempotent `--dry-run=client | kubectl apply`): `cameleer-auth`, `postgres-credentials`, `opensearch-credentials`, `CAMELEER_JWT_SECRET`
|
||||||
- K8s probes: server uses `/api/v1/health`, ClickHouse uses `/ping`
|
- K8s probes: server uses `/api/v1/health`, PostgreSQL uses `pg_isready`, OpenSearch uses `/_cluster/health`
|
||||||
- Docker build uses buildx registry cache + `--provenance=false` for Gitea compatibility
|
- Docker build uses buildx registry cache + `--provenance=false` for Gitea compatibility
|
||||||
|
|||||||
40
HOWTO.md
40
HOWTO.md
@@ -21,20 +21,20 @@ mvn clean verify # compile + run all tests (needs Docker for integrati
|
|||||||
|
|
||||||
## Infrastructure Setup
|
## Infrastructure Setup
|
||||||
|
|
||||||
Start ClickHouse:
|
Start PostgreSQL and OpenSearch:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
docker compose up -d
|
docker compose up -d
|
||||||
```
|
```
|
||||||
|
|
||||||
This starts ClickHouse 25.3 and automatically runs the schema init scripts (`clickhouse/init/01-schema.sql`, `clickhouse/init/02-search-columns.sql`, `clickhouse/init/03-users.sql`).
|
This starts TimescaleDB (PostgreSQL 16) and OpenSearch 2.19. The database schema is applied automatically via Flyway migrations on server startup.
|
||||||
|
|
||||||
| Service | Port | Purpose |
|
| Service | Port | Purpose |
|
||||||
|------------|------|------------------|
|
|------------|------|----------------------|
|
||||||
| ClickHouse | 8123 | HTTP API (JDBC) |
|
| PostgreSQL | 5432 | JDBC (Spring JDBC) |
|
||||||
| ClickHouse | 9000 | Native protocol |
|
| OpenSearch | 9200 | REST API (full-text) |
|
||||||
|
|
||||||
ClickHouse credentials: `cameleer` / `cameleer_dev`, database `cameleer3`.
|
PostgreSQL credentials: `cameleer` / `cameleer_dev`, database `cameleer3`.
|
||||||
|
|
||||||
## Run the Server
|
## Run the Server
|
||||||
|
|
||||||
@@ -109,7 +109,7 @@ The env-var local user gets `ADMIN` role. Agents get `AGENT` role at registratio
|
|||||||
|
|
||||||
### OIDC Login (Optional)
|
### OIDC Login (Optional)
|
||||||
|
|
||||||
OIDC configuration is stored in ClickHouse and managed via the admin API or UI. The SPA checks if OIDC is available:
|
OIDC configuration is stored in PostgreSQL and managed via the admin API or UI. The SPA checks if OIDC is available:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# 1. SPA checks if OIDC is available (returns 404 if not configured)
|
# 1. SPA checks if OIDC is available (returns 404 if not configured)
|
||||||
@@ -340,9 +340,8 @@ Key settings in `cameleer3-server-app/src/main/resources/application.yml`:
|
|||||||
|---------|---------|-------------|
|
|---------|---------|-------------|
|
||||||
| `server.port` | 8081 | Server port |
|
| `server.port` | 8081 | Server port |
|
||||||
| `ingestion.buffer-capacity` | 50000 | Max items in write buffer |
|
| `ingestion.buffer-capacity` | 50000 | Max items in write buffer |
|
||||||
| `ingestion.batch-size` | 5000 | Items per ClickHouse batch insert |
|
| `ingestion.batch-size` | 5000 | Items per batch insert |
|
||||||
| `ingestion.flush-interval-ms` | 1000 | Buffer flush interval (ms) |
|
| `ingestion.flush-interval-ms` | 1000 | Buffer flush interval (ms) |
|
||||||
| `ingestion.data-ttl-days` | 30 | ClickHouse TTL for auto-deletion |
|
|
||||||
| `agent-registry.heartbeat-interval-seconds` | 30 | Expected heartbeat interval |
|
| `agent-registry.heartbeat-interval-seconds` | 30 | Expected heartbeat interval |
|
||||||
| `agent-registry.stale-threshold-seconds` | 90 | Time before agent marked STALE |
|
| `agent-registry.stale-threshold-seconds` | 90 | Time before agent marked STALE |
|
||||||
| `agent-registry.dead-threshold-seconds` | 300 | Time after STALE before DEAD |
|
| `agent-registry.dead-threshold-seconds` | 300 | Time after STALE before DEAD |
|
||||||
@@ -386,7 +385,7 @@ npm run generate-api # Requires backend running on :8081
|
|||||||
|
|
||||||
## Running Tests
|
## Running Tests
|
||||||
|
|
||||||
Integration tests use Testcontainers (starts ClickHouse automatically — requires Docker):
|
Integration tests use Testcontainers (starts PostgreSQL and OpenSearch automatically — requires Docker):
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# All tests
|
# All tests
|
||||||
@@ -399,14 +398,13 @@ mvn test -pl cameleer3-server-core
|
|||||||
mvn test -pl cameleer3-server-app -Dtest=ExecutionControllerIT
|
mvn test -pl cameleer3-server-app -Dtest=ExecutionControllerIT
|
||||||
```
|
```
|
||||||
|
|
||||||
## Verify ClickHouse Data
|
## Verify Database Data
|
||||||
|
|
||||||
After posting data and waiting for the flush interval (1s default):
|
After posting data and waiting for the flush interval (1s default):
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
docker exec -it cameleer3-server-clickhouse-1 clickhouse-client \
|
docker exec -it cameleer3-server-postgres-1 psql -U cameleer -d cameleer3 \
|
||||||
--user cameleer --password cameleer_dev -d cameleer3 \
|
-c "SELECT count(*) FROM route_executions"
|
||||||
-q "SELECT count() FROM route_executions"
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Kubernetes Deployment
|
## Kubernetes Deployment
|
||||||
@@ -417,7 +415,8 @@ The full stack is deployed to k3s via CI/CD on push to `main`. K8s manifests are
|
|||||||
|
|
||||||
```
|
```
|
||||||
cameleer namespace:
|
cameleer namespace:
|
||||||
ClickHouse (StatefulSet, 2Gi PVC) ← clickhouse:8123 (ClusterIP)
|
PostgreSQL (StatefulSet, 10Gi PVC) ← postgres:5432 (ClusterIP)
|
||||||
|
OpenSearch (StatefulSet, 10Gi PVC) ← opensearch:9200 (ClusterIP)
|
||||||
cameleer3-server (Deployment) ← NodePort 30081
|
cameleer3-server (Deployment) ← NodePort 30081
|
||||||
cameleer3-ui (Deployment, Nginx) ← NodePort 30090
|
cameleer3-ui (Deployment, Nginx) ← NodePort 30090
|
||||||
Authentik Server (Deployment) ← NodePort 30950
|
Authentik Server (Deployment) ← NodePort 30950
|
||||||
@@ -439,7 +438,7 @@ cameleer namespace:
|
|||||||
|
|
||||||
Push to `main` triggers: **build** (UI npm + Maven, unit tests) → **docker** (buildx amd64 for server + UI, push to Gitea registry) → **deploy** (kubectl apply + rolling update).
|
Push to `main` triggers: **build** (UI npm + Maven, unit tests) → **docker** (buildx amd64 for server + UI, push to Gitea registry) → **deploy** (kubectl apply + rolling update).
|
||||||
|
|
||||||
Required Gitea org secrets: `REGISTRY_TOKEN`, `KUBECONFIG_BASE64`, `CAMELEER_AUTH_TOKEN`, `CAMELEER_JWT_SECRET`, `CLICKHOUSE_USER`, `CLICKHOUSE_PASSWORD`, `CAMELEER_UI_USER` (optional), `CAMELEER_UI_PASSWORD` (optional), `AUTHENTIK_PG_PASSWORD`, `AUTHENTIK_SECRET_KEY`, `CAMELEER_OIDC_ENABLED`, `CAMELEER_OIDC_ISSUER`, `CAMELEER_OIDC_CLIENT_ID`, `CAMELEER_OIDC_CLIENT_SECRET`.
|
Required Gitea org secrets: `REGISTRY_TOKEN`, `KUBECONFIG_BASE64`, `CAMELEER_AUTH_TOKEN`, `CAMELEER_JWT_SECRET`, `POSTGRES_USER`, `POSTGRES_PASSWORD`, `POSTGRES_DB`, `OPENSEARCH_USER`, `OPENSEARCH_PASSWORD`, `CAMELEER_UI_USER` (optional), `CAMELEER_UI_PASSWORD` (optional), `AUTHENTIK_PG_USER`, `AUTHENTIK_PG_PASSWORD`, `AUTHENTIK_SECRET_KEY`, `CAMELEER_OIDC_ENABLED`, `CAMELEER_OIDC_ISSUER`, `CAMELEER_OIDC_CLIENT_ID`, `CAMELEER_OIDC_CLIENT_SECRET`.
|
||||||
|
|
||||||
### Manual K8s Commands
|
### Manual K8s Commands
|
||||||
|
|
||||||
@@ -450,8 +449,11 @@ kubectl -n cameleer get pods
|
|||||||
# View server logs
|
# View server logs
|
||||||
kubectl -n cameleer logs -f deploy/cameleer3-server
|
kubectl -n cameleer logs -f deploy/cameleer3-server
|
||||||
|
|
||||||
# View ClickHouse logs
|
# View PostgreSQL logs
|
||||||
kubectl -n cameleer logs -f statefulset/clickhouse
|
kubectl -n cameleer logs -f statefulset/postgres
|
||||||
|
|
||||||
|
# View OpenSearch logs
|
||||||
|
kubectl -n cameleer logs -f statefulset/opensearch
|
||||||
|
|
||||||
# Restart server
|
# Restart server
|
||||||
kubectl -n cameleer rollout restart deployment/cameleer3-server
|
kubectl -n cameleer rollout restart deployment/cameleer3-server
|
||||||
|
|||||||
@@ -174,7 +174,7 @@
|
|||||||
<artifactId>maven-failsafe-plugin</artifactId>
|
<artifactId>maven-failsafe-plugin</artifactId>
|
||||||
<configuration>
|
<configuration>
|
||||||
<forkCount>1</forkCount>
|
<forkCount>1</forkCount>
|
||||||
<reuseForks>false</reuseForks>
|
<reuseForks>true</reuseForks>
|
||||||
</configuration>
|
</configuration>
|
||||||
<executions>
|
<executions>
|
||||||
<execution>
|
<execution>
|
||||||
|
|||||||
@@ -52,7 +52,10 @@ public class OpenSearchIndex implements SearchIndex {
|
|||||||
.template(t -> t
|
.template(t -> t
|
||||||
.settings(s -> s
|
.settings(s -> s
|
||||||
.numberOfShards("3")
|
.numberOfShards("3")
|
||||||
.numberOfReplicas("1")))));
|
.numberOfReplicas("1"))
|
||||||
|
.mappings(m -> m
|
||||||
|
.properties("processors", p -> p
|
||||||
|
.nested(n -> n))))));
|
||||||
log.info("OpenSearch index template created");
|
log.info("OpenSearch index template created");
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@@ -148,27 +151,32 @@ public class OpenSearchIndex implements SearchIndex {
|
|||||||
})));
|
})));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Keyword filters
|
// Keyword filters (use .keyword sub-field for exact matching on dynamically mapped text fields)
|
||||||
if (request.status() != null)
|
if (request.status() != null)
|
||||||
filter.add(termQuery("status", request.status()));
|
filter.add(termQuery("status.keyword", request.status()));
|
||||||
if (request.routeId() != null)
|
if (request.routeId() != null)
|
||||||
filter.add(termQuery("route_id", request.routeId()));
|
filter.add(termQuery("route_id.keyword", request.routeId()));
|
||||||
if (request.agentId() != null)
|
if (request.agentId() != null)
|
||||||
filter.add(termQuery("agent_id", request.agentId()));
|
filter.add(termQuery("agent_id.keyword", request.agentId()));
|
||||||
if (request.correlationId() != null)
|
if (request.correlationId() != null)
|
||||||
filter.add(termQuery("correlation_id", request.correlationId()));
|
filter.add(termQuery("correlation_id.keyword", request.correlationId()));
|
||||||
|
|
||||||
// Full-text search across all fields + nested processor fields
|
// Full-text search across all fields + nested processor fields
|
||||||
if (request.text() != null && !request.text().isBlank()) {
|
if (request.text() != null && !request.text().isBlank()) {
|
||||||
String text = request.text();
|
String text = request.text();
|
||||||
|
String wildcard = "*" + text.toLowerCase() + "*";
|
||||||
List<Query> textQueries = new ArrayList<>();
|
List<Query> textQueries = new ArrayList<>();
|
||||||
|
|
||||||
// Search top-level text fields
|
// Search top-level text fields (analyzed match + wildcard for substring)
|
||||||
textQueries.add(Query.of(q -> q.multiMatch(m -> m
|
textQueries.add(Query.of(q -> q.multiMatch(m -> m
|
||||||
.query(text)
|
.query(text)
|
||||||
.fields("error_message", "error_stacktrace"))));
|
.fields("error_message", "error_stacktrace"))));
|
||||||
|
textQueries.add(Query.of(q -> q.wildcard(w -> w
|
||||||
|
.field("error_message").value(wildcard).caseInsensitive(true))));
|
||||||
|
textQueries.add(Query.of(q -> q.wildcard(w -> w
|
||||||
|
.field("error_stacktrace").value(wildcard).caseInsensitive(true))));
|
||||||
|
|
||||||
// Search nested processor fields
|
// Search nested processor fields (analyzed match + wildcard)
|
||||||
textQueries.add(Query.of(q -> q.nested(n -> n
|
textQueries.add(Query.of(q -> q.nested(n -> n
|
||||||
.path("processors")
|
.path("processors")
|
||||||
.query(nq -> nq.multiMatch(m -> m
|
.query(nq -> nq.multiMatch(m -> m
|
||||||
@@ -176,6 +184,14 @@ public class OpenSearchIndex implements SearchIndex {
|
|||||||
.fields("processors.input_body", "processors.output_body",
|
.fields("processors.input_body", "processors.output_body",
|
||||||
"processors.input_headers", "processors.output_headers",
|
"processors.input_headers", "processors.output_headers",
|
||||||
"processors.error_message", "processors.error_stacktrace"))))));
|
"processors.error_message", "processors.error_stacktrace"))))));
|
||||||
|
textQueries.add(Query.of(q -> q.nested(n -> n
|
||||||
|
.path("processors")
|
||||||
|
.query(nq -> nq.bool(nb -> nb.should(
|
||||||
|
wildcardQuery("processors.input_body", wildcard),
|
||||||
|
wildcardQuery("processors.output_body", wildcard),
|
||||||
|
wildcardQuery("processors.input_headers", wildcard),
|
||||||
|
wildcardQuery("processors.output_headers", wildcard)
|
||||||
|
).minimumShouldMatch("1"))))));
|
||||||
|
|
||||||
// Also try keyword fields for exact matches
|
// Also try keyword fields for exact matches
|
||||||
textQueries.add(Query.of(q -> q.multiMatch(m -> m
|
textQueries.add(Query.of(q -> q.multiMatch(m -> m
|
||||||
@@ -185,32 +201,51 @@ public class OpenSearchIndex implements SearchIndex {
|
|||||||
must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1"))));
|
must.add(Query.of(q -> q.bool(b -> b.should(textQueries).minimumShouldMatch("1"))));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scoped text searches
|
// Scoped text searches (multiMatch + wildcard fallback for substring matching)
|
||||||
if (request.textInBody() != null && !request.textInBody().isBlank()) {
|
if (request.textInBody() != null && !request.textInBody().isBlank()) {
|
||||||
|
String bodyText = request.textInBody();
|
||||||
|
String bodyWildcard = "*" + bodyText.toLowerCase() + "*";
|
||||||
must.add(Query.of(q -> q.nested(n -> n
|
must.add(Query.of(q -> q.nested(n -> n
|
||||||
.path("processors")
|
.path("processors")
|
||||||
.query(nq -> nq.multiMatch(m -> m
|
.query(nq -> nq.bool(nb -> nb.should(
|
||||||
.query(request.textInBody())
|
Query.of(mq -> mq.multiMatch(m -> m
|
||||||
.fields("processors.input_body", "processors.output_body"))))));
|
.query(bodyText)
|
||||||
|
.fields("processors.input_body", "processors.output_body"))),
|
||||||
|
wildcardQuery("processors.input_body", bodyWildcard),
|
||||||
|
wildcardQuery("processors.output_body", bodyWildcard)
|
||||||
|
).minimumShouldMatch("1"))))));
|
||||||
}
|
}
|
||||||
if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) {
|
if (request.textInHeaders() != null && !request.textInHeaders().isBlank()) {
|
||||||
|
String headerText = request.textInHeaders();
|
||||||
|
String headerWildcard = "*" + headerText.toLowerCase() + "*";
|
||||||
must.add(Query.of(q -> q.nested(n -> n
|
must.add(Query.of(q -> q.nested(n -> n
|
||||||
.path("processors")
|
.path("processors")
|
||||||
.query(nq -> nq.multiMatch(m -> m
|
.query(nq -> nq.bool(nb -> nb.should(
|
||||||
.query(request.textInHeaders())
|
Query.of(mq -> mq.multiMatch(m -> m
|
||||||
.fields("processors.input_headers", "processors.output_headers"))))));
|
.query(headerText)
|
||||||
|
.fields("processors.input_headers", "processors.output_headers"))),
|
||||||
|
wildcardQuery("processors.input_headers", headerWildcard),
|
||||||
|
wildcardQuery("processors.output_headers", headerWildcard)
|
||||||
|
).minimumShouldMatch("1"))))));
|
||||||
}
|
}
|
||||||
if (request.textInErrors() != null && !request.textInErrors().isBlank()) {
|
if (request.textInErrors() != null && !request.textInErrors().isBlank()) {
|
||||||
String errText = request.textInErrors();
|
String errText = request.textInErrors();
|
||||||
|
String errWildcard = "*" + errText.toLowerCase() + "*";
|
||||||
must.add(Query.of(q -> q.bool(b -> b.should(
|
must.add(Query.of(q -> q.bool(b -> b.should(
|
||||||
Query.of(sq -> sq.multiMatch(m -> m
|
Query.of(sq -> sq.multiMatch(m -> m
|
||||||
.query(errText)
|
.query(errText)
|
||||||
.fields("error_message", "error_stacktrace"))),
|
.fields("error_message", "error_stacktrace"))),
|
||||||
|
wildcardQuery("error_message", errWildcard),
|
||||||
|
wildcardQuery("error_stacktrace", errWildcard),
|
||||||
Query.of(sq -> sq.nested(n -> n
|
Query.of(sq -> sq.nested(n -> n
|
||||||
.path("processors")
|
.path("processors")
|
||||||
.query(nq -> nq.multiMatch(m -> m
|
.query(nq -> nq.bool(nb -> nb.should(
|
||||||
.query(errText)
|
Query.of(nmq -> nmq.multiMatch(m -> m
|
||||||
.fields("processors.error_message", "processors.error_stacktrace")))))
|
.query(errText)
|
||||||
|
.fields("processors.error_message", "processors.error_stacktrace"))),
|
||||||
|
wildcardQuery("processors.error_message", errWildcard),
|
||||||
|
wildcardQuery("processors.error_stacktrace", errWildcard)
|
||||||
|
).minimumShouldMatch("1")))))
|
||||||
).minimumShouldMatch("1"))));
|
).minimumShouldMatch("1"))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -238,6 +273,10 @@ public class OpenSearchIndex implements SearchIndex {
|
|||||||
return Query.of(q -> q.term(t -> t.field(field).value(FieldValue.of(value))));
|
return Query.of(q -> q.term(t -> t.field(field).value(FieldValue.of(value))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Query wildcardQuery(String field, String pattern) {
|
||||||
|
return Query.of(q -> q.wildcard(w -> w.field(field).value(pattern).caseInsensitive(true)));
|
||||||
|
}
|
||||||
|
|
||||||
private Map<String, Object> toMap(ExecutionDocument doc) {
|
private Map<String, Object> toMap(ExecutionDocument doc) {
|
||||||
Map<String, Object> map = new LinkedHashMap<>();
|
Map<String, Object> map = new LinkedHashMap<>();
|
||||||
map.put("execution_id", doc.executionId());
|
map.put("execution_id", doc.executionId());
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.cameleer3.server.app;
|
package com.cameleer3.server.app;
|
||||||
|
|
||||||
|
import org.opensearch.testcontainers.OpensearchContainer;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
@@ -7,25 +8,29 @@ import org.springframework.test.context.ActiveProfiles;
|
|||||||
import org.springframework.test.context.DynamicPropertyRegistry;
|
import org.springframework.test.context.DynamicPropertyRegistry;
|
||||||
import org.springframework.test.context.DynamicPropertySource;
|
import org.springframework.test.context.DynamicPropertySource;
|
||||||
import org.testcontainers.containers.PostgreSQLContainer;
|
import org.testcontainers.containers.PostgreSQLContainer;
|
||||||
import org.testcontainers.junit.jupiter.Container;
|
|
||||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
|
||||||
import org.testcontainers.utility.DockerImageName;
|
import org.testcontainers.utility.DockerImageName;
|
||||||
|
|
||||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||||
@ActiveProfiles("test")
|
@ActiveProfiles("test")
|
||||||
@Testcontainers
|
|
||||||
public abstract class AbstractPostgresIT {
|
public abstract class AbstractPostgresIT {
|
||||||
|
|
||||||
private static final DockerImageName TIMESCALEDB_IMAGE =
|
private static final DockerImageName TIMESCALEDB_IMAGE =
|
||||||
DockerImageName.parse("timescale/timescaledb-ha:pg16")
|
DockerImageName.parse("timescale/timescaledb-ha:pg16")
|
||||||
.asCompatibleSubstituteFor("postgres");
|
.asCompatibleSubstituteFor("postgres");
|
||||||
|
|
||||||
@Container
|
static final PostgreSQLContainer<?> postgres;
|
||||||
static final PostgreSQLContainer<?> postgres =
|
static final OpensearchContainer<?> opensearch;
|
||||||
new PostgreSQLContainer<>(TIMESCALEDB_IMAGE)
|
|
||||||
.withDatabaseName("cameleer3")
|
static {
|
||||||
.withUsername("cameleer")
|
postgres = new PostgreSQLContainer<>(TIMESCALEDB_IMAGE)
|
||||||
.withPassword("test");
|
.withDatabaseName("cameleer3")
|
||||||
|
.withUsername("cameleer")
|
||||||
|
.withPassword("test");
|
||||||
|
postgres.start();
|
||||||
|
|
||||||
|
opensearch = new OpensearchContainer<>("opensearchproject/opensearch:2.19.0");
|
||||||
|
opensearch.start();
|
||||||
|
}
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
protected JdbcTemplate jdbcTemplate;
|
protected JdbcTemplate jdbcTemplate;
|
||||||
@@ -37,5 +42,6 @@ public abstract class AbstractPostgresIT {
|
|||||||
registry.add("spring.datasource.password", postgres::getPassword);
|
registry.add("spring.datasource.password", postgres::getPassword);
|
||||||
registry.add("spring.datasource.driver-class-name", () -> "org.postgresql.Driver");
|
registry.add("spring.datasource.driver-class-name", () -> "org.postgresql.Driver");
|
||||||
registry.add("spring.flyway.enabled", () -> "true");
|
registry.add("spring.flyway.enabled", () -> "true");
|
||||||
|
registry.add("opensearch.url", opensearch::getHttpHostAddress);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -123,13 +123,13 @@ class DetailControllerIT extends AbstractPostgresIT {
|
|||||||
// Wait for flush and get the execution_id
|
// Wait for flush and get the execution_id
|
||||||
await().atMost(10, SECONDS).untilAsserted(() -> {
|
await().atMost(10, SECONDS).untilAsserted(() -> {
|
||||||
Integer count = jdbcTemplate.queryForObject(
|
Integer count = jdbcTemplate.queryForObject(
|
||||||
"SELECT count(*) FROM route_executions WHERE route_id = 'detail-test-route'",
|
"SELECT count(*) FROM executions WHERE route_id = 'detail-test-route'",
|
||||||
Integer.class);
|
Integer.class);
|
||||||
assertThat(count).isGreaterThanOrEqualTo(1);
|
assertThat(count).isGreaterThanOrEqualTo(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
seededExecutionId = jdbcTemplate.queryForObject(
|
seededExecutionId = jdbcTemplate.queryForObject(
|
||||||
"SELECT execution_id FROM route_executions WHERE route_id = 'detail-test-route' LIMIT 1",
|
"SELECT execution_id FROM executions WHERE route_id = 'detail-test-route' LIMIT 1",
|
||||||
String.class);
|
String.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -111,7 +111,7 @@ class ExecutionControllerIT extends AbstractPostgresIT {
|
|||||||
|
|
||||||
await().atMost(10, SECONDS).untilAsserted(() -> {
|
await().atMost(10, SECONDS).untilAsserted(() -> {
|
||||||
Integer count = jdbcTemplate.queryForObject(
|
Integer count = jdbcTemplate.queryForObject(
|
||||||
"SELECT count(*) FROM route_executions WHERE route_id = 'flush-test-route'",
|
"SELECT count(*) FROM executions WHERE route_id = 'flush-test-route'",
|
||||||
Integer.class);
|
Integer.class);
|
||||||
assertThat(count).isGreaterThanOrEqualTo(1);
|
assertThat(count).isGreaterThanOrEqualTo(1);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -14,7 +14,9 @@ import org.springframework.http.HttpMethod;
|
|||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
|
|
||||||
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Integration tests for the search controller endpoints.
|
* Integration tests for the search controller endpoints.
|
||||||
@@ -153,11 +155,19 @@ class SearchControllerIT extends AbstractPostgresIT {
|
|||||||
""", i, i, i, i, i));
|
""", i, i, i, i, i));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify all data is available (synchronous writes)
|
// Verify all data is in PostgreSQL (synchronous writes)
|
||||||
Integer count = jdbcTemplate.queryForObject(
|
Integer count = jdbcTemplate.queryForObject(
|
||||||
"SELECT count(*) FROM executions WHERE route_id LIKE 'search-route-%'",
|
"SELECT count(*) FROM executions WHERE route_id LIKE 'search-route-%'",
|
||||||
Integer.class);
|
Integer.class);
|
||||||
assertThat(count).isEqualTo(10);
|
assertThat(count).isEqualTo(10);
|
||||||
|
|
||||||
|
// Wait for async OpenSearch indexing (debounce + index time)
|
||||||
|
// Check for last seeded execution specifically to avoid false positives from other test classes
|
||||||
|
await().atMost(30, SECONDS).untilAsserted(() -> {
|
||||||
|
ResponseEntity<String> r = searchGet("?correlationId=corr-page-10");
|
||||||
|
JsonNode body = objectMapper.readTree(r.getBody());
|
||||||
|
assertThat(body.get("total").asLong()).isGreaterThanOrEqualTo(1);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -10,29 +10,16 @@ import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
|
|||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.opensearch.client.opensearch.OpenSearchClient;
|
import org.opensearch.client.opensearch.OpenSearchClient;
|
||||||
import org.opensearch.client.opensearch.indices.RefreshRequest;
|
import org.opensearch.client.opensearch.indices.RefreshRequest;
|
||||||
import org.opensearch.testcontainers.OpensearchContainer;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.test.context.DynamicPropertyRegistry;
|
|
||||||
import org.springframework.test.context.DynamicPropertySource;
|
|
||||||
import org.testcontainers.junit.jupiter.Container;
|
|
||||||
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
|
|
||||||
// Extends AbstractPostgresIT for PostgreSQL datasource needed by Spring context
|
// Extends AbstractPostgresIT which provides both PostgreSQL and OpenSearch testcontainers
|
||||||
class OpenSearchIndexIT extends AbstractPostgresIT {
|
class OpenSearchIndexIT extends AbstractPostgresIT {
|
||||||
|
|
||||||
@Container
|
|
||||||
static final OpensearchContainer<?> opensearch =
|
|
||||||
new OpensearchContainer<>("opensearchproject/opensearch:2.19.0");
|
|
||||||
|
|
||||||
@DynamicPropertySource
|
|
||||||
static void configureOpenSearch(DynamicPropertyRegistry registry) {
|
|
||||||
registry.add("opensearch.url", opensearch::getHttpHostAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
SearchIndex searchIndex;
|
SearchIndex searchIndex;
|
||||||
|
|
||||||
|
|||||||
@@ -23,30 +23,33 @@ class PostgresStatsStoreIT extends AbstractPostgresIT {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
void statsReturnsCountsForTimeWindow() {
|
void statsReturnsCountsForTimeWindow() {
|
||||||
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
|
// Use a unique route + statsForRoute to avoid data contamination from other tests
|
||||||
insertExecution("stats-1", "route-a", "app-1", "COMPLETED", now, 100L);
|
String uniqueRoute = "stats-route-" + System.nanoTime();
|
||||||
insertExecution("stats-2", "route-a", "app-1", "FAILED", now.plusSeconds(10), 200L);
|
Instant base = Instant.now().minus(5, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS);
|
||||||
insertExecution("stats-3", "route-b", "app-1", "COMPLETED", now.plusSeconds(20), 50L);
|
insertExecution("stats-1-" + uniqueRoute, uniqueRoute, "app-stats", "COMPLETED", base, 100L);
|
||||||
|
insertExecution("stats-2-" + uniqueRoute, uniqueRoute, "app-stats", "FAILED", base.plusSeconds(10), 200L);
|
||||||
|
insertExecution("stats-3-" + uniqueRoute, uniqueRoute, "app-stats", "COMPLETED", base.plusSeconds(20), 50L);
|
||||||
|
|
||||||
// Force continuous aggregate refresh
|
// Force continuous aggregate refresh
|
||||||
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')");
|
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_route', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')");
|
||||||
|
|
||||||
ExecutionStats stats = statsStore.stats(now.minusSeconds(60), now.plusSeconds(60));
|
ExecutionStats stats = statsStore.statsForRoute(base.minusSeconds(60), base.plusSeconds(60), uniqueRoute, null);
|
||||||
assertEquals(3, stats.totalCount());
|
assertEquals(3, stats.totalCount());
|
||||||
assertEquals(1, stats.failedCount());
|
assertEquals(1, stats.failedCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void timeseriesReturnsBuckets() {
|
void timeseriesReturnsBuckets() {
|
||||||
Instant now = Instant.now().truncatedTo(ChronoUnit.MINUTES);
|
String uniqueRoute = "ts-route-" + System.nanoTime();
|
||||||
|
Instant base = Instant.now().minus(10, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MINUTES);
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
insertExecution("ts-" + i, "route-a", "app-1", "COMPLETED",
|
insertExecution("ts-" + i + "-" + uniqueRoute, uniqueRoute, "app-ts", "COMPLETED",
|
||||||
now.plusSeconds(i * 30), 100L + i);
|
base.plusSeconds(i * 30), 100L + i);
|
||||||
}
|
}
|
||||||
|
|
||||||
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_all', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')");
|
jdbc.execute("CALL refresh_continuous_aggregate('stats_1m_route', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour')");
|
||||||
|
|
||||||
StatsTimeseries ts = statsStore.timeseries(now.minus(1, ChronoUnit.MINUTES), now.plus(10, ChronoUnit.MINUTES), 5);
|
StatsTimeseries ts = statsStore.timeseriesForRoute(base.minus(1, ChronoUnit.MINUTES), base.plus(10, ChronoUnit.MINUTES), 5, uniqueRoute, null);
|
||||||
assertNotNull(ts);
|
assertNotNull(ts);
|
||||||
assertFalse(ts.buckets().isEmpty());
|
assertFalse(ts.buckets().isEmpty());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ spring:
|
|||||||
|
|
||||||
opensearch:
|
opensearch:
|
||||||
url: http://localhost:9200
|
url: http://localhost:9200
|
||||||
|
debounce-ms: 100
|
||||||
|
|
||||||
ingestion:
|
ingestion:
|
||||||
buffer-capacity: 100
|
buffer-capacity: 100
|
||||||
|
|||||||
@@ -8,13 +8,18 @@ import com.cameleer3.server.core.storage.ExecutionStore;
|
|||||||
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
|
import com.cameleer3.server.core.storage.ExecutionStore.ExecutionRecord;
|
||||||
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
|
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
|
||||||
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
|
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
public class IngestionService {
|
public class IngestionService {
|
||||||
|
|
||||||
|
private static final ObjectMapper JSON = new ObjectMapper();
|
||||||
|
|
||||||
private final ExecutionStore executionStore;
|
private final ExecutionStore executionStore;
|
||||||
private final DiagramStore diagramStore;
|
private final DiagramStore diagramStore;
|
||||||
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
|
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
|
||||||
@@ -99,8 +104,7 @@ public class IngestionService {
|
|||||||
p.getDurationMs(),
|
p.getDurationMs(),
|
||||||
p.getErrorMessage(), p.getErrorStackTrace(),
|
p.getErrorMessage(), p.getErrorStackTrace(),
|
||||||
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
|
truncateBody(p.getInputBody()), truncateBody(p.getOutputBody()),
|
||||||
p.getInputHeaders() != null ? p.getInputHeaders().toString() : null,
|
toJson(p.getInputHeaders()), toJson(p.getOutputHeaders())
|
||||||
p.getOutputHeaders() != null ? p.getOutputHeaders().toString() : null
|
|
||||||
));
|
));
|
||||||
if (p.getChildren() != null) {
|
if (p.getChildren() != null) {
|
||||||
flat.addAll(flattenProcessors(
|
flat.addAll(flattenProcessors(
|
||||||
@@ -116,4 +120,13 @@ public class IngestionService {
|
|||||||
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
|
if (body.length() > bodySizeLimit) return body.substring(0, bodySizeLimit);
|
||||||
return body;
|
return body;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String toJson(Map<String, String> headers) {
|
||||||
|
if (headers == null) return null;
|
||||||
|
try {
|
||||||
|
return JSON.writeValueAsString(headers);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
return "{}";
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,29 +0,0 @@
|
|||||||
version: '3.8'
|
|
||||||
|
|
||||||
services:
|
|
||||||
postgres:
|
|
||||||
image: timescale/timescaledb-ha:pg16
|
|
||||||
ports:
|
|
||||||
- "5432:5432"
|
|
||||||
environment:
|
|
||||||
POSTGRES_DB: cameleer3
|
|
||||||
POSTGRES_USER: cameleer
|
|
||||||
POSTGRES_PASSWORD: cameleer_dev
|
|
||||||
volumes:
|
|
||||||
- pgdata:/home/postgres/pgdata/data
|
|
||||||
|
|
||||||
opensearch:
|
|
||||||
image: opensearchproject/opensearch:2.19.0
|
|
||||||
ports:
|
|
||||||
- "9200:9200"
|
|
||||||
- "9300:9300"
|
|
||||||
environment:
|
|
||||||
discovery.type: single-node
|
|
||||||
DISABLE_SECURITY_PLUGIN: "true"
|
|
||||||
OPENSEARCH_JAVA_OPTS: "-Xms512m -Xmx512m"
|
|
||||||
volumes:
|
|
||||||
- osdata:/usr/share/opensearch/data
|
|
||||||
|
|
||||||
volumes:
|
|
||||||
pgdata:
|
|
||||||
osdata:
|
|
||||||
@@ -1,20 +1,27 @@
|
|||||||
services:
|
services:
|
||||||
clickhouse:
|
postgres:
|
||||||
image: clickhouse/clickhouse-server:25.3
|
image: timescale/timescaledb-ha:pg16
|
||||||
ports:
|
ports:
|
||||||
- "8123:8123"
|
- "5432:5432"
|
||||||
- "9000:9000"
|
|
||||||
volumes:
|
|
||||||
- clickhouse-data:/var/lib/clickhouse
|
|
||||||
- ./clickhouse/init:/docker-entrypoint-initdb.d
|
|
||||||
environment:
|
environment:
|
||||||
CLICKHOUSE_USER: cameleer
|
POSTGRES_DB: cameleer3
|
||||||
CLICKHOUSE_PASSWORD: cameleer_dev
|
POSTGRES_USER: cameleer
|
||||||
CLICKHOUSE_DB: cameleer3
|
POSTGRES_PASSWORD: cameleer_dev
|
||||||
ulimits:
|
volumes:
|
||||||
nofile:
|
- pgdata:/home/postgres/pgdata/data
|
||||||
soft: 262144
|
|
||||||
hard: 262144
|
opensearch:
|
||||||
|
image: opensearchproject/opensearch:2.19.0
|
||||||
|
ports:
|
||||||
|
- "9200:9200"
|
||||||
|
- "9300:9300"
|
||||||
|
environment:
|
||||||
|
discovery.type: single-node
|
||||||
|
DISABLE_SECURITY_PLUGIN: "true"
|
||||||
|
OPENSEARCH_JAVA_OPTS: "-Xms512m -Xmx512m"
|
||||||
|
volumes:
|
||||||
|
- osdata:/usr/share/opensearch/data
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
clickhouse-data:
|
pgdata:
|
||||||
|
osdata:
|
||||||
|
|||||||
Reference in New Issue
Block a user