zoukankan      html  css  js  c++  java
  • dubbo学习(四)服务导出

    概述

    配置 dubbo

    我们在使用dubbo使用的时候,首先在 resources 下创建 dubbo-consumer.xmldubbo-provider.xml 就可以了,例如我们看一下 dubbo-provider.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
           xmlns="http://www.springframework.org/schema/beans"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
    	http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    	<dubbo:provider protocol="dubbo" port="-1"/>
    	<!--<dubbo:service interface="cn.healthmall.order.service.AppMsgPushUpdateService" ref="appMsgPushUpdateService" version="1.0.0" retries="0"/>-->
    
    
    </beans>
    

    provider 服务接口定义如下 :

    public interface DemoService {
    
        String sayHello(String name);
    
        default CompletableFuture<String> sayHelloAsync(String name) {
            return CompletableFuture.completedFuture(sayHello(name));
        }
    
    }
    
    

    我们这一节了解一下 dubbo是如何将bean 导出的。

    源码分析

    DubboBootrap 是如何被嵌入到 Springboot 中启动的呢 ?

    DubboBootrap 是启动类,它包含了Dubbo的启动,我们从名字也可以看得出,那么在 Spring boot 是如何Dubbo是如何启动的呢?我们从springboot 流程中可以知道 Springboot 启动中允许用户根据需求扩展 Listener ,达到监听 Springboot 事件的作用,我们看一下这个类 :

    1297993-20200408150655466-1563553660.png

    主要是这个类,我们在讲 Springboot 启动流程中的时候讲到了 Springboot 会广播例如像刷新 context 一样的事件,而 dubbo 生成自己的监听器(Listener)来开启 Dubbo 启动的逻辑,具体的代码 :

    public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
            implements Ordered {
    
        private void onContextRefreshedEvent(ContextRefreshedEvent event) {
            dubboBootstrap.start();
        }     
    
    }
    
    

    接下里就是 DubboBootstrap 中的 start 方法方法

       /**
         * Start the bootstrap
         */
        public DubboBootstrap start() {
            if (started.compareAndSet(false, true)) {
                initialize();
                if (logger.isInfoEnabled()) {
                    logger.info(NAME + " is starting...");
                }
                // 导出服务
                // 1. export Dubbo Services
                exportServices();
    
                // Not only provider register
                if (!isOnlyRegisterProvider() || hasExportedServices()) {
                    // 2. export MetadataService 
                    exportMetadataService();
                    //3. Register the local ServiceInstance if required
                    registerServiceInstance();
                }
                // 
                referServices();
    
                if (logger.isInfoEnabled()) {
                    logger.info(NAME + " has started.");
                }
            }
            return this;
        }
    
    

    我们看一下 exportServices 方法,这个方法,先是 configManager 中获取多个 serviceBean ,而 serviceBean 又是如何来的呢? 是从;

        private void exportServices() {
            configManager.getServices().forEach(sc -> {
                // TODO, compatible with ServiceConfig.export()
                ServiceConfig serviceConfig = (ServiceConfig) sc;
                serviceConfig.setBootstrap(this);
    
                if (exportAsync) {
                    ExecutorService executor = executorRepository.getServiceExporterExecutor();
                    Future<?> future = executor.submit(() -> {
                        sc.export();
                    });
                    asyncExportingFutures.add(future);
                } else {
                    
                    sc.export();
                    exportedServices.add(sc);
                }
            });
        }
    

    serviceBean 就是 service bean 的抽象,类结构图

    1297993-20200408155542835-1487373101.png

    serviceBean 继承自 ServiceConfig ,ServiceConfig 本身带了一个 ScheduledExecutorService 用于某个 bean 延迟启动 (这个感觉类的对象消耗挺大的,因为每个 service bean 都带有一个 ScheduledExecutorService 字段,而服务一般都会有多个),我们在代码 sc.export() 这句就会看到具体的实现。

        //ServiceConfig类方法
        public synchronized void export() {
            if (!shouldExport()) {
                return;
            }
    
            if (bootstrap == null) {
                bootstrap = DubboBootstrap.getInstance();
                bootstrap.init();
            }
    
            checkAndUpdateSubConfigs();
    
            //init serviceMetadata
            serviceMetadata.setVersion(version);
            serviceMetadata.setGroup(group);
            serviceMetadata.setDefaultGroup(group);
            serviceMetadata.setServiceType(getInterfaceClass());
            serviceMetadata.setServiceInterfaceName(getInterface());
            serviceMetadata.setTarget(getRef());
    
            //延迟导出
            if (shouldDelay()) {
                DELAY_EXPORT_EXECUTOR.schedule(this::
                        doExport, getDelay(), TimeUnit.MILLISECONDS);
            } else {
                //最终调用这个方法
                doExport();
            }
        }
    
    
        //ServiceConfig类方法
        protected synchronized void doExport() {
            if (unexported) {
                throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
            }
            if (exported) {
                return;
            }
            exported = true;
    
            if (StringUtils.isEmpty(path)) {
                path = interfaceName;
            }
            //
            doExportUrls();
    
            // dispatch a ServiceConfigExportedEvent since 2.7.4
            dispatch(new ServiceConfigExportedEvent(this));
        } 
    
        private void doExportUrls() {
            ServiceRepository repository = ApplicationModel.getServiceRepository();
            ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
            repository.registerProvider(
                    getUniqueServiceName(),
                    ref,
                    serviceDescriptor,
                    this,
                    serviceMetadata
            );
    
            List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
    
            for (ProtocolConfig protocolConfig : protocols) {
                String pathKey = URL.buildKey(getContextPath(protocolConfig)
                        .map(p -> p + "/" + path)
                        .orElse(path), group, version);
                // In case user specified path, register service one more time to map it to path.
                repository.registerService(pathKey, interfaceClass);
                // TODO, uncomment this line once service key is unified
                serviceMetadata.setServiceKey(pathKey);
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
        }
    
    
    

    通过调试, registryURLs 一个例子例如这样 :

    registryURLs : registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&pid=5300&qos.port=22222&registry=zookeeper&timestamp=1586333795545
    

    可以知道前面的逻辑是获取 protocol 信息,获取知道 某个 service 是通过什么 protocol 进行传输的,于是进入了这个方法 :

        private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
            String name = protocolConfig.getName();
            if (StringUtils.isEmpty(name)) {
                name = DUBBO;
            }
    
            Map<String, String> map = new HashMap<String, String>();
      
            //这一段设置了一堆信息到 map 中,设置完的map 的信息如下图 
            .....
    
            //init serviceMetadata attachments
            serviceMetadata.getAttachments().putAll(map);
    
            // 下面开始导出服务了!!!核心逻辑
            // export service
            String host = findConfigedHosts(protocolConfig, registryURLs, map);
            Integer port = findConfigedPorts(protocolConfig, name, map);
            URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
    
            // You can customize Configurator to append extra parameters
            if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .hasExtension(url.getProtocol())) {
                url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                        .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
            }
    
            String scope = url.getParameter(SCOPE_KEY);
            // don't export when none is configured
            if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
    
                // export to local if the config is not remote (export to remote only when config is remote)
                if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                    exportLocal(url);
                }
                // export to remote if the config is not local (export to local only when config is local)
                if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                    if (CollectionUtils.isNotEmpty(registryURLs)) {
                        for (URL registryURL : registryURLs) {
    
                            ....
    
                            //注意这里!!使用 ProxyFactory 来生成一个 Invoker  (相当与 spring 中的 AOP 实现)
                            Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
    
                            //生成包装类
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                            //交给各个协议调用 export 方法了
                            Exporter<?> exporter = protocol.export(wrapperInvoker);
                            exporters.add(exporter);
                        }
                    } else {
                        if (logger.isInfoEnabled()) {
                            logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                        }
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                    /**
                     * @since 2.7.0
                     * ServiceData Store
                     */
                    WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
                    if (metadataService != null) {
                        metadataService.publishServiceDefinition(url);
                    }
                }
            }
            this.urls.add(url);
        }
    
    

    很长,但是我们看到核心的逻辑如下 :

    • 封装 map信息
    • PROXY_FACTORY 生成 invoker
    • 生成包装类
    • 调用 protocol 的 export 方法

    ok, 到了 protocol 的 export 方法,protocol 是个接口,我们从下图可以看到到最后注册在 zk 中,实际上这个 protocol 对象被包装了多层,涉及到的类如下

    1297993-20200408173249859-513091.png

    从上至下依次为 :

    • ProtocolListenerWrapper
    • ProtocolFilterWrapper
    • QosProtocolWrapper
    • RegistryProtocol

    我们从名字也大概可以猜到各个类的作用,我们来看一下 RegistryProtocol 这个类的 exprot 作用,从名字可以知道该类为了服务 provider 注册到注册中心去的。

    
        @Override
        public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            URL registryUrl = getRegistryUrl(originInvoker);
            // url to export locally
            URL providerUrl = getProviderUrl(originInvoker);
    
            // Subscribe the override data
            // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
            //  the same service. Because the subscribed is cached key with the name of the service, it causes the
            //  subscription information to cover.
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
            providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
            //export invoker
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    
            // url to registry
            final Registry registry = getRegistry(originInvoker);
            final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
            // decide if we need to delay publish
            boolean register = providerUrl.getParameter(REGISTER_KEY, true);
            if (register) {
                //注册的逻辑在这
                register(registryUrl, registeredProviderUrl);
            }
    
            // Deprecated! Subscribe to override rules in 2.6.x or before.
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    
            exporter.setRegisterUrl(registeredProviderUrl);
            exporter.setSubscribeUrl(overrideSubscribeUrl);
            //Ensure that a new exporter instance is returned every time export
            return new DestroyableExporter<>(exporter);
        }
    
        public void register(URL registryUrl, URL registeredProviderUrl) {
            Registry registry = registryFactory.getRegistry(registryUrl);
            registry.register(registeredProviderUrl);
    
            ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
            model.addStatedUrl(new ProviderModel.RegisterStatedURL(
                    registeredProviderUrl,
                    registryUrl,
                    true
            ));
        }
    
    

    而 Register 是个接口它和 protocol 类一样,被包装了多层,我们从上面的调试图片也可以看到,在FailbackRegistry 类中的 register 方法中,

        @Override
        public void register(URL url) {
            if (!acceptable(url)) {
                logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
                return;
            }
            super.register(url);
            removeFailedRegistered(url);
            removeFailedUnregistered(url);
            try {
                // Sending a registration request to the server side
                // 注册方法!!
                doRegister(url);
            } catch (Exception e) {
                ...
            }      
            ...
        }
    
    

    最终到了 ZookeeperRegistry 这个类的注册方法,很好理解,利用 zkClient 完成节点注册。

        @Override
        public void doRegister(URL url) {
            try {
                zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    
    

    至此,我们完成了服务的导出整个流程。

    补充

    ProxyFactory 是什么

    提外话看一下这个 ProxyFactory 到底是什么

    /**
     * ProxyFactory. (API/SPI, Singleton, ThreadSafe) 单例,线程安全的
     */
    @SPI("javassist")
    public interface ProxyFactory {
    
    
        @Adaptive({PROXY_KEY})
        <T> T getProxy(Invoker<T> invoker) throws RpcException;
    
    
        @Adaptive({PROXY_KEY})
        <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
    
    
        @Adaptive({PROXY_KEY})
        <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
    
    }
    
    
    //例如一个实现,利用反射返回对象
    public class JdkProxyFactory extends AbstractProxyFactory {
    
        @Override
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
        }
    
        @Override
        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                    return method.invoke(proxy, arguments);
                }
            };
        }
    
    }
    
    
    

    protocol 接口是如何被包装的呢?

    上面我们看到 protocol 调用 export 方法的时候实际上是一个 protocol&Adaptive 的包装类,我们了解一下这个包装类是如何形成的呢?在上一篇的 SPI 机制有讲到,可以前往查看。

    参考资料

    • http://dubbo.apache.org/zh-cn/docs/source_code_guide/dubbo-spi.html
  • 相关阅读:
    meta标签设置(移动端)
    清除浮动
    响应式设计
    堆和堆排序
    O(n^2)以及O(nlogn)时间复杂度的排序算法
    求数组的最大连续子数组和
    HTTP缓存原理
    将两个有序数组合并为一个有序数组
    如何实现居中对齐
    查找字符串中出现最多的字符
  • 原文地址:https://www.cnblogs.com/Benjious/p/12666826.html
Copyright © 2011-2022 走看看