fix(sse): close 4 parked SSE test failures
Three distinct root causes, all reproducible when the classes run
solo — not order-dependent as the triage report suggested. Full
diagnosis in .planning/sse-flakiness-diagnosis.md.
1. AgentSseController.events auto-heal was over-permissive: any valid
JWT allowed registering an arbitrary path-id, a spoofing vector.
Surface symptom was the parked sseConnect_unknownAgent_returns404
test hanging on a 200-with-empty-stream instead of getting 404.
Fix: auto-heal requires JWT subject == path id.
2. SseConnectionManager.pingAll read ${agent-registry.ping-interval-ms}
(unprefixed). AgentRegistryConfig binds cameleer.server.agentregistry.*
— same family of bug as the MetricsFlushScheduler fix in a6944911.
Fix: corrected placeholder prefix.
3. Spring's SseEmitter doesn't flush response headers until the first
emitter.send(); clients on BodyHandlers.ofInputStream blocked on
the first body byte, making awaitConnection(5s) unreliable under a
15s ping cadence. Fix: send an initial ": connected" comment on
connect() so headers hit the wire immediately.
Verified: 9/9 SSE tests green across AgentSseControllerIT + SseSigningIT.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -80,6 +80,17 @@ public class SseConnectionManager implements AgentEventListener {
|
||||
log.debug("SSE connection error for agent {}: {}", agentId, ex.getMessage());
|
||||
});
|
||||
|
||||
// Send an initial keepalive comment so Spring flushes the response
|
||||
// headers immediately. Without this, clients blocking on the first
|
||||
// body byte can hang for a full ping interval before observing the
|
||||
// connection — surface symptom in ITs that assert awaitConnection().
|
||||
try {
|
||||
emitter.send(SseEmitter.event().comment("connected"));
|
||||
} catch (IOException e) {
|
||||
log.debug("Initial keepalive failed for agent {}: {}", agentId, e.getMessage());
|
||||
emitters.remove(agentId, emitter);
|
||||
}
|
||||
|
||||
log.info("SSE connection established for agent {}", agentId);
|
||||
|
||||
return emitter;
|
||||
@@ -169,7 +180,7 @@ public class SseConnectionManager implements AgentEventListener {
|
||||
/**
|
||||
* Scheduled ping keepalive to all connected agents.
|
||||
*/
|
||||
@Scheduled(fixedDelayString = "${agent-registry.ping-interval-ms:15000}")
|
||||
@Scheduled(fixedDelayString = "${cameleer.server.agentregistry.ping-interval-ms:15000}")
|
||||
void pingAll() {
|
||||
if (!emitters.isEmpty()) {
|
||||
sendPingToAll();
|
||||
|
||||
@@ -62,10 +62,13 @@ public class AgentSseController {
|
||||
|
||||
AgentInfo agent = registryService.findById(id);
|
||||
if (agent == null) {
|
||||
// Auto-heal: re-register agent from JWT claims after server restart
|
||||
// Auto-heal re-registers an agent from JWT claims after a server
|
||||
// restart, but only when the JWT subject matches the path id.
|
||||
// Otherwise a holder of any valid agent JWT could spoof an
|
||||
// arbitrary agentId in the URL.
|
||||
var jwtResult = (JwtService.JwtValidationResult) httpRequest.getAttribute(
|
||||
JwtAuthenticationFilter.JWT_RESULT_ATTR);
|
||||
if (jwtResult != null) {
|
||||
if (jwtResult != null && id.equals(jwtResult.subject())) {
|
||||
String application = jwtResult.application() != null ? jwtResult.application() : "default";
|
||||
String env = jwtResult.environment() != null ? jwtResult.environment() : "default";
|
||||
registryService.register(id, id, application, env, "unknown", List.of(), Map.of());
|
||||
|
||||
Reference in New Issue
Block a user