zoukankan      html  css  js  c++  java
  • 第四章:(4)原理之 Dubbo 服务暴露流程

    一、服务暴露流程

      服务暴露时,是通过 <dubbo:service> 标签或者 @Service 注解来做到的,下面就解析一下服务暴露流程。

      1、ServiceBean 类,我们知道 ServiceBean 是用来解析 service 标签的,看一下这个类做了什么。

    public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, 
    ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {}

        这个类实现了众多的接口,核心关注 InitializingBeanApplicationListener<ContextRefreshedEvent> 接口。

        重写了 afterPropertiesSet() 方法,把之前配置的信息都保存起来。

        @Override
        @SuppressWarnings({"unchecked", "deprecation"})
        public void afterPropertiesSet() throws Exception {
            if (getProvider() == null) {
                Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
                if (providerConfigMap != null && providerConfigMap.size() > 0) {
                    Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
                    if ((protocolConfigMap == null || protocolConfigMap.size() == 0)
                            && providerConfigMap.size() > 1) { // backward compatibility
                        List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
                        for (ProviderConfig config : providerConfigMap.values()) {
                            if (config.isDefault() != null && config.isDefault().booleanValue()) {
                                providerConfigs.add(config);
                            }
                        }
                        if (!providerConfigs.isEmpty()) {
                            setProviders(providerConfigs);
                        }
                    } else {
                        ProviderConfig providerConfig = null;
                        for (ProviderConfig config : providerConfigMap.values()) {
                            if (config.isDefault() == null || config.isDefault().booleanValue()) {
                                if (providerConfig != null) {
                                    throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);
                                }
                                providerConfig = config;
                            }
                        }
                        if (providerConfig != null) {
                            setProvider(providerConfig);
                        }
                    }
                }
            }
            if (getApplication() == null
                    && (getProvider() == null || getProvider().getApplication() == null)) {
                Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
                if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
                    ApplicationConfig applicationConfig = null;
                    for (ApplicationConfig config : applicationConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (applicationConfig != null) {
                                throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
                            }
                            applicationConfig = config;
                        }
                    }
                    if (applicationConfig != null) {
                        setApplication(applicationConfig);
                    }
                }
            }
            if (getModule() == null
                    && (getProvider() == null || getProvider().getModule() == null)) {
                Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
                if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
                    ModuleConfig moduleConfig = null;
                    for (ModuleConfig config : moduleConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (moduleConfig != null) {
                                throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
                            }
                            moduleConfig = config;
                        }
                    }
                    if (moduleConfig != null) {
                        setModule(moduleConfig);
                    }
                }
            }
            if ((getRegistries() == null || getRegistries().isEmpty())
                    && (getProvider() == null || getProvider().getRegistries() == null || getProvider().getRegistries().isEmpty())
                    && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().isEmpty())) {
                Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
                if (registryConfigMap != null && registryConfigMap.size() > 0) {
                    List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
                    for (RegistryConfig config : registryConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            registryConfigs.add(config);
                        }
                    }
                    if (registryConfigs != null && !registryConfigs.isEmpty()) {
                        super.setRegistries(registryConfigs);
                    }
                }
            }
            if (getMonitor() == null
                    && (getProvider() == null || getProvider().getMonitor() == null)
                    && (getApplication() == null || getApplication().getMonitor() == null)) {
                Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
                if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
                    MonitorConfig monitorConfig = null;
                    for (MonitorConfig config : monitorConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (monitorConfig != null) {
                                throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
                            }
                            monitorConfig = config;
                        }
                    }
                    if (monitorConfig != null) {
                        setMonitor(monitorConfig);
                    }
                }
            }
            if ((getProtocols() == null || getProtocols().isEmpty())
                    && (getProvider() == null || getProvider().getProtocols() == null || getProvider().getProtocols().isEmpty())) {
                Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
                if (protocolConfigMap != null && protocolConfigMap.size() > 0) {
                    List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
                    for (ProtocolConfig config : protocolConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            protocolConfigs.add(config);
                        }
                    }
                    if (protocolConfigs != null && !protocolConfigs.isEmpty()) {
                        super.setProtocols(protocolConfigs);
                    }
                }
            }
            if (getPath() == null || getPath().length() == 0) {
                if (beanName != null && beanName.length() > 0
                        && getInterface() != null && getInterface().length() > 0
                        && beanName.startsWith(getInterface())) {
                    setPath(beanName);
                }
            }
            if (!isDelay()) {
                export();
            }
        }

        重写了 onApplicationEvent() 方法,在IOC容器刷新完毕后,暴露服务

        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            if (isDelay() && !isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: " + getInterface());
                }
                //暴露服务
                export();
            }
        }

      2、会调用到 ServiceConfig 的 export

        @Parameter(excluded = true)
        public boolean isUnexported() {
            return unexported;
        }
    
        public synchronized void export() {
            if (provider != null) {
                if (export == null) {
                    export = provider.getExport();
                }
                if (delay == null) {
                    delay = provider.getDelay();
                }
            }
            if (export != null && !export) {
                return;
            }
    
            if (delay != null && delay > 0) {
                delayExportExecutor.schedule(new Runnable() {
                    @Override
                    public void run() {
                        doExport();
                    }
                }, delay, TimeUnit.MILLISECONDS);
            } else {
                doExport(); //执行暴露
            }
        }

      3、执行暴露

        protected synchronized void doExport() {
            if (unexported) {
                throw new IllegalStateException("Already unexported!");
            }
            if (exported) {
                return;
            }
            exported = true;
            if (interfaceName == null || interfaceName.length() == 0) {
                throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
            }
            checkDefault();
            ...
            checkApplication();
            checkRegistry();
            checkProtocol();
            appendProperties(this);
            checkStubAndMock(interfaceClass);
            if (path == null || path.length() == 0) {
                path = interfaceName;
            }
            doExportUrls();  //执行暴露url地址
            ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
            ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
        }

      4、doExportUrls:暴露url地址

        private void doExportUrls() {
            List<URL> registryURLs = loadRegistries(true);  //读取注册中心地址
            for (ProtocolConfig protocolConfig : protocols) {
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
        }

      5、doExportUrlsFor1Protocol() 方法

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
            name = "dubbo";
        }
    
        Map<String, String> map = new HashMap<String, String>();
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        ...
    
        String scope = url.getParameter(Constants.SCOPE_KEY);
        // don't export when none is configured
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
    
            // export to local if the config is not remote (export to remote only when config is remote)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && !registryURLs.isEmpty()) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        //通过代理工厂,获取 ref和interfaceClass 的执行者(包含了url,执行哪个对象的哪个方法)
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        //又包装了执行者
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                        //通过 protocol 来暴露执行者
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

      6、Protocol

        基于 Java 的 SPI 机制,得到需要的 Protocol

    private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

        由于使用的是 Dubbo,会使用到 DubboProtocol,也会注册到注册中心,也会使用到 RegistryProtocol。

      7、根据Debug,会来到 RegistryProtocol 的 export() 方法

        @Override
        public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            //export invoker
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);  //暴露执行者
    
            URL registryUrl = getRegistryUrl(originInvoker);
    
            //registry provider
            final Registry registry = getRegistry(originInvoker);
            final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    
            //to judge to delay publish whether or not
            boolean register = registedProviderUrl.getParameter("register", true);
    
            ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); //注册进提供消费者注册表
    
            if (register) {
                register(registryUrl, registedProviderUrl);
                ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
            }
    
            // Subscribe the override data
            // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
            //Ensure that a new exporter instance is returned every time export
            return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
        }

      8、doLocalExport(originInvoker):本地暴露

        ① doLocalExport

        private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
            String key = getCacheKey(originInvoker);
            ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                synchronized (bounds) {
                    exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                    if (exporter == null) {
                        //暴露的执行者
                        final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                        exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                        bounds.put(key, exporter);
                    }
                }
            }
            return exporter;
        }

        ② 会执行到 DubboProtocol 的 export 方法

        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
    
            // export service.
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
    
            //export an stub service for dispatching event
            Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
            Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
            if (isStubSupportEvent && !isCallbackservice) {
                String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
                if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                                "], has set stubproxy support event ,but no stub methods founded."));
                    }
                } else {
                    stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
                }
            }
    
            openServer(url); //打开服务器,并监听对应的端口
            optimizeSerialization(url);
            return exporter;
        }

        ③ openServer(url)

        private void openServer(URL url) {
            // find server.
            String key = url.getAddress();
            //client can export a service which's only for server to invoke
            boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
            if (isServer) {
                ExchangeServer server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url)); //创建服务器
                } else {
                    // server supports reset, use together with override
                    server.reset(url);
                }
            }
        }

        ④ createServer 创建服务器

        private ExchangeServer createServer(URL url) {
            // send readonly event when server closes, it's enabled by default
            url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
            // enable heartbeat by default
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
            String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
                throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
            url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
            ExchangeServer server;
            try {
                server = Exchangers.bind(url, requestHandler); //绑定服务器,并传入请求处理器
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
            str = url.getParameter(Constants.CLIENT_KEY);
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }
            return server;
        }

        ⑤ bind 服务器

        public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handler == null) {
                throw new IllegalArgumentException("handler == null");
            }
            url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
            return getExchanger(url).bind(url, handler);
        }
        
        @Override
        public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
        
        public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handlers == null || handlers.length == 0) {
                throw new IllegalArgumentException("handlers == null");
            }
            ChannelHandler handler;
            if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                handler = new ChannelHandlerDispatcher(handlers);
            }
            return getTransporter().bind(url, handler);
        }
        
        @Override
        public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyServer(url, listener);
        }

         可以看到,最中开启一个 Netty 服务器来使用。

      9、registerProvider:注册提供者

    public class ProviderConsumerRegTable {
    //保存了每一个 url 和对应的执行器的信息(执行器中有服务真正的执行的对象)
    public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>(); public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>(); public static void registerProvider(Invoker invoker, URL registryUrl, URL providerUrl) { ProviderInvokerWrapper wrapperInvoker = new ProviderInvokerWrapper(invoker, registryUrl, providerUrl); String serviceUniqueName = providerUrl.getServiceKey(); Set<ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName); if (invokers == null) { providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ProviderInvokerWrapper>()); invokers = providerInvokers.get(serviceUniqueName); } invokers.add(wrapperInvoker); }

     

    二、时序图

        

     
  • 相关阅读:
    STM32F10x_ADC三通道逐次转换(单次、单通道软件触发)
    STM32F10x_RTC日历
    STM32F4_TIM输入波形捕获(脉冲频率)
    详解 C 语言开发五子棋游戏以及游戏中的重要算法与思路
    平安银行 深度解析梧州模式 或许是国内医药分开最好的模板!
    屏蔽双绞线和非屏蔽双绞线之间的区别
    如何刷新本地的DNS缓存?
    无线网络发射和接收的物理原理!
    wifi基本原理
    大润发创始人黄明端挥泪离场:我战胜了所有对手,却输给了时代!
  • 原文地址:https://www.cnblogs.com/niujifei/p/15807557.html
Copyright © 2011-2022 走看看