feat: add Streamable HTTP transport for MCP server#1830
Conversation
Replace legacy SSE transport with Streamable HTTP transport for the MCP server. This upgrades the MCP SDK from 0.10.0 to 1.1.0 and exposes a single /mcp endpoint instead of separate /mcp/sse + /mcp/message paths. The WebFluxStreamableServerTransportProvider is vendored from Spring AI 2.0.0-M4 with a compatibility fix for Spring Framework 6.2 (replaces HttpHeaders.asMultiValueMap() which requires Spring 7.0). Closes kafbat#1137
When a controller returns ResponseEntity<Flux<T>>, the MCP tool handler serialized the raw Flux object instead of collecting the stream first. Delegate to toCallResult() which already handles Flux/Mono unwrapping. Fixes kafbat#1454
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
✅ Files skipped from review due to trivial changes (2)
📝 WalkthroughWalkthroughThis PR migrates the MCP (Model Context Protocol) server transport mechanism from an SSE-based Spring WebFlux integration (v0.10.0) to a new streamable HTTP transport using MCP v1.1.0. The core change introduces a new ChangesMCP Transport Migration
Sequence DiagramsequenceDiagram
participant Client
participant WebFlux as WebFlux Handler
participant SessionMgr as Session Manager
participant SessionImpl as MCP Session
participant EventBus as Event Stream
Note over Client,EventBus: Initialize MCP Session (POST /mcp)
Client->>WebFlux: POST /mcp<br/>JSON-RPC: initialize
WebFlux->>SessionMgr: sessionFactory.create(context)
SessionMgr->>SessionImpl: new session
SessionMgr->>WebFlux: return sessionId
WebFlux->>Client: JSON-RPC response<br/>+ MCP_SESSION_ID header
Note over Client,EventBus: Stream Events (GET /mcp)
Client->>WebFlux: GET /mcp<br/>Accept: text/event-stream<br/>+ MCP_SESSION_ID header
WebFlux->>SessionMgr: lookup session
WebFlux->>SessionImpl: subscribe to listeningStream
SessionImpl->>EventBus: emit events
EventBus->>Client: SSE: message event
Note over Client,EventBus: Send Tool Call (POST /mcp)
Client->>WebFlux: POST /mcp<br/>JSON-RPC: callTool<br/>+ MCP_SESSION_ID header
WebFlux->>SessionImpl: route callTool request
SessionImpl->>SessionImpl: invoke tool handler
SessionImpl->>SessionImpl: emit response
EventBus->>Client: SSE: JSON-RPC response
Note over Client,EventBus: Close Session (DELETE /mcp)
Client->>WebFlux: DELETE /mcp<br/>+ MCP_SESSION_ID header
WebFlux->>SessionMgr: lookup & delete session
SessionImpl->>SessionImpl: cleanup resources
WebFlux->>Client: 200 OK
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes The new Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
📝 WalkthroughWalkthroughThis PR upgrades the Model Context Protocol (MCP) integration from an SSE-based Spring WebFlux transport to a streamable HTTP transport. Dependencies are updated, a new ChangesMCP Transport Migration
Sequence DiagramsequenceDiagram
participant Client as HTTP Client
participant Transport as WebFluxStreamableServerTransportProvider
participant SessionFactory as Session Factory
participant Session as MCP Session
participant Server as MCP Server
rect rgba(100, 150, 255, 0.5)
Note over Client,Server: Initialize & Establish Session
Client->>Transport: POST /mcp (initialize JSON-RPC)
Transport->>SessionFactory: Create new session
SessionFactory->>Session: new
Transport->>Transport: Store session in map
Transport->>Client: JSON-RPC response + MCP_SESSION_ID header
end
rect rgba(100, 200, 150, 0.5)
Note over Client,Server: Handle Tool Requests
Client->>Transport: POST /mcp (tool request JSON-RPC, with MCP_SESSION_ID)
Transport->>Session: Route JSON-RPC to session
Session->>Server: Execute tool via MCP protocol
Server->>Session: Return result
Transport->>Client: SSE message with response
end
rect rgba(200, 150, 100, 0.5)
Note over Client,Server: Session Streaming
Client->>Transport: GET /mcp (with MCP_SESSION_ID, SSE Accept)
Transport->>Session: Open listening stream
Session-->>Client: Stream SSE events as messages arrive
Client->>Transport: DELETE /mcp (with MCP_SESSION_ID)
Transport->>Session: Delete session
Transport->>Client: 200 OK
end
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~75 minutes The new Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java`:
- Around line 283-297: The code currently inserts the session into sessions via
sessions.put(init.session().getId(), init.session()) before init.initResult()
completes and never removes it on init failure or when a client issues DELETE;
modify the flow around
sessionFactory.startSession()/McpStreamableServerSession.McpStreamableServerSessionInit
so that if init.initResult() completes with an error or the client sends a
DELETE the session is removed from sessions (call
sessions.remove(init.session().getId())), e.g., attach onError/onErrorContinue
or doFinally handlers to the init.initResult() Mono to remove the session on
failure/termination and ensure the DELETE handler explicitly removes the session
from the registry (referencing the DELETE handler method and the sessions map).
- Around line 207-209: The Accept header check currently uses
acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM) which fails on parameters
(e.g., charset or q-values); update both occurrences (the one using
acceptHeaders = request.headers().asHttpHeaders().getAccept() and the later
similar check) to test compatibility by replacing the contains(...) test with a
predicate that uses MediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)
(e.g., check if acceptHeaders.stream().noneMatch(h ->
h.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) and keep returning
ServerResponse.badRequest().build() when no compatible media type is found).
Ensure you reference the acceptHeaders variable and MediaType.TEXT_EVENT_STREAM
in both places.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ad2f3a9b-4690-48ac-87c0-2a21d9fc628d
📒 Files selected for processing (7)
api/build.gradleapi/src/main/java/io/kafbat/ui/config/McpConfig.javaapi/src/main/java/io/kafbat/ui/service/mcp/McpSpecificationGenerator.javaapi/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.javaapi/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.javaetc/checkstyle/checkstyle-suppressions.xmlgradle/libs.versions.toml
| List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept(); | ||
| if (!acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM)) { | ||
| return ServerResponse.badRequest().build(); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find the target file
find . -type f -name "WebFluxStreamableServerTransportProvider.java" 2>/dev/nullRepository: kafbat/kafka-ui
Length of output: 148
🏁 Script executed:
# Check if file exists and get line count
if [ -f "api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java" ]; then
wc -l "api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java"
fiRepository: kafbat/kafka-ui
Length of output: 150
🏁 Script executed:
# Read the relevant lines from the file
sed -n '200,220p' "api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java"Repository: kafbat/kafka-ui
Length of output: 837
🏁 Script executed:
# Read the second location mentioned (262-265)
sed -n '255,275p' "api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java"Repository: kafbat/kafka-ui
Length of output: 1097
🌐 Web query:
Spring Framework MediaType equals() isCompatibleWith() difference
💡 Result:
The primary difference between MediaType.equals and MediaType.isCompatibleWith in Spring Framework is that equals performs an exact match including type, subtype, and all parameters (case-insensitive for type/subtype), while isCompatibleWith checks for semantic compatibility, particularly handling wildcards like text/* matching text/plain or text/html, and is symmetric (A compatible with B implies B compatible with A). It delegates to MimeType.isCompatibleWith, which is similar to includes but symmetric.[1][2] equals example from source: returns true only if type.equalsIgnoreCase(other.type), subtype.equalsIgnoreCase(other.subtype), and parameters match exactly.[3][2] isCompatibleWith example: text/* is compatible with text/plain, text/html, and vice versa.[1] This distinction is used in components like MediaTypeRequestMatcher, which defaults to isCompatibleWith but can switch to equals via setUseEquals(true).[4][5]
Citations:
- 1: https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/http/MediaType.html
- 2: https://github.com/spring-projects/spring-framework/blob/master/spring-core/src/main/java/org/springframework/util/MimeType.java
- 3: https://bethecoder.com/open-source/spring/spring-web-3.0.5/org/springframework/http/MediaType.java.html
- 4: https://docs.spring.io/spring-security/site/docs/5.4.6/api/org/springframework/security/web/util/matcher/MediaTypeRequestMatcher.html
- 5: https://github.com/spring-projects/spring-security/blob/master/web/src/main/java/org/springframework/security/web/util/matcher/MediaTypeRequestMatcher.java
🌐 Web query:
Spring MediaType contains() list matching parameters charset q-values
💡 Result:
Spring Framework's MediaType (which extends MimeType) has an includes method that determines if one media type includes another, primarily based on type and subtype hierarchy (e.g., text/* includes text/plain).[1][2][3] This method explicitly ignores parameters like charset and q-values; it only checks type/subtype matching and does not consider parameters for the inclusion decision.[3][4] There is no direct contains method on MediaType for checking if a list contains a matching MediaType. However, in practice, for content negotiation or matching lists of MediaTypes (e.g., from Accept headers), Spring uses: - includes for broad matching (ignores parameters).[3] - isCompatibleWith for symmetric compatibility (also ignores parameters).[1] - equals or exact parameter matching via parametersAreEqual for full matches including parameters like charset (case-insensitive for Charset).[3] q-values (quality factors) are parsed from the 'q' parameter via getQualityValue and used for sorting/prioritizing in content negotiation (higher q preferred), but not for boolean matching in includes.[5][1] For example: - MediaType.TEXT_PLAIN.includes(MediaType.parseMediaType("text/plain;charset=UTF-8")) == true (ignores charset).[3] - To check exact match including charset in a list, use list.stream.anyMatch(mt -> mt.equals(target)) or custom parameter check.[6] MediaType does not have a built-in method to check membership in a list while ignoring/matching specific parameters; custom iteration is needed.[7]
Citations:
- 1: https://docs.spring.io/spring-framework/docs/7.0.4/javadoc-api/org/springframework/http/MediaType.html
- 2: https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/http/MediaType.html
- 3: https://github.com/spring-projects/spring-framework/blob/master/spring-core/src/main/java/org/springframework/util/MimeType.java
- 4: https://spring.pleiades.io/spring-framework/docs/6.2.x/javadoc-api/org/springframework/util/MimeType.html
- 5: https://github.com/spring-projects/spring-framework/blob/master/spring-web/src/main/java/org/springframework/http/MediaType.java
- 6: https://stackoverflow.com/questions/62148127/how-do-i-specify-a-mediatype-of-text-plaincharset-utf-8-in-a-spring-boot-test
- 7: https://github.com/spring-projects/spring-security/blob/master/web/src/main/java/org/springframework/security/web/util/matcher/MediaTypeRequestMatcher.java
🏁 Script executed:
# Search for other contains() patterns with MediaType in the file
grep -n "acceptHeaders.contains\|MediaType" "api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java" | head -30Repository: kafbat/kafka-ui
Length of output: 678
🏁 Script executed:
# Check if isCompatibleWith is used elsewhere in the codebase as precedent
rg "isCompatibleWith" --type java 2>/dev/null | head -20Repository: kafbat/kafka-ui
Length of output: 41
🏁 Script executed:
# Search for how Accept headers are being handled elsewhere
rg "getAccept\(\)" --type java -B 2 -A 3 2>/dev/null | head -40Repository: kafbat/kafka-ui
Length of output: 1665
Use isCompatibleWith() for Accept header negotiation to handle parameters correctly.
The contains() check performs exact equality matching, including all parameters. Valid Accept headers like text/event-stream;charset=UTF-8 or with quality factors will be rejected with 400. Replace with isCompatibleWith(), which ignores parameters and properly handles wildcard matching.
Suggested fix
- List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
- if (!acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM)) {
+ List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
+ boolean acceptsEventStream = acceptHeaders.stream()
+ .anyMatch(mediaType -> mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM));
+ if (!acceptsEventStream) {
return ServerResponse.badRequest().build();
}- List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
- if (!(acceptHeaders.contains(MediaType.APPLICATION_JSON)
- && acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM))) {
+ List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
+ boolean acceptsJson = acceptHeaders.stream()
+ .anyMatch(mediaType -> mediaType.isCompatibleWith(MediaType.APPLICATION_JSON));
+ boolean acceptsEventStream = acceptHeaders.stream()
+ .anyMatch(mediaType -> mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM));
+ if (!(acceptsJson && acceptsEventStream)) {
return ServerResponse.badRequest().build();
}Applies to lines 207-209 and 262-265.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept(); | |
| if (!acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM)) { | |
| return ServerResponse.badRequest().build(); | |
| List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept(); | |
| boolean acceptsEventStream = acceptHeaders.stream() | |
| .anyMatch(mediaType -> mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)); | |
| if (!acceptsEventStream) { | |
| return ServerResponse.badRequest().build(); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java`
around lines 207 - 209, The Accept header check currently uses
acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM) which fails on parameters
(e.g., charset or q-values); update both occurrences (the one using
acceptHeaders = request.headers().asHttpHeaders().getAccept() and the later
similar check) to test compatibility by replacing the contains(...) test with a
predicate that uses MediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)
(e.g., check if acceptHeaders.stream().noneMatch(h ->
h.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) and keep returning
ServerResponse.badRequest().build() when no compatible media type is found).
Ensure you reference the acceptHeaders variable and MediaType.TEXT_EVENT_STREAM
in both places.
| McpStreamableServerSession.McpStreamableServerSessionInit init = this.sessionFactory | ||
| .startSession(initializeRequest); | ||
| this.sessions.put(init.session().getId(), init.session()); | ||
| return init.initResult().map(initializeResult -> { | ||
| McpSchema.JSONRPCResponse jsonrpcResponse = new McpSchema.JSONRPCResponse( | ||
| McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initializeResult, null); | ||
| try { | ||
| return this.jsonMapper.writeValueAsString(jsonrpcResponse); | ||
| } | ||
| catch (IOException e) { | ||
| logger.warn("Failed to serialize initResponse", e); | ||
| throw Exceptions.propagate(e); | ||
| } | ||
| }) | ||
| .flatMap(initResult -> ServerResponse.ok() |
There was a problem hiding this comment.
Remove sessions from the registry when init fails or a client deletes them.
The session is inserted into sessions before initResult() succeeds, and DELETE never removes it. That leaves failed/deleted sessions routable, keeps them in broadcasts/keepalives, and leaks the registry over time.
Suggested fix
- this.sessions.put(init.session().getId(), init.session());
- return init.initResult().map(initializeResult -> {
+ return init.initResult().doOnSuccess(__ ->
+ this.sessions.put(init.session().getId(), init.session())
+ ).map(initializeResult -> {
McpSchema.JSONRPCResponse jsonrpcResponse = new McpSchema.JSONRPCResponse(
McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initializeResult, null);
try {
return this.jsonMapper.writeValueAsString(jsonrpcResponse);
}
@@
- return session.delete().then(ServerResponse.ok().build());
+ return session.delete()
+ .doFinally(__ -> this.sessions.remove(sessionId, session))
+ .then(ServerResponse.ok().build());Also applies to: 384-392
🧰 Tools
🪛 GitHub Check: SonarCloud Code Analysis
[warning] 292-292: Replace "e" with an unnamed pattern.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java`
around lines 283 - 297, The code currently inserts the session into sessions via
sessions.put(init.session().getId(), init.session()) before init.initResult()
completes and never removes it on init failure or when a client issues DELETE;
modify the flow around
sessionFactory.startSession()/McpStreamableServerSession.McpStreamableServerSessionInit
so that if init.initResult() completes with an error or the client sends a
DELETE the session is removed from sessions (call
sessions.remove(init.session().getId())), e.g., attach onError/onErrorContinue
or doFinally handlers to the init.initResult() Mono to remove the session on
failure/termination and ensure the DELETE handler explicitly removes the session
from the registry (referencing the DELETE handler method and the sessions map).
|
The SonarCloud failure is from duplication in the vendored Could a maintainer add |
What changes did you make?
Replaces the legacy SSE transport with Streamable HTTP transport for the MCP server. The endpoint collapses from `/mcp/sse` (GET) + `/mcp/message` (POST) to a single `/mcp` endpoint per the MCP Streamable HTTP spec.
Closes #1137.
Implementation notes
Summary by CodeRabbit
Dependencies
Chores