zoukankan      html  css  js  c++  java
  • dubbo源码—service reply

    dubbo通过netty将请求发送到provider的时候,provider之前已经启动好的NettyServer监听指定端口的时候会收到来自consumer的请求,将通过网络发送来的二进制编码成Request交给上层处理。dubbo从Request中取出调用信息,找到之前的Invoker,然后经过filter,最后通过代理调用到提供服务的方法。

    provider处理请求的调用堆栈如下:

    sayHe110:18, TestDubb0Servicelmpl (com.test.service.impl) 
    invokeMethod:-1, Wrapper1 (com. alibabadubbo. common.bytecode) 
    dolnvoke:46, JavassistProxyFactory$1 (com.alibaba.dubbo.rpc.proxy.javassist) 
    invoke:72, AbstractProxylnvoker (com.alibaba.dubbo.rpc.proxy) 
    invoke:53, InvokerWrapper (com.alibaba.dubbo.rpc.protocol) 
    invoke:64, ExceptionFilter .com alibaba.dubbo.rpc filter) 
    invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) 
    invoke:64, MonitorFilter .com alibaba.dubbo. monitor.support) 
    invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) 
    invoke:42, TimeoutFilter .com alibaba.dubbo. rpc.filter) 
    invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) 
    invoke:49, TokenFilter .com alibaba.dubbo. roc. filter) 
    invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) 
    invoke:78, TraceFilter .com alibaba dubbo. roc. protocol.dubbo.filter) 
    invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) 
    invoke:60, ContextFilter .com alibaba.dubbo. roc. filter) 
    invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) 
    invoke:132, GenericFilter .com alibaba.dubbo. roc. filter) 
    invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) 
    invoke:38, ClassLoaderFilter .com alibaba dubbo.rpc.filter) 
    invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) 
    invoke:38, EchoFilter .com alibaba dubbo. rpc filter) 
    invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) 
    reply:108, DubboProtocol$1 .com alibaba dubbo.rpcprotocol.dubbo) 
    handleRequest:86, HeaderExchangeHandler (com.alibaba.dubbo.remoting.exchange.support.header) 
    received:172, HeaderExchangeHandler (com.alibaba dubbo. remoting. exchange.support.header) 
    received:52, DecodeHandler (com.alibaba dubbo.remoting. transport) 
    run:82, ChannelEventRunnable (com.alibaba.dubbo.remoting.transport.dispatcher) 
    runWorker:1142, ThreadPoolExecutor (java.util.concurrent) 
    run:617, ThreadPoolExecutor$Worker (java.util.concurrent) 
    run:745, Thread (java.lang) 
    

    从调用堆栈基本可以看出provider整个处理请求的过程,比较简单,但是需要知道为什么调用过程是这样的?其中关键类是什么时候在初始化的?怎么初始化的?

    接下来解决一下问题:

    1. 为什么是从ChannelEventRunnable开始的?谁初始化的ChannelEventRunnable?ChannelEventRunnable作用是什么?
    2. 为什么会调用到上面堆栈中的几个handler(也就是handler是怎么初始化的)?
    3. filter链怎么初始化的?

    本来这些问题在export的时候如果仔细查看源码已经可以解决了,但是真正用到的时候是处理请求的时候,所以这里算是补上之前export过程的一些关键步骤。

    ChannelEventRunnable初始化

    上面的调用堆栈中,是在线程池中一个单独的线程来处理请求,所以先从线程池中调用的线程开始,ChannelEventRunnable的构造过程。

    接着前面provider export的时候会启动NettyServer,所以ChannelEventRunnable的创建也从NettyServer的启动说起,ChannelEventRunnable被初始化的过程会涉及到netty的部分内容:

    1. NettyServer#doOpen,NettyServer启动的时候会创建NioServerSocketChannelFactory,该factory负责创建netty放入所有channel
    2. 在NioServerSocketChannelFactory构造方法中会初始化NioWorkerPool,在该类的构造方法中创建NioWorker
    3. 在创建NioWorker的过程中,调用超类AbstractNioSelector的构造方法
    // NioWorker构造方法中会调用超类AbstractNioSelector的构造方法
    AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
      this.executor = executor;
      openSelector(determiner);
    }
    
    // org.jboss.netty.channel.socket.nio.AbstractNioSelector#openSelector
    private void openSelector(ThreadNameDeterminer determiner) {
      try {
        // open selector
        selector = SelectorUtil.open();
      } catch (Throwable t) {
        throw new ChannelException("Failed to create a selector.", t);
      }
    
      // Start the worker thread with the new Selector.
      boolean success = false;
      try {
        // new一个thread,将当前初始化的NioWorker作为入参,也就是说最终要运行的是NioWorker.run
        // 这个start方法里面会将新建的这个线程放到线程池中运行
        // 这里的executor就是new NioServerSocketChannelFactory时候的入参worker,也就是worker线程池
        DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
        success = true;
      } finally {
        // 省略中间代码...
      }
      assert selector != null && selector.isOpen();
    }
    
    // org.jboss.netty.channel.socket.nio.AbstractNioWorker#newThreadRenamingRunnable
    @Override
    protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
      // 这里的this就是初始化的NioWorker
      return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner);
    }
    
    // org.jboss.netty.channel.socket.nio.NioWorker#run
    @Override
    public void run() {
      // 上面DeadLockProofWorker.start里面启动的线程会调用这个run方法
      // 这里调用了超类的run方法,最终会调用到org.jboss.netty.channel.socket.nio.AbstractNioSelector#run
      // AbstractNioSelector#run
      super.run();
      recvBufferPool.releaseExternalResources();
    }
    
    // AbstractNioSelector#run
    // 这个方法是NioWorker真正处理逻辑的地方,死循环调用select接受IO事件,然后处理
    public void run() {
      thread = Thread.currentThread();
    
      int selectReturnsImmediately = 0;
      Selector selector = this.selector;
    
      if (selector == null) {
        return;
      }
      // use 80% of the timeout for measure
      final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
      boolean wakenupFromLoop = false;
      for (;;) {
        wakenUp.set(false);
    
        try {
          long beforeSelect = System.nanoTime();
          // 监听I/O事件发生
          int selected = select(selector);
       
          // 省略中间代码...
          
          if (shutdown) {
    		// 省略中间代码...
          } else {
            // 处理I/O事件
            process(selector);
          }
        } catch (Throwable t) {
          // 省略中间代码...
        }
      }
    }
    

    接下来到初始化ChannelEventRunnable的调用堆栈

    终于到了ChannelEventRunnable开始初始化的地方,所有的ChannelEventRunnable都是在AllChannelHandler中完成初始化,并加入到线程池中执行,下面以收到connect事件为例

    public void connected(Channel channel) throws RemotingException {
      ExecutorService cexecutor = getExecutorService(); 
      try{
        // 初始化ChannelEventRunnable并将其加入线程池
        // 这里的线程池是com.alibaba.dubbo.common.threadpool.ThreadPool这个扩展,默认配置的是"fixed",也就是FixedThreadPool
        cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
      }catch (Throwable t) {
        throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t);
      }
    }
    

    处理请求

    上面最终启动了ChannelEventRunnable线程,在这个线程中会最终调用到我们的SayHello方法中,这个类负责分类处理各种接收到的I/O事件

    // com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run
    public void run() {
      switch (state) {
        case CONNECTED:
          try{
            // 接收到连接
            handler.connected(channel);
          }catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
          }
          break;
        case DISCONNECTED:
          try{
            // 连接断开
            handler.disconnected(channel);
          }catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
          }
          break;
        case SENT:
          try{
            // 发送数据
            handler.sent(channel,message);
          }catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is "+ message,e);
          }
          break;
        case RECEIVED:
          try{
            // 收到数据
            handler.received(channel, message);
          }catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is "+ message,e);
          }
          break;
        case CAUGHT:
          try{
            // 处理异常
            handler.caught(channel, exception);
          }catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is "+ channel
                        + ", message is: " + message + ", exception is " + exception,e);
          }
          break;
        default:
          logger.warn("unknown state: " + state + ", message is " + message);
      }
    }
    

    上面通过调用handler的相关方法来处理的,接下来看看handler是什么?

    handler初始化

    从最上面的调用堆栈里面有这些handler

    com.alibaba.dubbo.remoting.transport.DecodeHandler#DecodeHandler
    com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler
    // 最上面调用堆栈中com alibaba dubbo.rpcprotocol.dubbo.DubboProtocol$1.reply其实就是线面这个接口的实现类
    com.alibaba.dubbo.remoting.exchange.ExchangeHandler
    

    之前在dubbo export中说过启动NettyServer的调用堆栈,但是并没有详细看每一个调用方法,这里把相关重要的方法拿出来

    // com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
      // 这些请求received、connected、disconnected最终都会调用下面这个方法处理
      public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
        // 省略中间代码...
      }
      // 省略中间代码...
    }
    
    // com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
    private ExchangeServer createServer(URL url) {
      // 省略中间代码...
      // 这里的handler就是上面初始化的,是一个匿名内部类,也就是com.alibaba.dubbo.remoting.exchange.ExchangeHandler的实现类
      server = Exchangers.bind(url, requestHandler);
      // 省略中间代码...
      return server;
    }
    
    // com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
      // 这里的handler就是上面bind方法传入的requestHandler
      // 所以这里就是初始化DecodeHandler和HeaderExchangeHandler的地方,也就说传入Transporters.bind方法的是DecodeHandler类型
      return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
    

    ChannelEventRunnable中的handler是什么类型?

    从最上面的堆栈已经知道这个handler其实就是DecodeHandler,也就是初始化ChannelEventRunnable的时候传入的handler,接下来需要弄清楚的是为什么是DecodeHandler。

    上面刚说过ChannelEventRunnable的初始化是由AllChannelHandler中的某一个方法初始化的,那么作为构造参数传入ChannelEventRunnable的handler也就是WrappedChannelHandler#handler(这个类是AllChannelHandler的超类),现在要找到AllChannelHandler是怎么初始化的。

    // com.alibaba.dubbo.remoting.transport.netty.NettyServer#NettyServer
    // 上面说handler的初始化的时候,Transporters.bind方法会最终调用NettyServer的构造方法
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
      // 这里的handler就是DecodeHandler
      super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
    
    // com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers#wrap
    public static ChannelHandler wrap(ChannelHandler handler, URL url){
      // 这里的handler是DecodeHandler
      return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
    
    // com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers#wrapInternal
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
      // 这里的handler是DecodeHandler
      // 先获取Dispatcher的扩展类,默认是com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
      // 然后调用AllDispatcher.dispatch方法
      return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                                          .getAdaptiveExtension().dispatch(handler, url)));
    }
    
    // com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher#dispatch
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
      // 这里的handler是DecodeHandler,所以AllChannelHandler的超类WrappedChannelHandler#handler就是DecodeHandler
      return new AllChannelHandler(handler, url);
    }
    

    也就是ChannelEventRunnable中的handler就是HeaderExchanger#bind方法中new出来的DecodeHandler类型的对象

    filter链构造

    filter链的构造本来也是在provider export服务的时候完成的,同理consumer端是在refer服务的时候完成filter链的构造。

    consumer和provider的filter链都是在下面的类中构造的,查看前面的service_export和service_reference的调用堆栈就可以看到对该类的调用。

    // com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
    public class ProtocolFilterWrapper implements Protocol {
    
        private final Protocol protocol;
    
        public ProtocolFilterWrapper(Protocol protocol){
            if (protocol == null) {
                throw new IllegalArgumentException("protocol == null");
            }
            this.protocol = protocol;
        }
    
        public int getDefaultPort() {
            return protocol.getDefaultPort();
        }
    
        // service export的时候调用 
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
                return protocol.export(invoker);
            }
            // 先构造filter链再继续后面的export
            return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
        }
    
        // consumer refer的还是调用
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url);
            }
            // 这里是先refer调用创建DubboInvoker,然后才构造filter链,因为consumer是先经过filter链,再经过DubboInvoker处理,而provider是先经过DubboProtocol处理,然后调用filter链
            return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
        }
    
        public void destroy() {
            protocol.destroy();
        }
    
      	// 
        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
          	// 获取所有符合条件的filter扩展,条件包括
          	// 1. filter扩展类上面group对应的值和要求的group(入参)一致
          	// 2. url中也可以指定加载的filter或者剔除的filter,url配置的key就是入参的key
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (filters.size() > 0) {
                for (int i = filters.size() - 1; i >= 0; i --) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                  	// 每个filter使用一个Invoker包裹
                    last = new Invoker<T>() {
    
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
    
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
    
                        public Result invoke(Invocation invocation) throws RpcException {
                          	// 将next传入,在filter负责调用,由此构成链
                            return filter.invoke(next, invocation);
                        }
    
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
            return last;
        }
    }
    

    所以现在返回看最前面的调用堆栈一切应该是顺理成章了,netty接收到I/O请求后,通知到NioWorker,在NioWorker线程中经过pipeline的处理后启动了ChannelEventRunnable线程;在ChannelEventRunnable线程线程中根据接收到的不同事件调用handler的不同方法来处理,经过多个handler处理之后,经过的是filter链,最后会调用到我们编写的service方法。执行完我们的方法之后,dubo会将结果通过netty发送给consumer。

    总结

    上面通过提问题的方式,解读了一些阅读源码中的关键代码,现在将service export和service reply结合起来,再去阅读源代码就就本能读懂所有主流程了,就能明白源代码为什么这么写。

  • 相关阅读:
    GIT基础详解
    JS进阶解析
    JS基础解析
    CSS布局模型解析
    02.CentOS Linux 7.7 系统配置文档
    docker 创建bridge网络和修改默认网段
    selenium浏览器自动化测试工具 进阶使用
    前端导出Excel和打印介绍
    stm32使用gmtime()转换timestamp为日期,出的结果是乱的,不符合预期。改为localtime正常输出
    .net core api action 不能用作 httpget注释的参数名
  • 原文地址:https://www.cnblogs.com/sunshine-2015/p/8379902.html
Copyright © 2011-2022 走看看