zoukankan      html  css  js  c++  java
  • dubbo的exchange层

    介绍
    先来看下Dubbo的整体架构图。Exchange在倒数第三层。我用黄色线框框出来的区域。

     

     Exchange层,属于信息交换层,是对Request和Response的抽象。

    为什么要单独抽象出一个Exchange层,而不是在Protocol层直接对Netty或者Mina引用?这个问题其实不难理解,Netty或者Mina对外接口和调用方式都不一样,如果在Protocol层直接对Mina做引用,对于Protocol层来讲,就依赖了具体而不是抽象,过几天想要换成Netty,就需要对Protocol层做大量的修改。这样不符合开闭原则。

    Dubbo使用的是TCP长连接,与我们开发常见到的HTTP协议(HTTP本身与TCP也不在同一层)不同 。TCP本身没有Request和Response的概念。只有发送和接收。HTTP协议中的Request和Response是由Http服务器或者Servlet容器来实现的。

    Dubbo要使用TCP长连接,就得自己实现Request和Response的抽象概念,这样客户端与服务端之间的交互才能有去有回。

     

     说下交互流程:A服务首先向B服务发送【TCP消息】,B服务收到请求后,做业务处理,然后向A服务发送【TCP消息】

    看代码
    先看Exchanger接口

    public interface Exchanger {
    
    // 抽象出了bind行为,这个行为要完成服务端口暴露的动作,并且返回ExchangeServer抽象
    // Protocol层只需要给URL和handler就可以完成端口暴露的动作
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
    
    // 抽象出了connect行为,这个行为要完成客户端与服务端的连接动作,并且返回ExchangeClient抽象
    // Protocol层只需要给URL和handler就可以完成端口暴露的动作
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
    }

     Exchanger只有一个实现HeaderExchanger

    public class HeaderExchanger implements Exchanger {
    
        public static final String NAME = "header";
    
        @Override
        public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
              // 与服务端建立TCP连接,并且返回HeaderExchangeClient
              // Request和Response的概念提现要详细关注HeaderExchangeHandler
            return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
        }
    
        @Override
        public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
          // 暴露服务,并且返回HeaderExchangeServer
          // Request和Response的概念提现要详细关注HeaderExchangeHandler
          return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    }
    public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    // ----------------------此处生路一堆代码------------------------
    @Override // 接受信息
    public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
    if (message instanceof Request) {// 看看信息是不是Request
    // handle request.
    Request request = (Request) message;
    if (request.isEvent()) {
    handlerEvent(channel, request);
    } else {
    if (request.isTwoWay()) {// twoWay代表这个消息要回复
    // 服务器端接到请求,调用handleRequest得到Response。
    // 这边就提现了Request和Response的概念
    // handleRequest,实际上是去做实际的业务动作了
    Response response = handleRequest(exchangeChannel, request);
    channel.send(response);
    } else {
    // 不需要返回
    handler.received(exchangeChannel, request.getData());
    }
    }
    } else if (message instanceof Response) {// 看看信息是不是Response
    // 如果是Response,那么消息肯定是从Privoder方发来的。
    handleResponse(channel, (Response) message);
    } else if (message instanceof String) {
    if (isClientSide(channel)) {
    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
    logger.error(e.getMessage(), e);
    } else {
    String echo = handler.telnet(channel, (String) message);
    if (echo != null && echo.length() > 0) {
    channel.send(echo);
    }
    }
    } else {
    handler.received(exchangeChannel, message);
    }
    } finally {
    HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
    }
    
    // 收到Privoder方返回的Response信息,并且做出处理
    static void handleResponse(Channel channel, Response response) throws RemotingException {
    if (response != null && !response.isHeartbeat()) {
    // 调用DefaultFuture.received来通知Response消息到了。
    DefaultFuture.received(channel, response);
    }
    }
    
    // 服务器端接到请求,调用handleRequest得到Response。
    // 这边就提现了Request和Response的概念
    // handleRequest,实际上是去做实际的业务动作了
    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    if (req.isBroken()) {
    Object data = req.getData();
    
    String msg;
    if (data == null) msg = null;
    else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
    else msg = data.toString();
    res.setErrorMessage("Fail to decode request due to: " + msg);
    res.setStatus(Response.BAD_REQUEST);
    
    return res;
    }
    // find handler by message class.
    Object msg = req.getData();
    try {
    // handle data.
    // 完成实际的业务动作,也就是调用DubboProtocol.requestHandler.reply类中的实现。
    // 并且返回Response信息
    Object result = handler.reply(channel, msg);
    res.setStatus(Response.OK);
    res.setResult(result);
    } catch (Throwable e) {
    res.setStatus(Response.SERVICE_ERROR);
    res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
    }
    // ----------------------此处生路一堆代码------------------------
    }

    上面的代码比较多,这边总结下。received方法用于接受信息,这个方法是Provider和Consumer共用的。Provider接收的信息必然是Request,它所处的角色就类似与服务器。Consumer接收的信息必然是Response,它所处的角色就类似于客户端。当然,对于事件Event类信息另说,这边为了思路清晰,不展开细说。

    1.角色Provider,接收Request信息。然后就是做业务动作,接着就是判断是否回复Response,要看twoWay这个标识。具体再回看下代码中的一些注释。追踪下需要回复Response的情况(因为大部分情况下,我们都是用同步请求,需要回复Response)

    找到代码Object result = handler.reply(channel, msg);,reply动作会调用到DubboProtocol.requestHandler.reply。这个地方如果自己跟代码会比较乱。对Dubbo Handler不太熟悉可以看文章dubbo的handler机制。提醒下,看HeaderExchanger.bind动作的实现,会发现DubboProtocol.requestHandler被包起来了。所以这个地方reply一定是调用DubboProtocol.requestHandler的。看下那块代码。

    @Override// 执行invoke并且返回Response信息
    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
        if (message instanceof Invocation) {
            Invocation inv = (Invocation) message;
            Invoker<?> invoker = getInvoker(channel, inv);
                    // ----------------------此处生路一堆代码------------------------
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
              // 执行invoke并且返回Response信息
            return invoker.invoke(inv);
        }
    }

    invoker.invoke(inv);执行的是服务在初始化的时候Protocol.export出来的。实际export出来的Exporter会被过滤器链给包住。这部分知识可以查看文档dubbo的filter

    2.角色Consumer,接收Response信息。然后就是通知DefaultFuture说Response回来了。详细看下DefaultFuture,它就像以前接收信件的传达室,员工对外发了一封信,告知传达室登记下。传达室接到回信后再把信件交给员工。这就完成的信息的一来一回。来看代码HeaderExchangeChannel.request

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // ----------------------此处生路一堆代码------------------------
      // create request.
      Request req = new Request();
      req.setVersion(Version.getProtocolVersion());
      req.setTwoWay(true);
      req.setData(request);
      // 向DefaultFuture做登记,说明下面Request马上就会发出,然后DefaultFuture就会等待这个channel回来的Response消息。
      DefaultFuture future = new DefaultFuture(channel, req, timeout);
      try {
        // 发送Request消息
        channel.send(req);
      } catch (RemotingException e) {
        future.cancel();
        throw e;
      }
      return future;
    }

    向DefaultFuture做登记,说明下面Request马上就会发出,然后DefaultFuture就会等待这个channel回来的Response消息。channel.send发送消息完成后立马会返回ResponseFuture。并不会等待Provider端的返回。那总有地方需要等待。看下DefaultFuture类的实现就会明白。DefaultFuture类比较大。挑重要的说。

    public class DefaultFuture implements ResponseFuture {
          // ----------------------此处生路一堆代码------------------------
        static {
              // 类在初始化的时候,会搞出一个Thread,还是个守护线程。用来扫描所有的请求,看是否有超时的。
              // 30毫秒一次,具体可以看RemotingInvocationTimeoutScan的实现。
            Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
            th.setDaemon(true);
            th.start();
        }
            // ----------------------此处生路一堆代码------------------------
        @Override    // get方法用来等待
        public Object get() throws RemotingException {
            return get(timeout);
        }
    
        @Override
        public Object get(int timeout) throws RemotingException {
            if (timeout <= 0) {// 超时时长
                timeout = Constants.DEFAULT_TIMEOUT;
            }
            if (!isDone()) {
                long start = System.currentTimeMillis();
                lock.lock();
                try {
                      // 循环等待
                    while (!isDone()) {
                        done.await(timeout, TimeUnit.MILLISECONDS);
                        if (isDone() || System.currentTimeMillis() - start > timeout) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
                if (!isDone()) {
                    throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
                }
            }
            return returnFromResponse();
        }    
            // ----------------------此处生路一堆代码------------------------
    }

    DefaultFuture.get方法做了消息等待的动作。再来看看使用Dubbo协议的情况下,哪个地方调用了DefaultFuture.get,找到DubboInvoker.doInvoke

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        // ----------------------此处生路一堆代码------------------------
      if (isOneway) {// oneWay,只去不回。
        boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
        currentClient.send(inv, isSent);
        RpcContext.getContext().setFuture(null);
        return new RpcResult();
      } else if (isAsync) {// 异步消息,只去不回。也不用等待
        ResponseFuture future = currentClient.request(inv, timeout);
        RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
        return new RpcResult();
      } else {
        RpcContext.getContext().setFuture(null);
        // 有去有回,有Request,要等待Response。所以调用DefaultFuture.get方法。
        return (Result) currentClient.request(inv, timeout).get();
      }
        // ----------------------此处生路一堆代码------------------------
    }

    DubboInvoker是在服务启动时,做refer依赖的时候做实例化的。当调用服务时,就是运行了这边的doInvoke方法。这个方法中依赖了ExchangeClient作为请求客户端,发送Request消息并且调用DefaultFuture.get方法来等待Response返回。

    总结
    Exchange是对Request和Reponse的抽象。由于Exchange层的类比较错综复杂,这边只能过一遍源码流程中最重要的部分。
    ————————————————
    版权声明:本文为CSDN博主「吴键」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/wjlucky262/article/details/105907565

  • 相关阅读:
    关于win10输入法问题(打不出中文)解决方法
    Docker 修改默认存储位置
    Enabling and Mounting NFS on CoreOS
    docker run mysql
    Specified key was too long; max key length is 767 bytes mysql
    C# 实现 Snowflake算法 ID生成
    无忧之道:Docker中容器的备份、恢复和迁移
    IIS Express 虚拟目录
    从零開始学android&lt;AnalogClock与DigitalClock时钟组件.三十一.&gt;
    jquery版本号升级不兼容的问题:$(&quot;input&quot;).attr(&quot;value&quot;)功能发生改变
  • 原文地址:https://www.cnblogs.com/nizuimeiabc1/p/14855857.html
Copyright © 2011-2022 走看看