From b41f34c0902ccf0c0ed1cca4bb6eb7176d24e5a7 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Wed, 22 Apr 2026 15:49:05 +0200 Subject: [PATCH] =?UTF-8?q?search:=20SearchRequest.afterExecutionId=20?= =?UTF-8?q?=E2=80=94=20composite=20(startTime,=20execId)=20predicate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an optional afterExecutionId field to SearchRequest. When combined with a non-null timeFrom, ClickHouseSearchIndex applies a strictly-after tuple predicate (start_time > ts OR (start_time = ts AND execution_id > id)) so same-millisecond exchanges can be consumed exactly once across ticks. When afterExecutionId is null, timeFrom keeps its existing >= semantics — no behaviour change for any current caller. Also adds the SearchRequest.withCursor(ts, id) wither. Threads the field through existing withInstanceIds / withEnvironment witheres. All existing positional call-sites (SearchController, ExchangeMatchEvaluator, ClickHouseSearchIndexIT, ClickHouseChunkPipelineIT) pass null for the new slot. Task 1.2 of docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md. The evaluator-side wiring that actually supplies the cursor is Task 1.5. --- .../alerting/eval/ExchangeMatchEvaluator.java | 1 + .../app/controller/SearchController.java | 1 + .../app/search/ClickHouseSearchIndex.java | 8 ++- .../app/search/ClickHouseSearchIndexIT.java | 32 +++++----- .../storage/ClickHouseChunkPipelineIT.java | 4 +- .../server/core/search/SearchRequest.java | 64 ++++++++++++------- 6 files changed, 69 insertions(+), 41 deletions(-) diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluator.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluator.java index d45bdd99..dff12099 100644 --- a/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluator.java +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/alerting/eval/ExchangeMatchEvaluator.java @@ -110,6 +110,7 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator ? OR (start_time = ? AND execution_id > ?))"); + params.add(Timestamp.from(request.timeFrom())); + params.add(Timestamp.from(request.timeFrom())); + params.add(request.afterExecutionId()); + } else if (request.timeFrom() != null) { conditions.add("start_time >= ?"); params.add(Timestamp.from(request.timeFrom())); } diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexIT.java index 9ecb2ede..b1269825 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/search/ClickHouseSearchIndexIT.java @@ -118,7 +118,7 @@ class ClickHouseSearchIndexIT { void search_withNoFilters_returnsAllExecutions() { SearchRequest request = new SearchRequest( null, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -130,7 +130,7 @@ class ClickHouseSearchIndexIT { void search_byStatus_filtersCorrectly() { SearchRequest request = new SearchRequest( "FAILED", null, null, null, null, null, null, null, null, null, - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -145,7 +145,7 @@ class ClickHouseSearchIndexIT { // Time window covering exec-1 and exec-2 but not exec-3 SearchRequest request = new SearchRequest( null, baseTime, baseTime.plusMillis(1500), null, null, null, null, null, null, null, - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -158,7 +158,7 @@ class ClickHouseSearchIndexIT { void search_fullTextSearch_findsInErrorMessage() { SearchRequest request = new SearchRequest( null, null, null, null, null, null, "NullPointerException", null, null, null, - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -170,7 +170,7 @@ class ClickHouseSearchIndexIT { void search_fullTextSearch_findsInInputBody() { SearchRequest request = new SearchRequest( null, null, null, null, null, null, "12345", null, null, null, - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -182,7 +182,7 @@ class ClickHouseSearchIndexIT { void search_textInBody_searchesProcessorBodies() { SearchRequest request = new SearchRequest( null, null, null, null, null, null, null, "Hello World", null, null, - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -194,7 +194,7 @@ class ClickHouseSearchIndexIT { void search_textInHeaders_searchesProcessorHeaders() { SearchRequest request = new SearchRequest( null, null, null, null, null, null, null, null, "secret-token", null, - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -206,7 +206,7 @@ class ClickHouseSearchIndexIT { void search_textInErrors_searchesErrorFields() { SearchRequest request = new SearchRequest( null, null, null, null, null, null, null, null, null, "Foo.bar", - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -218,7 +218,7 @@ class ClickHouseSearchIndexIT { void search_withHighlight_returnsSnippet() { SearchRequest request = new SearchRequest( null, null, null, null, null, null, "NullPointerException", null, null, null, - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -230,7 +230,7 @@ class ClickHouseSearchIndexIT { void search_pagination_works() { SearchRequest request = new SearchRequest( null, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, 0, 2, null, null, null); + null, null, null, null, null, 0, 2, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -244,7 +244,7 @@ class ClickHouseSearchIndexIT { void search_byApplication_filtersCorrectly() { SearchRequest request = new SearchRequest( null, null, null, null, null, null, null, null, null, null, - null, null, null, "other-app", null, 0, 50, null, null, null); + null, null, null, "other-app", null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -256,7 +256,7 @@ class ClickHouseSearchIndexIT { void search_byAgentIds_filtersCorrectly() { SearchRequest request = new SearchRequest( null, null, null, null, null, null, null, null, null, null, - null, null, null, null, List.of("agent-b"), 0, 50, null, null, null); + null, null, null, null, List.of("agent-b"), 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -268,7 +268,7 @@ class ClickHouseSearchIndexIT { void count_returnsMatchingCount() { SearchRequest request = new SearchRequest( "COMPLETED", null, null, null, null, null, null, null, null, null, - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); long count = searchIndex.count(request); @@ -279,7 +279,7 @@ class ClickHouseSearchIndexIT { void search_multipleStatusFilter_works() { SearchRequest request = new SearchRequest( "COMPLETED,FAILED", null, null, null, null, null, null, null, null, null, - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -290,7 +290,7 @@ class ClickHouseSearchIndexIT { void search_byCorrelationId_filtersCorrectly() { SearchRequest request = new SearchRequest( null, null, null, null, null, "corr-1", null, null, null, null, - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); @@ -302,7 +302,7 @@ class ClickHouseSearchIndexIT { void search_byDurationRange_filtersCorrectly() { SearchRequest request = new SearchRequest( null, null, null, 300L, 600L, null, null, null, null, null, - null, null, null, null, null, 0, 50, null, null, null); + null, null, null, null, null, 0, 50, null, null, null, null); SearchResult result = searchIndex.search(request); diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java index 4cb4c3fd..6403a39a 100644 --- a/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/storage/ClickHouseChunkPipelineIT.java @@ -157,7 +157,7 @@ class ClickHouseChunkPipelineIT { null, null, null, null, null, null, "ORD-123", null, null, null, null, null, null, null, null, - 0, 50, null, null, null)); + 0, 50, null, null, null, null)); assertThat(result.total()).isEqualTo(1); assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1"); assertThat(result.data().get(0).status()).isEqualTo("COMPLETED"); @@ -168,7 +168,7 @@ class ClickHouseChunkPipelineIT { null, null, null, null, null, null, null, "ABC-123", null, null, null, null, null, null, null, - 0, 50, null, null, null)); + 0, 50, null, null, null, null)); assertThat(bodyResult.total()).isEqualTo(1); // Verify iteration data in processor_executions diff --git a/cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchRequest.java b/cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchRequest.java index 52044916..de15c4b8 100644 --- a/cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchRequest.java +++ b/cameleer-server-core/src/main/java/com/cameleer/server/core/search/SearchRequest.java @@ -9,26 +9,29 @@ import java.util.List; * All filter fields are nullable/optional. When null, the filter is not applied. * The compact constructor validates and normalizes pagination parameters. * - * @param status execution status filter (COMPLETED, FAILED, RUNNING) - * @param timeFrom inclusive start of time range - * @param timeTo exclusive end of time range - * @param durationMin minimum duration in milliseconds (inclusive) - * @param durationMax maximum duration in milliseconds (inclusive) - * @param correlationId exact correlation ID match - * @param text global full-text search across all text fields - * @param textInBody full-text search scoped to exchange bodies - * @param textInHeaders full-text search scoped to exchange headers - * @param textInErrors full-text search scoped to error messages and stack traces - * @param routeId exact match on route_id - * @param instanceId exact match on instance_id - * @param processorType matches processor_types array via has() - * @param applicationId exact match on application_id - * @param instanceIds list of instance IDs for an IN clause (only set when drilling down to specific agents) - * @param offset pagination offset (0-based) - * @param limit page size (default 50, max 500) - * @param sortField column to sort by (default: startTime) - * @param sortDir sort direction: asc or desc (default: desc) - * @param environment optional environment filter (e.g. "dev", "staging", "prod") + * @param status execution status filter (COMPLETED, FAILED, RUNNING) + * @param timeFrom inclusive start of time range + * @param timeTo exclusive end of time range + * @param durationMin minimum duration in milliseconds (inclusive) + * @param durationMax maximum duration in milliseconds (inclusive) + * @param correlationId exact correlation ID match + * @param text global full-text search across all text fields + * @param textInBody full-text search scoped to exchange bodies + * @param textInHeaders full-text search scoped to exchange headers + * @param textInErrors full-text search scoped to error messages and stack traces + * @param routeId exact match on route_id + * @param instanceId exact match on instance_id + * @param processorType matches processor_types array via has() + * @param applicationId exact match on application_id + * @param instanceIds list of instance IDs for an IN clause (only set when drilling down to specific agents) + * @param offset pagination offset (0-based) + * @param limit page size (default 50, max 500) + * @param sortField column to sort by (default: startTime) + * @param sortDir sort direction: asc or desc (default: desc) + * @param afterExecutionId when combined with a non-null {@code timeFrom}, applies the composite predicate + * {@code (start_time > timeFrom) OR (start_time = timeFrom AND execution_id > afterExecutionId)}. + * When null, {@code timeFrom} is applied as a plain {@code >=} lower bound (existing behaviour). + * @param environment optional environment filter (e.g. "dev", "staging", "prod") */ public record SearchRequest( String status, @@ -50,6 +53,7 @@ public record SearchRequest( int limit, String sortField, String sortDir, + String afterExecutionId, String environment ) { @@ -92,7 +96,7 @@ public record SearchRequest( status, timeFrom, timeTo, durationMin, durationMax, correlationId, text, textInBody, textInHeaders, textInErrors, routeId, instanceId, processorType, applicationId, resolvedInstanceIds, - offset, limit, sortField, sortDir, environment + offset, limit, sortField, sortDir, afterExecutionId, environment ); } @@ -102,7 +106,23 @@ public record SearchRequest( status, timeFrom, timeTo, durationMin, durationMax, correlationId, text, textInBody, textInHeaders, textInErrors, routeId, instanceId, processorType, applicationId, instanceIds, - offset, limit, sortField, sortDir, env + offset, limit, sortField, sortDir, afterExecutionId, env + ); + } + + /** + * Create a copy with a composite {@code (start_time, execution_id)} cursor. + *

+ * The resulting request applies a strictly-after tuple predicate + * {@code (start_time > ts) OR (start_time = ts AND execution_id > afterExecutionId)}, + * enabling exactly-once consumption of same-millisecond exchanges across scheduler ticks. + */ + public SearchRequest withCursor(Instant ts, String afterExecutionId) { + return new SearchRequest( + status, ts, timeTo, durationMin, durationMax, correlationId, + text, textInBody, textInHeaders, textInErrors, + routeId, instanceId, processorType, applicationId, instanceIds, + offset, limit, sortField, sortDir, afterExecutionId, environment ); } }