Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -78,6 +80,12 @@ public class AcpClientSession implements AcpSession {
/** Atomic counter for generating unique request IDs */
private final AtomicLong requestCounter = new AtomicLong(0);

/** Sink for serializing notification delivery in arrival order */
private final Sinks.Many<AcpSchema.JSONRPCNotification> notificationSink;

/** Subscription draining the notification sink via concatMap */
private final Disposable notificationSubscription;

/**
* Functional interface for handling incoming JSON-RPC requests. Implementations
* should process the request parameters and return a response.
Expand Down Expand Up @@ -148,6 +156,17 @@ public AcpClientSession(Duration requestTimeout, AcpClientTransport transport,
return t;
}), "acp-timeout-" + sessionPrefix);

// Serialize notification delivery: concatMap ensures each notification's Mono
// completes before the next one starts, preserving arrival order even when
// handlers do async work.
this.notificationSink = Sinks.many().unicast().onBackpressureBuffer();
this.notificationSubscription = this.notificationSink.asFlux()
.concatMap(notification -> handleIncomingNotification(notification).onErrorComplete(t -> {
logger.error("Error handling notification: {}", t.getMessage());
return true;
}))
.subscribe();

this.transport.connect(mono -> mono.doOnNext(this::handle).then(Mono.empty())).transform(connectHook).subscribe();
}

Expand Down Expand Up @@ -203,10 +222,10 @@ else if (message instanceof AcpSchema.JSONRPCRequest request) {
else if (message instanceof AcpSchema.JSONRPCNotification notification) {
logger.debug("Received notification: {}", notification);
logger.trace("Incoming notification method='{}' params={}", notification.method(), notification.params());
handleIncomingNotification(notification).onErrorComplete(t -> {
logger.error("Error handling notification: {}", t.getMessage());
return true;
}).subscribe();
Sinks.EmitResult result = notificationSink.tryEmitNext(notification);
if (result.isFailure()) {
logger.warn("Failed to enqueue notification for serial processing: {}", result);
}
}
else {
logger.warn("Received unknown message type: {}", message);
Expand Down Expand Up @@ -352,6 +371,8 @@ public Mono<Void> sendNotification(String method, Object params) {
public Mono<Void> closeGracefully() {
return Mono.fromRunnable(() -> {
dismissPendingResponses();
notificationSink.tryEmitComplete();
notificationSubscription.dispose();
timeoutScheduler.dispose();
});
}
Expand All @@ -362,6 +383,8 @@ public Mono<Void> closeGracefully() {
@Override
public void close() {
dismissPendingResponses();
notificationSink.tryEmitComplete();
notificationSubscription.dispose();
timeoutScheduler.dispose();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
package com.agentclientprotocol.sdk.spec;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.agentclientprotocol.sdk.MockAcpClientTransport;
Expand Down Expand Up @@ -298,6 +302,37 @@ void testGracefulShutdown() {
StepVerifier.create(session.closeGracefully()).verifyComplete();
}

@Test
void testNotificationOrderPreservedWithAsyncHandler() throws InterruptedException {
// Notification i gets a delay of (5 - i) * 30ms so that without serialization
// later notifications would complete first, reversing the observed order.
int count = 5;
List<Integer> processedOrder = new CopyOnWriteArrayList<>();
CountDownLatch latch = new CountDownLatch(count);

var transport = new MockAcpClientTransport();
var session = new AcpClientSession(TIMEOUT, transport, Map.of(),
Map.of(TEST_NOTIFICATION, params -> {
int index = (int) ((Map<?, ?>) params).get("index");
long delayMs = (count - index) * 30L;
return Mono.delay(Duration.ofMillis(delayMs)).then(Mono.fromRunnable(() -> {
processedOrder.add(index);
latch.countDown();
}));
}),
Function.identity());

for (int i = 0; i < count; i++) {
transport.simulateIncomingMessage(new AcpSchema.JSONRPCNotification(
AcpSchema.JSONRPC_VERSION, TEST_NOTIFICATION, Map.of("index", i)));
}

assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(processedOrder).containsExactly(0, 1, 2, 3, 4);

session.close();
}

@Test
void testConcurrentRequests() {
var transport = new MockAcpClientTransport();
Expand Down