feat(02-03): ClickHouse search engine, search controller, and 13 integration tests

- ClickHouseSearchEngine with dynamic WHERE clause building and LIKE escape
- SearchController with GET (basic filters) and POST (advanced JSON body)
- SearchBeanConfig wiring SearchEngine, SearchService, DetailService beans
- 13 integration tests covering all filter types, combinations, pagination, empty results

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-11 16:23:20 +01:00
parent dcae89f404
commit 82a190c8e2
4 changed files with 604 additions and 0 deletions

View File

@@ -0,0 +1,32 @@
package com.cameleer3.server.app.config;
import com.cameleer3.server.app.search.ClickHouseSearchEngine;
import com.cameleer3.server.core.detail.DetailService;
import com.cameleer3.server.core.search.SearchEngine;
import com.cameleer3.server.core.search.SearchService;
import com.cameleer3.server.core.storage.ExecutionRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
/**
* Creates beans for the search and detail layers.
*/
@Configuration
public class SearchBeanConfig {
@Bean
public SearchEngine searchEngine(JdbcTemplate jdbcTemplate) {
return new ClickHouseSearchEngine(jdbcTemplate);
}
@Bean
public SearchService searchService(SearchEngine searchEngine) {
return new SearchService(searchEngine);
}
@Bean
public DetailService detailService(ExecutionRepository executionRepository) {
return new DetailService(executionRepository);
}
}

View File

@@ -0,0 +1,64 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.search.SearchService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.time.Instant;
/**
* Search endpoints for querying route executions.
* <p>
* GET supports basic filters via query parameters. POST accepts a full
* {@link SearchRequest} JSON body for advanced search with all filter types.
*/
@RestController
@RequestMapping("/api/v1/search")
@Tag(name = "Search", description = "Transaction search endpoints")
public class SearchController {
private final SearchService searchService;
public SearchController(SearchService searchService) {
this.searchService = searchService;
}
@GetMapping("/executions")
@Operation(summary = "Search executions with basic filters")
public ResponseEntity<SearchResult<ExecutionSummary>> searchGet(
@RequestParam(required = false) String status,
@RequestParam(required = false) Instant timeFrom,
@RequestParam(required = false) Instant timeTo,
@RequestParam(required = false) String correlationId,
@RequestParam(required = false) String text,
@RequestParam(defaultValue = "0") int offset,
@RequestParam(defaultValue = "50") int limit) {
SearchRequest request = new SearchRequest(
status, timeFrom, timeTo,
null, null,
correlationId,
text, null, null, null,
offset, limit
);
return ResponseEntity.ok(searchService.search(request));
}
@PostMapping("/executions")
@Operation(summary = "Advanced search with all filters")
public ResponseEntity<SearchResult<ExecutionSummary>> searchPost(
@RequestBody SearchRequest request) {
return ResponseEntity.ok(searchService.search(request));
}
}

View File

@@ -0,0 +1,145 @@
package com.cameleer3.server.app.search;
import com.cameleer3.server.core.search.ExecutionSummary;
import com.cameleer3.server.core.search.SearchEngine;
import com.cameleer3.server.core.search.SearchRequest;
import com.cameleer3.server.core.search.SearchResult;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
/**
* ClickHouse implementation of {@link SearchEngine}.
* <p>
* Builds dynamic WHERE clauses from non-null {@link SearchRequest} fields
* and queries the {@code route_executions} table. LIKE patterns are properly
* escaped to prevent injection.
*/
public class ClickHouseSearchEngine implements SearchEngine {
private final JdbcTemplate jdbcTemplate;
public ClickHouseSearchEngine(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public SearchResult<ExecutionSummary> search(SearchRequest request) {
var conditions = new ArrayList<String>();
var params = new ArrayList<Object>();
buildWhereClause(request, conditions, params);
String where = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions);
// Count query
var countParams = params.toArray();
Long total = jdbcTemplate.queryForObject(
"SELECT count() FROM route_executions" + where, Long.class, countParams);
if (total == null) total = 0L;
if (total == 0) {
return SearchResult.empty(request.offset(), request.limit());
}
// Data query
params.add(request.limit());
params.add(request.offset());
String dataSql = "SELECT execution_id, route_id, agent_id, status, start_time, end_time, " +
"duration_ms, correlation_id, error_message, diagram_content_hash " +
"FROM route_executions" + where +
" ORDER BY start_time DESC LIMIT ? OFFSET ?";
List<ExecutionSummary> data = jdbcTemplate.query(dataSql, (rs, rowNum) -> {
Timestamp endTs = rs.getTimestamp("end_time");
return new ExecutionSummary(
rs.getString("execution_id"),
rs.getString("route_id"),
rs.getString("agent_id"),
rs.getString("status"),
rs.getTimestamp("start_time").toInstant(),
endTs != null ? endTs.toInstant() : null,
rs.getLong("duration_ms"),
rs.getString("correlation_id"),
rs.getString("error_message"),
rs.getString("diagram_content_hash")
);
}, params.toArray());
return new SearchResult<>(data, total, request.offset(), request.limit());
}
@Override
public long count(SearchRequest request) {
var conditions = new ArrayList<String>();
var params = new ArrayList<Object>();
buildWhereClause(request, conditions, params);
String where = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions);
Long result = jdbcTemplate.queryForObject(
"SELECT count() FROM route_executions" + where, Long.class, params.toArray());
return result != null ? result : 0L;
}
private void buildWhereClause(SearchRequest req, List<String> conditions, List<Object> params) {
if (req.status() != null && !req.status().isBlank()) {
conditions.add("status = ?");
params.add(req.status());
}
if (req.timeFrom() != null) {
conditions.add("start_time >= ?");
params.add(Timestamp.from(req.timeFrom()));
}
if (req.timeTo() != null) {
conditions.add("start_time <= ?");
params.add(Timestamp.from(req.timeTo()));
}
if (req.durationMin() != null) {
conditions.add("duration_ms >= ?");
params.add(req.durationMin());
}
if (req.durationMax() != null) {
conditions.add("duration_ms <= ?");
params.add(req.durationMax());
}
if (req.correlationId() != null && !req.correlationId().isBlank()) {
conditions.add("correlation_id = ?");
params.add(req.correlationId());
}
if (req.text() != null && !req.text().isBlank()) {
String pattern = "%" + escapeLike(req.text()) + "%";
conditions.add("(error_message LIKE ? OR error_stacktrace LIKE ? OR exchange_bodies LIKE ? OR exchange_headers LIKE ?)");
params.add(pattern);
params.add(pattern);
params.add(pattern);
params.add(pattern);
}
if (req.textInBody() != null && !req.textInBody().isBlank()) {
conditions.add("exchange_bodies LIKE ?");
params.add("%" + escapeLike(req.textInBody()) + "%");
}
if (req.textInHeaders() != null && !req.textInHeaders().isBlank()) {
conditions.add("exchange_headers LIKE ?");
params.add("%" + escapeLike(req.textInHeaders()) + "%");
}
if (req.textInErrors() != null && !req.textInErrors().isBlank()) {
String pattern = "%" + escapeLike(req.textInErrors()) + "%";
conditions.add("(error_message LIKE ? OR error_stacktrace LIKE ?)");
params.add(pattern);
params.add(pattern);
}
}
/**
* Escape special LIKE characters to prevent LIKE injection.
*/
static String escapeLike(String input) {
return input
.replace("\\", "\\\\")
.replace("%", "\\%")
.replace("_", "\\_");
}
}

View File

@@ -0,0 +1,363 @@
package com.cameleer3.server.app.controller;
import com.cameleer3.server.app.AbstractClickHouseIT;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
/**
* Integration tests for the search controller endpoints.
* Tests all filter types independently and in combination.
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class SearchControllerIT extends AbstractClickHouseIT {
@Autowired
private TestRestTemplate restTemplate;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* Seed test data: Insert executions with varying statuses, times, durations,
* correlationIds, error messages, and exchange snapshot data.
*/
@BeforeAll
void seedTestData() {
// Execution 1: COMPLETED, short duration, no errors
ingest("""
{
"routeId": "search-route-1",
"exchangeId": "ex-search-1",
"correlationId": "corr-alpha",
"status": "COMPLETED",
"startTime": "2026-03-10T10:00:00Z",
"endTime": "2026-03-10T10:00:00.050Z",
"durationMs": 50,
"errorMessage": "",
"errorStackTrace": "",
"processors": [
{
"processorId": "proc-1",
"processorType": "log",
"status": "COMPLETED",
"startTime": "2026-03-10T10:00:00Z",
"endTime": "2026-03-10T10:00:00.050Z",
"durationMs": 50,
"inputBody": "customer-123 order data",
"outputBody": "processed customer-123",
"inputHeaders": {"Content-Type": "application/json"},
"outputHeaders": {"X-Trace": "abc"}
}
]
}
""");
// Execution 2: FAILED with NullPointerException, medium duration
ingest("""
{
"routeId": "search-route-2",
"exchangeId": "ex-search-2",
"correlationId": "corr-beta",
"status": "FAILED",
"startTime": "2026-03-10T12:00:00Z",
"endTime": "2026-03-10T12:00:00.200Z",
"durationMs": 200,
"errorMessage": "NullPointerException in OrderService",
"errorStackTrace": "java.lang.NullPointerException\\n at com.example.OrderService.process(OrderService.java:42)",
"processors": []
}
""");
// Execution 3: RUNNING, long duration, different time window
ingest("""
{
"routeId": "search-route-3",
"exchangeId": "ex-search-3",
"correlationId": "corr-gamma",
"status": "RUNNING",
"startTime": "2026-03-11T08:00:00Z",
"endTime": "2026-03-11T08:00:01Z",
"durationMs": 1000,
"errorMessage": "",
"errorStackTrace": "",
"processors": []
}
""");
// Execution 4: FAILED with MyException in stack trace
ingest("""
{
"routeId": "search-route-4",
"exchangeId": "ex-search-4",
"correlationId": "corr-delta",
"status": "FAILED",
"startTime": "2026-03-10T14:00:00Z",
"endTime": "2026-03-10T14:00:00.300Z",
"durationMs": 300,
"errorMessage": "Processing failed",
"errorStackTrace": "com.example.MyException: something broke\\n at com.example.Handler.handle(Handler.java:10)",
"processors": [
{
"processorId": "proc-4",
"processorType": "bean",
"status": "FAILED",
"startTime": "2026-03-10T14:00:00Z",
"endTime": "2026-03-10T14:00:00.300Z",
"durationMs": 300,
"inputBody": "",
"outputBody": "",
"inputHeaders": {"Content-Type": "text/plain"},
"outputHeaders": {}
}
]
}
""");
// Insert 6 more COMPLETED executions for pagination testing (total = 10)
for (int i = 5; i <= 10; i++) {
ingest(String.format("""
{
"routeId": "search-route-%d",
"exchangeId": "ex-search-%d",
"correlationId": "corr-page-%d",
"status": "COMPLETED",
"startTime": "2026-03-10T15:00:%02d.000Z",
"endTime": "2026-03-10T15:00:%02d.100Z",
"durationMs": 100,
"errorMessage": "",
"errorStackTrace": "",
"processors": []
}
""", i, i, i, i, i));
}
// Wait for all data to flush
await().atMost(10, SECONDS).untilAsserted(() -> {
Integer count = jdbcTemplate.queryForObject(
"SELECT count() FROM route_executions WHERE route_id LIKE 'search-route-%'",
Integer.class);
assertThat(count).isEqualTo(10);
});
}
@Test
void searchByStatus_returnsOnlyMatchingExecutions() throws Exception {
ResponseEntity<String> response = searchGet("?status=FAILED");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("total").asLong()).isEqualTo(2);
assertThat(body.get("offset").asInt()).isEqualTo(0);
assertThat(body.get("limit").asInt()).isEqualTo(50);
assertThat(body.get("data")).isNotNull();
body.get("data").forEach(item ->
assertThat(item.get("status").asText()).isEqualTo("FAILED"));
}
@Test
void searchByTimeRange_returnsOnlyExecutionsInRange() throws Exception {
// Only execution 1 and 2 are on 2026-03-10 before 13:00
ResponseEntity<String> response = searchGet(
"?timeFrom=2026-03-10T09:00:00Z&timeTo=2026-03-10T13:00:00Z");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("total").asLong()).isEqualTo(2);
}
@Test
void searchByDuration_returnsOnlyMatchingExecutions() throws Exception {
// durationMin=100, durationMax=500 should match executions with 100, 200, 300 ms
ResponseEntity<String> response = searchPost("""
{
"durationMin": 100,
"durationMax": 500
}
""");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
// Exec 2 (200ms), Exec 4 (300ms), Execs 5-10 (100ms each = 6)
assertThat(body.get("total").asLong()).isEqualTo(8);
}
@Test
void searchByCorrelationId_returnsOnlyMatchingExecution() throws Exception {
ResponseEntity<String> response = searchGet("?correlationId=corr-alpha");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("total").asLong()).isEqualTo(1);
assertThat(body.get("data").get(0).get("correlationId").asText()).isEqualTo("corr-alpha");
}
@Test
void fullTextSearchGlobal_findsMatchInErrorMessage() throws Exception {
ResponseEntity<String> response = searchPost("""
{ "text": "NullPointerException" }
""");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("total").asLong()).isEqualTo(1);
assertThat(body.get("data").get(0).get("routeId").asText()).isEqualTo("search-route-2");
}
@Test
void fullTextSearchGlobal_returnsEmptyForNonexistent() throws Exception {
ResponseEntity<String> response = searchPost("""
{ "text": "nonexistent-term-xyz-12345" }
""");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("total").asLong()).isZero();
assertThat(body.get("data")).isEmpty();
}
@Test
void fullTextSearchInBody_findsMatchInExchangeBody() throws Exception {
ResponseEntity<String> response = searchPost("""
{ "textInBody": "customer-123" }
""");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("total").asLong()).isEqualTo(1);
assertThat(body.get("data").get(0).get("routeId").asText()).isEqualTo("search-route-1");
}
@Test
void fullTextSearchInHeaders_findsMatchInExchangeHeaders() throws Exception {
// Content-Type appears in exec 1 and exec 4 headers
ResponseEntity<String> response = searchPost("""
{ "textInHeaders": "Content-Type" }
""");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("total").asLong()).isGreaterThanOrEqualTo(1);
}
@Test
void fullTextSearchInErrors_findsMatchInStackTrace() throws Exception {
ResponseEntity<String> response = searchPost("""
{ "textInErrors": "MyException" }
""");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("total").asLong()).isEqualTo(1);
assertThat(body.get("data").get(0).get("routeId").asText()).isEqualTo("search-route-4");
}
@Test
void combinedFilters_statusAndText() throws Exception {
// Only FAILED + NullPointer = exec 2
ResponseEntity<String> response = searchPost("""
{
"status": "FAILED",
"text": "NullPointer"
}
""");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("total").asLong()).isEqualTo(1);
assertThat(body.get("data").get(0).get("routeId").asText()).isEqualTo("search-route-2");
}
@Test
void postAdvancedSearch_allFiltersWork() throws Exception {
ResponseEntity<String> response = searchPost("""
{
"status": "COMPLETED",
"timeFrom": "2026-03-10T09:00:00Z",
"timeTo": "2026-03-10T11:00:00Z",
"durationMin": 0,
"durationMax": 100,
"correlationId": "corr-alpha"
}
""");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("total").asLong()).isEqualTo(1);
assertThat(body.get("data").get(0).get("correlationId").asText()).isEqualTo("corr-alpha");
}
@Test
void pagination_worksCorrectly() throws Exception {
// Get all 10 executions with pagination: offset=2, limit=3
ResponseEntity<String> response = searchPost("""
{
"offset": 2,
"limit": 3
}
""");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("total").asLong()).isEqualTo(10);
assertThat(body.get("data").size()).isEqualTo(3);
assertThat(body.get("offset").asInt()).isEqualTo(2);
assertThat(body.get("limit").asInt()).isEqualTo(3);
}
@Test
void emptyResults_returnsCorrectEnvelope() throws Exception {
ResponseEntity<String> response = searchGet("?status=NONEXISTENT_STATUS");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
JsonNode body = objectMapper.readTree(response.getBody());
assertThat(body.get("data")).isEmpty();
assertThat(body.get("total").asLong()).isZero();
assertThat(body.get("offset").asInt()).isEqualTo(0);
assertThat(body.get("limit").asInt()).isEqualTo(50);
}
// --- Helper methods ---
private void ingest(String json) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("X-Cameleer-Protocol-Version", "1");
restTemplate.postForEntity("/api/v1/data/executions",
new HttpEntity<>(json, headers), String.class);
}
private ResponseEntity<String> searchGet(String queryString) {
HttpHeaders headers = new HttpHeaders();
headers.set("X-Cameleer-Protocol-Version", "1");
return restTemplate.exchange(
"/api/v1/search/executions" + queryString,
HttpMethod.GET,
new HttpEntity<>(headers),
String.class);
}
private ResponseEntity<String> searchPost(String jsonBody) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("X-Cameleer-Protocol-Version", "1");
return restTemplate.exchange(
"/api/v1/search/executions",
HttpMethod.POST,
new HttpEntity<>(jsonBody, headers),
String.class);
}
}