zoukankan      html  css  js  c++  java
  • dubbo系列四、dubbo服务暴露过程源码解析

    一、代码准备

    1、示例代码

    参考dubbo系列二、dubbo+zookeeper+dubboadmin分布式服务框架搭建(windows平台)

    2、简单了解下spring自定义标签

    https://www.jianshu.com/p/16b72c10fca8

    Spring自定义标签总共可以分为以下几个步骤
    定义Bean 标签解析生成接收配置的POJO。
    定义schema文件,定义自定义标签的attr属性
    定义解析类parser,遇到自定义标签如何解析。
    定义命名空间处理类namespaceSupport,遇到自定义的命名标签,能够路由到对应的解析类。
    声明schema,写入spring.schema文件中
    声明自定义标签的命名处理类namespaceHandler,写入spring.handlers文件中

    例如dubbo标签:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://code.alibabatech.com/schema/dubbo
            http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    <!--dubbo应用程序命名-->
    <dubbo:application name="dubbo-demo-provider"/>
    <!--dubbo注册地址-->
    <dubbo:registry address="zookeeper://192.168.1.100:2181"/>
    <!--dubbo协议地址-->
    <dubbo:protocol name="dubbo" port="20880"/>
    <!--接口声明-->
    <dubbo:service interface="com.dubbo.demo.api.DemoRpcService" ref="demoRpcService"/>
        <bean id="demoRpcService" class="com.dubbo.demo.DemoRpcServiceImpl"/>
    </beans>

    3、官网说明

    官网:https://dubbo.incubator.apache.org/zh-cn/docs/dev/implementation.html

    初始化过程细节

    解析服务

    基于 dubbo.jar 内的 META-INF/spring.handlers 配置,Spring 在遇到 dubbo 名称空间时,会回调 DubboNamespaceHandler

    所有 dubbo 的标签,都统一用 DubboBeanDefinitionParser 进行解析,基于一对一属性映射,将 XML 标签解析为 Bean 对象。

    在 ServiceConfig.export() 或 ReferenceConfig.get() 初始化时,将 Bean 对象转换 URL 格式,所有 Bean 属性转成 URL 的参数。

    然后将 URL 传给 协议扩展点,基于扩展点的 扩展点自适应机制,根据 URL 的协议头,进行不同协议的服务暴露或引用。

    二、dubbo标签解析

    1、启动服务,断点跟踪

    public static void main(String[] args) throws IOException {
            ClassPathXmlApplicationContext context
                    = new ClassPathXmlApplicationContext("classpath:dubbo-provider.xml");
            context.start();
            // 阻塞当前进程,否则程序会直接停止
            System.in.read();
        }

    spring启动过程中,随着Spring在初始化过程中,碰到dubbo命名的标签,如(<dubbo:service>,<dubbo:registry>)等标签,会由DubboNamespaceHandler类处理,具体原理见链接Spring自定义标签

    DubboNamespaceHandler类源码:

    //
    // Source code recreated from a .class file by IntelliJ IDEA
    // (powered by Fernflower decompiler)
    //
    
    package com.alibaba.dubbo.config.spring.schema;
    。。。import org.springframework.beans.factory.xml.NamespaceHandlerSupport;
    
    public class DubboNamespaceHandler extends NamespaceHandlerSupport {
        public DubboNamespaceHandler() {
        }
    
        public void init() {
         // application标签解析 <dubbo:application name="dubbo-demo-provider"/>
    this.registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); // module标签解析
    this.registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));      // module标签解析
         this.registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));

         this
    .registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
    this.registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); this.registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
         // <dubbo:protocol name="dubbo" port="20880"/>
    this.registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
         // service标签
         // <dubbo:service interface="com.dubbo.demo.api.DemoRpcService" ref="demoRpcService"/>
        // <bean id="demoRpcService" class="com.dubbo.demo.DemoRpcServiceImpl"/>
    // </beans>

    this.registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); this.registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); this.registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); } static { Version.checkDuplicate(DubboNamespaceHandler.class); } }

    遇到不同的标签,会由不同的Parser处理,这里重点看服务发布,这行代码:

    registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));

    也就是说,当Spring容器处理完<dubbo:service>标签后,会在Spring容器中生成一个ServiceBean ,服务的发布也会在ServiceBean中完成。不妨看一下ServiceBean的定义:

    public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
    }

    2、启动入口类

    ServiceBean 同时也是service标签解析之后的bean之一,继承ServiceConfig

    该Bean实现了很多接口,关于InitializingBeanDisposableBeanApplicationContextAwareBeanNameAware,这些接口的使用介绍如下链接: 而在Spring初始化完成Bean的组装,会调用InitializingBeanafterPropertiesSet方法,在Spring容器加载完成,会接收到事件ContextRefreshedEvent,调用ApplicationListeneronApplicationEvent方法。
    afterPropertiesSet中,和onApplicationEvent中,会调用export(),在export()中,会暴露dubbo服务,具体区别在于是否配置了delay属性,是否延迟暴露,如果delay不为null,或者不为-1时,会在afterPropertiesSet中调用export()暴露dubbo服务,如果为null,或者为-1时,会在Spring容器初始化完成,接收到ContextRefreshedEvent事件,调用onApplicationEvent,暴露dubbo服务。
    部分ServiceBean的代码如下:
    public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
     //Spring容器初始化完成,调用
     public void onApplicationEvent(ContextRefreshedEvent event) {
            if (isDelay() && !isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: " + getInterface());
                }
                //暴露服务
                export();
            }
        }
    }

    export(),暴露服务过程中,如果发现有delay属性,则延迟delay时间,暴露服务,如果没有,则直接暴露服务。

    public synchronized void export() {
            //忽略若干行代码
            if (delay != null && delay > 0) {
                //当delay不为null,且大于0时,延迟delay时间,暴露服务
                delayExportExecutor.schedule(new Runnable() {
                    public void run() {
                        //暴露服务
                        doExport();
                    }
                }, delay, TimeUnit.MILLISECONDS);
            } else {
                //直接暴露服务
                doExport();
            }
        }

    而在doExport()中,验证参数,按照不同的Protocol,比如(dubbo,injvm)暴露服务,在不同的zookeeper集群节点上注册自己的服务。

    protected synchronized void doExport() {
             //忽略10000行代码
            doExportUrls();
            //忽略10000行代码
        }
    
     private void doExportUrls() {
            List<URL> registryURLs = loadRegistries(true);
            for (ProtocolConfig protocolConfig : protocols) {
                //按照不同的Protocal暴露服务
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
        }

        // 获取注册中心地址
        protected List<URL> loadRegistries(boolean provider) {
            checkRegistry();
            List<URL> registryList = new ArrayList<URL>();
            // protected List<RegistryConfig> registries;解析后的RegistryConfig中获取地址列表
            if (registries != null && !registries.isEmpty()) {
                for (RegistryConfig config : registries) {
                    String address = config.getAddress();
                    if (address == null || address.length() == 0) {
                        address = Constants.ANYHOST_VALUE;
                    }
                    // 如果地址为空,再次从配置文件中取
                    String sysaddress = System.getProperty("dubbo.registry.address");
                    if (sysaddress != null && sysaddress.length() > 0) {
                        address = sysaddress;
                    }
                    // 如果地址不为空,拼接协议类型、版本信息
                    if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                        Map<String, String> map = new HashMap<String, String>();
                        appendParameters(map, application);
                        appendParameters(map, config);
                        map.put("path", RegistryService.class.getName());
                        map.put("dubbo", Version.getProtocolVersion());
                        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
                        if (ConfigUtils.getPid() > 0) {
                            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
                        }
                        // 默认dubbo协议
                        if (!map.containsKey("protocol")) {
                            if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
                                map.put("protocol", "remote");
                            } else {
                                map.put("protocol", "dubbo");
                            }
                        }
                        // 如果同一个标签配置多个地址,则拆分
                        List<URL> urls = UrlUtils.parseURLs(address, map);
                        for (URL url : urls) {
                            url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                            url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
                            if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                                    || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                                registryList.add(url);
                            }
                        }
                    }
                }
            }
            // 返回格式化后的注册地址
            return registryList;
        }

     3、服务暴露过程

    这里以dubbo协议为例,看一下发布的过程,在发布过程中,会用一个变量map保存URL的所有变量和value值,然后调用代理工程proxyFactory,获取代理类,然后将invoker转换成exporter,暴露服务,具体如下:

     protocol://host:port/path?key=value&key=value
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
            //如果协议类型为null,则默认为dubbo协议
            String name = protocolConfig.getName();
            if (name == null || name.length() == 0) {
                name = "dubbo";
            }
            //map是保存url中key-Value的值
            Map<String, String> map = new HashMap<String, String>();
            //URL中的side属性,有两个值,一个provider,一个consumer,暴露服务的时候为provider
            map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
            //dubbo的版本号  url中的dubbo
            map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
           //url中的timestamp
            map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
            //url中的pid
            if (ConfigUtils.getPid() > 0) {
                map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
            }
            //从其他参数中获取参数
            appendParameters(map, application);
            appendParameters(map, module);
            appendParameters(map, provider, Constants.DEFAULT_KEY);
            appendParameters(map, protocolConfig);
            appendParameters(map, this);
            //忽略若干代码
            
            if (ProtocolUtils.isGeneric(generic)) {
                map.put("generic", generic);
                map.put("methods", Constants.ANY_VALUE);
            } else {
                //url中的revesion字段
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put("revision", revision);
                }
                //拼接URL中的methods
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if (methods.length == 0) {
                    logger.warn("NO method found in service interface " + interfaceClass.getName());
                    map.put("methods", Constants.ANY_VALUE);
                } else {
                    map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
                }
            }
            //token 临牌校验
            if (!ConfigUtils.isEmpty(token)) {
                if (ConfigUtils.isDefault(token)) {
                    map.put("token", UUID.randomUUID().toString());
                } else {
                    map.put("token", token);
                }
            }
            //injvm协议
            if ("injvm".equals(protocolConfig.getName())) {
                protocolConfig.setRegister(false);
                map.put("notify", "false");
            }
            //获取上下文路径
            String contextPath = protocolConfig.getContextpath();
            if ((contextPath == null || contextPath.length() == 0) && provider != null) {
                contextPath = provider.getContextpath();
            }
            //获取主机名
            String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
            //获取端口
            Integer port = this.findConfigedPorts(protocolConfig, name, map);
            //组装URL,将map中的协议,版本号信息等
            URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
             //如果url使用的协议存在扩展,调用对应的扩展来修改原url
            if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .hasExtension(url.getProtocol())) {
                url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                        .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
            }
           
            String scope = url.getParameter(Constants.SCOPE_KEY);
             //配置为none不暴露
            if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
                if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                    //如果不是remote,则暴露本地服务
                    exportLocal(url);
                }
                 //如果配置不是local则暴露为远程服务
                if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                   //忽略日志 如果注册中心地址不为null
                    if (registryURLs != null && registryURLs.size() > 0) {
                        for (URL registryURL : registryURLs) {
                            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                          //忽略不相干的代码  
                          // 通过代理工厂将ref对象转化成invoker对象
                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                            //代理invoker对象
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                            // 暴露服务
                            Exporter<?> exporter = protocol.export(wrapperInvoker);
                             //一个服务可能有多个提供者,保存在一起
                            exporters.add(exporter);
                        }
                    } else {
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                }
            }
            this.urls.add(url);
        }

    doExportUrlsFor1Protocol代码再简化一下,如下:

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
         Map map=builderUrl();
         // 通过代理工厂将ref对象转化成invoker对象
         Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
         //代理invoker对象
         DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
         // 暴露服务
         Exporter<?> exporter = protocol.export(wrapperInvoker);
         //一个服务可能有多个提供者,保存在一起
         exporters.add(exporter);
    }
    

    拼接后的url:

    dubbo://192.168.1.100:20880/com.dubbo.demo.api.DemoRpcService?anyhost=true&application=dubbo-demo-provider&bind.ip=192.168.1.100&bi

    nd.port=20880&dubbo=2.6.0&generic=false&interface=com.dubbo.demo.api.DemoRpcService&methods=getUserName&pid=18740&side=provider&timestamp=1538311737815

     

    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

    而在上面proxyFactory.getInvoker中,很显然是获取到的是接口的代理类。

    而在 protocol.export(wrapperInvoker)中,将服务暴露出去。

    代码如下:

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
            //忽略若干代码
            //打开服务
            openServer(url);
            optimizeSerialization(url);
            return exporter;
        }
    private void openServer(URL url) {
           
            String key = url.getAddress();
            boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
             //是否server端
            if (isServer) {
                ExchangeServer server = serverMap.get(key);
                if (server == null) {
                    //如果服务不存在,创建服务
                    serverMap.put(key, createServer(url));
                } else {
                    server.reset(url);
                }
            }
        }
    private ExchangeServer createServer(URL url) {
            //忽略若干代码
            ExchangeServer server;
            try {
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
            return server;
        }

    而在headerExchangerbind中,调用了Transporters.bind(),一直调用到NettyServer,绑定了端口和链接。

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    
    //Transporters.bind
    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
            //忽略很多代码
            return getTransporter().bind(url, handler);
        }
    //上段代码的getTransporter()
     public static Transporter getTransporter() {
            return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
        }

    而在Transporter的定义中看到下面代码:

    @SPI("netty")
    public interface Transporter {
        @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
        Server bind(URL url, ChannelHandler handler) throws RemotingException;
        @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
        Client connect(URL url, ChannelHandler handler) throws RemotingException;
    }
    

    所以这里调用的是NettyTransporter,这里启动了一个新的NettyServer

    public class NettyTransporter implements Transporter {
    
        public static final String NAME = "netty4";
    
        public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyServer(url, listener);
        }
    
        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyClient(url, listener);
        }
    }

    在NettyServer的构造方法中,调用了父类的构造方法,调用了doOpen()方法指定了端口

    public class NettyServer extends AbstractServer implements Server {
      public NettyServer(URL url, ChannelHandler handler) throws   RemotingException {
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
    }
    
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
           super(url, handler);
           //忽略很多代码  
          doOpen();
           //忽略很多代码
    }
    
    
     @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
    
            bootstrap = new ServerBootstrap();
    
            bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
            workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                    new DefaultThreadFactory("NettyServerWorker", true));
    
            final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
            channels = nettyServerHandler.getChannels();
    
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                    .addLast("decoder", adapter.getDecoder())
                                    .addLast("encoder", adapter.getEncoder())
                                    .addLast("handler", nettyServerHandler);
                        }
                    });
            // bind
            ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
            channelFuture.syncUninterruptibly();
            channel = channelFuture.channel();
    
        }

    到这里dubbo服务就启动了,但是有一点还是有疑惑,那么,dubbo服务什么时候注册到注册中心的?带着疑惑看了一下官方文档。

    也就是说,在调用DubboProtocol暴露服务之前,回去调用拦截器,当发现是regiester,则去注册中心注册服务。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //如果是registerProtocol,则调用RegisterProtocol.export方法
            if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
                return protocol.export(invoker);
            }
            return new ListenerExporterWrapper<T>(protocol.export(invoker),
                    Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                            .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
        }

    而在RegisterProtocol.export

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            //export invoker
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    
            URL registryUrl = getRegistryUrl(originInvoker);
    
            //根据SPI机制获取具体的Registry实例,这里获取到的是ZookeeperRegistry
            final Registry registry = getRegistry(originInvoker);
            final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
            boolean register = registedProviderUrl.getParameter("register", true);
            if (register) {
                //在这里注册服务
                register(registryUrl, registedProviderUrl);
               //忽略很多代码
            }
            //忽略很多代码
        }
    
        public void register(URL registryUrl, URL registedProviderUrl) {
            Registry registry = registryFactory.getRegistry(registryUrl);
            registry.register(registedProviderUrl);
        }

    ZookeeperRegistry继承父类FailbackRegistry,在父类的register方法中,调用了 doRegister,doRegister中,创建了ZK节点,这样就将自己的服务暴露到注册中心zk上:

    @Override
        public void register(URL url) {
                 //忽略很多代码
                doRegister(url);
                //忽略很多代码
        }
    
     protected void doRegister(URL url) {
            try {
                zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }

    这样,整个dubbo服务就启动了。再回头看官方文档上的说明,就很清楚了。

    三、类调用关系

    1、provider提供方

    ClassPathXmlApplicationContext <init>(构造方法)
    -> ClassPathXmlApplicationContext refresh()
    -> ClassPathXmlApplicationContext finishRefresh()
    -> AbstractApplicationContext publishEvent()
    -> ServiceBean onApplicationEvent()
    -> ServiceConfig doExport()
    #构造dubbo对象 application provider module protocol registry service reference consume等
     
    -> ServiceConfig doExportUrls #导出URL,获取注册中心RegistryConfig
    #注册中心:registry://10.199.101.228:2181/com.alibaba.dubbo.registry.RegistryService?application=demo&backup=10.199.101.227:2181,10.199.101.229:2181&dubbo=2.4.9&pid=8045&registry=zookeeper&timestamp=1491546077803
     
    -> ServiceConfig doExportUrlsFor1Protocol()
    #需要暴露 dubbo://10.199.66.242:20880/com.unj.dubbotest.provider.DemoService?anyhost=true&application=dubbo_demo_provider&dubbo=2.4.9&interface=com.unj.dubbotest.provider.DemoService&methods=sayHello,getUsers&pid=8045&revision=0.0.1&side=provider&timestamp=1491546674441&version=0.0.1
     
    -> ServiceConfig exportLocal()
    -> Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
    #暴露Invoker<XxxService>调用服务代理类
     
    -> proxyFactory.getInvoker(ref, (Class) interfaceClass, local)
    #返回 AbstractProxyInvoker代理ProxyInvoker<XxxService>
    public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
    private final T proxy; //代理目标实例 XxxServiceImpl
    private final Class<T> type;
    private final URL url;
    }
    -> InvokerInvocationHandler.invoke()
    #invoker.invoke(new RpcInvocation(method, args)).recreate();
     
    -> DubboProtocol export(Invoker<T> invoker)
    # 返回暴露Exporter<T>
    public class DubboExporter<T> extends AbstractExporter<T> {
    private final String key; //com.unj.dubbotest.provider.DemoService:0.0.1:20880
    private final Map<String, Exporter<?>> exporterMap;
    public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap){
    super(invoker);
    this.key = key;
    this.exporterMap = exporterMap;
    }
     
    -> DubboProtocol openServer(url)
    #url dubbo://10.199.66.242:20880/com.unj.dubbotest.provider.DemoService?anyhost=true&application=dubbo_demo&dubbo=2.4.9&interface=com.unj.dubbotest.provider.DemoService&methods=sayHello,getUsers&pid=8045&revision=0.0.1&side=provider&timestamp=1491546674441&version=0.0.
    #serverMap.put(key, createServer(url)); key:10.199.66.242:20880 value:ExchangeServer
     
    -> DubboProtocol createServer(URL url)
    #返回HeaderExchangeServer,添加参数列表 如心跳,心跳时间
    -> Exchangers.bind(url, requestHandler);
    #返回HeaderExchangeServer,getTransporter()获取的实例来源于配置,默认返回一个NettyTransporter
    -> HeaderExchangeServer.bind(URL url, ExchangeHandler handler);
     
    -> HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    #HeaderExchangeServer包装实例NettyServer
     
    -> NettyTransporter.bind(URL url, ChannelHandler listener)
    #return new NettyServer(url, listener)
     
    -> NettyServer.doOpen();
    #打开socket监听端口准备接收消息
    #ServerBootstrap bind(getBindAddress())绑定地址端口
    #RpcInvocation 具体类名、方法名、调用参数
    #DubboInvoker – 执行具体的远程调用,包含初始化信息如client
    #Protocol – 服务地址的发布和订阅
    #Exporter – 暴露服务的引用,或取消暴露

    2、consume(消费方)

    ->ReferenceConfig.init
    #consume端启动初始化
    ->DubboProtocol.refer
    #根据参数url,接口等构建Invoker
    ->JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] interfaces)
    #构建代理对象Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
     
    ->DemoService.say(String hello);#真正调用时候
    ->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
    #invoker.invoke(new RpcInvocation(method, args)).recreate();RpcInvocation包装参数方法名
    ->DubboInvoker.doInovke(final Invocation invocation)
    ->MockClusterInvoker是否Mock
    ->FailfastClusterInvoker.invoke
    ->RegistryDirectory的methodInvokerMap 获取invock列表
    ->Loadbalance RegistryDirectory.doList(Invocation)负载均衡
     
    #统一代理调用
    ->ExchangeClient.send(invocation, isSent);
    ->HeaderExchangeChannel.request(Object request, int timeout)
    ->NettyChannel.send(Object message, boolean sent) 
     
     

    3、dubbo 底层通讯

    NettyClient <-- 异步NIO传输 socket监听-> NettyServer
     

    4、consume --> provider 调用过程

     
    -> NettyServer->NettyHandler.messageReceived #接收消息处理器
    -> MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$requestHandler
    #NettyServer启动时候绑定MultiMessageHandler
    #DubboProtocol.getServers() 检索serverMap获取Exporter<?>
    #DubboProtocol.getServers() 检索serverMap获取ExchangeServer
    -> ExchangeHandlerAdapter.reply
    #真正获取Invoker,将传入message 转换 invocation
    -> invoker.invoke(invocation)
    -> JavassistProxyFactory$AbstractProxyInvoker.doInvoke
    #服务端Invoker代理 AbstractProxyInvoker调用目标引用service,客户端DubboInvoker
  • 相关阅读:
    Springboot2.0之HikariCP 连接池
    Spring Kafka中关于Kafka的配置参数
    Spring @Async异步线程池 导致OOM报错的原因
    JDK 8 函数式编程入门
    Spring自定义argumentResolver参数解析器
    Kafka消费异常处理
    Java项目生成可执行jar包、exe文件以及在Windows下的安装文件
    Mysql索引研究总结
    windows安装zookeeper
    JVM中的堆和栈
  • 原文地址:https://www.cnblogs.com/wangzhuxing/p/9733339.html
Copyright © 2011-2022 走看看