zoukankan      html  css  js  c++  java
  • RPC-非阻塞通信下的同步API实现原理,以Dubbo为例

      Netty在Java NIO领域基本算是独占鳌头,涉及到高性能网络通信,基本都会以Netty为底层通信框架,Dubbo 也不例外。以下将以Dubbo实现为例介绍其是如何在NIO非阻塞通信基础上实现同步通信的。

        Dubbo为一种RPC通信框架,提供进程间的通信,在使用dubbo协议+Netty作为传输层时,提供三种API调用方式:

    1. 同步接口
    2. 异步带回调接口
    3. 异步不带回调接口

        同步接口适用在大部分环境,通信方式简单、可靠,客户端发起调用,等待服务端处理,调用结果同步返回。这种方式下,在高吞吐、高性能(响应时间很快)的服务接口场景中最为适用,可以减少异步带来的额外的消耗,也方便客户端做一致性保证。

     

        异步带回调接口,用在任务处理时间较长,客户端应用线程不愿阻塞等待,而是为了提高自身处理能力希望服务端处理完成后可以异步通知应用线程。这种方式可以大大提升客户端的吞吐量,避免因为服务端的耗时问题拖死客户端。

        异步不带回调接口,一些场景为了进一步提升客户端的吞吐能力,只需发起一次服务端调用,不需关系调用结果,可以使用此种通信方式。一般在不需要严格保证数据一致性或者有其他补偿措施的情况下,选用这种,可以最小化远程调用带来的性能损耗。

        

        来看一下Dubbo是如何实现这三种API的。核心代码在com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker,如下图对应的位置,属于协议层的实现部分。为方便大家可以准确定位代码所在位置,使用截图的方式,而不是直接贴代码了。

        上文描述的是三种API方式,Dubbo里面通过参数isOneway、isAsync来控制,isOneway=true表示异步不带回调,isAsync=true表示异步带回调,否则是同步API。具体是如何控制,看以下代码:

        isOneway==true时,客户端send完请求后,直接return一个空结果的RpcResult;isAsync==true时,客户端发起请求,设置一个ResponseFuture,直接return一个空结果的RpcResult,接下来当服务端处理完成,客户端Netty层在收到响应后会通过Future通知应用线程;最后是同步情况下,客户端发起请求,并通过get()方法阻塞等待服务端的响应结果。

        异步API情况下,结合NIO模型比较好理解是如何实现的(当然需要先了解NIO的reactor模型),接下来重点理解下,这个get()阻塞方法是如何做到基于非阻塞NIO实现同步阻塞效果。

        直接进入get()方法内部。

        可以看到是利用Java的锁机制实现,循环判断是否收到响应,如果收到或者等待超时则返回。done的实例对象如下:

    private final Lock                            lock = new ReentrantLock();
    private final Condition                       done = lock.newCondition();

        使用可重入锁ReentrantLock,获取一个Condition对象在其上做await操作。这里有await操作,何时被唤醒呢,有两个条件,第一个是等待timeout超时,默认dubbo是1s,第二个就是被其他线程唤醒,即收到了服务端的响应。

        signal信号一发出,上文循环检测内的await操作会立即返回,下一次isDone判断会变成true,直接跳出循环。

        仔细看代码会发现,被唤醒的地方还有一个是在DefaultFuture内部有一个超时轮询检测的线程,这个线程主要是处理响应超时后触发资源回收、记录异常日志等操作。    

    private static class RemotingInvocationTimeoutScan implements Runnable {
    
            public void run() {
                while (true) {
                    try {
                        for (DefaultFuture future : FUTURES.values()) {
                            if (future == null || future.isDone()) {
                                continue;
                            }
                            if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                                // create exception response.
                                Response timeoutResponse = new Response(future.getId());
                                // set timeout status.
                                timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                                timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                                // handle response.
                                DefaultFuture.received(future.getChannel(), timeoutResponse);
                            }
                        }
                        Thread.sleep(30);
                    } catch (Throwable e) {
                        logger.error("Exception when scan the timeout invocation of remoting.", e);
                    }
                }
            }
        }
    
        static {
            Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
            th.setDaemon(true);
            th.start();
        }

          可能会有疑问,这个触发操作为何不直接在get()方法内部检测到超时直接调用DefaultFuture.received(Channel channel, Response response)来清理,而是要额外开启一个后台线程。

        单独启动一个超时线程有两个好处:

    1.  提高超时精度

          get()方法内部的轮询有一个timeout,每次超时唤醒的时间间隔至少是timeout时长,最差的情况可能会等待2*timeout作出超时反应。在超时轮询线程中,每隔30ms遍历检测一次,可以很大程度的提升超时精度。

           2.  提升性能,降低响应时间

          剥离超时处理逻辑到一个单独线程,可以减少对业务线程的时间占用,这个超时后的处理对应用来说并无直接作用,完全可以放到后台异步去处理。另外单独在一个线程中,实际上有批量处理的表现。

        以上是就NIO通信基础上实现三种API调用的实现原理,或许有更多优于Dubbo的处理方式,可以拿出来讨论。

  • 相关阅读:
    UOJ #455 [UER #8]雪灾与外卖 (贪心、模拟费用流)
    Codeforces 482E ELCA (LCT)
    Codeforces 798D Mike and distribution (构造)
    AtCoder AGC017C Snuke and Spells
    HDU 6089 Rikka with Terrorist (线段树)
    HDU 6136 Death Podracing (堆)
    AtCoder AGC032D Rotation Sort (DP)
    jenkins+python+kubectl实现批量更新k8s镜像
    Linux 下载最新kubectl版本的命令:
    jenkins X 和k8s CI/CD
  • 原文地址:https://www.cnblogs.com/yaohonv/p/rpc_sycn_nio.html
Copyright © 2011-2022 走看看