zoukankan      html  css  js  c++  java
  • dubbo异步调用原理 (1)

    此文已由作者赵计刚授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。


    一、使用方式

    服务提供方不变,调用方代码如下:

    1     <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
    2         <dubbo:method name="sayHello" async="true" timeout="60000"/>
    3         <dubbo:method name="sayBye" async="true" timeout="60000"/>
    4     </dubbo:reference>

    配置里添加<dubbo:method name="xxx" async="true"/>,表示单个方法xxx使用异步方式;如果demoService下的所有方法都使用异步,直接配置为<dubbo:reference async="true"/>。


    1     public static void main(String[] args) throws Exception {
     2         //Prevent to get IPV6 address,this way only work in debug mode
     3         //But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not
     4         System.setProperty("java.net.preferIPv4Stack", "true");
     5 
     6         asyncFuture2();
     7     }
     8 
     9     public static void asyncFuture1() throws ExecutionException, InterruptedException {
    10         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
    11         context.start();
    12         DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
    13 
    14         long start = System.currentTimeMillis();
    15 
    16         demoService.sayHello("zhangsan");
    17         Future<String> helloFuture = RpcContext.getContext().getFuture();
    18 
    19         demoService.sayBye("lisi");
    20         Future<String> byeFuture = RpcContext.getContext().getFuture();
    21 
    22         final String helloStr = helloFuture.get();//消耗5s
    23         final String byeStr = byeFuture.get();//消耗8s
    24 
    25         System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//总消耗8s
    26     }
    27 
    28     public static void asyncFuture2() throws ExecutionException, InterruptedException {
    29         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
    30         context.start();
    31         DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
    32 
    33         long start = System.currentTimeMillis();
    34 
    35         Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello("zhangsan"));
    36         Future<String> byeFuture = RpcContext.getContext().asyncCall(()->demoService.sayBye("lisi"));
    37 
    38         final String helloStr = helloFuture.get();//消耗5s
    39         final String byeStr = byeFuture.get();//消耗8s
    40 
    41         System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//总消耗8s
    42     }


    Consumer启动主类。其中asyncFuture2()方法是推荐用法,注意Callable(asyncCall方法的入参)只是一个任务task,不会新建线程;所以asyncFuture2()和asyncFuture1()相似,资源占用相同,都是用一根线程进行异步操作的。

    二、asyncFuture1()源码解析

    先来看asyncFuture1(),总体步骤:

    • demoService.sayHello("zhangsan"); 创建一个Future对象,存入当前线程的上下文中

    • Future<String> helloFuture = RpcContext.getContext().getFuture(); 从当前线程的上下文中获取第一步存入的Future对象

    • final String helloStr = helloFuture.get(); 阻塞等待,从Future中获取结果

    代码主要执行流(代码详细执行流看文章开头的三篇博客):

    1、demoService.sayHello("zhangsan"); 

    -->FutureFilter.invoke(final Invoker<?> invoker, final Invocation invocation)
       -->DubboInvoker.doInvoke(final Invocation invocation)

    FutureFilter:


     1     public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
     2         final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
     3 
     4         fireInvokeCallback(invoker, invocation);
     5         // need to configure if there's return value before the invocation in order to help invoker to judge if it's
     6         // necessary to return future.
     7         Result result = invoker.invoke(invocation);
     8         if (isAsync) {
     9             asyncCallback(invoker, invocation);
    10         } else {
    11             syncCallback(invoker, invocation, result);
    12         }
    13         return result;
    14     }


    对于如上异步操作(asyncFuture1()和asyncFuture2()),FutureFilter没起任何作用,该Filter主要会用在事件通知中,后续再说。

    DubboInvoker.doInvoke(final Invocation invocation):


     1     protected Result doInvoke(final Invocation invocation) throws Throwable {
     2         RpcInvocation inv = (RpcInvocation) invocation; 3         final String methodName = RpcUtils.getMethodName(invocation);
     4         inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
     5         inv.setAttachment(Constants.VERSION_KEY, version);
     6 
     7         ExchangeClient currentClient;
     8         if (clients.length == 1) {
     9             currentClient = clients[0];
    10         } else {
    11             currentClient = clients[index.getAndIncrement() % clients.length];
    12         }
    13         try {
    14             boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
    15             boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
    16             int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    17             if (isOneway) { //无返回值
    18                 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
    19                 currentClient.send(inv, isSent);
    20                 RpcContext.getContext().setFuture(null);
    21                 return new RpcResult();
    22             } else if (isAsync) { //异步有返回值
    23                 ResponseFuture future = currentClient.request(inv, timeout);
    24                 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
    25                 return new RpcResult();
    26             } else { //同步有返回值
    27                 RpcContext.getContext().setFuture(null);
    28                 return (Result) currentClient.request(inv, timeout).get();
    29             }
    30         } catch (TimeoutException e) {
    31             throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    32         } catch (RemotingException e) {
    33             throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    34         }
    35     }


    模式:

    • 如果是isOneway(不需要返回值),不管同步还是异步,请求直接发出,不会创建Future,直接返回RpcResult空对象。

    • 如果是isAsync(异步),则

      • 先创建ResponseFuture对象,之后使用FutureAdapter包装该ResponseFuture对象;(创建ResponseFuture对象与同步的代码相同,最后得到的是一个DefaultFuture对象)

      • 然后将该FutureAdapter对象设入当前线程的上下文中RpcContext.getContext();

      • 最后返回空的RpcResult

    • 如果是同步,则先创建ResponseFuture对象,之后直接调用其get()方法进行阻塞调用(见文章开头的三篇文章)

    简单来看一下FutureAdapter:


     1 public class FutureAdapter<V> implements Future<V> {
     2 
     3     private final ResponseFuture future;
     4 
     5     public FutureAdapter(ResponseFuture future) {
     6         this.future = future;
     7     }
     8 
     9     public ResponseFuture getFuture() {
    10         return future;
    11     }
    12 
    13     public boolean cancel(boolean mayInterruptIfRunning) {
    14         return false;
    15     }
    16 
    17     public boolean isCancelled() {
    18         return false;
    19     }
    20 
    21     public boolean isDone() {
    22         return future.isDone();
    23     }
    24 
    25     @SuppressWarnings("unchecked")
    26     public V get() throws InterruptedException, ExecutionException {
    27         try {
    28             return (V) (((Result) future.get()).recreate());
    29         } catch (RemotingException e) {
    30             throw new ExecutionException(e.getMessage(), e);
    31         } catch (Throwable e) {
    32             throw new RpcException(e);
    33         }
    34     }
    35 
    36     @SuppressWarnings("unchecked")
    37     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    38         int timeoutInMillis = (int) unit.convert(timeout, TimeUnit.MILLISECONDS);
    39         try {
    40             return (V) (((Result) future.get(timeoutInMillis)).recreate());
    41         } catch (com.alibaba.dubbo.remoting.TimeoutException e) {
    42             throw new TimeoutException(StringUtils.toString(e));
    43         } catch (RemotingException e) {
    44             throw new ExecutionException(e.getMessage(), e);
    45         } catch (Throwable e) {
    46             throw new RpcException(e);
    47         }
    48     }
    49 }


    最后,回头看一下FutureFilter:

     1     public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
     2         final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
     3 
     4         fireInvokeCallback(invoker, invocation);
     5         // need to configure if there's return value before the invocation in order to help invoker to judge if it's
     6         // necessary to return future.
     7         Result result = invoker.invoke(invocation);
     8         if (isAsync) {
     9             asyncCallback(invoker, invocation);
    10         } else {
    11             syncCallback(invoker, invocation, result);
    12         }
    13         return result;
    14     }




    免费体验云安全(易盾)内容安全、验证码等服务

    更多网易技术、产品、运营经验分享请点击


    相关文章:
    【推荐】 使用QUIC
    【推荐】 数据库路由中间件MyCat - 源代码篇(5)

  • 相关阅读:
    第13组_16通信3班_045_OSPFv3作业
    RIPng配置(第十三组)
    基于IPV6的数据包分析(更新拓扑加入了linux主机和抓取133icmp包)(第十三组)
    vmware vsphere powercli 因为在此系统中禁止执行脚本
    vmware virtual machine must be running in order to be migrated
    flashback transaction闪回事务查询
    oracle 闪回功能详解
    linux下修改/dev/shm tmpfs文件系统大小
    vmware虚拟机guest系统重启后获得169.254.X.X的ip解决方法
    一键部署 PPTP server
  • 原文地址:https://www.cnblogs.com/163yun/p/9970855.html
Copyright © 2011-2022 走看看