zoukankan      html  css  js  c++  java
  • alibaba远程调用框架dubbo原理

    alibaba有好几个分布式框架,主要有:进行远程调用(类似于RMI的这种远程调用)的(dubbo、hsf),jms消息服务(napoli、notify),KV数据库(tair)等。这个框架/工具/产品在实现的时候,都考虑到了容灾,扩展,负载均衡,于是出现一个配置中心(ConfigServer)的东西来解决这些问题。

    基本原理如图:
    在我们的系统中,经常会有一些跨系统的调用,如在A系统中要调用B系统的一个服务,我们可能会使用RMI直接来进行,B系统发布一个RMI接口服务,然后A 系统就来通过RMI调用这个接口,为了解决容灾,扩展,负载均衡的问题,我们可能会想很多办法,alibaba的这个办法感觉不错。
    本文只说dubbo,原理如下:
    • ConfigServer
    配置中心,和每个Server/Client之间会作一个实时的心跳检测(因为它们都是建立的Socket长连接),比如几秒钟检测一次。收集每个Server提供的服务的信息,每个Client的信息,整理出一个服务列表,如:
     serviceName serverAddressList clientAddressList
     UserService 192.168.0.1,192.168.0.2,192.168.0.3,192.168.0.4  172.16.0.1,172.16.0.2
     ProductService 192.168.0.3,192.168.0.4,192.168.0.5,192.168.0.6 172.16.0.2,172.16.0.3
     OrderService 192.168.0.10,192.168.0.12,192.168.0.5,192.168.0.6  172.16.0.3,172.16.0.4
    当某个Server不可用,那么就更新受影响的服务对应的serverAddressList,即把这个Server从serverAddressList中踢出去(从地址列表中删除),同时将推送serverAddressList给这些受影响的服务的clientAddressList里面的所有Client。如:192.168.0.3挂了,那么UserService和ProductService的serverAddressList都要把192.168.0.3删除掉,同时把新的列表告诉对应的Client 172.16.0.1,172.16.0.2,172.16.0.3;
    当某个Client挂了,那么更新受影响的服务对应的clientAddressList
    ConfigServer根据服务列表,就能提供一个web管理界面,来查看管理服务的提供者和使用者。
    新加一个Server时,由于它会主动与ConfigServer取得联系,而ConfigServer又会将这个信息主动发送给Client,所以新加一个Server时,只需要启动Server,然后几秒钟内,Client就会使用上它提供的服务
    • Client
    调用服务的机器,每个Client启动时,主动与ConfigServer建立Socket长连接,并将自己的IP等相应信息发送给ConfigServer。
    Client在使用服务的时候根据服务名称去ConfigServer中获取服务提供者信息(这样ConfigServer就知道某个服务是当前哪几个Client在使用),Client拿到这些服务提供者信息后,与它们都建立连接,后面就可以直接调用服务了,当有多个服务提供者的时候,Client根据一定的规则来进行负载均衡,如轮询,随机,按权重等。
    一旦Client使用的服务它对应的服务提供者有变化(服务提供者有新增,删除的情况),ConfigServer就会把最新的服务提供者列表推送给Client,Client就会依据最新的服务提供者列表重新建立连接,新增的提供者建立连接,删除的提供者丢弃连接
    • Server
    真正提供服务的机器,每个Server启动时,主动与ConfigServer建立Scoket长连接,并将自己的IP,提供的服务名称,端口等信息直接发送给ConfigServer,ConfigServer就会收集到每个Server提供的服务的信息。
     
    优点:
    1,只要在Client和Server启动的时候,ConfigServer是好的,服务就可调用了,如果后面ConfigServer挂了,那只影响ConfigServer挂了以后服务提供者有变化,而Client还无法感知这一变化。
    2,Client每次调用服务是不经过ConfigServer的,Client只是与它建立联系,从它那里获取提供服务者列表而已
    3,调用服务-负载均衡:Client调用服务时,可以根据规则在多个服务提供者之间轮流调用服务。
    4,服务提供者-容灾:某一个Server挂了,Client依然是可以正确的调用服务的,当前提是这个服务有至少2个服务提供者,Client能很快的感知到服务提供者的变化,并作出相应反应。
    5,服务提供者-扩展:添加一个服务提供者很容易,而且Client会很快的感知到它的存在并使用它。


    顺便说一下,hadoop里面的中心节点跟这里的configServer作用类似,在维护节点列表方面,不过它的相关计算都需要通过中心节节点,让它来分配任务。
     

    由于Dubbo底层采用Socket进行通信,自己对通信理理论也不是很清楚,所以顺便把通信的知识也学习一下。

    n  通信理论

    计算机与外界的信息交换称为通信。基本的通信方法有并行通信和串行通信两种。

    1.一组信息(通常是字节)的各位数据被同时传送的通信方法称为并行通信。并行通信依靠并行I/O接口实现。并行通信速度快,但传输线根数多,只适用于近距离(相距数公尺)的通信。

    2.一组信息的各位数据被逐位顺序传送的通信方式称为串行通信。串行通信可通过串行接口来实现。串行通信速度慢,但传输线少,适宜长距离通信。

    串行通信按信息传送方向分为以下3种:

    1)   单工

    只能一个方向传输数据

     

    2)   半双工

    信息能双向传输,但不能同时双向传输

     

    3)   全双工

    能双向传输并且可以同时双向传输

     

    n  Socket

    Socket 是一种应用接口, TCP/IP 是网络传输协议,虽然接口相同, 但是不同的协议会有不同的服务性质。创建Socket 连接时,可以指定使用的传输层协议,Socket 可以支持不同的传输层协议(TCP 或UDP ),当使用TCP 协议进行连接时,该Socket 连接就是一个TCP 连接。Soket 跟TCP/IP 并没有必然的联系。Socket 编程接口在设计的时候,就希望也能适应其他的网络协议。所以,socket 的出现只是可以更方便的使用TCP/IP 协议栈而已。

    引自:http://hi.baidu.com/lewutian/blog/item/b28e27fd446d641d09244d08.html

    上一个通信理论其实是想说Socket(TCP)通信是全双工的方式

    n  Dubbo远程同步调用原理分析

    从Dubbo开源文档上了解到一个调用过程如下图

    http://code.alibabatech.com/wiki/display/dubbo/User+Guide#UserGuide-APIReference

    另外文档里有说明:Dubbo缺省协议采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。

    Dubbo缺省协议,使用基于mina1.1.7+hessian3.2.1的tbremoting交互。

    • 连接个数:单连接
    • 连接方式:长连接
    • 传输协议:TCP
    • 传输方式:NIO异步传输
    • 序列化:Hessian二进制序列化
    • 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串。
    • 适用场景:常规远程服务方法调用
     通常,一个典型的同步远程调用应该是这样的:
     
    1, 客户端线程调用远程接口,向服务端发送请求,同时当前线程应该处于“暂停“状态,即线程不能向后执行了,必需要拿到服务端给自己的结果后才能向后执行
    2, 服务端接到客户端请求后,处理请求,将结果给客户端
    3, 客户端收到结果,然后当前线程继续往后执行
     
    Dubbo里使用到了Socket(采用apache mina框架做底层调用)来建立长连接,发送、接收数据,底层使用apache mina框架的IoSession进行发送消息。
     
    查看Dubbo文档及源代码可知,Dubbo底层使用Socket发送消息的形式进行数据传递,结合了mina框架,使用IoSession.write()方法,这个方法调用后对于整个远程调用(从发出请求到接收到结果)来说是一个异步的,即对于当前线程来说,将请求发送出来,线程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现了2个问题:
    • 当前线程怎么让它“暂停”,等结果回来后,再向后执行?
    • 正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?

    分析源代码,基本原理如下:
    1. client一个线程调用远程接口,生成一个唯一的ID(比如一段随机字符串,UUID等),Dubbo是使用AtomicLong从0开始累计数字的
    2. 将打包的方法调用信息(如调用的接口名称,方法名称,参数值列表等),和处理结果的回调对象callback,全部封装在一起,组成一个对象object
    3. 向专门存放调用信息的全局ConcurrentHashMap里面put(ID, object)
    4. 将ID和打包的方法调用信息封装成一对象connRequest,使用IoSession.write(connRequest)异步发送出去
    5. 当前线程再使用callback的get()方法试图获取远程返回的结果,在get()内部,则使用synchronized获取回调对象callback的锁, 再先检测是否已经获取到结果,如果没有,然后调用callback的wait()方法,释放callback上的锁,让当前线程处于等待状态。
    6. 服务端接收到请求并处理后,将结果(此结果中包含了前面的ID,即回传)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到ID,再从前面的ConcurrentHashMap里面get(ID),从而找到callback,将方法调用结果设置到callback对象里。
    7. 监听线程接着使用synchronized获取回调对象callback的锁(因为前面调用过wait(),那个线程已释放callback的锁了),再notifyAll(),唤醒前面处于等待状态的线程继续执行(callback的get()方法继续执行就能拿到调用结果了),至此,整个过程结束。
    这里还需要画一个大图来描述,后面再补了
    需要注意的是,这里的callback对象是每次调用产生一个新的,不能共享,否则会有问题;另外ID必需至少保证在一个Socket连接里面是唯一的。

    现在,前面两个问题已经有答案了,
    • 当前线程怎么让它“暂停”,等结果回来后,再向后执行?
         答:先 生成一个对象obj,在一个全局map里put(ID,obj)存放起来,再用synchronized获取obj锁,再调用obj.wait()让当前 线程处于等待状态,然后另一消息监听线程等到服务端结果来了后,再map.get(ID)找到obj,再用synchronized获取obj锁,再调用 obj.notifyAll()唤醒前面处于等待状态的线程。
    • 正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?
         答:使用一个ID,让其唯一,然后传递给服务端,再服务端又回传回来,这样就知道结果是原先哪个线程的了。
     
    这种做法不是第一次见了,10年在上一公司里,也是远程接口调用,不过走的消息中间件rabbitmq,同步调用的原理跟这类似,详见:rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理
     

    看一下Dubbo的相关代码


    关键代码:

    com.taobao.remoting.impl.DefaultClient.java

    //同步调用远程接口

    public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {

            byte protocol = getProtocol(control);

            if (!TRConstants.isValidProtocol(protocol)) {

                throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");

            }

            ResponseFuture future = invokeWithFuture(appRequest, control);

            return future.get();  //获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback

    }

    public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) {

             byte protocol = getProtocol(control);

             long timeout = getTimeout(control);

             ConnectionRequest request = new ConnectionRequest(appRequest);

             request.setSerializeProtocol(protocol);

             Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);

             connection.sendRequestWithCallback(request, adapter, timeout);

             return adapter;

    }

    Callback2FutureAdapter implements ResponseFuture

    public Object get() throws RemotingException, InterruptedException {

    synchronized (this) {  // 旋锁

       while (!isDone) {  // 是否有结果了

    wait(); //没结果是释放锁,让当前线程处于等待状态

       }

    }

    if (errorCode == TRConstants.RESULT_TIMEOUT) {

       throw new TimeoutException("Wait response timeout, request["

       + connectionRequest.getAppRequest() + "].");

    }

    else if (errorCode > 0) {

       throw new RemotingException(errorMsg);

    }

    else {

       return appResp;

    }

    }

    客户端收到服务端结果后,回调时相关方法,即设置isDone = truenotifyAll()

    public void handleResponse(Object _appResponse) {

             appResp = _appResponse; //将远程调用结果设置到callback中来

             setDone();

    }

    public void onRemotingException(int _errorType, String _errorMsg) {

             errorCode = _errorType;

             errorMsg = _errorMsg;

             setDone();

    }

    private void setDone() {

             isDone = true;

             synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了

                 notifyAll(); // 唤醒处于等待的线程

             }

    }

    com.taobao.remoting.impl.DefaultConnection.java

     

    // 用来存放请求和回调的MAP

    private final ConcurrentHashMap<Long, Object[]> requestResidents;

     

    //发送消息出去

    void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) {

             long requestId = connRequest.getId();

             long waitBegin = System.currentTimeMillis();

             long waitEnd = waitBegin + timeoutMs;

             Object[] queue = new Object[4];

             int idx = 0;

             queue[idx++] = waitEnd;

             queue[idx++] = waitBegin;   //用于记录日志

             queue[idx++] = connRequest; //用于记录日志

             queue[idx++] = callback;

             requestResidents.put(requestId, queue); // 记录响应队列

             write(connRequest);

             // 埋点记录等待响应的Map的大小

             StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(),

                       1L);

    }

    public void write(final Object connectionMsg) {

    //mina里的IoSession.write()发送消息

             WriteFuture writeFuture = ioSession.write(connectionMsg);

             // 注册FutureListener,当请求发送失败后,能够立即做出响应

             writeFuture.addListener(new MsgWrittenListener(this, connectionMsg));

    }

    /**

    * 在得到响应后,删除对应的请求队列,并执行回调

    * 调用者:MINA线程

    */

    public void putResponse(final ConnectionResponse connResp) {

             final long requestId = connResp.getRequestId();

             Object[] queue = requestResidents.remove(requestId);

             if (null == queue) {

                 Object appResp = connResp.getAppResponse();

                 String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName();

                 StringBuilder sb = new StringBuilder();

                 sb.append("Not found response receiver for requestId=[").append(requestId).append("],");

                 sb.append("from [").append(connResp.getHost()).append("],");

                 sb.append("response type [").append(appRespClazz).append("].");

                 LOGGER.warn(sb.toString());

                 return;

             }

             int idx = 0;

             idx++;

             long waitBegin = (Long) queue[idx++];

             ConnectionRequest connRequest = (ConnectionRequest) queue[idx++];

             ResponseCallback callback = (ResponseCallback) queue[idx++];

             // ** 把回调任务交给业务提供的线程池执行 **

             Executor callbackExecutor = callback.getExecutor();

             callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));

             long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间

             logIfResponseError(connResp, duration, connRequest.getAppRequest());

    }

    CallbackExecutorTask

    static private class CallbackExecutorTask implements Runnable {

             final ConnectionResponse resp;

             final ResponseCallback callback;

             final Thread createThread;

             CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) {

                 resp = _resp;

                 callback = _cb;

                 createThread = Thread.currentThread();

             }

             public void run() {

                 // 预防这种情况:业务提供的Executor,让调用者线程来执行任务

                 if (createThread == Thread.currentThread()

                           && callback.getExecutor() != DIYExecutor.getInstance()) {

                       StringBuilder sb = new StringBuilder();

                       sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:");

                       sb.append("Can not callback task on the network io thhread.");

                       LOGGER.warn(sb.toString());

                       return;

                 }

                 if (TRConstants.RESULT_SUCCESS == resp.getResult()) {

                       callback.handleResponse(resp.getAppResponse()); //设置调用结果

                 }

                 else {

                       callback.onRemotingException(resp.getResult(), resp

                                .getErrorMsg());  //处理调用异常

                 }

             }

    }

    另外:

    1,服务端在处理客户端的消息,然后再处理时,使用了线程池来并行处理,不用一个一个消息的处理

    同样,客户端接收到服务端的消息,也是使用线程池来处理消息,再回调

    原文链接:http://www.cnblogs.com/xmzzp/p/4178153.html
  • 相关阅读:
    CodeForces 19D Points (线段树+set)
    FZU 2105 Digits Count
    HDU 5618 Jam's problem again(三维偏序,CDQ分治,树状数组,线段树)
    HDU 5634 Rikka with Phi (线段树)
    Java实现 蓝桥杯 算法提高 转圈游戏(暴力快速幂)
    Java实现 蓝桥杯 算法提高 转圈游戏(暴力快速幂)
    Java实现 蓝桥杯 算法提高 转圈游戏(暴力快速幂)
    Java实现 蓝桥杯 算法提高VIP Substrings(暴力)
    Java实现 蓝桥杯 算法提高VIP Substrings(暴力)
    Java实现 蓝桥杯 算法提高VIP Substrings(暴力)
  • 原文地址:https://www.cnblogs.com/gotodsp/p/6532767.html
Copyright © 2011-2022 走看看