Skip to content

[AI-8th] feat: support server-side async capability (CompletableFuture and AsyncContext)#1558

Open
EvenLjj wants to merge 2 commits intomasterfrom
feature/server-async-support
Open

[AI-8th] feat: support server-side async capability (CompletableFuture and AsyncContext)#1558
EvenLjj wants to merge 2 commits intomasterfrom
feature/server-async-support

Conversation

@EvenLjj
Copy link
Copy Markdown
Collaborator

@EvenLjj EvenLjj commented Apr 1, 2026

Summary

  • Add AsyncContext interface and implementation for server-side async programming
  • Support CompletableFuture<T> return type in service methods
  • Add RpcInvokeContext.startAsync() method for manual async control
  • Add protocol-specific implementations for Bolt and Triple protocols

Changes

New Files

  • core/api/src/main/java/com/alipay/sofa/rpc/context/AsyncContext.java - AsyncContext interface
  • core/api/src/main/java/com/alipay/sofa/rpc/context/DefaultAsyncContext.java - Default implementation
  • core/api/src/main/java/com/alipay/sofa/rpc/context/ServerAsyncResponseSender.java - Protocol adapter interface
  • remoting/remoting-bolt/src/main/java/.../BoltServerAsyncResponseSender.java - Bolt implementation
  • remoting/remoting-triple/src/main/java/.../TripleServerAsyncResponseSender.java - Triple implementation
  • core/api/src/test/java/.../AsyncContextTest.java - Unit tests
  • core/api/src/test/java/.../ProviderInvokerTest.java - Unit tests

Modified Files

  • core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java - Add startAsync() method
  • core/api/src/main/java/com/alipay/sofa/rpc/filter/ProviderInvoker.java - Add CompletableFuture support
  • remoting/remoting-bolt/src/main/java/.../BoltServerProcessor.java - Add async mode support
  • remoting/remoting-triple/src/main/java/.../GenericServiceImpl.java - Add async mode support

Usage Examples

CompletableFuture return type:

public interface HelloService {
    CompletableFuture<String> sayHelloAsync(String name);
}

public class HelloServiceImpl implements HelloService {
    @Override
    public CompletableFuture<String> sayHelloAsync(String name) {
        return CompletableFuture.supplyAsync(() -> {
            // async business logic
            return "Hello, " + name;
        });
    }
}

AsyncContext:

public String sayHello(String name) {
    AsyncContext asyncContext = RpcInvokeContext.startAsync();
    executorService.submit(() -> {
        try {
            Thread.sleep(1000);
            asyncContext.write("Hello, " + name);
        } catch (Exception e) {
            asyncContext.writeError(e);
        }
    });
    return null;
}

Test plan

  • All unit tests pass (231 tests in core-api)
  • Code compiles successfully
  • New unit tests added for AsyncContext and ProviderInvoker

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Server-side manual async response control so handlers can defer and send responses later.
    • Automatic support for CompletableFuture return types across proxies and transports, enabling non-blocking async callers.
    • Transport adapters updated to deliver server async responses for Bolt and gRPC/triple.
  • Tests

    • Added unit and integration tests covering async context, CompletableFuture flows, callback delivery, and error paths.
  • Behavior

    • Response futures now support listener callbacks for async completion notification.

…ncContext)

This commit adds support for server-side async programming model in SOFARPC:

1. AsyncContext support:
   - Add AsyncContext interface with write() and writeError() methods
   - Add DefaultAsyncContext implementation
   - Add startAsync() method in RpcInvokeContext

2. CompletableFuture support:
   - Modify ProviderInvoker to detect CompletableFuture return type
   - Auto-handle async response when method returns CompletableFuture

3. Protocol adaptation:
   - Bolt protocol: BoltServerAsyncResponseSender
   - Triple protocol: TripleServerAsyncResponseSender

4. Unit tests:
   - AsyncContextTest for AsyncContext and DefaultAsyncContext
   - ProviderInvokerTest for CompletableFuture support

Related issue: #1550

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 1, 2026 11:34
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 1, 2026

📝 Walkthrough

Walkthrough

Adds server-side asynchronous response control: new AsyncContext and ServerAsyncResponseSender contracts and implementations, CompletableFuture return handling across proxy/provider layers, protocol adapters for Bolt/Triple, per-request async state in RpcInvokeContext, and unit/integration tests for async flows.

Changes

Cohort / File(s) Summary
Core Async API
core/api/src/main/java/com/alipay/sofa/rpc/context/AsyncContext.java, core/api/src/main/java/com/alipay/sofa/rpc/context/ServerAsyncResponseSender.java
New public interfaces: AsyncContext (write/writeError/isSent) and ServerAsyncResponseSender (sendResponse/isSent).
Default AsyncContext
core/api/src/main/java/com/alipay/sofa/rpc/context/DefaultAsyncContext.java
Implementation enforcing single-send, attaches baggage conditionally, sends SofaResponse via sender, posts ServerSendEvent and ServerEndHandleEvent.
RpcInvokeContext state
core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java, core/api/src/main/java/com/alipay/sofa/rpc/common/RpcConstants.java
Adds startAsync(), asyncStarted flag, accessors, and new hidden key HIDDEN_KEY_ASYNC_RESPONSE_SENDER for wiring async senders.
Provider invocation / Futures
core/api/src/main/java/com/alipay/sofa/rpc/filter/ProviderInvoker.java
Detects CompletableFuture returns, registers completion handlers to send SofaResponse asynchronously and returns null for async path.
Proxy CompletableFuture support
core-impl/proxy/.../BytebuddyInvocationHandler.java, .../JDKInvocationHandler.java, .../JavassistProxy.java
Proxy handlers detect CompletableFuture return types, set invokeType to FUTURE, and return CompletableFutures backed by ResponseFuture callbacks.
Client context preservation for FUTURE
core/api/src/main/java/com/alipay/sofa/rpc/client/ClientProxyInvoker.java
Avoids clearing RpcInternalContext for FUTURE invokeType to preserve ResponseFuture for async completion.
Response future listeners
remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/message/bolt/BoltResponseFuture.java
Adds listener registration, immediate notification if already completed, and per-listener dispatch helpers.
Bolt protocol adapters
remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerAsyncResponseSender.java, .../BoltServerProcessor.java
Bolt async sender adapter; processor attaches async sender/context and gates response/event paths when async-started.
Triple/gRPC adapters
remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServerAsyncResponseSender.java, .../GenericServiceImpl.java
Triple async sender serializes with hessian2 and GenericServiceImpl skips synchronous response emission when async started.
Tests & integration samples
core/api/src/test/.../AsyncContextTest.java, core/api/src/test/.../ProviderInvokerTest.java, test/test-integration/src/test/.../ServerAsync*
Unit and integration tests covering AsyncContext, DefaultAsyncContext, ProviderInvoker CompletableFuture paths, protocol integration, and sample async service implementations.

Sequence Diagram

sequenceDiagram
    participant Client
    participant ProviderInvoker as ProviderInvoker
    participant Service as BusinessService
    participant Executor as Executor
    participant RpcInternal as RpcInternalContext
    participant AsyncCtx as AsyncContext
    participant RespSender as ServerAsyncResponseSender

    Client->>ProviderInvoker: invoke(request)
    ProviderInvoker->>Service: call method
    Service-->>ProviderInvoker: returns CompletableFuture
    ProviderInvoker->>RpcInternal: attach ServerAsyncResponseSender
    ProviderInvoker->>ProviderInvoker: register future completion callback
    ProviderInvoker-->>Client: return null (async)

    Note over Executor,Service: future completes asynchronously
    Executor->>AsyncCtx: RpcInvokeContext.startAsync() (via stored sender)
    Executor->>AsyncCtx: write(result) / writeError(exception)
    AsyncCtx->>AsyncCtx: checkState() and attach baggage
    AsyncCtx->>RespSender: sendResponse(SofaResponse)
    RespSender->>Client: deliver response
    AsyncCtx->>RpcInternal: post ServerSendEvent & ServerEndHandleEvent
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related issues

  • Issue #1550: Implements server-side async response control, CompletableFuture and protocol adapters matching the issue objective.

Possibly related PRs

  • PR #1503: Also modifies RpcInvokeContext-related code (deadline fields); relevant due to overlapping context/state changes.

Suggested labels

size/XL

Suggested reviewers

  • chuailiwu
  • Lo1nt
  • sunhailin-Leo

Poem

🐰 I hopped through threads and futures bright,
Sent responses soft by moonlit byte,
AsyncContext held the key,
Futures freed the server—whee! 🥕
Now Bolt and Triple sing at night.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 49.40% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title clearly and specifically describes the main change: adding server-side async capability with two key mechanisms (CompletableFuture and AsyncContext).

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/server-async-support

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds server-side asynchronous invocation support to SOFA RPC by introducing an AsyncContext abstraction and enabling provider methods to return CompletableFuture, with protocol adapters for Bolt and Triple.

Changes:

  • Introduce AsyncContext / DefaultAsyncContext plus ServerAsyncResponseSender to decouple protocol response sending.
  • Add RpcInvokeContext.startAsync() / async-start flag to enable manual async response control.
  • Extend provider invocation and remoting processors (Bolt/Triple) to support async execution paths and add unit tests.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
core/api/src/main/java/com/alipay/sofa/rpc/context/AsyncContext.java New server-side async response control interface.
core/api/src/main/java/com/alipay/sofa/rpc/context/DefaultAsyncContext.java Default async context implementation that sends responses + posts events.
core/api/src/main/java/com/alipay/sofa/rpc/context/ServerAsyncResponseSender.java Protocol adapter interface for sending async responses.
core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java Adds startAsync() and async-start tracking.
core/api/src/main/java/com/alipay/sofa/rpc/filter/ProviderInvoker.java Adds CompletableFuture return handling for provider methods.
remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerProcessor.java Wires async sender into Bolt server request lifecycle.
remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerAsyncResponseSender.java Bolt implementation of ServerAsyncResponseSender.
remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java Wires async sender into Triple unary invocation.
remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServerAsyncResponseSender.java Triple implementation of ServerAsyncResponseSender.
core/api/src/test/java/com/alipay/sofa/rpc/context/AsyncContextTest.java Tests for async context/sender interfaces and default implementation.
core/api/src/test/java/com/alipay/sofa/rpc/filter/ProviderInvokerTest.java Tests for ProviderInvoker sync + CompletableFuture paths.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

if (result instanceof CompletableFuture) {
@SuppressWarnings("unchecked")
CompletableFuture<Object> completableFuture = (CompletableFuture<Object>) result;
handleCompletableFuture(completableFuture, request);
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletableFuture handling registers a callback and immediately returns null, but the callback calls RpcInvokeContext.startAsync() later. That callback will often run on a different thread after RpcInternalContext/RpcInvokeContext have been cleared (see BoltServerProcessor finally removing contexts), so startAsync will throw and the async response will never be sent. Start async while still on the request thread (e.g., call startAsync before returning, capture the AsyncContext/responseSender, and mark asyncStarted) so downstream processors don’t attempt to send a null response or fire end events prematurely.

Suggested change
handleCompletableFuture(completableFuture, request);
// Start async processing on the request thread to ensure RpcInvokeContext is valid
final AsyncContext asyncContext = RpcInvokeContext.startAsync();
completableFuture.whenComplete((value, throwable) -> {
SofaResponse asyncResponse = new SofaResponse();
try {
if (throwable != null) {
Throwable bizThrowable = throwable;
if (throwable instanceof InvocationTargetException && throwable.getCause() != null) {
bizThrowable = throwable.getCause();
}
cutCause(bizThrowable);
asyncResponse.setAppResponse(bizThrowable);
} else {
asyncResponse.setAppResponse(value);
}
asyncContext.sendResponse(asyncResponse);
} catch (Throwable sendEx) {
// Avoid throwing from async callback; just log the failure to send response
LOGGER.errorWithApp(null, "Failed to send async provider response.", sendEx);
}
});

Copilot uses AI. Check for mistakes.
Comment on lines +118 to 121
// Store both the original Bolt AsyncContext and the ServerAsyncResponseSender for compatibility
ServerAsyncResponseSender asyncResponseSender = new BoltServerAsyncResponseSender(asyncCtx);
context.setAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT, asyncResponseSender); // 远程返回的通道

Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BoltServerProcessor now stores a ServerAsyncResponseSender under RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT, but existing code (e.g., BoltSendableResponseCallback) reads this attachment and casts it to com.alipay.remoting.AsyncContext. This will cause a ClassCastException at runtime. Consider keeping the original AsyncContext under the existing key and storing the new sender under a new key (or make RpcInvokeContext.startAsync adapt based on attachment type).

Copilot uses AI. Check for mistakes.
Comment on lines +522 to +525
ServerAsyncResponseSender responseSender = (ServerAsyncResponseSender) internalContext.getAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT);
if (responseSender == null) {
throw new IllegalStateException("Async context is not available. Please ensure you are calling this method in a server-side invocation context with a supported protocol (Bolt or Triple).");
}
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startAsync() unconditionally casts the HIDDEN_KEY_ASYNC_CONTEXT attachment to ServerAsyncResponseSender. That same key is used by other protocols/modules to store different types (e.g., Bolt stores com.alipay.remoting.AsyncContext; HTTP stores a Netty Channel), so this can throw ClassCastException instead of the intended IllegalStateException. Use a dedicated attachment key for ServerAsyncResponseSender and/or guard with instanceof (adapting known types) to avoid breaking other protocols.

Suggested change
ServerAsyncResponseSender responseSender = (ServerAsyncResponseSender) internalContext.getAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT);
if (responseSender == null) {
throw new IllegalStateException("Async context is not available. Please ensure you are calling this method in a server-side invocation context with a supported protocol (Bolt or Triple).");
}
Object asyncAttachment = internalContext.getAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT);
if (!(asyncAttachment instanceof ServerAsyncResponseSender)) {
throw new IllegalStateException("Async context is not available or not supported by the current protocol. Please ensure you are calling this method in a server-side invocation context with a supported protocol (Bolt or Triple).");
}
ServerAsyncResponseSender responseSender = (ServerAsyncResponseSender) asyncAttachment;

Copilot uses AI. Check for mistakes.
Comment on lines +114 to +119
if (EventBus.isEnable(ServerSendEvent.class)) {
EventBus.post(new ServerSendEvent(null, response, sofaException));
}
if (EventBus.isEnable(ServerEndHandleEvent.class)) {
EventBus.post(new ServerEndHandleEvent());
}
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DefaultAsyncContext posts ServerSendEvent with a null SofaRequest. Downstream subscribers (e.g., metrics-micrometer InvokeMeta) dereference event.getRequest() and will throw NullPointerException when metrics/tracing is enabled. Capture the current SofaRequest at startAsync() time (or pass it into DefaultAsyncContext) and use it when posting ServerSendEvent.

Copilot uses AI. Check for mistakes.
Comment on lines +94 to +99
private void checkState() {
if (sent) {
throw new IllegalStateException("Async response has already been sent");
}
sent = true;
}
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkState() uses a volatile boolean but the check-then-set is not atomic; two threads can pass the sent check concurrently and both send a response. Use an AtomicBoolean with compareAndSet(false, true) (or synchronize) to make the single-send guarantee thread-safe.

Copilot uses AI. Check for mistakes.
Comment on lines +209 to +217
// Set async started flag through reflection
RpcInvokeContext context = RpcInvokeContext.getContext();
try {
java.lang.reflect.Field asyncStartedField = RpcInvokeContext.class.getDeclaredField("asyncStarted");
asyncStartedField.setAccessible(true);
asyncStartedField.set(context, true);
} catch (Exception e) {
Assert.fail("Failed to set asyncStarted field: " + e.getMessage());
}
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses reflection to set the private asyncStarted field, but RpcInvokeContext now provides setAsyncStarted(boolean). Prefer the public setter to avoid brittle reflection-based tests that can break with refactors/security manager restrictions.

Suggested change
// Set async started flag through reflection
RpcInvokeContext context = RpcInvokeContext.getContext();
try {
java.lang.reflect.Field asyncStartedField = RpcInvokeContext.class.getDeclaredField("asyncStarted");
asyncStartedField.setAccessible(true);
asyncStartedField.set(context, true);
} catch (Exception e) {
Assert.fail("Failed to set asyncStarted field: " + e.getMessage());
}
// Set async started flag via public API
RpcInvokeContext context = RpcInvokeContext.getContext();
context.setAsyncStarted(true);

Copilot uses AI. Check for mistakes.
Comment on lines +562 to +566
public boolean isAsyncStartedFlag() {
return asyncStarted;
}

/**
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instance accessor is named isAsyncStartedFlag(), while the static convenience is isAsyncStarted(). This asymmetry makes it easy to call the static method via an instance reference (as BoltServerProcessor currently does) and can be confusing for API users. Consider renaming the instance method to isAsyncStarted() (or removing it in favor of the static + setter) for consistency.

Suggested change
public boolean isAsyncStartedFlag() {
return asyncStarted;
}
/**
public boolean isAsyncStarted() {
return asyncStarted;
}
/**
* Get async started flag (backward-compatible instance method)
*
* @return true if async has been started
* @deprecated Use {@link #isAsyncStarted()} instead.
*/
@Deprecated
public boolean isAsyncStartedFlag() {
return isAsyncStarted();
}
/**

Copilot uses AI. Check for mistakes.
Comment on lines +39 to +57
* The original request
*/
private final SofaResponse sofaResponse;

/**
* Whether response has been sent
*/
private volatile boolean sent;

/**
* Constructor
*
* @param responseSender the server async response sender
* @param sofaResponse the response object to be populated
*/
public DefaultAsyncContext(ServerAsyncResponseSender responseSender, SofaResponse sofaResponse) {
this.responseSender = responseSender;
this.sofaResponse = sofaResponse;
}
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DefaultAsyncContext stores a SofaResponse in the sofaResponse field but buildResponse() always creates a new SofaResponse and never uses the stored instance. Either populate/reuse the provided sofaResponse (so callers can preconfigure responseProps/serializeType) or remove the unused field/constructor parameter to avoid confusion.

Copilot uses AI. Check for mistakes.

context.setRemoteAddress(bizCtx.getRemoteHost(), bizCtx.getRemotePort()); // 远程地址
context.setAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT, asyncCtx); // 远程返回的通道
// Store both the original Bolt AsyncContext and the ServerAsyncResponseSender for compatibility
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says both the original Bolt AsyncContext and ServerAsyncResponseSender are stored for compatibility, but only the ServerAsyncResponseSender is attached. Either attach both (under different keys) or update the comment to match behavior.

Suggested change
// Store both the original Bolt AsyncContext and the ServerAsyncResponseSender for compatibility
// Store a ServerAsyncResponseSender (wrapping the original Bolt AsyncContext) for compatibility

Copilot uses AI. Check for mistakes.
import com.alipay.sofa.rpc.codec.SerializerFactory;
import com.alipay.sofa.rpc.context.ServerAsyncResponseSender;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf;
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteArrayWrapperByteBuf is imported but never used in this class. Removing the unused import will reduce noise and avoid potential style/checkstyle failures if unused imports are enforced.

Suggested change
import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf;

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (2)
core/api/src/main/java/com/alipay/sofa/rpc/context/DefaultAsyncContext.java (1)

67-71: sofaException path is currently unused; wire it from writeError or remove it.

At Line [70], sendResponse(resp, null) means Line [115] always emits ServerSendEvent with null exception for error writes.

Also applies to: 104-116

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/api/src/main/java/com/alipay/sofa/rpc/context/DefaultAsyncContext.java`
around lines 67 - 71, The writeError path is passing null as the exception to
sendResponse causing the send-side event to always report a null exception;
update DefaultAsyncContext.writeError to forward the thrown Throwable into
sendResponse (i.e., call sendResponse(resp, throwable)) so the sofaException
path is wired through (alternatively remove the unused sofaException parameter
from sendResponse and related ServerSendEvent emission if you prefer to simplify
the API), and ensure buildResponse/ sendResponse/ServerSendEvent use the
forwarded exception consistently.
core/api/src/test/java/com/alipay/sofa/rpc/context/AsyncContextTest.java (1)

49-79: Prefer behavior-focused tests over reflection-based API shape checks.

These checks are brittle and low-signal compared with invoking methods and asserting outcomes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/api/src/test/java/com/alipay/sofa/rpc/context/AsyncContextTest.java`
around lines 49 - 79, Replace the brittle reflection-only tests with
behavior-driven assertions: instead of checking AsyncContext.class and
ServerAsyncResponseSender.class via getMethod, instantiate or mock concrete
implementations of AsyncContext and ServerAsyncResponseSender (or create
lightweight test stubs) and call AsyncContext.write(Object),
AsyncContext.writeError(Throwable), AsyncContext.isSent(), and
ServerAsyncResponseSender.sendResponse(SofaResponse)/isSent() to assert expected
state changes and outcomes (e.g., that write marks sent and stores the value,
writeError records the throwable and marks sent, sendResponse forwards the
SofaResponse and flips isSent). Locate tests referencing AsyncContext,
ServerAsyncResponseSender, write, writeError, isSent, sendResponse, and
SofaResponse and replace reflection checks with these direct behavioral
assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@core/api/src/main/java/com/alipay/sofa/rpc/context/DefaultAsyncContext.java`:
- Around line 54-57: The constructor
DefaultAsyncContext(ServerAsyncResponseSender responseSender, SofaResponse
sofaResponse) currently stores the passed sofaResponse but later code (in the
send path around the method that invokes responseSender, e.g., sendResponse or
similar) creates and sends a new SofaResponse, losing any state on the original
instance; change the send logic to populate and send the existing
this.sofaResponse (not instantiate a new SofaResponse) and ensure
responseSender.send(this.sofaResponse) is used so any headers, attachments or
custom fields on the provided sofaResponse are preserved.
- Around line 46-47: Replace the non-atomic volatile sent flag in
DefaultAsyncContext with an AtomicBoolean (e.g., private final AtomicBoolean
sent) and update the send-guard logic in the write() and writeError() methods to
use sent.compareAndSet(false, true) so only the thread that successfully flips
the flag calls sendResponse(); retain sendResponse() as the single send path and
remove the existing manual check-then-set that relied on the volatile boolean.

In `@core/api/src/main/java/com/alipay/sofa/rpc/filter/ProviderInvoker.java`:
- Around line 124-130: The invoke() branch that handles CompletableFuture must
capture the request-thread AsyncContext and call RpcInvokeContext.startAsync()
on the request thread before returning; modify the code around the
CompletableFuture handling in ProviderInvoker.invoke() so that before calling
handleCompletableFuture(completableFuture, request) and returning null you
obtain and pass the current AsyncContext (or the RpcInvokeContext state) to
handleCompletableFuture; then update handleCompletableFuture(CompletableFuture,
Request) to use the passed-in AsyncContext (or restore the RpcInvokeContext on
the callback thread) and call RpcInvokeContext.startAsync() from the original
request thread context rather than inside the whenComplete callback, ensuring
RpcInternalContext/RpcInvokeContext ThreadLocals are present when startAsync()
is invoked.

In `@core/api/src/test/java/com/alipay/sofa/rpc/context/AsyncContextTest.java`:
- Around line 100-110: Add identity assertions to ensure DefaultAsyncContext
reuses the supplied SofaResponse instance: after creating SofaResponse and
DefaultAsyncContext(mockSender, sofaResponse) and calling
asyncContext.write(...), add Assert.assertSame(sofaResponse,
capturedResponse.get()) (and similarly in the other test block around lines
131-142) so the test verifies the response object identity rather than only
payload; reference DefaultAsyncContext, SofaResponse, capturedResponse and write
to locate where to add the assertion.

In `@core/api/src/test/java/com/alipay/sofa/rpc/filter/ProviderInvokerTest.java`:
- Around line 122-149: The test
ProviderInvokerTest.testProviderInvokerCompletableFuture currently only asserts
invoke() returns null and fails to attach an async sender, causing
RpcInvokeContext.startAsync() in ProviderInvoker.handleCompletableFuture to
throw; update the test to create and attach a mocked ServerAsyncResponseSender
into RpcInternalContext using the HIDDEN_KEY_ASYNC_CONTEXT key (or set up
RpcInvokeContext's async context) before calling
providerInvoker.invoke(request), use a CountDownLatch or similar to wait for the
CompletableFuture whenComplete callback to run, and assert the mock
ServerAsyncResponseSender was invoked with the expected SofaResponse to verify
the asynchronous response delivery.

In
`@remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerProcessor.java`:
- Around line 118-120: The code stores a BoltServerAsyncResponseSender under
RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT which causes a ClassCastException in
BoltSendableResponseCallback.init(); change BoltServerProcessor to keep the
original com.alipay.remoting.AsyncContext under
RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT and store the new
BoltServerAsyncResponseSender under a new constant (e.g.,
RpcConstants.HIDDEN_KEY_ASYNC_RESPONSE_SENDER). Add that new hidden key
constant, update places that need the sender (RpcInvokeContext.startAsync() and
any protocol writers) to retrieve the ServerAsyncResponseSender from the new
key, and ensure BoltSendableResponseCallback.init() still reads the AsyncContext
from HIDDEN_KEY_ASYNC_CONTEXT.

In
`@remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServerAsyncResponseSender.java`:
- Around line 55-78: In TripleServerAsyncResponseSender.sendResponse, change the
async reply to mirror the synchronous GenericServiceImpl: select the serializer
from the incoming request (do not hardcode "hessian2"—use whatever
SerializerFactory.getSerializer(...) or the request/Invocation metadata
provides), set the Response.type/serializeType fields the same way the sync path
does (include the declared return type), and when appResponse is a Throwable
call responseObserver.onError with the original throwable (or rethrow it)
instead of wrapping it in a new RuntimeException(appResponse.toString()); ensure
you still encode the appResponse with the chosen serializer and build
Response.Builder exactly as the sync path does before calling
responseObserver.onNext/onCompleted.

---

Nitpick comments:
In `@core/api/src/main/java/com/alipay/sofa/rpc/context/DefaultAsyncContext.java`:
- Around line 67-71: The writeError path is passing null as the exception to
sendResponse causing the send-side event to always report a null exception;
update DefaultAsyncContext.writeError to forward the thrown Throwable into
sendResponse (i.e., call sendResponse(resp, throwable)) so the sofaException
path is wired through (alternatively remove the unused sofaException parameter
from sendResponse and related ServerSendEvent emission if you prefer to simplify
the API), and ensure buildResponse/ sendResponse/ServerSendEvent use the
forwarded exception consistently.

In `@core/api/src/test/java/com/alipay/sofa/rpc/context/AsyncContextTest.java`:
- Around line 49-79: Replace the brittle reflection-only tests with
behavior-driven assertions: instead of checking AsyncContext.class and
ServerAsyncResponseSender.class via getMethod, instantiate or mock concrete
implementations of AsyncContext and ServerAsyncResponseSender (or create
lightweight test stubs) and call AsyncContext.write(Object),
AsyncContext.writeError(Throwable), AsyncContext.isSent(), and
ServerAsyncResponseSender.sendResponse(SofaResponse)/isSent() to assert expected
state changes and outcomes (e.g., that write marks sent and stores the value,
writeError records the throwable and marks sent, sendResponse forwards the
SofaResponse and flips isSent). Locate tests referencing AsyncContext,
ServerAsyncResponseSender, write, writeError, isSent, sendResponse, and
SofaResponse and replace reflection checks with these direct behavioral
assertions.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c2de4e45-9a8b-4be2-bfc1-dc93c25d2cb8

📥 Commits

Reviewing files that changed from the base of the PR and between 178cbd5 and 6ef2cc1.

📒 Files selected for processing (11)
  • core/api/src/main/java/com/alipay/sofa/rpc/context/AsyncContext.java
  • core/api/src/main/java/com/alipay/sofa/rpc/context/DefaultAsyncContext.java
  • core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java
  • core/api/src/main/java/com/alipay/sofa/rpc/context/ServerAsyncResponseSender.java
  • core/api/src/main/java/com/alipay/sofa/rpc/filter/ProviderInvoker.java
  • core/api/src/test/java/com/alipay/sofa/rpc/context/AsyncContextTest.java
  • core/api/src/test/java/com/alipay/sofa/rpc/filter/ProviderInvokerTest.java
  • remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerAsyncResponseSender.java
  • remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerProcessor.java
  • remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java
  • remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServerAsyncResponseSender.java

Comment on lines +46 to +47
private volatile boolean sent;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the current non-atomic pattern in DefaultAsyncContext
rg -n "volatile boolean\\s+sent|if \\(sent\\)|sent = true|checkState\\(" core/api/src/main/java/com/alipay/sofa/rpc/context/DefaultAsyncContext.java

Repository: sofastack/sofa-rpc

Length of output: 246


🏁 Script executed:

cat -n core/api/src/main/java/com/alipay/sofa/rpc/context/DefaultAsyncContext.java

Repository: sofastack/sofa-rpc

Length of output: 4833


Use atomic single-send guard to prevent concurrent double writes.

The volatile keyword with manual check-then-set at lines 95–98 is not atomic. Two threads calling write() or writeError() concurrently can both pass the sent check and call sendResponse(), violating the single-send contract.

Replace with AtomicBoolean and compareAndSet():

Proposed fix
+import java.util.concurrent.atomic.AtomicBoolean;
 ...
-    private volatile boolean                sent;
+    private final AtomicBoolean             sent = new AtomicBoolean(false);
 
     `@Override`
     public boolean isSent() {
-        return sent;
+        return sent.get();
     }
 
     private void checkState() {
-        if (sent) {
+        if (!sent.compareAndSet(false, true)) {
             throw new IllegalStateException("Async response has already been sent");
         }
-        sent = true;
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/api/src/main/java/com/alipay/sofa/rpc/context/DefaultAsyncContext.java`
around lines 46 - 47, Replace the non-atomic volatile sent flag in
DefaultAsyncContext with an AtomicBoolean (e.g., private final AtomicBoolean
sent) and update the send-guard logic in the write() and writeError() methods to
use sent.compareAndSet(false, true) so only the thread that successfully flips
the flag calls sendResponse(); retain sendResponse() as the single send path and
remove the existing manual check-then-set that relied on the volatile boolean.

Comment on lines +54 to +57
public DefaultAsyncContext(ServerAsyncResponseSender responseSender, SofaResponse sofaResponse) {
this.responseSender = responseSender;
this.sofaResponse = sofaResponse;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Populate and send the provided SofaResponse instance, not a new one.

Line [54]-[57] says the passed sofaResponse is “to be populated”, but Line [82] creates a new object. This can lose response state carried on the original instance.

🔧 Proposed fix
     private SofaResponse buildResponse(Object response, Throwable throwable) {
-        SofaResponse resp = new SofaResponse();
+        SofaResponse resp = this.sofaResponse;
         if (throwable != null) {
             resp.setAppResponse(throwable);
         } else {
             resp.setAppResponse(response);
         }
         return resp;
     }

Also applies to: 81-89

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/api/src/main/java/com/alipay/sofa/rpc/context/DefaultAsyncContext.java`
around lines 54 - 57, The constructor
DefaultAsyncContext(ServerAsyncResponseSender responseSender, SofaResponse
sofaResponse) currently stores the passed sofaResponse but later code (in the
send path around the method that invokes responseSender, e.g., sendResponse or
similar) creates and sends a new SofaResponse, losing any state on the original
instance; change the send logic to populate and send the existing
this.sofaResponse (not instantiate a new SofaResponse) and ensure
responseSender.send(this.sofaResponse) is used so any headers, attachments or
custom fields on the provided sofaResponse are preserved.

Comment on lines +124 to +130
// Check if the result is a CompletableFuture
if (result instanceof CompletableFuture) {
@SuppressWarnings("unchecked")
CompletableFuture<Object> completableFuture = (CompletableFuture<Object>) result;
handleCompletableFuture(completableFuture, request);
// Return null to indicate that response will be sent asynchronously
return null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

In Java CompletableFuture, on which thread can whenComplete run, and are ThreadLocal values from the caller automatically available there?

💡 Result:

In Java's CompletableFuture, the whenComplete method (a non-async completion stage method) executes its BiConsumer on the thread that completes the current CompletableFuture (e.g., via complete, completeExceptionally, or obtrusivelyComplete), or on the thread that calls a completion method like whenComplete if the future is already completed at that time. It does not spawn a new thread or use an Executor by default. ThreadLocal values set by the caller thread are NOT automatically available in whenComplete. ThreadLocal values are thread-specific and do not propagate across threads. If whenComplete runs on a different thread than the caller (e.g., the completing thread from a thread pool), the caller's ThreadLocal values will not be visible there. Explicit propagation (e.g., via wrappers capturing and restoring values) is required for cross-thread access.

Citations:


🏁 Script executed:

# Find the ProviderInvoker.java file
find . -name "ProviderInvoker.java" -type f

Repository: sofastack/sofa-rpc

Length of output: 134


🏁 Script executed:

# Read the relevant lines from ProviderInvoker.java to confirm the issue
# First, let's get the file and check lines around 124-130 and 194-208
fd -e java "ProviderInvoker.java" | head -1 | xargs wc -l

Repository: sofastack/sofa-rpc

Length of output: 136


🏁 Script executed:

# Get the exact implementation of the CompletableFuture handling
fd -e java "ProviderInvoker.java" | head -1 | xargs cat -n | sed -n '110,145p'

Repository: sofastack/sofa-rpc

Length of output: 2076


🏁 Script executed:

# Check lines 194-208
fd -e java "ProviderInvoker.java" | head -1 | xargs cat -n | sed -n '185,215p'

Repository: sofastack/sofa-rpc

Length of output: 1312


🏁 Script executed:

# Inspect RpcInvokeContext.startAsync() method signature and behavior
fd -e java "RpcInvokeContext.java" | head -1 | xargs cat -n | grep -A 20 "startAsync"

Repository: sofastack/sofa-rpc

Length of output: 2105


🏁 Script executed:

# Search for AsyncContext class definition
fd -e java "AsyncContext.java" | head -1 | xargs cat -n | sed -n '1,50p'

Repository: sofastack/sofa-rpc

Length of output: 2196


Call RpcInvokeContext.startAsync() on the request thread before returning from invoke().

The handleCompletableFuture() method at lines 194–208 calls RpcInvokeContext.startAsync() inside the whenComplete() callback. When a CompletableFuture completes on a thread pool (the common case), whenComplete() runs on that thread where RpcInternalContext and RpcInvokeContext ThreadLocals are absent. This causes startAsync() to throw IllegalStateException, which is caught and logged at line 206, silently dropping the response. Capture AsyncContext on the request thread before returning, then pass it to the callback.

🔧 Proposed fix
-            if (result instanceof CompletableFuture) {
+            if (result instanceof CompletableFuture) {
                 `@SuppressWarnings`("unchecked")
                 CompletableFuture<Object> completableFuture = (CompletableFuture<Object>) result;
-                handleCompletableFuture(completableFuture, request);
+                AsyncContext asyncContext = RpcInvokeContext.startAsync();
+                handleCompletableFuture(completableFuture, asyncContext);
                 // Return null to indicate that response will be sent asynchronously
                 return null;
             }
@@
-    private void handleCompletableFuture(CompletableFuture<Object> completableFuture, SofaRequest request) {
+    private void handleCompletableFuture(CompletableFuture<Object> completableFuture, AsyncContext asyncContext) {
         // Register callback to handle the result when future completes
         completableFuture.whenComplete((result, throwable) -> {
             try {
-                // Get the async context
-                AsyncContext asyncContext = RpcInvokeContext.startAsync();
                 if (throwable != null) {
                     asyncContext.writeError(throwable);
                 } else {

Also applies to: 194–208

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/api/src/main/java/com/alipay/sofa/rpc/filter/ProviderInvoker.java`
around lines 124 - 130, The invoke() branch that handles CompletableFuture must
capture the request-thread AsyncContext and call RpcInvokeContext.startAsync()
on the request thread before returning; modify the code around the
CompletableFuture handling in ProviderInvoker.invoke() so that before calling
handleCompletableFuture(completableFuture, request) and returning null you
obtain and pass the current AsyncContext (or the RpcInvokeContext state) to
handleCompletableFuture; then update handleCompletableFuture(CompletableFuture,
Request) to use the passed-in AsyncContext (or restore the RpcInvokeContext on
the callback thread) and call RpcInvokeContext.startAsync() from the original
request thread context rather than inside the whenComplete callback, ensuring
RpcInternalContext/RpcInvokeContext ThreadLocals are present when startAsync()
is invoked.

Comment on lines +100 to +110
// Create DefaultAsyncContext
SofaResponse sofaResponse = new SofaResponse();
DefaultAsyncContext asyncContext = new DefaultAsyncContext(mockSender, sofaResponse);

// Test write method
asyncContext.write("test response");

// Verify response was sent
Assert.assertNotNull(capturedResponse.get());
Assert.assertEquals("test response", capturedResponse.get().getAppResponse());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add identity assertions to validate DefaultAsyncContext uses the passed SofaResponse.

These tests verify payload but not instance reuse. Add assertSame so regressions in response object handling are caught.

🔧 Proposed test additions
         // Verify response was sent
         Assert.assertNotNull(capturedResponse.get());
+        Assert.assertSame(sofaResponse, capturedResponse.get());
         Assert.assertEquals("test response", capturedResponse.get().getAppResponse());
...
         // Verify error was sent
         Assert.assertNotNull(capturedResponse.get());
+        Assert.assertSame(sofaResponse, capturedResponse.get());
         Assert.assertEquals(testException, capturedResponse.get().getAppResponse());

Also applies to: 131-142

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/api/src/test/java/com/alipay/sofa/rpc/context/AsyncContextTest.java`
around lines 100 - 110, Add identity assertions to ensure DefaultAsyncContext
reuses the supplied SofaResponse instance: after creating SofaResponse and
DefaultAsyncContext(mockSender, sofaResponse) and calling
asyncContext.write(...), add Assert.assertSame(sofaResponse,
capturedResponse.get()) (and similarly in the other test block around lines
131-142) so the test verifies the response object identity rather than only
payload; reference DefaultAsyncContext, SofaResponse, capturedResponse and write
to locate where to add the assertion.

Comment on lines +55 to +78
public void sendResponse(SofaResponse response) {
checkState();

Object appResponse = response.getAppResponse();
if (appResponse instanceof Throwable) {
// For errors, send through onError
responseObserver.onError(new RuntimeException(appResponse.toString()));
} else {
// For normal response, build and send Triple Response
try {
// Use hessian2 as default serializer
Serializer serializer = SerializerFactory.getSerializer("hessian2");
byte[] data = serializer.encode(appResponse, null).array();

Response.Builder builder = Response.newBuilder();
builder.setSerializeType("hessian2");
builder.setData(ByteString.copyFrom(data));
Response tripleResponse = builder.build();

responseObserver.onNext(tripleResponse);
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(e);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Mirror the synchronous Triple response format here.

This async path always replies as hessian2, never sets Response.type, and turns server exceptions into RuntimeException(appResponse.toString()). The synchronous unary path in GenericServiceImpl uses the request’s serializer, includes the declared return type, and propagates the original throwable, so async unary responses will diverge from sync behavior and can break non-Hessian or generic clients.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServerAsyncResponseSender.java`
around lines 55 - 78, In TripleServerAsyncResponseSender.sendResponse, change
the async reply to mirror the synchronous GenericServiceImpl: select the
serializer from the incoming request (do not hardcode "hessian2"—use whatever
SerializerFactory.getSerializer(...) or the request/Invocation metadata
provides), set the Response.type/serializeType fields the same way the sync path
does (include the declared return type), and when appResponse is a Throwable
call responseObserver.onError with the original throwable (or rethrow it)
instead of wrapping it in a new RuntimeException(appResponse.toString()); ensure
you still encode the appResponse with the chosen serializer and build
Response.Builder exactly as the sync path does before calling
responseObserver.onNext/onCompleted.

- Fix context management in ClientProxyInvoker to preserve future for async calls
- Add @FixMethodOrder to ensure test execution order
- Clean up debug System.out.println statements

This fixes the issue where CompletableFuture return types from server-side
async methods would return null on the client side.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (3)
core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java (1)

527-529: The sofaResponse parameter passed to DefaultAsyncContext is unused.

Looking at DefaultAsyncContext (context snippet 1), the sofaResponse field is stored but buildResponse() always creates a new SofaResponse. The instance created on line 528 is effectively dead code.

Consider either:

  1. Removing the unused parameter from DefaultAsyncContext constructor
  2. Or using the passed sofaResponse in buildResponse() instead of creating a new one
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java`
around lines 527 - 529, The created SofaResponse in RpcInvokeContext
(SofaResponse sofaResponse = new SofaResponse()) is never used by
DefaultAsyncContext because DefaultAsyncContext.buildResponse() always
instantiates a new SofaResponse; either remove the unused SofaResponse parameter
from DefaultAsyncContext's constructor and stop creating it in RpcInvokeContext,
or update DefaultAsyncContext.buildResponse() to return or populate the stored
sofaResponse field instead of creating a new instance; locate the constructor
and the buildResponse() method in DefaultAsyncContext and the creation site in
RpcInvokeContext to implement the chosen fix (adjust constructor
signature/usages if removing the param, or replace the new SofaResponse() call
inside buildResponse() with the stored sofaResponse if reusing it).
test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncTest.java (2)

135-136: Consider removing or reducing Thread.sleep() delays between tests.

Using Thread.sleep() for test isolation (lines 135, 158, 179) is brittle and slows down the test suite. With @FixMethodOrder(MethodSorters.NAME_ASCENDING), tests run in predictable order, but the sleeps add 2.4+ seconds of wait time.

If these are needed to avoid resource contention, consider using proper synchronization, or document why these specific delays are necessary.

Also applies to: 157-158, 178-179

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncTest.java`
around lines 135 - 136, The Thread.sleep(...) calls in ServerAsyncTest are
brittle and slow the suite; replace them with deterministic synchronization
instead: remove the Thread.sleep invocations in ServerAsyncTest and wait for
test-specific completion signals (e.g., use
CountDownLatch/Future.join/CompletableFuture, or Awaitility.await() checking the
expected condition) in the test methods that currently rely on Thread.sleep;
alternatively, shorten to a small timeout only as a fallback and add a comment.
Locate the sleeps by the Thread.sleep calls in the ServerAsyncTest class and
replace each with a wait on the actual async operation or a latch tied to the
callback/completion.

133-150: Replace manual null check and debug prints with proper assertions.

Lines 140-145 use System.out.println for debugging and a manual null check that throws NullPointerException. Use JUnit assertions instead:

Proposed fix
     `@Test`
     public void testCompletableFutureMethod() throws Exception {
         // Add delay to ensure previous async operations are completed
         Thread.sleep(1000);

-        // Test async method returning CompletableFuture
-        // Server returns null immediately, then sends response via AsyncContext
-        // Client receives CompletableFuture and needs to get the result
-        System.out.println("Calling sayHelloAsync...");
         CompletableFuture<String> future = service.sayHelloAsync("World");
-        System.out.println("Got future: " + future);
-        if (future == null) {
-            throw new NullPointerException("Future is null!");
-        }
+        Assert.assertNotNull("Future should not be null", future);
         String result = future.get(5000, TimeUnit.MILLISECONDS);
         Assert.assertEquals("Hello async, World", result);
-
-        System.out.println("testCompletableFutureMethod passed");
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncTest.java`
around lines 133 - 150, In testCompletableFutureMethod replace the debug prints
and manual null check with JUnit assertions: remove System.out.println calls and
the explicit "if (future == null) throw ..." and instead assert the future is
not null using Assert.assertNotNull on the result of
service.sayHelloAsync("World"), then call future.get(5000,
TimeUnit.MILLISECONDS) and assert the returned string equals "Hello async,
World" with Assert.assertEquals; keep the existing sleep and exception signature
but use assertions for validation in the testCompletableFutureMethod method.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@core-impl/proxy/src/main/java/com/alipay/sofa/rpc/proxy/jdk/JDKInvocationHandler.java`:
- Around line 77-116: The CompletableFuture branch in JDKInvocationHandler
returns before finishing the FUTURE handoff and leaves the pushed
RpcInternalContext on the thread; after obtaining ResponseFuture via
RpcInternalContext.getContext().getFuture() and wiring the CompletableFuture
listener (and also on the null-future fallback), ensure you remove/pop the
internal context before returning by calling the appropriate cleanup (e.g.,
RpcInternalContext.removeContext() or
RpcInternalContext.getContext().popContext()) so the pushed frame is always
cleaned up after proxyInvoker.invoke(...) and before returning the
CompletableFuture (or proceeding to the sync return path).

In `@core/api/src/main/java/com/alipay/sofa/rpc/client/ClientProxyInvoker.java`:
- Around line 100-105: Centralize the special-case FUTURE handoff/cleanup by
extracting the logic that captures the ResponseFuture and unwinds
RpcInternalContext into a single helper (e.g., a new static method like
FutureHandoff.captureAndCleanup or a utility on ClientProxyInvoker) and update
the call sites: remove the duplicated if-checks and direct
RpcInternalContext.removeContext()/popContext() from ClientProxyInvoker and the
three proxy implementations and instead call the new helper to obtain the
ResponseFuture and perform context cleanup; ensure the helper accepts the
ResponseFuture (or the Request/InvokeType) and performs the conditional
pop/remove only when invoke type is not RpcConstants.INVOKER_TYPE_FUTURE so the
FUTURE contract is honored consistently across JDK/ByteBuddy/Javassist proxies.

In `@core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java`:
- Around line 515-536: The clone() implementation in RpcInvokeContext must copy
the asyncStarted flag so async state isn't lost when setContext() or
resetContext() invoke cloning; update RpcInvokeContext.clone() to assign
child.asyncStarted = this.asyncStarted (preserving the asyncStarted boolean from
the parent into the cloned instance) so callbacks and interceptors retain async
mode across context swaps.

In
`@remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/message/bolt/BoltResponseFuture.java`:
- Around line 103-119: In notifySingleListener (class BoltResponseFuture) remove
the redundant RuntimeException branch and simply call
listener.onAppException(cause, request.getMethodName(), request) when cause !=
null; also stop swallowing exceptions in the catch block—log the thrown
exception (using the class logger, e.g., LOGGER.warn/error) including context
(methodName/request) so callback failures are visible; reference:
notifySingleListener, SofaResponseCallback, cause, result, request.
- Around line 68-98: BoltResponseFuture has a race between
addListener/addListeners and notifyListeners causing duplicate notifications; to
fix, ensure adding the listener and checking/completing notification are atomic:
in addListener and addListeners, perform listeners.add(...) and the isDone()
check under the same lock used by notifyListeners (or use a single AtomicBoolean
per-listener registration to mark "notified") so that if notifyListeners() is
running it either sees the listener before isDone() check or the add sees
completion and directly notifies exactly once; update addListener, addListeners,
and notifyListeners to synchronize on the same monitor (or use the new atomic
flag) and rely on notifySingleListener to be called only once per registered
listener.

In
`@test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncServiceImpl.java`:
- Line 38: The static ExecutorService executor in ServerAsyncServiceImpl is
never shut down which can leak threads across tests; add a cleanup method that
shuts it down (e.g., call executor.shutdownNow() or shutdown()+awaitTermination)
and annotate it so tests run it after all tests (e.g., an `@AfterClass` static
teardown in ServerAsyncServiceImpl's test lifecycle or register a JVM shutdown
hook) to ensure the cached thread pool is terminated; reference the static field
name executor and the ServerAsyncServiceImpl class when adding the shutdown
logic.
- Around line 77-99: Remove the orphaned method sayHelloWithAsyncContext from
the ServerAsyncServiceImpl class: delete the entire public String
sayHelloWithAsyncContext(String name) { ... } implementation (including its
AsyncContext usage and executor submission) since ServerAsyncService does not
declare it and the correct implementation resides in
ServerAsyncServiceWithContextImpl; ensure no leftover references to
sayHelloWithAsyncContext remain in ServerAsyncServiceImpl (remove unused imports
if necessary).

In
`@test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncTest.java`:
- Around line 100-114: The teardown in destroy() checks the proxy variables
service and serviceWithContext before calling unRefer(), which can NPE if
refer() failed; change the null checks to the consumer config objects instead:
check consumerConfig before calling consumerConfig.unRefer() and check
consumerConfigWithContext before calling consumerConfigWithContext.unRefer();
keep the existing null checks for providerConfig and providerConfigWithContext
before calling unExport() to ensure safe cleanup of providerConfig.unExport()
and providerConfigWithContext.unExport().

---

Nitpick comments:
In `@core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java`:
- Around line 527-529: The created SofaResponse in RpcInvokeContext
(SofaResponse sofaResponse = new SofaResponse()) is never used by
DefaultAsyncContext because DefaultAsyncContext.buildResponse() always
instantiates a new SofaResponse; either remove the unused SofaResponse parameter
from DefaultAsyncContext's constructor and stop creating it in RpcInvokeContext,
or update DefaultAsyncContext.buildResponse() to return or populate the stored
sofaResponse field instead of creating a new instance; locate the constructor
and the buildResponse() method in DefaultAsyncContext and the creation site in
RpcInvokeContext to implement the chosen fix (adjust constructor
signature/usages if removing the param, or replace the new SofaResponse() call
inside buildResponse() with the stored sofaResponse if reusing it).

In
`@test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncTest.java`:
- Around line 135-136: The Thread.sleep(...) calls in ServerAsyncTest are
brittle and slow the suite; replace them with deterministic synchronization
instead: remove the Thread.sleep invocations in ServerAsyncTest and wait for
test-specific completion signals (e.g., use
CountDownLatch/Future.join/CompletableFuture, or Awaitility.await() checking the
expected condition) in the test methods that currently rely on Thread.sleep;
alternatively, shorten to a small timeout only as a fallback and add a comment.
Locate the sleeps by the Thread.sleep calls in the ServerAsyncTest class and
replace each with a wait on the actual async operation or a latch tied to the
callback/completion.
- Around line 133-150: In testCompletableFutureMethod replace the debug prints
and manual null check with JUnit assertions: remove System.out.println calls and
the explicit "if (future == null) throw ..." and instead assert the future is
not null using Assert.assertNotNull on the result of
service.sayHelloAsync("World"), then call future.get(5000,
TimeUnit.MILLISECONDS) and assert the returned string equals "Hello async,
World" with Assert.assertEquals; keep the existing sleep and exception signature
but use assertions for validation in the testCompletableFutureMethod method.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 6d1f350c-cc9b-46e1-8c91-85d902ac7ca2

📥 Commits

Reviewing files that changed from the base of the PR and between 6ef2cc1 and f2929e5.

📒 Files selected for processing (15)
  • bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultClientProxyInvoker.java
  • core-impl/proxy/src/main/java/com/alipay/sofa/rpc/proxy/bytebuddy/BytebuddyInvocationHandler.java
  • core-impl/proxy/src/main/java/com/alipay/sofa/rpc/proxy/javassist/JavassistProxy.java
  • core-impl/proxy/src/main/java/com/alipay/sofa/rpc/proxy/jdk/JDKInvocationHandler.java
  • core/api/src/main/java/com/alipay/sofa/rpc/client/ClientProxyInvoker.java
  • core/api/src/main/java/com/alipay/sofa/rpc/common/RpcConstants.java
  • core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java
  • core/api/src/main/java/com/alipay/sofa/rpc/filter/ProviderInvoker.java
  • remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/message/bolt/BoltResponseFuture.java
  • remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerProcessor.java
  • test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncService.java
  • test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncServiceImpl.java
  • test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncServiceWithContext.java
  • test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncServiceWithContextImpl.java
  • test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncTest.java
✅ Files skipped from review due to trivial changes (3)
  • test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncServiceWithContext.java
  • core/api/src/main/java/com/alipay/sofa/rpc/filter/ProviderInvoker.java
  • test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncService.java

Comment on lines +77 to +116
// Check if return type is CompletableFuture
boolean isCompletableFutureReturn = CompletableFuture.class.isAssignableFrom(method.getReturnType());

SofaRequest sofaRequest = MessageBuilder.buildSofaRequest(method.getDeclaringClass(),
method, paramTypes, paramValues);

// If return type is CompletableFuture, use future invoke type
if (isCompletableFutureReturn) {
sofaRequest.setInvokeType(RpcConstants.INVOKER_TYPE_FUTURE);
}

SofaResponse response = proxyInvoker.invoke(sofaRequest);

// Handle CompletableFuture return type
// IMPORTANT: Get the future immediately after invoke to avoid race condition
if (isCompletableFutureReturn) {
ResponseFuture<?> responseFuture = RpcInternalContext.getContext().getFuture();
if (responseFuture != null) {
// Create a CompletableFuture that wraps the ResponseFuture
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
responseFuture.addListener(new com.alipay.sofa.rpc.core.invoke.SofaResponseCallback() {
@Override
public void onAppResponse(Object appResponse, String methodName,
com.alipay.sofa.rpc.core.request.RequestBase request) {
completableFuture.complete(appResponse);
}

@Override
public void onAppException(Throwable throwable, String methodName,
com.alipay.sofa.rpc.core.request.RequestBase request) {
completableFuture.completeExceptionally(throwable);
}

@Override
public void onSofaException(SofaRpcException sofaException, String methodName,
com.alipay.sofa.rpc.core.request.RequestBase request) {
completableFuture.completeExceptionally(sofaException);
}
});
return completableFuture;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Finish the FUTURE handoff before returning from this branch.

ClientProxyInvoker now leaves the pushed RpcInternalContext in place for INVOKER_TYPE_FUTURE, but this path returns without removeContext()/popContext(). On pooled caller threads each CompletableFuture call leaks one context frame, and if getFuture() is ever null the code falls back to the synchronous return path for a CompletableFuture signature.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@core-impl/proxy/src/main/java/com/alipay/sofa/rpc/proxy/jdk/JDKInvocationHandler.java`
around lines 77 - 116, The CompletableFuture branch in JDKInvocationHandler
returns before finishing the FUTURE handoff and leaves the pushed
RpcInternalContext on the thread; after obtaining ResponseFuture via
RpcInternalContext.getContext().getFuture() and wiring the CompletableFuture
listener (and also on the null-future fallback), ensure you remove/pop the
internal context before returning by calling the appropriate cleanup (e.g.,
RpcInternalContext.removeContext() or
RpcInternalContext.getContext().popContext()) so the pushed frame is always
cleaned up after proxyInvoker.invoke(...) and before returning the
CompletableFuture (or proceeding to the sync return path).

Comment on lines +100 to +105
// For future invoke type, don't remove context or pop context as the proxy needs to get the future
// The context will be removed after the proxy gets the future
if (!RpcConstants.INVOKER_TYPE_FUTURE.equals(request.getInvokeType())) {
RpcInternalContext.removeContext();
RpcInternalContext.popContext();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Please centralize the FUTURE handoff/cleanup contract.

This special case is now split across ClientProxyInvoker and three proxy implementations. A shared helper that captures the ResponseFuture and unwinds RpcInternalContext would prevent the current JDK/ByteBuddy/Javassist drift.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/api/src/main/java/com/alipay/sofa/rpc/client/ClientProxyInvoker.java`
around lines 100 - 105, Centralize the special-case FUTURE handoff/cleanup by
extracting the logic that captures the ResponseFuture and unwinds
RpcInternalContext into a single helper (e.g., a new static method like
FutureHandoff.captureAndCleanup or a utility on ClientProxyInvoker) and update
the call sites: remove the duplicated if-checks and direct
RpcInternalContext.removeContext()/popContext() from ClientProxyInvoker and the
three proxy implementations and instead call the new helper to obtain the
ResponseFuture and perform context cleanup; ensure the helper accepts the
ResponseFuture (or the Request/InvokeType) and performs the conditional
pop/remove only when invoke type is not RpcConstants.INVOKER_TYPE_FUTURE so the
FUTURE contract is honored consistently across JDK/ByteBuddy/Javassist proxies.

Comment on lines +515 to +536
public static AsyncContext startAsync() {
RpcInternalContext internalContext = RpcInternalContext.peekContext();
if (internalContext == null) {
throw new IllegalStateException("RpcInternalContext is not available, startAsync must be called in server side");
}

// Get the server async response sender from protocol layer
ServerAsyncResponseSender responseSender = (ServerAsyncResponseSender) internalContext.getAttachment(RpcConstants.HIDDEN_KEY_ASYNC_RESPONSE_SENDER);
if (responseSender == null) {
throw new IllegalStateException("Async context is not available. Please ensure you are calling this method in a server-side invocation context with a supported protocol (Bolt or Triple).");
}

// Create and return the async context
SofaResponse sofaResponse = new SofaResponse();
DefaultAsyncContext asyncContext = new DefaultAsyncContext(responseSender, sofaResponse);

// Set flag to indicate async mode
RpcInvokeContext invokeContext = RpcInvokeContext.getContext();
invokeContext.asyncStarted = true;

return asyncContext;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if setContext is called after startAsync in any usage patterns
rg -n 'startAsync|setContext' --type java -C2

Repository: sofastack/sofa-rpc

Length of output: 50374


🏁 Script executed:

#!/bin/bash
# First, locate the RpcInvokeContext.java file
find . -name "RpcInvokeContext.java" -type f

Repository: sofastack/sofa-rpc

Length of output: 136


🏁 Script executed:

#!/bin/bash
# Read the RpcInvokeContext.java file to examine clone() and startAsync() methods
# First check file size
wc -l "core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java"

Repository: sofastack/sofa-rpc

Length of output: 138


🏁 Script executed:

#!/bin/bash
# Read the clone() method (around lines 120-148)
sed -n '100,180p' "core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java"

Repository: sofastack/sofa-rpc

Length of output: 1909


🏁 Script executed:

#!/bin/bash
# Read the startAsync() method (around lines 515-536) and asyncStarted field declaration
sed -n '500,580p' "core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java"

Repository: sofastack/sofa-rpc

Length of output: 2922


🏁 Script executed:

#!/bin/bash
# Search for all calls to clone() method in the codebase
rg -n '\.clone\(\)' --type java | grep -i 'invoke' | head -20

Repository: sofastack/sofa-rpc

Length of output: 44


🏁 Script executed:

#!/bin/bash
# Search for where asyncStarted is used/accessed
rg -n 'asyncStarted' --type java

Repository: sofastack/sofa-rpc

Length of output: 1659


Preserve the asyncStarted flag when cloning RpcInvokeContext.

The clone() method does not copy the asyncStarted flag. Since setContext() internally calls clone(), any subsequent context operations could lose async state if they modify or restore the context. Add the following to the clone() method:

child.asyncStarted = parent.asyncStarted;

This ensures async state is properly preserved across context operations, particularly in callback handlers and interceptors that call setContext() or resetContext().

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java`
around lines 515 - 536, The clone() implementation in RpcInvokeContext must copy
the asyncStarted flag so async state isn't lost when setContext() or
resetContext() invoke cloning; update RpcInvokeContext.clone() to assign
child.asyncStarted = this.asyncStarted (preserving the asyncStarted boolean from
the parent into the cloned instance) so callbacks and interceptors retain async
mode across context swaps.

Comment on lines 68 to +98
public BoltResponseFuture addListeners(List<SofaResponseCallback> list) {
throw new UnsupportedOperationException("Not supported, Please use callback function");
if (list != null) {
listeners.addAll(list);
// If already done, notify the new listeners
if (isDone()) {
notifyListeners();
}
}
return this;
}

@Override
public BoltResponseFuture addListener(SofaResponseCallback sofaResponseCallback) {
throw new UnsupportedOperationException("Not supported, Please use callback function");
if (sofaResponseCallback != null) {
listeners.add(sofaResponseCallback);
// If already done, notify the new listener immediately
if (isDone()) {
notifySingleListener(sofaResponseCallback);
}
}
return this;
}

@Override
public void notifyListeners() {
for (SofaResponseCallback listener : listeners) {
notifySingleListener(listener);
}
// Clear listeners after notifying
listeners.clear();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how AbstractResponseFuture.notifyListeners is invoked and if synchronization exists
ast-grep --pattern $'class AbstractResponseFuture {
  $$$
  notifyListeners() {
    $$$
  }
  $$$
}'

Repository: sofastack/sofa-rpc

Length of output: 44


🏁 Script executed:

#!/bin/bash
# Find the AbstractResponseFuture class and related files
fd -e java | xargs grep -l "class AbstractResponseFuture"

Repository: sofastack/sofa-rpc

Length of output: 140


🏁 Script executed:

#!/bin/bash
# Find BoltResponseFuture to examine full implementation
cat -n remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/message/bolt/BoltResponseFuture.java | head -150

Repository: sofastack/sofa-rpc

Length of output: 4722


🏁 Script executed:

#!/bin/bash
# Search for setSuccess and setFailure in BoltResponseFuture or parent classes
rg "setSuccess|setFailure" remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/message/bolt/ -A 5

Repository: sofastack/sofa-rpc

Length of output: 2472


🏁 Script executed:

#!/bin/bash
# Read AbstractResponseFuture to understand parent implementation
cat -n core/api/src/main/java/com/alipay/sofa/rpc/message/AbstractResponseFuture.java | head -200

Repository: sofastack/sofa-rpc

Length of output: 7365


🏁 Script executed:

#!/bin/bash
# Continue reading AbstractResponseFuture for setSuccess/setFailure
cat -n core/api/src/main/java/com/alipay/sofa/rpc/message/AbstractResponseFuture.java | tail -200

Repository: sofastack/sofa-rpc

Length of output: 6292


Double notification race condition exists: listener may be notified twice.

When setSuccess() or setFailure() completes between a listener being added and the notification check, the listener can be notified twice:

  1. Once by the parent's notifyListeners() call (which iterates through the list and calls notifySingleListener() for each listener)
  2. Once by addListener()'s immediate notifySingleListener() call when isDone() returns true

This occurs because addListener() and notifyListeners() are not coordinated under a lock. A listener added between listeners.add() and the isDone() check will be included in notifyListeners()'s iteration snapshot if the parent's notification starts before addListener() completes, resulting in duplicate notifications.

The concern about lost notifications is mitigated by the isDone() check in addListener(), which ensures listeners added after completion are still notified immediately.

Wrap the listener addition and completion check in synchronization, or use an atomic flag to ensure each listener is notified exactly once.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/message/bolt/BoltResponseFuture.java`
around lines 68 - 98, BoltResponseFuture has a race between
addListener/addListeners and notifyListeners causing duplicate notifications; to
fix, ensure adding the listener and checking/completing notification are atomic:
in addListener and addListeners, perform listeners.add(...) and the isDone()
check under the same lock used by notifyListeners (or use a single AtomicBoolean
per-listener registration to mark "notified") so that if notifyListeners() is
running it either sees the listener before isDone() check or the add sees
completion and directly notifies exactly once; update addListener, addListeners,
and notifyListeners to synchronize on the same monitor (or use the new atomic
flag) and rely on notifySingleListener to be called only once per registered
listener.

Comment on lines +103 to 119
protected void notifySingleListener(SofaResponseCallback listener) {
try {
if (cause != null) {
if (cause instanceof RuntimeException) {
listener.onAppException(cause, request.getMethodName(), request);
} else {
listener.onAppException(cause, request.getMethodName(), request);
}
} else if (result instanceof SofaRpcException) {
listener.onSofaException((SofaRpcException) result, request.getMethodName(), request);
} else {
listener.onAppResponse(result, request.getMethodName(), request);
}
} catch (Exception e) {
// Log but don't propagate
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Dead code in notifySingleListener: both branches of the RuntimeException check are identical.

Lines 106-110 have an if/else where both branches call onAppException with the same arguments. This appears to be copy-paste error or incomplete logic.

Additionally, exceptions at line 116-118 are silently swallowed without logging, which will make debugging callback failures difficult.

Proposed fix
     protected void notifySingleListener(SofaResponseCallback listener) {
         try {
             if (cause != null) {
-                if (cause instanceof RuntimeException) {
-                    listener.onAppException(cause, request.getMethodName(), request);
-                } else {
-                    listener.onAppException(cause, request.getMethodName(), request);
-                }
+                listener.onAppException(cause, request.getMethodName(), request);
             } else if (result instanceof SofaRpcException) {
                 listener.onSofaException((SofaRpcException) result, request.getMethodName(), request);
             } else {
                 listener.onAppResponse(result, request.getMethodName(), request);
             }
         } catch (Exception e) {
-            // Log but don't propagate
+            // Log but don't propagate - consider adding proper logging
+            // LOGGER.warn("Failed to notify listener", e);
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/message/bolt/BoltResponseFuture.java`
around lines 103 - 119, In notifySingleListener (class BoltResponseFuture)
remove the redundant RuntimeException branch and simply call
listener.onAppException(cause, request.getMethodName(), request) when cause !=
null; also stop swallowing exceptions in the catch block—log the thrown
exception (using the class logger, e.g., LOGGER.warn/error) including context
(methodName/request) so callback failures are visible; reference:
notifySingleListener, SofaResponseCallback, cause, result, request.


private static final Logger LOGGER = LoggerFactory.getLogger(ServerAsyncServiceImpl.class);

private static final ExecutorService executor = Executors.newCachedThreadPool();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Static ExecutorService is never shut down, causing potential resource leak.

The executor is created as a static CachedThreadPool but there's no shutdown hook or cleanup method. In tests this may cause thread leaks between test runs. Consider adding an @AfterClass cleanup or using a test-scoped executor.

Proposed fix: Add shutdown method
 public class ServerAsyncServiceImpl implements ServerAsyncService {

     private static final Logger          LOGGER   = LoggerFactory.getLogger(ServerAsyncServiceImpl.class);

     private static final ExecutorService executor = Executors.newCachedThreadPool();

+    /**
+     * Shutdown the executor service (for test cleanup)
+     */
+    public static void shutdown() {
+        executor.shutdown();
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private static final ExecutorService executor = Executors.newCachedThreadPool();
private static final ExecutorService executor = Executors.newCachedThreadPool();
/**
* Shutdown the executor service (for test cleanup)
*/
public static void shutdown() {
executor.shutdown();
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncServiceImpl.java`
at line 38, The static ExecutorService executor in ServerAsyncServiceImpl is
never shut down which can leak threads across tests; add a cleanup method that
shuts it down (e.g., call executor.shutdownNow() or shutdown()+awaitTermination)
and annotate it so tests run it after all tests (e.g., an `@AfterClass` static
teardown in ServerAsyncServiceImpl's test lifecycle or register a JVM shutdown
hook) to ensure the cached thread pool is terminated; reference the static field
name executor and the ServerAsyncServiceImpl class when adding the shutdown
logic.

Comment on lines +77 to +99
/**
* Test method using AsyncContext for manual async control
*/
public String sayHelloWithAsyncContext(String name) {
LOGGER.info("sayHelloWithAsyncContext called with name: {}", name);
// Start async context
AsyncContext asyncContext = RpcInvokeContext.startAsync();

// Submit async task
executor.submit(() -> {
try {
// Simulate async business processing
Thread.sleep(300);
asyncContext.write("Hello async context, " + name);
} catch (Exception e) {
LOGGER.error("Error in async context", e);
asyncContext.writeError(e);
}
});

// Return null to indicate async handling
return null;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if sayHelloWithAsyncContext is declared in ServerAsyncService or if ServerAsyncServiceWithContext exists
rg -n 'sayHelloWithAsyncContext' --type java -C2

Repository: sofastack/sofa-rpc

Length of output: 3253


🏁 Script executed:

#!/bin/bash
# Find the class declaration for ServerAsyncServiceImpl to see what interface it implements
rg -n 'class ServerAsyncServiceImpl' --type java -A 5

Repository: sofastack/sofa-rpc

Length of output: 912


Remove duplicate method sayHelloWithAsyncContext from ServerAsyncServiceImpl.

ServerAsyncServiceImpl implements ServerAsyncService, which does not declare this method. The implementation is orphaned and won't be accessible via RPC. The method is properly declared in ServerAsyncServiceWithContext and implemented in ServerAsyncServiceWithContextImpl, where it belongs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncServiceImpl.java`
around lines 77 - 99, Remove the orphaned method sayHelloWithAsyncContext from
the ServerAsyncServiceImpl class: delete the entire public String
sayHelloWithAsyncContext(String name) { ... } implementation (including its
AsyncContext usage and executor submission) since ServerAsyncService does not
declare it and the correct implementation resides in
ServerAsyncServiceWithContextImpl; ensure no leftover references to
sayHelloWithAsyncContext remain in ServerAsyncServiceImpl (remove unused imports
if necessary).

Comment on lines +100 to +114
@AfterClass
public static void destroy() {
if (service != null) {
consumerConfig.unRefer();
}
if (providerConfig != null) {
providerConfig.unExport();
}
if (serviceWithContext != null) {
consumerConfigWithContext.unRefer();
}
if (providerConfigWithContext != null) {
providerConfigWithContext.unExport();
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Teardown null checks are on wrong variables.

Lines 102 and 108 check service and serviceWithContext (the proxies) but should check consumerConfig and consumerConfigWithContext to avoid NPE if refer() failed but config was created:

Proposed fix
     `@AfterClass`
     public static void destroy() {
-        if (service != null) {
+        if (consumerConfig != null) {
             consumerConfig.unRefer();
         }
         if (providerConfig != null) {
             providerConfig.unExport();
         }
-        if (serviceWithContext != null) {
+        if (consumerConfigWithContext != null) {
             consumerConfigWithContext.unRefer();
         }
         if (providerConfigWithContext != null) {
             providerConfigWithContext.unExport();
         }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@AfterClass
public static void destroy() {
if (service != null) {
consumerConfig.unRefer();
}
if (providerConfig != null) {
providerConfig.unExport();
}
if (serviceWithContext != null) {
consumerConfigWithContext.unRefer();
}
if (providerConfigWithContext != null) {
providerConfigWithContext.unExport();
}
}
`@AfterClass`
public static void destroy() {
if (consumerConfig != null) {
consumerConfig.unRefer();
}
if (providerConfig != null) {
providerConfig.unExport();
}
if (consumerConfigWithContext != null) {
consumerConfigWithContext.unRefer();
}
if (providerConfigWithContext != null) {
providerConfigWithContext.unExport();
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/test-integration/src/test/java/com/alipay/sofa/rpc/test/async/ServerAsyncTest.java`
around lines 100 - 114, The teardown in destroy() checks the proxy variables
service and serviceWithContext before calling unRefer(), which can NPE if
refer() failed; change the null checks to the consumer config objects instead:
check consumerConfig before calling consumerConfig.unRefer() and check
consumerConfigWithContext before calling consumerConfigWithContext.unRefer();
keep the existing null checks for providerConfig and providerConfigWithContext
before calling unExport() to ensure safe cleanup of providerConfig.unExport()
and providerConfigWithContext.unExport().

@nobodyiam
Copy link
Copy Markdown
Member

感谢提交此 PR。这为 SOFA RPC 带来了重要的服务端异步能力,整体设计非常清晰且涵盖了主流协议。

在合并之前,有几个技术细节建议优化:

  1. 并发安全BoltResponseFutureaddListener 存在竞态风险,可能导致重复通知,建议增加同步保护。
  2. 上下文清理JDKInvocationHandlerFUTURE 模式下存在 RpcInternalContext 未被正确 pop 的风险,可能导致 ThreadLocal 泄漏。
  3. 代码清理DefaultAsyncContext 构造函数接收的 sofaResponse 未被有效利用,且 ServerAsyncServiceImpl 中存在未定义的冗余方法建议移除。
  4. 测试优化:集成测试中的 Thread.sleep 较为脆弱,建议改为显式的同步等待。

这些优化完成后,此 PR 将更加稳健。

注:本回复由 AI 自动生成并自动发送,用于初步分诊;如需进一步确认,维护者会继续跟进。

@sunhailin-Leo sunhailin-Leo changed the title feat: support server-side async capability (CompletableFuture and AsyncContext) [AI-8th] feat: support server-side async capability (CompletableFuture and AsyncContext) Apr 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants