zoukankan      html  css  js  c++  java
  • 2、Dubbo源码解析--服务发布原理(Netty服务暴露)

    一、服务发布 - 原理:

    首先看Dubbo日志,截取重要部分:

      1)暴露本地服务

        

    Export dubbo service com.alibaba.dubbo.demo.DemoService to local registry, dubbo version: 2.0.0, current host: 10.165.2.47

      2)暴露远程服务

        

    Export dubbo service com.alibaba.dubbo.demo.DemoService to url dubbo://10.165.2.47:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=10.165.2.47&bind.port=20880&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=3880&qos.port=22222&side=provider&timestamp=1520303067433, dubbo version: 2.0.0, current host: 10.165.2.47
    
    
    Register dubbo service com.alibaba.dubbo.demo.DemoService url dubbo://10.165.2.47:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=10.165.2.47&bind.port=20880&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=3880&qos.port=22222&side=provider&timestamp=1520303067433 to registry registry://224.5.6.7:1234/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&pid=3880&qos.port=22222&registry=multicast&timestamp=1520303067418, dubbo version: 2.0.0, current host: 10.165.2.47

      3)启动Netty

         

    Start NettyServer bind /0.0.0.0:20880, export /10.165.2.47:20880, dubbo version: 2.0.0, current host: 10.165.2.47

      4)打开Zookeeper

        

    INFO zookeeper.ClientCnxn: Opening socket connection to server /192.168.48.117:2181

      5)注册到Zookeeper

        

    Register: dubbo://10.165.2.47:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=3880&side=provider&timestamp=1520303067433, dubbo version: 2.0.0, current host: 10.165.2.47

      6)监听Zookeeper

        

    Subscribe: provider://10.165.2.47:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=3880&side=provider&timestamp=1520303067433, dubbo version: 2.0.0, current host: 10.165.2.47

      7)频繁发送广播包,注册中心利用广播包监听provider健康状况

    [DUBBO] Send broadcast message: subscribe provider://10.165.2.47:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=3880&side=provider&timestamp=1520303067433 to /224.5.6.7:1234, dubbo version: 2.0.0, current host: 10.165.2.47
    
    [DUBBO] Receive multicast message: subscribe provider://10.165.2.47:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=3880&side=provider&timestamp=1520303067433 from /10.165.2.47:1234, dubbo version: 2.0.0, current host: 10.165.2.47

    二、根据原理分析源码

      1)首先看Provider注册文件:

        

    <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
    
    <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>

        可以看到,是通过dubbo的schema service进行注入的,那我们找到DubboNameSpaceHandler,Dubbo命名空间处理器,找到<dubbo:service>标签解析行:

     public void init() {
            registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
            registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
            registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
            registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
            registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
            registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
            registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
            //这里就是<dubbo:service>标签的解析注入入口
            registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
    registerBeanDefinitionParser(
    "reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser()); }

      

      2)上面new DubboBeanDefinitionParser是<dubbo:service>解析注入入口,那ServiceBean.class就是<dubbo:service>标签的发布订阅入口,进入ServiceBean可以看到一个核心方法onApplicationEvent(),其中export()就是发布方法:

       

    public void onApplicationEvent(ContextRefreshedEvent event) {
            if (isDelay() && !isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: " + getInterface());
                }
            //这里是发布方法  
            export();
            }
        }    

      3)发布代码处理路径

      

    ServiceBean.onApplicationEvent
    -->export()
        -->ServiceConfig.export()
            -->doExport()
                -->doExportUrls() //里面有个for循环,代表一个服务可以有多个通信协议,例如tcp、http、dubbo等协议,默认是tcp协议
                    -->loadRegistries(true)   //加载注册信息,组装注册中心url信息,如源码中config.properties中读取dubbo.registry.address = zookeeper://192.168.99.100:32770链接信息,组装Provider注册链接串
                        -->doExportUrlsFor1Protocol(protocolConfig, registryURLs)
                  // export to local if the config is not remote (export to remote only when config is remote)远程暴露
                  -->exportLocal(url) 
                    -->proxyFactory.getInvoker(ref, (Class) interfaceClass, local))
                      -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
                      -->extension.getInvoker(arg0, arg1, arg2)
                        -->StubProxyFactoryWrapper.getInvoker(T proxy, Class<T> type, URL url)
                          -->proxyFactory.getInvoker(proxy, type, url)
                            -->JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
                              -->Wrapper.getWrapper(com.alibaba.dubbo.demo.provider.DemoServiceImpl)
                                -->makeWrapper(Class<?> c)
                              -->return new AbstractProxyInvoker<T>(proxy, type, url)
                  -->protocal.export //本地暴露
                    -->Protocal$Adaptive.export
                      -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocal.class).getExtension("injvm");
                      -->extension.export(arg0)
                        -->ProtocalFilterWrapper.export
                           -->buildInvokerChain //创建8个Filter
                            -->ProtocalListenerWrapper.export
                              -->InjvmProtocal.export
                                -->return new InjvmExporter
                                   -->InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {super(invoker);
                                          this.key = key;
                                      this.exporterMap = exporterMap;
                                      exporterMap.put(key, this); //key=com.alibaba.dubbo.demo.DemoService this=InjvmExporter
                                     }

                  
    // export to local if the config is not remote (export to remote only when config is remote)远程暴露
                  -->proxyFactory.getInvoker()原理和本地暴露一样都是为了获取一个Invoker对象
                    -->protocal.export
                      -->Protocal$Adaptive.ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocal.class).getExtension("register");
                      -->extension.export(arg0)
                        -->ProtocalFilterWrappter.export
                          -->ProtocalListenerWrapper.export
                            -->RegistryProtocal.export
                              -->doLocalExport(originInvoker)
                                -->getCacheKey(originInvoker) //读取dubbo://192.168.100.51:20880/
                                -->protocal.export
                                  -->Protocol$Adaptive.export
                                    -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocl.class).getExtension("dubbo");
                                    -->extension.export(arg0)
                                      -->ProtocolFilterWrapper.export
                                        -->buildInvokerChain//创建8个Filter
                                         -->ProtocalListenerWrapper.export
                    -------1、netty服务暴露的开始-------- -->DubboProtocal.export
                                            -->serviceKey(url) //组装key=com.alibaba.dubbo.demo.DemoService:20880
                                            -->目的:exporterMap.put(key, this); //key=com.alibaba.dubbo.demo.DemoService:20880 this=
                                            -->openServer(url)
                                              -->createServer(url)
                    --------2、信息交换层exchange 开始--------     -->Exchanges.bind(url, requestHandler) //exchanger是一个信息交换层
                                                  -->getExchanger(url)
                                                    -->getExchange(type)
                                                      -->ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension("header")
                                                        -->HeaderExchanger.bind
                                                          -->Transporters.bind(url, new DecoderHandler(new HeaderExchangeHandler(handler)))
                                                            -->new HeaderExchangeHandler(handler) //this.handler = handler
                                                              -->
    new DecoderHandler
                                                                -->new AbstractChannelHandlerDelegate //this.handler = handler;
                    ------------>3、网络传输层 transporter ------------          -->Transporters.bind
                                                                -->getTransporter
                                                                  -->
    ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
                                                                -->Transporter$Adaptive.bind
                                                                    //netty服务暴露
                                                                  -->ExtensionLoader.getExtensionLoader(com.alibaba.remoting.Transporter.class).getExtension("netty")
                                                                  -->extension.bind(arg0, arg1)
                                                                    -->NettyTransporter.bind
                                                                      -->new NettyServer(url, listener)
                                                                        -->AbstractPeer //this.url = url; this.handler = handler;
                                                                        -->AbstractEndpoint //codec timeout=1000 connectTimeout=3000
                                                                        -->AbstractServer //bindAddress accepts=0 idleTimeout=600000
                    ----------->4、打开端口,暴露netty服务 --------------                      -->doOpen()
                                                                          -->设置NioServerSocketChannelFactory boss worker的线程池 线程个数为3
                                                                          -->设置编码handler
                                                                          -->bootstrap.bind(getBindAddress())
                                                                -->new HeaderExchangeServer()
                                                                  -->this.server = NettyServer
                                                                  -->this.heartbeat=600000
                                                                  -->heartbeatTimeout=180000
                                                                  -->startHeatbeatTimer() //这是一个心跳定时器,采用线程池ScheduledExecutePool,如果断开了就心跳重连
                                                                    -->

                                                                    
                                                                   
                       
     
     
      


    说说上述这些类及方法的概念作用:”
      1、proxyFactory:就是为了获取一个接口的代理类,例如获取一个远程接口的代理。
      它有2个方法,代表2个作用
        a、getInvoker:针对server端,将服务对象,例如DemoServiceImpl包装成一个Wrapper对象。
        b、getProxy:针对client端,创建接口的代理对象,例如DemoService的接口。

      2、makeWrapper:它类似spring的BeanWrapper,它就是包装了一个接口或一个类,可以通过Wrapper对实例对象进行赋值取值以及指定方法的调用。

      3、Invoker:它是一个可执行的对象,能够根据方法名称、参数得到相应的执行结果。
        它里面有一个很重要的方法Result invoke(Invocation invocation),Invocation是包含了需要执行的方法和参数的重要信息,目前它只有2个实现类RpcInvocation MockInvocation
        它有3种类型的Invoker
          1、本地执行类的Invoker
          2、远程通信类的Invoker
          3、多个远程通信执行类的Invoker聚合成集群版的Invoker
      4、Protocol:
        1)export暴露远程服务(用语服务端),就是将proxyFactory.getInvoker创建的代理类invoker对象,通过协议暴露给外部。
        2)refer:引用远程服务(用于客户端)
      
      5、Exporter:维护invoker的生命周期

      6、exchanger:信息交换层,封装请求响应模式同步转异步

      7、transporter:网络传输层,用来抽象Netty(dubbo默认)或者Mina的统一接口

       4)暴露本地服务与暴露远程服务的区别是什么?

        a、暴露本地服务:指暴露在同一个JVM里面,不用通过zk来进行远程通信,例如在同一个服务里面(tomcat),自己调用自己的接口,就无需进行网络IP连接通信。

        b、暴露远程服务:指暴露给远程客户端的IP和端口号,通过网络来实现通信。

  • 相关阅读:
    [置顶] Extjs4 异步刷新书的情况下 保持树的展开状态
    控制系统中常用的名词术语
    开环控制系统与闭环控制系统
    反馈控制
    Tensorflow把label转化成one_hot格式
    Tensorflow——tf.nn.max_pool池化操作
    Tensorflow——tf.nn.conv2d卷积操作
    Tensoflow简单神经网络实现非线性拟合
    Tensorflow 中的 placeholder
    Tensorflow 中的constant、Session、placeholde和 Variable简介
  • 原文地址:https://www.cnblogs.com/dbaxyx/p/8513599.html
Copyright © 2011-2022 走看看