Fix command palette: agent ID propagation, result selection, and scope tabs
All checks were successful
CI / build (push) Successful in 59s
CI / docker (push) Successful in 46s
CI / deploy (push) Successful in 25s

- Propagate authenticated agent identity through write buffers via
  TaggedExecution/TaggedDiagram wrappers so ClickHouse rows get real
  agent IDs instead of empty strings
- Add execution_id to text search LIKE clause so selecting an execution
  by ID in the palette actually finds it
- Clear status filter to all three statuses on palette selection so the
  chosen execution/agent isn't filtered out
- Add disabled Routes and Exchanges scope tabs with "coming soon" state

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-03-13 17:13:14 +01:00
parent 64b03a4e2f
commit 86e016874a
16 changed files with 151 additions and 69 deletions

View File

@@ -1,8 +1,8 @@
package com.cameleer3.server.app.config;
import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.TaggedDiagram;
import com.cameleer3.server.core.ingestion.TaggedExecution;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.model.MetricsSnapshot;
import org.springframework.context.annotation.Bean;
@@ -18,12 +18,12 @@ import org.springframework.context.annotation.Configuration;
public class IngestionBeanConfig {
@Bean
public WriteBuffer<RouteExecution> executionBuffer(IngestionConfig config) {
public WriteBuffer<TaggedExecution> executionBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
@Bean
public WriteBuffer<RouteGraph> diagramBuffer(IngestionConfig config) {
public WriteBuffer<TaggedDiagram> diagramBuffer(IngestionConfig config) {
return new WriteBuffer<>(config.getBufferCapacity());
}
@@ -33,8 +33,8 @@ public class IngestionBeanConfig {
}
@Bean
public IngestionService ingestionService(WriteBuffer<RouteExecution> executionBuffer,
WriteBuffer<RouteGraph> diagramBuffer,
public IngestionService ingestionService(WriteBuffer<TaggedExecution> executionBuffer,
WriteBuffer<TaggedDiagram> diagramBuffer,
WriteBuffer<MetricsSnapshot> metricsBuffer) {
return new IngestionService(executionBuffer, diagramBuffer, metricsBuffer);
}

View File

@@ -2,6 +2,7 @@ package com.cameleer3.server.app.controller;
import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.TaggedDiagram;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -12,6 +13,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -46,13 +49,17 @@ public class DiagramController {
@ApiResponse(responseCode = "202", description = "Data accepted for processing")
@ApiResponse(responseCode = "503", description = "Buffer full, retry later")
public ResponseEntity<Void> ingestDiagrams(@RequestBody String body) throws JsonProcessingException {
String agentId = extractAgentId();
List<RouteGraph> graphs = parsePayload(body);
boolean accepted;
List<TaggedDiagram> tagged = graphs.stream()
.map(graph -> new TaggedDiagram(agentId, graph))
.toList();
if (graphs.size() == 1) {
accepted = ingestionService.acceptDiagram(graphs.get(0));
boolean accepted;
if (tagged.size() == 1) {
accepted = ingestionService.acceptDiagram(tagged.get(0));
} else {
accepted = ingestionService.acceptDiagrams(graphs);
accepted = ingestionService.acceptDiagrams(tagged);
}
if (!accepted) {
@@ -65,6 +72,11 @@ public class DiagramController {
return ResponseEntity.accepted().build();
}
private String extractAgentId() {
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
return auth != null ? auth.getName() : "";
}
private List<RouteGraph> parsePayload(String body) throws JsonProcessingException {
String trimmed = body.strip();
if (trimmed.startsWith("[")) {

View File

@@ -2,6 +2,7 @@ package com.cameleer3.server.app.controller;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.ingestion.IngestionService;
import com.cameleer3.server.core.ingestion.TaggedExecution;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -12,6 +13,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -47,13 +50,17 @@ public class ExecutionController {
@ApiResponse(responseCode = "202", description = "Data accepted for processing")
@ApiResponse(responseCode = "503", description = "Buffer full, retry later")
public ResponseEntity<Void> ingestExecutions(@RequestBody String body) throws JsonProcessingException {
String agentId = extractAgentId();
List<RouteExecution> executions = parsePayload(body);
boolean accepted;
List<TaggedExecution> tagged = executions.stream()
.map(exec -> new TaggedExecution(agentId, exec))
.toList();
if (executions.size() == 1) {
accepted = ingestionService.acceptExecution(executions.get(0));
boolean accepted;
if (tagged.size() == 1) {
accepted = ingestionService.acceptExecution(tagged.get(0));
} else {
accepted = ingestionService.acceptExecutions(executions);
accepted = ingestionService.acceptExecutions(tagged);
}
if (!accepted) {
@@ -66,6 +73,11 @@ public class ExecutionController {
return ResponseEntity.accepted().build();
}
private String extractAgentId() {
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
return auth != null ? auth.getName() : "";
}
private List<RouteExecution> parsePayload(String body) throws JsonProcessingException {
String trimmed = body.strip();
if (trimmed.startsWith("[")) {

View File

@@ -1,8 +1,8 @@
package com.cameleer3.server.app.ingestion;
import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.app.config.IngestionConfig;
import com.cameleer3.server.core.ingestion.TaggedDiagram;
import com.cameleer3.server.core.ingestion.TaggedExecution;
import com.cameleer3.server.core.ingestion.WriteBuffer;
import com.cameleer3.server.core.storage.DiagramRepository;
import com.cameleer3.server.core.storage.ExecutionRepository;
@@ -27,8 +27,8 @@ public class ClickHouseFlushScheduler implements SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(ClickHouseFlushScheduler.class);
private final WriteBuffer<RouteExecution> executionBuffer;
private final WriteBuffer<RouteGraph> diagramBuffer;
private final WriteBuffer<TaggedExecution> executionBuffer;
private final WriteBuffer<TaggedDiagram> diagramBuffer;
private final WriteBuffer<MetricsSnapshot> metricsBuffer;
private final ExecutionRepository executionRepository;
private final DiagramRepository diagramRepository;
@@ -37,8 +37,8 @@ public class ClickHouseFlushScheduler implements SmartLifecycle {
private volatile boolean running = false;
public ClickHouseFlushScheduler(WriteBuffer<RouteExecution> executionBuffer,
WriteBuffer<RouteGraph> diagramBuffer,
public ClickHouseFlushScheduler(WriteBuffer<TaggedExecution> executionBuffer,
WriteBuffer<TaggedDiagram> diagramBuffer,
WriteBuffer<MetricsSnapshot> metricsBuffer,
ExecutionRepository executionRepository,
DiagramRepository diagramRepository,
@@ -62,7 +62,7 @@ public class ClickHouseFlushScheduler implements SmartLifecycle {
private void flushExecutions() {
try {
List<RouteExecution> batch = executionBuffer.drain(batchSize);
List<TaggedExecution> batch = executionBuffer.drain(batchSize);
if (!batch.isEmpty()) {
executionRepository.insertBatch(batch);
log.debug("Flushed {} executions to ClickHouse", batch.size());
@@ -74,9 +74,9 @@ public class ClickHouseFlushScheduler implements SmartLifecycle {
private void flushDiagrams() {
try {
List<RouteGraph> batch = diagramBuffer.drain(batchSize);
for (RouteGraph graph : batch) {
diagramRepository.store(graph);
List<TaggedDiagram> batch = diagramBuffer.drain(batchSize);
for (TaggedDiagram diagram : batch) {
diagramRepository.store(diagram);
}
if (!batch.isEmpty()) {
log.debug("Flushed {} diagrams to ClickHouse", batch.size());
@@ -130,8 +130,8 @@ public class ClickHouseFlushScheduler implements SmartLifecycle {
private void drainAll() {
drainBufferCompletely("executions", executionBuffer, batch -> executionRepository.insertBatch(batch));
drainBufferCompletely("diagrams", diagramBuffer, batch -> {
for (RouteGraph g : batch) {
diagramRepository.store(g);
for (TaggedDiagram d : batch) {
diagramRepository.store(d);
}
});
drainBufferCompletely("metrics", metricsBuffer, batch -> metricsRepository.insertBatch(batch));

View File

@@ -123,13 +123,18 @@ public class ClickHouseSearchEngine implements SearchEngine {
}
if (req.text() != null && !req.text().isBlank()) {
String pattern = "%" + escapeLike(req.text()) + "%";
conditions.add("(route_id LIKE ? OR agent_id LIKE ? OR error_message LIKE ? OR error_stacktrace LIKE ? OR exchange_bodies LIKE ? OR exchange_headers LIKE ?)");
params.add(pattern);
params.add(pattern);
params.add(pattern);
params.add(pattern);
params.add(pattern);
params.add(pattern);
String[] textColumns = {
"execution_id", "route_id", "agent_id",
"error_message", "error_stacktrace",
"exchange_bodies", "exchange_headers"
};
var likeClauses = java.util.Arrays.stream(textColumns)
.map(col -> col + " LIKE ?")
.toList();
conditions.add("(" + String.join(" OR ", likeClauses) + ")");
for (int i = 0; i < textColumns.length; i++) {
params.add(pattern);
}
}
if (req.textInBody() != null && !req.textInBody().isBlank()) {
conditions.add("exchange_bodies LIKE ?");

View File

@@ -1,6 +1,7 @@
package com.cameleer3.server.app.storage;
import com.cameleer3.common.graph.RouteGraph;
import com.cameleer3.server.core.ingestion.TaggedDiagram;
import com.cameleer3.server.core.storage.DiagramRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,16 +55,16 @@ public class ClickHouseDiagramRepository implements DiagramRepository {
}
@Override
public void store(RouteGraph graph) {
public void store(TaggedDiagram diagram) {
try {
RouteGraph graph = diagram.graph();
String agentId = diagram.agentId() != null ? diagram.agentId() : "";
String json = objectMapper.writeValueAsString(graph);
String contentHash = sha256Hex(json);
String routeId = graph.getRouteId() != null ? graph.getRouteId() : "";
// agent_id is not part of RouteGraph -- set empty, controllers can enrich
String agentId = "";
jdbcTemplate.update(INSERT_SQL, contentHash, routeId, agentId, json);
log.debug("Stored diagram for route={} with hash={}", routeId, contentHash);
log.debug("Stored diagram for route={} agent={} with hash={}", routeId, agentId, contentHash);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize RouteGraph to JSON", e);
}

View File

@@ -4,6 +4,7 @@ import com.cameleer3.common.model.ExchangeSnapshot;
import com.cameleer3.common.model.ProcessorExecution;
import com.cameleer3.common.model.RouteExecution;
import com.cameleer3.server.core.detail.RawExecutionRow;
import com.cameleer3.server.core.ingestion.TaggedExecution;
import com.cameleer3.server.core.storage.DiagramRepository;
import com.cameleer3.server.core.storage.ExecutionRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -62,7 +63,7 @@ public class ClickHouseExecutionRepository implements ExecutionRepository {
}
@Override
public void insertBatch(List<RouteExecution> executions) {
public void insertBatch(List<TaggedExecution> executions) {
if (executions.isEmpty()) {
return;
}
@@ -70,13 +71,15 @@ public class ClickHouseExecutionRepository implements ExecutionRepository {
jdbcTemplate.batchUpdate(INSERT_SQL, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
RouteExecution exec = executions.get(i);
TaggedExecution tagged = executions.get(i);
RouteExecution exec = tagged.execution();
String agentId = tagged.agentId() != null ? tagged.agentId() : "";
List<FlatProcessor> flatProcessors = flattenWithMetadata(exec.getProcessors());
int col = 1;
ps.setString(col++, UUID.randomUUID().toString());
ps.setString(col++, nullSafe(exec.getRouteId()));
ps.setString(col++, ""); // agent_id set by controller header or empty
ps.setString(col++, agentId);
ps.setString(col++, exec.getStatus() != null ? exec.getStatus().name() : "RUNNING");
ps.setObject(col++, toTimestamp(exec.getStartTime()));
ps.setObject(col++, toTimestamp(exec.getEndTime()));
@@ -142,7 +145,7 @@ public class ClickHouseExecutionRepository implements ExecutionRepository {
ps.setObject(col++, outputHeaders); // processor_output_headers
ps.setObject(col++, diagramNodeIds); // processor_diagram_node_ids
String diagramHash = diagramRepository
.findContentHashForRoute(exec.getRouteId(), "")
.findContentHashForRoute(exec.getRouteId(), agentId)
.orElse("");
ps.setString(col++, diagramHash); // diagram_content_hash
}