From 5bc906a024342622dca2f02cdf56a325c1614d78 Mon Sep 17 00:00:00 2001 From: Liam Morales Date: Fri, 19 Jun 2026 22:32:48 +0800 Subject: [PATCH] Fix non-deterministic notification ordering in AcpClientSession Incoming JSONRPCNotifications were processed via independent .subscribe() calls, allowing async handlers to complete out of arrival order. Route notifications through a Sinks.Many drained by concatMap so each handler Mono must complete before the next notification is picked up. Adds testNotificationOrderPreservedWithAsyncHandler to verify that five rapid notifications with staggered async delays still arrive in order. --- .../sdk/spec/AcpClientSession.java | 31 +++++++++++++--- .../sdk/spec/AcpClientSessionTest.java | 35 +++++++++++++++++++ 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpClientSession.java b/acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpClientSession.java index 2b36831..8140e44 100644 --- a/acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpClientSession.java +++ b/acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpClientSession.java @@ -11,8 +11,10 @@ import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import reactor.core.publisher.Sinks; import java.time.Duration; import java.util.Map; @@ -78,6 +80,12 @@ public class AcpClientSession implements AcpSession { /** Atomic counter for generating unique request IDs */ private final AtomicLong requestCounter = new AtomicLong(0); + /** Sink for serializing notification delivery in arrival order */ + private final Sinks.Many notificationSink; + + /** Subscription draining the notification sink via concatMap */ + private final Disposable notificationSubscription; + /** * Functional interface for handling incoming JSON-RPC requests. Implementations * should process the request parameters and return a response. @@ -148,6 +156,17 @@ public AcpClientSession(Duration requestTimeout, AcpClientTransport transport, return t; }), "acp-timeout-" + sessionPrefix); + // Serialize notification delivery: concatMap ensures each notification's Mono + // completes before the next one starts, preserving arrival order even when + // handlers do async work. + this.notificationSink = Sinks.many().unicast().onBackpressureBuffer(); + this.notificationSubscription = this.notificationSink.asFlux() + .concatMap(notification -> handleIncomingNotification(notification).onErrorComplete(t -> { + logger.error("Error handling notification: {}", t.getMessage()); + return true; + })) + .subscribe(); + this.transport.connect(mono -> mono.doOnNext(this::handle).then(Mono.empty())).transform(connectHook).subscribe(); } @@ -203,10 +222,10 @@ else if (message instanceof AcpSchema.JSONRPCRequest request) { else if (message instanceof AcpSchema.JSONRPCNotification notification) { logger.debug("Received notification: {}", notification); logger.trace("Incoming notification method='{}' params={}", notification.method(), notification.params()); - handleIncomingNotification(notification).onErrorComplete(t -> { - logger.error("Error handling notification: {}", t.getMessage()); - return true; - }).subscribe(); + Sinks.EmitResult result = notificationSink.tryEmitNext(notification); + if (result.isFailure()) { + logger.warn("Failed to enqueue notification for serial processing: {}", result); + } } else { logger.warn("Received unknown message type: {}", message); @@ -352,6 +371,8 @@ public Mono sendNotification(String method, Object params) { public Mono closeGracefully() { return Mono.fromRunnable(() -> { dismissPendingResponses(); + notificationSink.tryEmitComplete(); + notificationSubscription.dispose(); timeoutScheduler.dispose(); }); } @@ -362,6 +383,8 @@ public Mono closeGracefully() { @Override public void close() { dismissPendingResponses(); + notificationSink.tryEmitComplete(); + notificationSubscription.dispose(); timeoutScheduler.dispose(); } diff --git a/acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpClientSessionTest.java b/acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpClientSessionTest.java index 58a3799..59b9690 100644 --- a/acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpClientSessionTest.java +++ b/acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpClientSessionTest.java @@ -5,7 +5,11 @@ package com.agentclientprotocol.sdk.spec; import java.time.Duration; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import com.agentclientprotocol.sdk.MockAcpClientTransport; @@ -298,6 +302,37 @@ void testGracefulShutdown() { StepVerifier.create(session.closeGracefully()).verifyComplete(); } + @Test + void testNotificationOrderPreservedWithAsyncHandler() throws InterruptedException { + // Notification i gets a delay of (5 - i) * 30ms so that without serialization + // later notifications would complete first, reversing the observed order. + int count = 5; + List processedOrder = new CopyOnWriteArrayList<>(); + CountDownLatch latch = new CountDownLatch(count); + + var transport = new MockAcpClientTransport(); + var session = new AcpClientSession(TIMEOUT, transport, Map.of(), + Map.of(TEST_NOTIFICATION, params -> { + int index = (int) ((Map) params).get("index"); + long delayMs = (count - index) * 30L; + return Mono.delay(Duration.ofMillis(delayMs)).then(Mono.fromRunnable(() -> { + processedOrder.add(index); + latch.countDown(); + })); + }), + Function.identity()); + + for (int i = 0; i < count; i++) { + transport.simulateIncomingMessage(new AcpSchema.JSONRPCNotification( + AcpSchema.JSONRPC_VERSION, TEST_NOTIFICATION, Map.of("index", i))); + } + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(processedOrder).containsExactly(0, 1, 2, 3, 4); + + session.close(); + } + @Test void testConcurrentRequests() { var transport = new MockAcpClientTransport();