diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 0a8dff363..3b411fdfe 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -466,19 +466,21 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { })).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe(); })).flatMap(responseEvent -> { - if (transportSession.markInitialized( - responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) { - // Once we have a session, we try to open an async stream for - // the server to send notifications and requests out-of-band. - - reconnect(null).contextWrite(deliveredSink.contextView()).subscribe(); - } - String sessionRepresentation = sessionIdOrPlaceholder(transportSession); int statusCode = responseEvent.responseInfo().statusCode(); if (statusCode >= 200 && statusCode < 300) { + // Only initialize session and open async stream for successful + // responses + if (transportSession.markInitialized(responseEvent.responseInfo() + .headers() + .firstValue("mcp-session-id") + .orElseGet(() -> null))) { + // Once we have a session, we try to open an async stream for + // the server to send notifications and requests out-of-band. + reconnect(null).contextWrite(deliveredSink.contextView()).subscribe(); + } String contentType = responseEvent.responseInfo() .headers() diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index 5af98985d..6f90bde5b 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -310,18 +310,19 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { }) .bodyValue(jsonText) .exchangeToFlux(response -> { - if (transportSession - .markInitialized(response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID))) { - // Once we have a session, we try to open an async stream for - // the server to send notifications and requests out-of-band. - reconnect(null).contextWrite(sink.contextView()).subscribe(); - } - String sessionRepresentation = sessionIdOrPlaceholder(transportSession); // The spec mentions only ACCEPTED, but the existing SDKs can return // 200 OK for notifications if (response.statusCode().is2xxSuccessful()) { + // Only initialize session and open async stream for successful + // responses + if (transportSession + .markInitialized(response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID))) { + // Once we have a session, we try to open an async stream for + // the server to send notifications and requests out-of-band. + reconnect(null).contextWrite(sink.contextView()).subscribe(); + } Optional contentType = response.headers().contentType(); long contentLength = response.headers().contentLength().orElse(-1); // Existing SDKs consume notifications with no response body nor