feat(search): deserialize and surface attributes in detail service and OpenSearch indexing (Task 4)

DetailService deserializes attributes JSON from ExecutionRecord/ProcessorRecord and
passes them to ExecutionDetail and ProcessorNode constructors. ExecutionDocument and
ProcessorDoc carry attributes as a JSON string. SearchIndexer passes attributes when
building documents. OpenSearchIndex includes attributes in indexed maps and
deserializes them when constructing ExecutionSummary from search hits.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-26 18:23:47 +01:00
parent ca5250c134
commit 2d6cc4c634
4 changed files with 50 additions and 7 deletions

View File

@@ -6,6 +6,8 @@ import com.cameleer3.server.core.search.SearchResult;
import com.cameleer3.server.core.storage.SearchIndex; import com.cameleer3.server.core.storage.SearchIndex;
import com.cameleer3.server.core.storage.model.ExecutionDocument; import com.cameleer3.server.core.storage.model.ExecutionDocument;
import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc; import com.cameleer3.server.core.storage.model.ExecutionDocument.ProcessorDoc;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import org.opensearch.client.json.JsonData; import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch.OpenSearchClient;
@@ -33,6 +35,8 @@ public class OpenSearchIndex implements SearchIndex {
private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class); private static final Logger log = LoggerFactory.getLogger(OpenSearchIndex.class);
private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd") private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd")
.withZone(ZoneOffset.UTC); .withZone(ZoneOffset.UTC);
private static final ObjectMapper JSON = new ObjectMapper();
private static final TypeReference<Map<String, String>> STR_MAP = new TypeReference<>() {};
private final OpenSearchClient client; private final OpenSearchClient client;
private final String indexPrefix; private final String indexPrefix;
@@ -314,6 +318,9 @@ public class OpenSearchIndex implements SearchIndex {
map.put("duration_ms", doc.durationMs()); map.put("duration_ms", doc.durationMs());
map.put("error_message", doc.errorMessage()); map.put("error_message", doc.errorMessage());
map.put("error_stacktrace", doc.errorStacktrace()); map.put("error_stacktrace", doc.errorStacktrace());
if (doc.attributes() != null) {
map.put("attributes", parseAttributesJson(doc.attributes()));
}
if (doc.processors() != null) { if (doc.processors() != null) {
map.put("processors", doc.processors().stream().map(p -> { map.put("processors", doc.processors().stream().map(p -> {
Map<String, Object> pm = new LinkedHashMap<>(); Map<String, Object> pm = new LinkedHashMap<>();
@@ -326,6 +333,9 @@ public class OpenSearchIndex implements SearchIndex {
pm.put("output_body", p.outputBody()); pm.put("output_body", p.outputBody());
pm.put("input_headers", p.inputHeaders()); pm.put("input_headers", p.inputHeaders());
pm.put("output_headers", p.outputHeaders()); pm.put("output_headers", p.outputHeaders());
if (p.attributes() != null) {
pm.put("attributes", parseAttributesJson(p.attributes()));
}
return pm; return pm;
}).toList()); }).toList());
} }
@@ -336,6 +346,9 @@ public class OpenSearchIndex implements SearchIndex {
private ExecutionSummary hitToSummary(Hit<Map> hit) { private ExecutionSummary hitToSummary(Hit<Map> hit) {
Map<String, Object> src = hit.source(); Map<String, Object> src = hit.source();
if (src == null) return null; if (src == null) return null;
@SuppressWarnings("unchecked")
Map<String, String> attributes = src.get("attributes") instanceof Map
? (Map<String, String>) src.get("attributes") : null;
return new ExecutionSummary( return new ExecutionSummary(
(String) src.get("execution_id"), (String) src.get("execution_id"),
(String) src.get("route_id"), (String) src.get("route_id"),
@@ -348,7 +361,8 @@ public class OpenSearchIndex implements SearchIndex {
(String) src.get("correlation_id"), (String) src.get("correlation_id"),
(String) src.get("error_message"), (String) src.get("error_message"),
null, // diagramContentHash not stored in index null, // diagramContentHash not stored in index
extractHighlight(hit) extractHighlight(hit),
attributes
); );
} }
@@ -361,4 +375,13 @@ public class OpenSearchIndex implements SearchIndex {
} }
return null; return null;
} }
private static Map<String, String> parseAttributesJson(String json) {
if (json == null || json.isBlank()) return null;
try {
return JSON.readValue(json, STR_MAP);
} catch (Exception e) {
return null;
}
}
} }

View File

@@ -2,11 +2,16 @@ package com.cameleer3.server.core.detail;
import com.cameleer3.server.core.storage.ExecutionStore; import com.cameleer3.server.core.storage.ExecutionStore;
import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord; import com.cameleer3.server.core.storage.ExecutionStore.ProcessorRecord;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*; import java.util.*;
public class DetailService { public class DetailService {
private static final ObjectMapper JSON = new ObjectMapper();
private static final TypeReference<Map<String, String>> STR_MAP = new TypeReference<>() {};
private final ExecutionStore executionStore; private final ExecutionStore executionStore;
public DetailService(ExecutionStore executionStore) { public DetailService(ExecutionStore executionStore) {
@@ -27,7 +32,8 @@ public class DetailService {
exec.errorMessage(), exec.errorStacktrace(), exec.errorMessage(), exec.errorStacktrace(),
exec.diagramContentHash(), roots, exec.diagramContentHash(), roots,
exec.inputBody(), exec.outputBody(), exec.inputBody(), exec.outputBody(),
exec.inputHeaders(), exec.outputHeaders() exec.inputHeaders(), exec.outputHeaders(),
parseAttributes(exec.attributes())
); );
}); });
} }
@@ -41,7 +47,8 @@ public class DetailService {
p.processorId(), p.processorType(), p.status(), p.processorId(), p.processorType(), p.status(),
p.startTime(), p.endTime(), p.startTime(), p.endTime(),
p.durationMs() != null ? p.durationMs() : 0L, p.durationMs() != null ? p.durationMs() : 0L,
p.diagramNodeId(), p.errorMessage(), p.errorStacktrace() p.diagramNodeId(), p.errorMessage(), p.errorStacktrace(),
parseAttributes(p.attributes())
)); ));
} }
@@ -61,4 +68,13 @@ public class DetailService {
} }
return roots; return roots;
} }
private static Map<String, String> parseAttributes(String json) {
if (json == null || json.isBlank()) return null;
try {
return JSON.readValue(json, STR_MAP);
} catch (Exception e) {
return null;
}
}
} }

View File

@@ -70,14 +70,16 @@ public class SearchIndexer implements SearchIndexerStats {
p.processorId(), p.processorType(), p.status(), p.processorId(), p.processorType(), p.status(),
p.errorMessage(), p.errorStacktrace(), p.errorMessage(), p.errorStacktrace(),
p.inputBody(), p.outputBody(), p.inputBody(), p.outputBody(),
p.inputHeaders(), p.outputHeaders())) p.inputHeaders(), p.outputHeaders(),
p.attributes()))
.toList(); .toList();
searchIndex.index(new ExecutionDocument( searchIndex.index(new ExecutionDocument(
exec.executionId(), exec.routeId(), exec.agentId(), exec.applicationName(), exec.executionId(), exec.routeId(), exec.agentId(), exec.applicationName(),
exec.status(), exec.correlationId(), exec.exchangeId(), exec.status(), exec.correlationId(), exec.exchangeId(),
exec.startTime(), exec.endTime(), exec.durationMs(), exec.startTime(), exec.endTime(), exec.durationMs(),
exec.errorMessage(), exec.errorStacktrace(), processorDocs)); exec.errorMessage(), exec.errorStacktrace(), processorDocs,
exec.attributes()));
indexedCount.incrementAndGet(); indexedCount.incrementAndGet();
lastIndexedAt = Instant.now(); lastIndexedAt = Instant.now();

View File

@@ -8,12 +8,14 @@ public record ExecutionDocument(
String status, String correlationId, String exchangeId, String status, String correlationId, String exchangeId,
Instant startTime, Instant endTime, Long durationMs, Instant startTime, Instant endTime, Long durationMs,
String errorMessage, String errorStacktrace, String errorMessage, String errorStacktrace,
List<ProcessorDoc> processors List<ProcessorDoc> processors,
String attributes
) { ) {
public record ProcessorDoc( public record ProcessorDoc(
String processorId, String processorType, String status, String processorId, String processorType, String status,
String errorMessage, String errorStacktrace, String errorMessage, String errorStacktrace,
String inputBody, String outputBody, String inputBody, String outputBody,
String inputHeaders, String outputHeaders String inputHeaders, String outputHeaders,
String attributes
) {} ) {}
} }