diff --git a/acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpClientSession.java b/acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpClientSession.java index 2b36831..8140e44 100644 --- a/acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpClientSession.java +++ b/acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpClientSession.java @@ -11,8 +11,10 @@ import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import reactor.core.publisher.Sinks; import java.time.Duration; import java.util.Map; @@ -78,6 +80,12 @@ public class AcpClientSession implements AcpSession { /** Atomic counter for generating unique request IDs */ private final AtomicLong requestCounter = new AtomicLong(0); + /** Sink for serializing notification delivery in arrival order */ + private final Sinks.Many notificationSink; + + /** Subscription draining the notification sink via concatMap */ + private final Disposable notificationSubscription; + /** * Functional interface for handling incoming JSON-RPC requests. Implementations * should process the request parameters and return a response. @@ -148,6 +156,17 @@ public AcpClientSession(Duration requestTimeout, AcpClientTransport transport, return t; }), "acp-timeout-" + sessionPrefix); + // Serialize notification delivery: concatMap ensures each notification's Mono + // completes before the next one starts, preserving arrival order even when + // handlers do async work. + this.notificationSink = Sinks.many().unicast().onBackpressureBuffer(); + this.notificationSubscription = this.notificationSink.asFlux() + .concatMap(notification -> handleIncomingNotification(notification).onErrorComplete(t -> { + logger.error("Error handling notification: {}", t.getMessage()); + return true; + })) + .subscribe(); + this.transport.connect(mono -> mono.doOnNext(this::handle).then(Mono.empty())).transform(connectHook).subscribe(); } @@ -203,10 +222,10 @@ else if (message instanceof AcpSchema.JSONRPCRequest request) { else if (message instanceof AcpSchema.JSONRPCNotification notification) { logger.debug("Received notification: {}", notification); logger.trace("Incoming notification method='{}' params={}", notification.method(), notification.params()); - handleIncomingNotification(notification).onErrorComplete(t -> { - logger.error("Error handling notification: {}", t.getMessage()); - return true; - }).subscribe(); + Sinks.EmitResult result = notificationSink.tryEmitNext(notification); + if (result.isFailure()) { + logger.warn("Failed to enqueue notification for serial processing: {}", result); + } } else { logger.warn("Received unknown message type: {}", message); @@ -352,6 +371,8 @@ public Mono sendNotification(String method, Object params) { public Mono closeGracefully() { return Mono.fromRunnable(() -> { dismissPendingResponses(); + notificationSink.tryEmitComplete(); + notificationSubscription.dispose(); timeoutScheduler.dispose(); }); } @@ -362,6 +383,8 @@ public Mono closeGracefully() { @Override public void close() { dismissPendingResponses(); + notificationSink.tryEmitComplete(); + notificationSubscription.dispose(); timeoutScheduler.dispose(); } diff --git a/acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpClientSessionTest.java b/acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpClientSessionTest.java index 58a3799..59b9690 100644 --- a/acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpClientSessionTest.java +++ b/acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpClientSessionTest.java @@ -5,7 +5,11 @@ package com.agentclientprotocol.sdk.spec; import java.time.Duration; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import com.agentclientprotocol.sdk.MockAcpClientTransport; @@ -298,6 +302,37 @@ void testGracefulShutdown() { StepVerifier.create(session.closeGracefully()).verifyComplete(); } + @Test + void testNotificationOrderPreservedWithAsyncHandler() throws InterruptedException { + // Notification i gets a delay of (5 - i) * 30ms so that without serialization + // later notifications would complete first, reversing the observed order. + int count = 5; + List processedOrder = new CopyOnWriteArrayList<>(); + CountDownLatch latch = new CountDownLatch(count); + + var transport = new MockAcpClientTransport(); + var session = new AcpClientSession(TIMEOUT, transport, Map.of(), + Map.of(TEST_NOTIFICATION, params -> { + int index = (int) ((Map) params).get("index"); + long delayMs = (count - index) * 30L; + return Mono.delay(Duration.ofMillis(delayMs)).then(Mono.fromRunnable(() -> { + processedOrder.add(index); + latch.countDown(); + })); + }), + Function.identity()); + + for (int i = 0; i < count; i++) { + transport.simulateIncomingMessage(new AcpSchema.JSONRPCNotification( + AcpSchema.JSONRPC_VERSION, TEST_NOTIFICATION, Map.of("index", i))); + } + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(processedOrder).containsExactly(0, 1, 2, 3, 4); + + session.close(); + } + @Test void testConcurrentRequests() { var transport = new MockAcpClientTransport();