Skip to content

feat: add Streamable HTTP transport for MCP server#1830

Open
sappusaketh wants to merge 5 commits into
kafbat:mainfrom
sappusaketh:feature/streamable-http-transport
Open

feat: add Streamable HTTP transport for MCP server#1830
sappusaketh wants to merge 5 commits into
kafbat:mainfrom
sappusaketh:feature/streamable-http-transport

Conversation

@sappusaketh

@sappusaketh sappusaketh commented May 5, 2026

Copy link
Copy Markdown
  • Breaking change? Endpoint changes from `/mcp/sse` + `/mcp/message` to a single `/mcp`

What changes did you make?

Replaces the legacy SSE transport with Streamable HTTP transport for the MCP server. The endpoint collapses from `/mcp/sse` (GET) + `/mcp/message` (POST) to a single `/mcp` endpoint per the MCP Streamable HTTP spec.

Closes #1137.

Implementation notes

  • Upgraded MCP SDK 0.10.0 → 1.1.0 (`mcp-core` + `mcp-json-jackson3`). Has breaking API changes — `AsyncToolSpecification` callback signature, `Tool` and `CallToolResult` now use builders.
  • Vendored `WebFluxStreamableServerTransportProvider` from Spring AI 2.0-M4 into `io.kafbat.ui.transport`. Spring AI 2.0 targets Spring Framework 7.0, but kafka-ui is on Spring Framework 6.2. The only behavioral change in the vendored copy is replacing `HttpHeaders.asMultiValueMap()` (Spring 7.0+) with `new LinkedHashMap<>(httpHeaders)`. Once kafka-ui upgrades to Spring Boot 4.0, the vendored file can be replaced with the library dependency.
  • Includes the Flux unwrap fix from MCP server doesn't properly handle Flux responses. #1454 / BE: Handle Flux response bodies in MCP tool call results #1491 — `reponseToCallResult` was returning the raw `Flux` object instead of unwrapping it. Fixed by delegating to `toCallResult(body)`. BE: Handle Flux response bodies in MCP tool call results #1491 can be closed once this lands.

Summary by CodeRabbit

  • Dependencies

    • Updated Model Context Protocol libraries to version 1.1.0 for improved server compatibility and reliability.
  • Chores

    • Enhanced code quality and static analysis configurations.

Replace legacy SSE transport with Streamable HTTP transport for the MCP
server. This upgrades the MCP SDK from 0.10.0 to 1.1.0 and exposes a
single /mcp endpoint instead of separate /mcp/sse + /mcp/message paths.

The WebFluxStreamableServerTransportProvider is vendored from Spring AI
2.0.0-M4 with a compatibility fix for Spring Framework 6.2 (replaces
HttpHeaders.asMultiValueMap() which requires Spring 7.0).

Closes kafbat#1137
When a controller returns ResponseEntity<Flux<T>>, the MCP tool handler
serialized the raw Flux object instead of collecting the stream first.
Delegate to toCallResult() which already handles Flux/Mono unwrapping.

Fixes kafbat#1454
@sappusaketh sappusaketh requested review from a team as code owners May 5, 2026 17:34
@kapybro kapybro Bot added status/triage Issues pending maintainers triage status/triage/manual Manual triage in progress status/triage/completed Automatic triage completed and removed status/triage Issues pending maintainers triage labels May 5, 2026
@coderabbitai

coderabbitai Bot commented May 5, 2026

Copy link
Copy Markdown

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 90eb4c3d-53d6-410c-b29d-5d34ff71c759

📥 Commits

Reviewing files that changed from the base of the PR and between 5fd7693 and 79e6b1e.

📒 Files selected for processing (2)
  • build.gradle
  • sonar-project.properties
✅ Files skipped from review due to trivial changes (2)
  • sonar-project.properties
  • build.gradle

📝 Walkthrough

Walkthrough

This PR migrates the MCP (Model Context Protocol) server transport mechanism from an SSE-based Spring WebFlux integration (v0.10.0) to a new streamable HTTP transport using MCP v1.1.0. The core change introduces a new WebFluxStreamableServerTransportProvider implementation that manages HTTP sessions, routing JSON-RPC requests over SSE, and broadcasting notifications. Configuration, tool generation, and test fixtures are updated accordingly.

Changes

MCP Transport Migration

Layer / File(s) Summary
Dependency Upgrade
gradle/libs.versions.toml, api/build.gradle
MCP library switched from modelcontextprotocol-spring-webflux:0.10.0 to mcp-core and mcp-json-jackson, both v1.1.0. Build adds checkstyle suppressions configuration file reference.
Core Transport Implementation
api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java
New 511-line provider implements McpStreamableServerTransportProvider with GET/POST/DELETE handlers for JSON-RPC routing over SSE. GET replays events or opens live streaming; POST initializes sessions and routes messages; DELETE closes sessions. Includes session lifecycle management, keep-alive scheduling, and per-client notification broadcasting.
Config Integration
api/src/main/java/io/kafbat/ui/config/McpConfig.java
Replaced SSE transport bean with new streamable transport provider bean. Updated mcpServer and mcpRouterFunction to accept WebFluxStreamableServerTransportProvider instead of WebFluxSseServerTransportProvider.
Generator Adaptation
api/src/main/java/io/kafbat/ui/service/mcp/McpSpecificationGenerator.java
Updated tool creation to use McpSchema.Tool.builder() pattern. Refactored methodCall handler to accept CallToolRequest and extract arguments from request.arguments(). Simplified response handling by directly delegating to toCallResult() without wrapping in Mono.just().
Build & Code Quality Config
build.gradle, sonar-project.properties, etc/checkstyle/checkstyle-suppressions.xml
Added SonarQube project configuration and copy-paste detection exclusion for the new transport provider. Added Checkstyle suppressions file to disable checks for the large new provider class.
Test Updates
api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java
Introduced tool() helper method using builder pattern. Refactored expected tool fixtures in testConvertController to use the helper instead of direct constructor calls.

Sequence Diagram

sequenceDiagram
    participant Client
    participant WebFlux as WebFlux Handler
    participant SessionMgr as Session Manager
    participant SessionImpl as MCP Session
    participant EventBus as Event Stream

    Note over Client,EventBus: Initialize MCP Session (POST /mcp)
    Client->>WebFlux: POST /mcp<br/>JSON-RPC: initialize
    WebFlux->>SessionMgr: sessionFactory.create(context)
    SessionMgr->>SessionImpl: new session
    SessionMgr->>WebFlux: return sessionId
    WebFlux->>Client: JSON-RPC response<br/>+ MCP_SESSION_ID header

    Note over Client,EventBus: Stream Events (GET /mcp)
    Client->>WebFlux: GET /mcp<br/>Accept: text/event-stream<br/>+ MCP_SESSION_ID header
    WebFlux->>SessionMgr: lookup session
    WebFlux->>SessionImpl: subscribe to listeningStream
    SessionImpl->>EventBus: emit events
    EventBus->>Client: SSE: message event

    Note over Client,EventBus: Send Tool Call (POST /mcp)
    Client->>WebFlux: POST /mcp<br/>JSON-RPC: callTool<br/>+ MCP_SESSION_ID header
    WebFlux->>SessionImpl: route callTool request
    SessionImpl->>SessionImpl: invoke tool handler
    SessionImpl->>SessionImpl: emit response
    EventBus->>Client: SSE: JSON-RPC response

    Note over Client,EventBus: Close Session (DELETE /mcp)
    Client->>WebFlux: DELETE /mcp<br/>+ MCP_SESSION_ID header
    WebFlux->>SessionMgr: lookup & delete session
    SessionImpl->>SessionImpl: cleanup resources
    WebFlux->>Client: 200 OK
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

The new WebFluxStreamableServerTransportProvider is a substantial, feature-rich component (511 lines) with multiple responsibilities: HTTP request handling (GET/POST/DELETE), session lifecycle management, JSON-RPC routing, SSE event streaming, and graceful shutdown. The interactions between session factories, concurrent session storage, keep-alive scheduling, and event broadcasting require careful logic verification. Additionally, the integration across config, generator, and test files adds heterogeneity. The builder pattern refactoring in tests and generator adds modest additional cognitive load. No critical algorithmic complexity, but high surface area for integration bugs.

Poem

🐰 A stream of sessions flows so bright,
JSON-RPC bounces left and right,
SSE whispers through the wire,
Transport magic takes us higher,
The rabbit hops—the tools now fly! ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 7.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: transitioning from SSE to Streamable HTTP transport for the MCP server, which is the primary objective of the PR.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai

coderabbitai Bot commented May 5, 2026

Copy link
Copy Markdown
📝 Walkthrough

Walkthrough

This PR upgrades the Model Context Protocol (MCP) integration from an SSE-based Spring WebFlux transport to a streamable HTTP transport. Dependencies are updated, a new WebFluxStreamableServerTransportProvider is introduced with session management and routing, the MCP config bean wiring is refactored, and tool specification generation is adapted to the new request format.

Changes

MCP Transport Migration

Layer / File(s) Summary
Dependencies
gradle/libs.versions.toml, api/build.gradle
Removes modelcontextprotocol-spring-webflux (v0.10.0); adds mcp-core and mcp-json-jackson (both v1.1.0). Gradle build configuration adds checkstyle suppressions config reference.
Core Transport Implementation
api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java
New streamable HTTP transport provider with GET/POST/DELETE routing on /mcp endpoint. Manages concurrent sessions, supports SSE streaming with replay via Last-Event-Id, handles JSON-RPC initialization and message routing, provides graceful shutdown and client notification broadcasting.
Configuration Wiring
api/src/main/java/io/kafbat/ui/config/McpConfig.java
Replaces SSE transport bean with streamable transport provider bean. Updates mcpRouterFunction and mcpServer bean signatures to accept WebFluxStreamableServerTransportProvider instead of WebFluxSseServerTransportProvider.
Tool Request Handling
api/src/main/java/io/kafbat/ui/service/mcp/McpSpecificationGenerator.java
Updates tool specification builder to use McpSchema.Tool.builder() pattern. Refactors methodCall handler to accept CallToolRequest and extract arguments from request.arguments(). Simplifies response handling for ResponseEntity<?> results.
Tests & Configuration
api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java, etc/checkstyle/checkstyle-suppressions.xml
Test helper added to construct McpSchema.Tool via builder pattern. Checkstyle suppressions file created to disable checks for the new WebFluxStreamableServerTransportProvider class.

Sequence Diagram

sequenceDiagram
    participant Client as HTTP Client
    participant Transport as WebFluxStreamableServerTransportProvider
    participant SessionFactory as Session Factory
    participant Session as MCP Session
    participant Server as MCP Server

    rect rgba(100, 150, 255, 0.5)
    Note over Client,Server: Initialize & Establish Session
    Client->>Transport: POST /mcp (initialize JSON-RPC)
    Transport->>SessionFactory: Create new session
    SessionFactory->>Session: new
    Transport->>Transport: Store session in map
    Transport->>Client: JSON-RPC response + MCP_SESSION_ID header
    end

    rect rgba(100, 200, 150, 0.5)
    Note over Client,Server: Handle Tool Requests
    Client->>Transport: POST /mcp (tool request JSON-RPC, with MCP_SESSION_ID)
    Transport->>Session: Route JSON-RPC to session
    Session->>Server: Execute tool via MCP protocol
    Server->>Session: Return result
    Transport->>Client: SSE message with response
    end

    rect rgba(200, 150, 100, 0.5)
    Note over Client,Server: Session Streaming
    Client->>Transport: GET /mcp (with MCP_SESSION_ID, SSE Accept)
    Transport->>Session: Open listening stream
    Session-->>Client: Stream SSE events as messages arrive
    Client->>Transport: DELETE /mcp (with MCP_SESSION_ID)
    Transport->>Session: Delete session
    Transport->>Client: 200 OK
    end
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~75 minutes

The new WebFluxStreamableServerTransportProvider class introduces dense logic spanning HTTP request routing (GET/POST/DELETE), concurrent session management, SSE streaming with replay support, JSON-RPC error handling, and keep-alive scheduling. Multiple error paths and integration points across bean configuration require careful verification of behavior semantics and edge cases.

Poem

🐰 From gentle SSE streams to channels flowing free,
The transport hops through POST and GET with glee!
Sessions dance in maps, keep-alives tick on time,
The protocol takes flight—a hoppy paradigm! 🚀

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 7.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title directly and clearly summarizes the main change: adding Streamable HTTP transport for the MCP server. The title is concise, specific, and accurately reflects the primary objective of the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java`:
- Around line 283-297: The code currently inserts the session into sessions via
sessions.put(init.session().getId(), init.session()) before init.initResult()
completes and never removes it on init failure or when a client issues DELETE;
modify the flow around
sessionFactory.startSession()/McpStreamableServerSession.McpStreamableServerSessionInit
so that if init.initResult() completes with an error or the client sends a
DELETE the session is removed from sessions (call
sessions.remove(init.session().getId())), e.g., attach onError/onErrorContinue
or doFinally handlers to the init.initResult() Mono to remove the session on
failure/termination and ensure the DELETE handler explicitly removes the session
from the registry (referencing the DELETE handler method and the sessions map).
- Around line 207-209: The Accept header check currently uses
acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM) which fails on parameters
(e.g., charset or q-values); update both occurrences (the one using
acceptHeaders = request.headers().asHttpHeaders().getAccept() and the later
similar check) to test compatibility by replacing the contains(...) test with a
predicate that uses MediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)
(e.g., check if acceptHeaders.stream().noneMatch(h ->
h.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) and keep returning
ServerResponse.badRequest().build() when no compatible media type is found).
Ensure you reference the acceptHeaders variable and MediaType.TEXT_EVENT_STREAM
in both places.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ad2f3a9b-4690-48ac-87c0-2a21d9fc628d

📥 Commits

Reviewing files that changed from the base of the PR and between 4bf06dd and 5fd7693.

📒 Files selected for processing (7)
  • api/build.gradle
  • api/src/main/java/io/kafbat/ui/config/McpConfig.java
  • api/src/main/java/io/kafbat/ui/service/mcp/McpSpecificationGenerator.java
  • api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java
  • api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java
  • etc/checkstyle/checkstyle-suppressions.xml
  • gradle/libs.versions.toml

Comment on lines +207 to +209
List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
if (!acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM)) {
return ServerResponse.badRequest().build();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

# Find the target file
find . -type f -name "WebFluxStreamableServerTransportProvider.java" 2>/dev/null

Repository: kafbat/kafka-ui

Length of output: 148


🏁 Script executed:

# Check if file exists and get line count
if [ -f "api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java" ]; then
  wc -l "api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java"
fi

Repository: kafbat/kafka-ui

Length of output: 150


🏁 Script executed:

# Read the relevant lines from the file
sed -n '200,220p' "api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java"

Repository: kafbat/kafka-ui

Length of output: 837


🏁 Script executed:

# Read the second location mentioned (262-265)
sed -n '255,275p' "api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java"

Repository: kafbat/kafka-ui

Length of output: 1097


🌐 Web query:

Spring Framework MediaType equals() isCompatibleWith() difference

💡 Result:

The primary difference between MediaType.equals and MediaType.isCompatibleWith in Spring Framework is that equals performs an exact match including type, subtype, and all parameters (case-insensitive for type/subtype), while isCompatibleWith checks for semantic compatibility, particularly handling wildcards like text/* matching text/plain or text/html, and is symmetric (A compatible with B implies B compatible with A). It delegates to MimeType.isCompatibleWith, which is similar to includes but symmetric.[1][2] equals example from source: returns true only if type.equalsIgnoreCase(other.type), subtype.equalsIgnoreCase(other.subtype), and parameters match exactly.[3][2] isCompatibleWith example: text/* is compatible with text/plain, text/html, and vice versa.[1] This distinction is used in components like MediaTypeRequestMatcher, which defaults to isCompatibleWith but can switch to equals via setUseEquals(true).[4][5]

Citations:


🌐 Web query:

Spring MediaType contains() list matching parameters charset q-values

💡 Result:

Spring Framework's MediaType (which extends MimeType) has an includes method that determines if one media type includes another, primarily based on type and subtype hierarchy (e.g., text/* includes text/plain).[1][2][3] This method explicitly ignores parameters like charset and q-values; it only checks type/subtype matching and does not consider parameters for the inclusion decision.[3][4] There is no direct contains method on MediaType for checking if a list contains a matching MediaType. However, in practice, for content negotiation or matching lists of MediaTypes (e.g., from Accept headers), Spring uses: - includes for broad matching (ignores parameters).[3] - isCompatibleWith for symmetric compatibility (also ignores parameters).[1] - equals or exact parameter matching via parametersAreEqual for full matches including parameters like charset (case-insensitive for Charset).[3] q-values (quality factors) are parsed from the 'q' parameter via getQualityValue and used for sorting/prioritizing in content negotiation (higher q preferred), but not for boolean matching in includes.[5][1] For example: - MediaType.TEXT_PLAIN.includes(MediaType.parseMediaType("text/plain;charset=UTF-8")) == true (ignores charset).[3] - To check exact match including charset in a list, use list.stream.anyMatch(mt -> mt.equals(target)) or custom parameter check.[6] MediaType does not have a built-in method to check membership in a list while ignoring/matching specific parameters; custom iteration is needed.[7]

Citations:


🏁 Script executed:

# Search for other contains() patterns with MediaType in the file
grep -n "acceptHeaders.contains\|MediaType" "api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java" | head -30

Repository: kafbat/kafka-ui

Length of output: 678


🏁 Script executed:

# Check if isCompatibleWith is used elsewhere in the codebase as precedent
rg "isCompatibleWith" --type java 2>/dev/null | head -20

Repository: kafbat/kafka-ui

Length of output: 41


🏁 Script executed:

# Search for how Accept headers are being handled elsewhere
rg "getAccept\(\)" --type java -B 2 -A 3 2>/dev/null | head -40

Repository: kafbat/kafka-ui

Length of output: 1665


Use isCompatibleWith() for Accept header negotiation to handle parameters correctly.

The contains() check performs exact equality matching, including all parameters. Valid Accept headers like text/event-stream;charset=UTF-8 or with quality factors will be rejected with 400. Replace with isCompatibleWith(), which ignores parameters and properly handles wildcard matching.

Suggested fix
-			List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
-			if (!acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM)) {
+			List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
+			boolean acceptsEventStream = acceptHeaders.stream()
+				.anyMatch(mediaType -> mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM));
+			if (!acceptsEventStream) {
 				return ServerResponse.badRequest().build();
 			}
-		List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
-		if (!(acceptHeaders.contains(MediaType.APPLICATION_JSON)
-				&& acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM))) {
+		List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
+		boolean acceptsJson = acceptHeaders.stream()
+			.anyMatch(mediaType -> mediaType.isCompatibleWith(MediaType.APPLICATION_JSON));
+		boolean acceptsEventStream = acceptHeaders.stream()
+			.anyMatch(mediaType -> mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM));
+		if (!(acceptsJson && acceptsEventStream)) {
 			return ServerResponse.badRequest().build();
 		}

Applies to lines 207-209 and 262-265.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
if (!acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM)) {
return ServerResponse.badRequest().build();
List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
boolean acceptsEventStream = acceptHeaders.stream()
.anyMatch(mediaType -> mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM));
if (!acceptsEventStream) {
return ServerResponse.badRequest().build();
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java`
around lines 207 - 209, The Accept header check currently uses
acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM) which fails on parameters
(e.g., charset or q-values); update both occurrences (the one using
acceptHeaders = request.headers().asHttpHeaders().getAccept() and the later
similar check) to test compatibility by replacing the contains(...) test with a
predicate that uses MediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)
(e.g., check if acceptHeaders.stream().noneMatch(h ->
h.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) and keep returning
ServerResponse.badRequest().build() when no compatible media type is found).
Ensure you reference the acceptHeaders variable and MediaType.TEXT_EVENT_STREAM
in both places.

Comment on lines +283 to +297
McpStreamableServerSession.McpStreamableServerSessionInit init = this.sessionFactory
.startSession(initializeRequest);
this.sessions.put(init.session().getId(), init.session());
return init.initResult().map(initializeResult -> {
McpSchema.JSONRPCResponse jsonrpcResponse = new McpSchema.JSONRPCResponse(
McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initializeResult, null);
try {
return this.jsonMapper.writeValueAsString(jsonrpcResponse);
}
catch (IOException e) {
logger.warn("Failed to serialize initResponse", e);
throw Exceptions.propagate(e);
}
})
.flatMap(initResult -> ServerResponse.ok()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Remove sessions from the registry when init fails or a client deletes them.

The session is inserted into sessions before initResult() succeeds, and DELETE never removes it. That leaves failed/deleted sessions routable, keeps them in broadcasts/keepalives, and leaks the registry over time.

Suggested fix
-					this.sessions.put(init.session().getId(), init.session());
-					return init.initResult().map(initializeResult -> {
+					return init.initResult().doOnSuccess(__ ->
+						this.sessions.put(init.session().getId(), init.session())
+					).map(initializeResult -> {
 						McpSchema.JSONRPCResponse jsonrpcResponse = new McpSchema.JSONRPCResponse(
 								McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initializeResult, null);
 						try {
 							return this.jsonMapper.writeValueAsString(jsonrpcResponse);
 						}
@@
-			return session.delete().then(ServerResponse.ok().build());
+			return session.delete()
+				.doFinally(__ -> this.sessions.remove(sessionId, session))
+				.then(ServerResponse.ok().build());

Also applies to: 384-392

🧰 Tools
🪛 GitHub Check: SonarCloud Code Analysis

[warning] 292-292: Replace "e" with an unnamed pattern.

See more on https://sonarcloud.io/project/issues?id=kafbat_kafka-ui&issues=AZ35NSHv_MejDXXElXPR&open=AZ35NSHv_MejDXXElXPR&pullRequest=1830

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@api/src/main/java/io/kafbat/ui/transport/WebFluxStreamableServerTransportProvider.java`
around lines 283 - 297, The code currently inserts the session into sessions via
sessions.put(init.session().getId(), init.session()) before init.initResult()
completes and never removes it on init failure or when a client issues DELETE;
modify the flow around
sessionFactory.startSession()/McpStreamableServerSession.McpStreamableServerSessionInit
so that if init.initResult() completes with an error or the client sends a
DELETE the session is removed from sessions (call
sessions.remove(init.session().getId())), e.g., attach onError/onErrorContinue
or doFinally handlers to the init.initResult() Mono to remove the session on
failure/termination and ensure the DELETE handler explicitly removes the session
from the registry (referencing the DELETE handler method and the sessions map).

@sappusaketh

Copy link
Copy Markdown
Author

The SonarCloud failure is from duplication in the vendored WebFluxStreamableServerTransportProvider.java. I tried adding sonar.cpd.exclusions via both build.gradle and sonar-project.properties, but the SonarCloud automatic analysis ignores those local configs.

Could a maintainer add **/transport/WebFluxStreamableServerTransportProvider.java to sonar.cpd.exclusions in the SonarCloud project settings?

@Haarolean Haarolean requested a review from germanosin May 7, 2026 12:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

status/triage/completed Automatic triage completed status/triage/manual Manual triage in progress

Projects

None yet

Development

Successfully merging this pull request may close these issues.

streamable-http for MCP server

1 participant