Skip to content

Fix non-deterministic notification ordering in AcpClientSession#13

Open
ljiro wants to merge 1 commit into
agentclientprotocol:mainfrom
ljiro:fix/notification-ordering
Open

Fix non-deterministic notification ordering in AcpClientSession#13
ljiro wants to merge 1 commit into
agentclientprotocol:mainfrom
ljiro:fix/notification-ordering

Conversation

@ljiro

@ljiro ljiro commented Jun 22, 2026

Copy link
Copy Markdown

Closes #11

Root Cause

In AcpClientSession.handle(), every incoming JSONRPCNotification was dispatched by calling .subscribe() on an independent Mono:

handleIncomingNotification(notification).onErrorComplete(...).subscribe();

Because each subscription is independent, if two notifications arrive back-to-back and the handler (e.g. sessionUpdateConsumer) does any async work, there is nothing preventing notification B from completing before notification A. The transport delivers messages in order, but that guarantee is lost the moment execution fans out into parallel subscriptions.

This is particularly impactful with Claude Code, which emits many rapid sessionUpdate chunks during streaming. Any consumer that hops schedulers, writes to a reactive sink, or dispatches to shared state (e.g. clientHolder.dispatch(notification.update())) is silently at risk of receiving chunks out of sequence.

Fix

Route all incoming notifications through a Sinks.Many<JSONRPCNotification> drained by concatMap, which ensures each notification's Mono must complete before the next one is picked up:

this.notificationSink = Sinks.many().unicast().onBackpressureBuffer();
this.notificationSubscription = this.notificationSink.asFlux()
    .concatMap(notification -> handleIncomingNotification(notification)
        .onErrorComplete(t -> { ... }))
    .subscribe();

In handle(), notifications are now enqueued rather than subscribed to directly:

Sinks.EmitResult result = notificationSink.tryEmitNext(notification);

The sink is completed and the subscription disposed in both close() and closeGracefully(). The change is contained entirely to AcpClientSession, touches no public API, and does not affect request/response handling.

Tradeoff: serialization means a slow sessionUpdateConsumer will delay delivery of the next notification (head-of-line blocking). The previous behaviour processed notifications in parallel, which was faster but produced incorrect ordering. For the typical consumer (dispatching to a queue, writing to a list, updating reactive state) this has no practical impact.

Verification

A new test testNotificationOrderPreservedWithAsyncHandler was added to AcpClientSessionTest. It sends 5 rapid notifications where earlier ones have longer async delays than later ones — a setup that reliably inverts the order without serialization:

Notification Delay
0 150 ms
1 120 ms
2 90 ms
3 60 ms
4 30 ms

Without the fix: test fails, notifications arrive as [4, 3, 2, 1, 0].
With the fix: test passes, notifications arrive as [0, 1, 2, 3, 4].

All 338 existing tests continue to pass.

Incoming JSONRPCNotifications were processed via independent .subscribe()
calls, allowing async handlers to complete out of arrival order. Route
notifications through a Sinks.Many drained by concatMap so each handler
Mono must complete before the next notification is picked up.

Adds testNotificationOrderPreservedWithAsyncHandler to verify that five
rapid notifications with staggered async delays still arrive in order.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

The order of streamed content from acpsdk is non-deterministic.

1 participant