Fix non-deterministic notification ordering in AcpClientSession#13
Open
ljiro wants to merge 1 commit into
Open
Fix non-deterministic notification ordering in AcpClientSession#13ljiro wants to merge 1 commit into
ljiro wants to merge 1 commit into
Conversation
Incoming JSONRPCNotifications were processed via independent .subscribe() calls, allowing async handlers to complete out of arrival order. Route notifications through a Sinks.Many drained by concatMap so each handler Mono must complete before the next notification is picked up. Adds testNotificationOrderPreservedWithAsyncHandler to verify that five rapid notifications with staggered async delays still arrive in order.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #11
Root Cause
In
AcpClientSession.handle(), every incomingJSONRPCNotificationwas dispatched by calling.subscribe()on an independentMono:Because each subscription is independent, if two notifications arrive back-to-back and the handler (e.g.
sessionUpdateConsumer) does any async work, there is nothing preventing notification B from completing before notification A. The transport delivers messages in order, but that guarantee is lost the moment execution fans out into parallel subscriptions.This is particularly impactful with Claude Code, which emits many rapid
sessionUpdatechunks during streaming. Any consumer that hops schedulers, writes to a reactive sink, or dispatches to shared state (e.g.clientHolder.dispatch(notification.update())) is silently at risk of receiving chunks out of sequence.Fix
Route all incoming notifications through a
Sinks.Many<JSONRPCNotification>drained byconcatMap, which ensures each notification'sMonomust complete before the next one is picked up:In
handle(), notifications are now enqueued rather than subscribed to directly:The sink is completed and the subscription disposed in both
close()andcloseGracefully(). The change is contained entirely toAcpClientSession, touches no public API, and does not affect request/response handling.Tradeoff: serialization means a slow
sessionUpdateConsumerwill delay delivery of the next notification (head-of-line blocking). The previous behaviour processed notifications in parallel, which was faster but produced incorrect ordering. For the typical consumer (dispatching to a queue, writing to a list, updating reactive state) this has no practical impact.Verification
A new test
testNotificationOrderPreservedWithAsyncHandlerwas added toAcpClientSessionTest. It sends 5 rapid notifications where earlier ones have longer async delays than later ones — a setup that reliably inverts the order without serialization:Without the fix: test fails, notifications arrive as
[4, 3, 2, 1, 0].With the fix: test passes, notifications arrive as
[0, 1, 2, 3, 4].All 338 existing tests continue to pass.