From 2d6cc4c63498f422f912c6e85635d1a9ca378a4e Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Thu, 26 Mar 2026 18:23:47 +0100 Subject: [PATCH] feat(search): deserialize and surface attributes in detail service and OpenSearch indexing (Task 4) DetailService deserializes attributes JSON from ExecutionRecord/ProcessorRecord and passes them to ExecutionDetail and ProcessorNode constructors. ExecutionDocument and ProcessorDoc carry attributes as a JSON string. SearchIndexer passes attributes when building documents. OpenSearchIndex includes attributes in indexed maps and deserializes them when constructing ExecutionSummary from search hits. Co-Authored-By: Claude Sonnet 4.6 --- .../server/app/search/OpenSearchIndex.java | 25 ++++++++++++++++++- .../server/core/detail/DetailService.java | 20 +++++++++++++-- .../server/core/indexing/SearchIndexer.java | 6 +++-- .../core/storage/model/ExecutionDocument.java | 6 +++-- 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java index 2ad0046a..ea913e58 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/OpenSearchIndex.java @@ -6,6 +6,8 @@ import com.cameleer3.server.core.search.SearchResult; import com.cameleer3.server.core.storage.SearchIndex; import com.cameleer3.server.core.storage.model.ExecutionDocument; import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.annotation.PostConstruct; import org.opensearch.client.json.JsonData; import org.opensearch.client.opensearch.OpenSearchClient; @@ -33,6 +35,8 @@ public class OpenSearchIndex implements SearchIndex { private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class); private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd") .withZone(ZoneOffset.UTC); + private static final ObjectMapper JSON = new ObjectMapper(); + private static final TypeReference> STR_MAP = new TypeReference<>() {}; private final OpenSearchClient client; private final String indexPrefix; @@ -314,6 +318,9 @@ public class OpenSearchIndex implements SearchIndex { map.put("duration_ms", doc.durationMs()); map.put("error_message", doc.errorMessage()); map.put("error_stacktrace", doc.errorStacktrace()); + if (doc.attributes() != null) { + map.put("attributes", parseAttributesJson(doc.attributes())); + } if (doc.processors() != null) { map.put("processors", doc.processors().stream().map(p -> { Map pm = new LinkedHashMap<>(); @@ -326,6 +333,9 @@ public class OpenSearchIndex implements SearchIndex { pm.put("output_body", p.outputBody()); pm.put("input_headers", p.inputHeaders()); pm.put("output_headers", p.outputHeaders()); + if (p.attributes() != null) { + pm.put("attributes", parseAttributesJson(p.attributes())); + } return pm; }).toList()); } @@ -336,6 +346,9 @@ public class OpenSearchIndex implements SearchIndex { private ExecutionSummary hitToSummary(Hit hit) { Map src = hit.source(); if (src == null) return null; + @SuppressWarnings("unchecked") + Map attributes = src.get("attributes") instanceof Map + ? (Map) src.get("attributes") : null; return new ExecutionSummary( (String) src.get("execution_id"), (String) src.get("route_id"), @@ -348,7 +361,8 @@ public class OpenSearchIndex implements SearchIndex { (String) src.get("correlation_id"), (String) src.get("error_message"), null, // diagramContentHash not stored in index - extractHighlight(hit) + extractHighlight(hit), + attributes ); } @@ -361,4 +375,13 @@ public class OpenSearchIndex implements SearchIndex { } return null; } + + private static Map parseAttributesJson(String json) { + if (json == null || json.isBlank()) return null; + try { + return JSON.readValue(json, STR_MAP); + } catch (Exception e) { + return null; + } + } } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java index eef8fa68..e6161de7 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/detail/DetailService.java @@ -2,11 +2,16 @@ package com.cameleer3.server.core.detail; import com.cameleer3.server.core.storage.ExecutionStore; import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.*; public class DetailService { + private static final ObjectMapper JSON = new ObjectMapper(); + private static final TypeReference> STR_MAP = new TypeReference<>() {}; + private final ExecutionStore executionStore; public DetailService(ExecutionStore executionStore) { @@ -27,7 +32,8 @@ public class DetailService { exec.errorMessage(), exec.errorStacktrace(), exec.diagramContentHash(), roots, exec.inputBody(), exec.outputBody(), - exec.inputHeaders(), exec.outputHeaders() + exec.inputHeaders(), exec.outputHeaders(), + parseAttributes(exec.attributes()) ); }); } @@ -41,7 +47,8 @@ public class DetailService { p.processorId(), p.processorType(), p.status(), p.startTime(), p.endTime(), p.durationMs() != null ? p.durationMs() : 0L, - p.diagramNodeId(), p.errorMessage(), p.errorStacktrace() + p.diagramNodeId(), p.errorMessage(), p.errorStacktrace(), + parseAttributes(p.attributes()) )); } @@ -61,4 +68,13 @@ public class DetailService { } return roots; } + + private static Map parseAttributes(String json) { + if (json == null || json.isBlank()) return null; + try { + return JSON.readValue(json, STR_MAP); + } catch (Exception e) { + return null; + } + } } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java index 7341a571..1531f647 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/indexing/SearchIndexer.java @@ -70,14 +70,16 @@ public class SearchIndexer implements SearchIndexerStats { p.processorId(), p.processorType(), p.status(), p.errorMessage(), p.errorStacktrace(), p.inputBody(), p.outputBody(), - p.inputHeaders(), p.outputHeaders())) + p.inputHeaders(), p.outputHeaders(), + p.attributes())) .toList(); searchIndex.index(new ExecutionDocument( exec.executionId(), exec.routeId(), exec.agentId(), exec.applicationName(), exec.status(), exec.correlationId(), exec.exchangeId(), exec.startTime(), exec.endTime(), exec.durationMs(), - exec.errorMessage(), exec.errorStacktrace(), processorDocs)); + exec.errorMessage(), exec.errorStacktrace(), processorDocs, + exec.attributes())); indexedCount.incrementAndGet(); lastIndexedAt = Instant.now(); diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java index cc0ccc32..5b4d0eba 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/storage/model/ExecutionDocument.java @@ -8,12 +8,14 @@ public record ExecutionDocument( String status, String correlationId, String exchangeId, Instant startTime, Instant endTime, Long durationMs, String errorMessage, String errorStacktrace, - List processors + List processors, + String attributes ) { public record ProcessorDoc( String processorId, String processorType, String status, String errorMessage, String errorStacktrace, String inputBody, String outputBody, - String inputHeaders, String outputHeaders + String inputHeaders, String outputHeaders, + String attributes ) {} }