search: SearchRequest.afterExecutionId — composite (startTime, execId) predicate

Adds an optional afterExecutionId field to SearchRequest. When combined
with a non-null timeFrom, ClickHouseSearchIndex applies a strictly-after
tuple predicate (start_time > ts OR (start_time = ts AND execution_id > id))
so same-millisecond exchanges can be consumed exactly once across ticks.

When afterExecutionId is null, timeFrom keeps its existing >= semantics —
no behaviour change for any current caller.

Also adds the SearchRequest.withCursor(ts, id) wither. Threads the field
through existing withInstanceIds / withEnvironment witheres. All existing
positional call-sites (SearchController, ExchangeMatchEvaluator,
ClickHouseSearchIndexIT, ClickHouseChunkPipelineIT) pass null for the new
slot.

Task 1.2 of docs/superpowers/plans/2026-04-22-per-exchange-exactly-once.md.
The evaluator-side wiring that actually supplies the cursor is Task 1.5.
This commit is contained in:
hsiegeln
2026-04-22 15:49:05 +02:00
parent 6fa8e3aa30
commit b41f34c090
6 changed files with 69 additions and 41 deletions

View File

@@ -110,6 +110,7 @@ public class ExchangeMatchEvaluator implements ConditionEvaluator<ExchangeMatchC
50, 50,
"startTime", "startTime",
"asc", // asc so we process oldest first "asc", // asc so we process oldest first
null, // afterExecutionId (wired in Task 1.5)
envSlug envSlug
); );

View File

@@ -71,6 +71,7 @@ public class SearchController {
application, null, application, null,
offset, limit, offset, limit,
sortField, sortDir, sortField, sortDir,
null,
env.slug() env.slug()
); );

View File

@@ -124,7 +124,13 @@ public class ClickHouseSearchIndex implements SearchIndex {
conditions.add("tenant_id = ?"); conditions.add("tenant_id = ?");
params.add(tenantId); params.add(tenantId);
if (request.timeFrom() != null) { if (request.timeFrom() != null && request.afterExecutionId() != null) {
// composite predicate: strictly-after in (start_time, execution_id) tuple order
conditions.add("(start_time > ? OR (start_time = ? AND execution_id > ?))");
params.add(Timestamp.from(request.timeFrom()));
params.add(Timestamp.from(request.timeFrom()));
params.add(request.afterExecutionId());
} else if (request.timeFrom() != null) {
conditions.add("start_time >= ?"); conditions.add("start_time >= ?");
params.add(Timestamp.from(request.timeFrom())); params.add(Timestamp.from(request.timeFrom()));
} }

View File

@@ -118,7 +118,7 @@ class ClickHouseSearchIndexIT {
void search_withNoFilters_returnsAllExecutions() { void search_withNoFilters_returnsAllExecutions() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -130,7 +130,7 @@ class ClickHouseSearchIndexIT {
void search_byStatus_filtersCorrectly() { void search_byStatus_filtersCorrectly() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
"FAILED", null, null, null, null, null, null, null, null, null, "FAILED", null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -145,7 +145,7 @@ class ClickHouseSearchIndexIT {
// Time window covering exec-1 and exec-2 but not exec-3 // Time window covering exec-1 and exec-2 but not exec-3
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, baseTime, baseTime.plusMillis(1500), null, null, null, null, null, null, null, null, baseTime, baseTime.plusMillis(1500), null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -158,7 +158,7 @@ class ClickHouseSearchIndexIT {
void search_fullTextSearch_findsInErrorMessage() { void search_fullTextSearch_findsInErrorMessage() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, null, null, null, null, null, "NullPointerException", null, null, null, null, null, null, null, null, null, "NullPointerException", null, null, null,
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -170,7 +170,7 @@ class ClickHouseSearchIndexIT {
void search_fullTextSearch_findsInInputBody() { void search_fullTextSearch_findsInInputBody() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, null, null, null, null, null, "12345", null, null, null, null, null, null, null, null, null, "12345", null, null, null,
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -182,7 +182,7 @@ class ClickHouseSearchIndexIT {
void search_textInBody_searchesProcessorBodies() { void search_textInBody_searchesProcessorBodies() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, "Hello World", null, null, null, null, null, null, null, null, null, "Hello World", null, null,
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -194,7 +194,7 @@ class ClickHouseSearchIndexIT {
void search_textInHeaders_searchesProcessorHeaders() { void search_textInHeaders_searchesProcessorHeaders() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, "secret-token", null, null, null, null, null, null, null, null, null, "secret-token", null,
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -206,7 +206,7 @@ class ClickHouseSearchIndexIT {
void search_textInErrors_searchesErrorFields() { void search_textInErrors_searchesErrorFields() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, "Foo.bar", null, null, null, null, null, null, null, null, null, "Foo.bar",
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -218,7 +218,7 @@ class ClickHouseSearchIndexIT {
void search_withHighlight_returnsSnippet() { void search_withHighlight_returnsSnippet() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, null, null, null, null, null, "NullPointerException", null, null, null, null, null, null, null, null, null, "NullPointerException", null, null, null,
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -230,7 +230,7 @@ class ClickHouseSearchIndexIT {
void search_pagination_works() { void search_pagination_works() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 2, null, null, null); null, null, null, null, null, 0, 2, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -244,7 +244,7 @@ class ClickHouseSearchIndexIT {
void search_byApplication_filtersCorrectly() { void search_byApplication_filtersCorrectly() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, "other-app", null, 0, 50, null, null, null); null, null, null, "other-app", null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -256,7 +256,7 @@ class ClickHouseSearchIndexIT {
void search_byAgentIds_filtersCorrectly() { void search_byAgentIds_filtersCorrectly() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, List.of("agent-b"), 0, 50, null, null, null); null, null, null, null, List.of("agent-b"), 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -268,7 +268,7 @@ class ClickHouseSearchIndexIT {
void count_returnsMatchingCount() { void count_returnsMatchingCount() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
"COMPLETED", null, null, null, null, null, null, null, null, null, "COMPLETED", null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
long count = searchIndex.count(request); long count = searchIndex.count(request);
@@ -279,7 +279,7 @@ class ClickHouseSearchIndexIT {
void search_multipleStatusFilter_works() { void search_multipleStatusFilter_works() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
"COMPLETED,FAILED", null, null, null, null, null, null, null, null, null, "COMPLETED,FAILED", null, null, null, null, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -290,7 +290,7 @@ class ClickHouseSearchIndexIT {
void search_byCorrelationId_filtersCorrectly() { void search_byCorrelationId_filtersCorrectly() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, null, null, null, null, "corr-1", null, null, null, null, null, null, null, null, null, "corr-1", null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);
@@ -302,7 +302,7 @@ class ClickHouseSearchIndexIT {
void search_byDurationRange_filtersCorrectly() { void search_byDurationRange_filtersCorrectly() {
SearchRequest request = new SearchRequest( SearchRequest request = new SearchRequest(
null, null, null, 300L, 600L, null, null, null, null, null, null, null, null, 300L, 600L, null, null, null, null, null,
null, null, null, null, null, 0, 50, null, null, null); null, null, null, null, null, 0, 50, null, null, null, null);
SearchResult<ExecutionSummary> result = searchIndex.search(request); SearchResult<ExecutionSummary> result = searchIndex.search(request);

View File

@@ -157,7 +157,7 @@ class ClickHouseChunkPipelineIT {
null, null, null, null, null, null, null, null, null, null, null, null,
"ORD-123", null, null, null, "ORD-123", null, null, null,
null, null, null, null, null, null, null, null, null, null,
0, 50, null, null, null)); 0, 50, null, null, null, null));
assertThat(result.total()).isEqualTo(1); assertThat(result.total()).isEqualTo(1);
assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1"); assertThat(result.data().get(0).executionId()).isEqualTo("pipeline-1");
assertThat(result.data().get(0).status()).isEqualTo("COMPLETED"); assertThat(result.data().get(0).status()).isEqualTo("COMPLETED");
@@ -168,7 +168,7 @@ class ClickHouseChunkPipelineIT {
null, null, null, null, null, null, null, null, null, null, null, null,
null, "ABC-123", null, null, null, "ABC-123", null, null,
null, null, null, null, null, null, null, null, null, null,
0, 50, null, null, null)); 0, 50, null, null, null, null));
assertThat(bodyResult.total()).isEqualTo(1); assertThat(bodyResult.total()).isEqualTo(1);
// Verify iteration data in processor_executions // Verify iteration data in processor_executions

View File

@@ -9,26 +9,29 @@ import java.util.List;
* All filter fields are nullable/optional. When null, the filter is not applied. * All filter fields are nullable/optional. When null, the filter is not applied.
* The compact constructor validates and normalizes pagination parameters. * The compact constructor validates and normalizes pagination parameters.
* *
* @param status execution status filter (COMPLETED, FAILED, RUNNING) * @param status execution status filter (COMPLETED, FAILED, RUNNING)
* @param timeFrom inclusive start of time range * @param timeFrom inclusive start of time range
* @param timeTo exclusive end of time range * @param timeTo exclusive end of time range
* @param durationMin minimum duration in milliseconds (inclusive) * @param durationMin minimum duration in milliseconds (inclusive)
* @param durationMax maximum duration in milliseconds (inclusive) * @param durationMax maximum duration in milliseconds (inclusive)
* @param correlationId exact correlation ID match * @param correlationId exact correlation ID match
* @param text global full-text search across all text fields * @param text global full-text search across all text fields
* @param textInBody full-text search scoped to exchange bodies * @param textInBody full-text search scoped to exchange bodies
* @param textInHeaders full-text search scoped to exchange headers * @param textInHeaders full-text search scoped to exchange headers
* @param textInErrors full-text search scoped to error messages and stack traces * @param textInErrors full-text search scoped to error messages and stack traces
* @param routeId exact match on route_id * @param routeId exact match on route_id
* @param instanceId exact match on instance_id * @param instanceId exact match on instance_id
* @param processorType matches processor_types array via has() * @param processorType matches processor_types array via has()
* @param applicationId exact match on application_id * @param applicationId exact match on application_id
* @param instanceIds list of instance IDs for an IN clause (only set when drilling down to specific agents) * @param instanceIds list of instance IDs for an IN clause (only set when drilling down to specific agents)
* @param offset pagination offset (0-based) * @param offset pagination offset (0-based)
* @param limit page size (default 50, max 500) * @param limit page size (default 50, max 500)
* @param sortField column to sort by (default: startTime) * @param sortField column to sort by (default: startTime)
* @param sortDir sort direction: asc or desc (default: desc) * @param sortDir sort direction: asc or desc (default: desc)
* @param environment optional environment filter (e.g. "dev", "staging", "prod") * @param afterExecutionId when combined with a non-null {@code timeFrom}, applies the composite predicate
* {@code (start_time > timeFrom) OR (start_time = timeFrom AND execution_id > afterExecutionId)}.
* When null, {@code timeFrom} is applied as a plain {@code >=} lower bound (existing behaviour).
* @param environment optional environment filter (e.g. "dev", "staging", "prod")
*/ */
public record SearchRequest( public record SearchRequest(
String status, String status,
@@ -50,6 +53,7 @@ public record SearchRequest(
int limit, int limit,
String sortField, String sortField,
String sortDir, String sortDir,
String afterExecutionId,
String environment String environment
) { ) {
@@ -92,7 +96,7 @@ public record SearchRequest(
status, timeFrom, timeTo, durationMin, durationMax, correlationId, status, timeFrom, timeTo, durationMin, durationMax, correlationId,
text, textInBody, textInHeaders, textInErrors, text, textInBody, textInHeaders, textInErrors,
routeId, instanceId, processorType, applicationId, resolvedInstanceIds, routeId, instanceId, processorType, applicationId, resolvedInstanceIds,
offset, limit, sortField, sortDir, environment offset, limit, sortField, sortDir, afterExecutionId, environment
); );
} }
@@ -102,7 +106,23 @@ public record SearchRequest(
status, timeFrom, timeTo, durationMin, durationMax, correlationId, status, timeFrom, timeTo, durationMin, durationMax, correlationId,
text, textInBody, textInHeaders, textInErrors, text, textInBody, textInHeaders, textInErrors,
routeId, instanceId, processorType, applicationId, instanceIds, routeId, instanceId, processorType, applicationId, instanceIds,
offset, limit, sortField, sortDir, env offset, limit, sortField, sortDir, afterExecutionId, env
);
}
/**
* Create a copy with a composite {@code (start_time, execution_id)} cursor.
* <p>
* The resulting request applies a strictly-after tuple predicate
* {@code (start_time > ts) OR (start_time = ts AND execution_id > afterExecutionId)},
* enabling exactly-once consumption of same-millisecond exchanges across scheduler ticks.
*/
public SearchRequest withCursor(Instant ts, String afterExecutionId) {
return new SearchRequest(
status, ts, timeTo, durationMin, durationMax, correlationId,
text, textInBody, textInHeaders, textInErrors,
routeId, instanceId, processorType, applicationId, instanceIds,
offset, limit, sortField, sortDir, afterExecutionId, environment
); );
} }
} }