zoukankan      html  css  js  c++  java
  • 9.4 dubbo异步调用原理

    9.1 客户端发起请求源码9.2 服务端接收请求消息并发送响应消息源码9.3 客户端接收响应信息(异步转同步的实现) 分析了dubbo同步调用的源码,现在来看一下dubbo异步调用。

    一、使用方式

    服务提供方不变,调用方代码如下:

    1     <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
    2         <dubbo:method name="sayHello" async="true" timeout="60000"/>
    3         <dubbo:method name="sayBye" async="true" timeout="60000"/>
    4     </dubbo:reference>

    配置里添加<dubbo:method name="xxx" async="true"/>,表示单个方法xxx使用异步方式;如果demoService下的所有方法都使用异步,直接配置为<dubbo:reference async="true"/>。

     1     public static void main(String[] args) throws Exception {
     2         //Prevent to get IPV6 address,this way only work in debug mode
     3         //But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not
     4         System.setProperty("java.net.preferIPv4Stack", "true");
     5 
     6         asyncFuture2();
     7     }
     8 
     9     public static void asyncFuture1() throws ExecutionException, InterruptedException {
    10         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
    11         context.start();
    12         DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
    13 
    14         long start = System.currentTimeMillis();
    15 
    16         demoService.sayHello("zhangsan");
    17         Future<String> helloFuture = RpcContext.getContext().getFuture();
    18 
    19         demoService.sayBye("lisi");
    20         Future<String> byeFuture = RpcContext.getContext().getFuture();
    21 
    22         final String helloStr = helloFuture.get();//消耗5s
    23         final String byeStr = byeFuture.get();//消耗8s
    24 
    25         System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//总消耗8s
    26     }
    27 
    28     public static void asyncFuture2() throws ExecutionException, InterruptedException {
    29         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
    30         context.start();
    31         DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
    32 
    33         long start = System.currentTimeMillis();
    34 
    35         Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello("zhangsan"));
    36         Future<String> byeFuture = RpcContext.getContext().asyncCall(()->demoService.sayBye("lisi"));
    37 
    38         final String helloStr = helloFuture.get();//消耗5s
    39         final String byeStr = byeFuture.get();//消耗8s
    40 
    41         System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//总消耗8s
    42     }

    Consumer启动主类。其中asyncFuture2()方法是推荐用法,注意Callable(asyncCall方法的入参)只是一个任务task,不会新建线程;所以asyncFuture2()和asyncFuture1()相似,资源占用相同,都是用一根线程进行异步操作的。

    二、asyncFuture1()源码解析

    先来看asyncFuture1(),总体步骤:

    • demoService.sayHello("zhangsan"); 创建一个Future对象,存入当前线程的上下文中
    • Future<String> helloFuture = RpcContext.getContext().getFuture(); 从当前线程的上下文中获取第一步存入的Future对象
    • final String helloStr = helloFuture.get(); 阻塞等待,从Future中获取结果

    代码主要执行流(代码详细执行流看文章开头的三篇博客):

    1、demoService.sayHello("zhangsan"); 

    -->FutureFilter.invoke(final Invoker<?> invoker, final Invocation invocation)
       -->DubboInvoker.doInvoke(final Invocation invocation)

    FutureFilter:

     1     public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
     2         final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
     3 
     4         fireInvokeCallback(invoker, invocation);
     5         // need to configure if there's return value before the invocation in order to help invoker to judge if it's
     6         // necessary to return future.
     7         Result result = invoker.invoke(invocation);
     8         if (isAsync) {
     9             asyncCallback(invoker, invocation);
    10         } else {
    11             syncCallback(invoker, invocation, result);
    12         }
    13         return result;
    14     }

    对于如上异步操作(asyncFuture1()和asyncFuture2()),FutureFilter没起任何作用,该Filter主要会用在事件通知中,后续再说。

    DubboInvoker.doInvoke(final Invocation invocation):

     1     protected Result doInvoke(final Invocation invocation) throws Throwable {
     2         RpcInvocation inv = (RpcInvocation) invocation;
     3         final String methodName = RpcUtils.getMethodName(invocation);
     4         inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
     5         inv.setAttachment(Constants.VERSION_KEY, version);
     6 
     7         ExchangeClient currentClient;
     8         if (clients.length == 1) {
     9             currentClient = clients[0];
    10         } else {
    11             currentClient = clients[index.getAndIncrement() % clients.length];
    12         }
    13         try {
    14             boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
    15             boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
    16             int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    17             if (isOneway) { //无返回值
    18                 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
    19                 currentClient.send(inv, isSent);
    20                 RpcContext.getContext().setFuture(null);
    21                 return new RpcResult();
    22             } else if (isAsync) { //异步有返回值
    23                 ResponseFuture future = currentClient.request(inv, timeout);
    24                 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
    25                 return new RpcResult();
    26             } else { //同步有返回值
    27                 RpcContext.getContext().setFuture(null);
    28                 return (Result) currentClient.request(inv, timeout).get();
    29             }
    30         } catch (TimeoutException e) {
    31             throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    32         } catch (RemotingException e) {
    33             throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    34         }
    35     }

    模式:

    • 如果是isOneway(不需要返回值),不管同步还是异步,请求直接发出,不会创建Future,直接返回RpcResult空对象。
    • 如果是isAsync(异步),则
      • 先创建ResponseFuture对象,之后使用FutureAdapter包装该ResponseFuture对象;(创建ResponseFuture对象与同步的代码相同,最后得到的是一个DefaultFuture对象)
      • 然后将该FutureAdapter对象设入当前线程的上下文中RpcContext.getContext();
      • 最后返回空的RpcResult
    • 如果是同步,则先创建ResponseFuture对象,之后直接调用其get()方法进行阻塞调用(见文章开头的三篇文章)

    简单来看一下FutureAdapter:

     1 public class FutureAdapter<V> implements Future<V> {
     2 
     3     private final ResponseFuture future;
     4 
     5     public FutureAdapter(ResponseFuture future) {
     6         this.future = future;
     7     }
     8 
     9     public ResponseFuture getFuture() {
    10         return future;
    11     }
    12 
    13     public boolean cancel(boolean mayInterruptIfRunning) {
    14         return false;
    15     }
    16 
    17     public boolean isCancelled() {
    18         return false;
    19     }
    20 
    21     public boolean isDone() {
    22         return future.isDone();
    23     }
    24 
    25     @SuppressWarnings("unchecked")
    26     public V get() throws InterruptedException, ExecutionException {
    27         try {
    28             return (V) (((Result) future.get()).recreate());
    29         } catch (RemotingException e) {
    30             throw new ExecutionException(e.getMessage(), e);
    31         } catch (Throwable e) {
    32             throw new RpcException(e);
    33         }
    34     }
    35 
    36     @SuppressWarnings("unchecked")
    37     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    38         int timeoutInMillis = (int) unit.convert(timeout, TimeUnit.MILLISECONDS);
    39         try {
    40             return (V) (((Result) future.get(timeoutInMillis)).recreate());
    41         } catch (com.alibaba.dubbo.remoting.TimeoutException e) {
    42             throw new TimeoutException(StringUtils.toString(e));
    43         } catch (RemotingException e) {
    44             throw new ExecutionException(e.getMessage(), e);
    45         } catch (Throwable e) {
    46             throw new RpcException(e);
    47         }
    48     }
    49 }

    最后,回头看一下FutureFilter:

     1     public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
     2         final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
     3 
     4         fireInvokeCallback(invoker, invocation);
     5         // need to configure if there's return value before the invocation in order to help invoker to judge if it's
     6         // necessary to return future.
     7         Result result = invoker.invoke(invocation);
     8         if (isAsync) {
     9             asyncCallback(invoker, invocation);
    10         } else {
    11             syncCallback(invoker, invocation, result);
    12         }
    13         return result;
    14     }
     1     private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
     2         Future<?> f = RpcContext.getContext().getFuture();
     3         if (f instanceof FutureAdapter) {
     4             ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
     5             future.setCallback(new ResponseCallback() {
     6                 public void done(Object rpcResult) {
     7                     if (rpcResult == null) {
     8                         logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
     9                         return;
    10                     }
    11                     ///must be rpcResult
    12                     if (!(rpcResult instanceof Result)) {
    13                         logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
    14                         return;
    15                     }
    16                     Result result = (Result) rpcResult;
    17                     if (result.hasException()) {
    18                         fireThrowCallback(invoker, invocation, result.getException());
    19                     } else {
    20                         fireReturnCallback(invoker, invocation, result.getValue());
    21                     }
    22                 }
    23 
    24                 public void caught(Throwable exception) {
    25                     fireThrowCallback(invoker, invocation, exception);
    26                 }
    27             });
    28         }
    29     }

    这里的future对象时之前创建好的DefaultFuture对象。

     1     private volatile Response response;
     2     private volatile ResponseCallback callback;
     3 
     4     public boolean isDone() {
     5         return response != null;
     6     }
     7 
     8     public void setCallback(ResponseCallback callback) {
     9         if (isDone()) {
    10             invokeCallback(callback);
    11         } else {
    12             boolean isdone = false;
    13             lock.lock();
    14             try {
    15                 if (!isDone()) {
    16                     this.callback = callback;
    17                 } else {
    18                     isdone = true;
    19                 }
    20             } finally {
    21                 lock.unlock();
    22             }
    23             if (isdone) {
    24                 invokeCallback(callback);
    25             }
    26         }
    27     }

    这里判断响应是否已经返回了,如果返回了,直接执行invokeCallback(callback),否则将传入的ResponseCallback对象赋值给callback对象。

    2、Future<String> helloFuture = RpcContext.getContext().getFuture(); 

    RpcContext:

     1     private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
     2         @Override
     3         protected RpcContext initialValue() {
     4             return new RpcContext();
     5         }
     6     };
     7 
     8     private Future<?> future;
     9 
    10     public static RpcContext getContext() {
    11         return LOCAL.get();
    12     }
    13 
    14     public <T> Future<T> getFuture() {
    15         return (Future<T>) future;
    16     }

    从当前线程上下文中获取之前存进去的FutureAdapter对象。

    3、final String helloStr = helloFuture.get(); 

    helloFuture是上述的FutureAdapter对象,其get()调用的是内部的DefaultFuture的get(),该方法与同步调用时相同,源码分析见文章开头的三篇文章。

    1     public V get() throws InterruptedException, ExecutionException {
    2         try {
    3             return (V) (((Result) future.get()).recreate());
    4         } catch (RemotingException e) {
    5             throw new ExecutionException(e.getMessage(), e);
    6         } catch (Throwable e) {
    7             throw new RpcException(e);
    8         }
    9     }

    get方法的超时设置除了直接在xml中配置之外,还可以在代码中手动执行(优先级高) 

    1 final String helloStr2 = helloFuture.get(7000, TimeUnit.MILLISECONDS);

    三、asyncFuture2()源码解析

    下面来看一下asyncFuture2()源码:

    1、Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello("zhangsan"));

     1     public <T> Future<T> asyncCall(Callable<T> callable) {
     2         try {
     3             try {
     4                 setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
     5                 // 1 执行传入的任务(此处创建FutureAdapter对象,并且设置到当前线程的RpcContext的future对象中)
     6                 final T o = callable.call();
     7                 //local invoke will return directly
     8                 if (o != null) {
     9                     FutureTask<T> f = new FutureTask<T>(new Callable<T>() {
    10                         public T call() throws Exception {
    11                             return o;
    12                         }
    13                     });
    14                     f.run();
    15                     return f;
    16                 } else {
    17 
    18                 }
    19             } catch (Exception e) {
    20                 throw new RpcException(e);
    21             } finally {
    22                 removeAttachment(Constants.ASYNC_KEY);
    23             }
    24         } catch (final RpcException e) {
    25             return new Future<T>() {
    26                 public boolean cancel(boolean mayInterruptIfRunning) {
    27                     return false;
    28                 }
    29 
    30                 public boolean isCancelled() {
    31                     return false;
    32                 }
    33 
    34                 public boolean isDone() {
    35                     return true;
    36                 }
    37 
    38                 public T get() throws InterruptedException, ExecutionException {
    39                     throw new ExecutionException(e.getCause());
    40                 }
    41 
    42                 public T get(long timeout, TimeUnit unit)
    43                         throws InterruptedException, ExecutionException,
    44                         TimeoutException {
    45                     return get();
    46                 }
    47             };
    48         }
    49         // 2 从当前线程的RpcContext中获取future对象
    50         return ((Future<T>) getContext().getFuture());
    51     }

    这里外层的catch的作用是什么?没搞清楚 https://github.com/alibaba/dubbo/issues/1346

    2、final String helloStr = helloFuture.get();

    与同步相同。

    总结:dubbo异步与同步的差别:

    • 同步:创建DefaultFuture之后,直接get阻塞等待;
    • 异步:创建DefaultFuture之后,使用FutureAdapter进行包装,之后设置到当前线程的RpcContext中;后续用户在合适的时候自己从RpcContext获取future,之后get。
  • 相关阅读:
    软件的结构
    SpringMVC返回值类型及响应数据类型
    mybatis学习日记3
    面试八
    面试七
    微信小程序实例源码大全下载
    dot.js模板引擎,避免大量拼接字符串
    前端重构方案了解一下
    微信小程序开发基础教程
    如何让签到成为提升用户活跃度的利器
  • 原文地址:https://www.cnblogs.com/java-zhao/p/8424019.html
Copyright © 2011-2022 走看看