refactor: extend ProcessorRecord with seq/iteration fields for ClickHouse model

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-01 00:02:03 +02:00
parent 968117c41a
commit 190ae2797d
5 changed files with 22 additions and 6 deletions

View File

@@ -206,7 +206,8 @@ public class PostgresExecutionStore implements ExecutionStore {
rs.getString("error_type"), rs.getString("error_category"), rs.getString("error_type"), rs.getString("error_category"),
rs.getString("root_cause_type"), rs.getString("root_cause_message"), rs.getString("root_cause_type"), rs.getString("root_cause_message"),
rs.getString("error_handler_type"), rs.getString("circuit_breaker_state"), rs.getString("error_handler_type"), rs.getString("circuit_breaker_state"),
rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null); rs.getObject("fallback_triggered") != null ? rs.getBoolean("fallback_triggered") : null,
null, null, null, null, null, null);
private static Instant toInstant(ResultSet rs, String column) throws SQLException { private static Instant toInstant(ResultSet rs, String column) throws SQLException {
Timestamp ts = rs.getTimestamp(column); Timestamp ts = rs.getTimestamp(column);

View File

@@ -77,13 +77,15 @@ class PostgresExecutionStoreIT extends AbstractPostgresIT {
now, now.plusMillis(10), 10L, null, null, now, now.plusMillis(10), 10L, null, null,
"input body", "output body", null, null, null, "input body", "output body", null, null, null,
null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null), null, null, null, null, null, null, null, null,
null, null, null, null, null, null),
new ProcessorRecord("exec-proc", "proc-2", "to", new ProcessorRecord("exec-proc", "proc-2", "to",
"app-1", "route-a", 1, "proc-1", "COMPLETED", "app-1", "route-a", 1, "proc-1", "COMPLETED",
now.plusMillis(10), now.plusMillis(30), 20L, null, null, now.plusMillis(10), now.plusMillis(30), 20L, null, null,
null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null) null, null, null, null, null, null, null, null,
null, null, null, null, null, null)
); );
executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors); executionStore.upsertProcessors("exec-proc", now, "app-1", "route-a", processors);

View File

@@ -159,7 +159,8 @@ public class IngestionService {
p.getErrorType(), p.getErrorCategory(), p.getErrorType(), p.getErrorCategory(),
p.getRootCauseType(), p.getRootCauseMessage(), p.getRootCauseType(), p.getRootCauseMessage(),
p.getErrorHandlerType(), p.getCircuitBreakerState(), p.getErrorHandlerType(), p.getCircuitBreakerState(),
p.getFallbackTriggered() p.getFallbackTriggered(),
null, null, null, null, null, null
)); ));
} }
return flat; return flat;

View File

@@ -49,6 +49,17 @@ public interface ExecutionStore {
String errorType, String errorCategory, String errorType, String errorCategory,
String rootCauseType, String rootCauseMessage, String rootCauseType, String rootCauseMessage,
String errorHandlerType, String circuitBreakerState, String errorHandlerType, String circuitBreakerState,
Boolean fallbackTriggered Boolean fallbackTriggered,
// New fields for ClickHouse seq-based model
Integer seq,
Integer parentSeq,
Integer iteration,
Integer iterationSize,
Boolean filterMatched,
Boolean duplicateMessage
) {} ) {}
default Optional<ProcessorRecord> findProcessorBySeq(String executionId, int seq) {
return Optional.empty();
}
} }

View File

@@ -29,7 +29,8 @@ class TreeReconstructionTest {
status, NOW, NOW, 10L, status, NOW, NOW, 10L,
null, null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null null, null, null, null, null, null, null, null,
null, null, null, null, null, null
); );
} }