zoukankan      html  css  js  c++  java
  • dubbo-源码分析Provider

    Dubbo provider启动原理:

    当我们的dubbo启动我们的spring容器时spring 初始化容器的时候会查找META-INF/spring.handles文件查找对应的NamespaceHandle,dubbo在其jar包下配置了DubboNamespaceHandle,该类下有以下配置项:

    registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
    registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
    registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
    registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
    registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
    registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
    registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
    registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
    registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
    registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
    registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
    registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());

    意思就是当spring 在解析容器的时候遇到指定配置会使用对应的Parser去解析配置项。

    provider

    我们提供者主要会配置对应的application、registry、protocol、service所以我们一个一个来看,我们先来看service 根据上面所说,我们配置了<dubbo:service>这个配置项的话就会生成ServiceBean对象,并注册到容器里,那我们来看下serviceBean这个类:

    public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
            ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
            ApplicationEventPublisherAware

    该类主要实现了上面几个接口,我们来看其中最主要的InitializingBean,该类会在类实例化后调用其中的afterPropertiesSet方法 ,所以我们来看下:

    public void afterPropertiesSet() throws Exception {
       ...
        //上面一大堆代码都是判空去重新赋值的代码,我们不关注他们,最主要是下面这个export方法
        if (!supportedApplicationListener) {
            export();
        }
    }
    public synchronized void export() {
        checkAndUpdateSubConfigs();
    ​
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }
    ​
        //当设置了延时发布时用定时器延时发布
        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
        } else {
            //否则的话直接发布
            doExport();
        }
    }

    export方法最主要是判断是有配置了延时发布,是的话就用schedule去延时发布,否的话doExport发布,在spring中真正干活的都是do开头的方法,我们再继续查看doExport方法

     1 protected synchronized void doExport() {
     2     if (unexported) {
     3         throw new IllegalStateException("Already unexported!");
     4     }
     5     if (exported) {
     6         return;
     7     }
     8     exported = true;
     9 10     if (path == null || path.length() == 0) {
    11         path = interfaceName;
    12     }
    13     //生成唯一serviceName group/interfaceClass 如group/com.xx.xxx
    14     //ref 接口实现bean 服务的真正提供者
    15     //interfaceClass 需要发布的接口服务
    16     ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass);
    17     ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
    18     //发布url
    19     doExportUrls();
    20 }

    这里主要查看doExportUrls()方法,上面的是把服务信息存到本地map里:

    @SuppressWarnings({"unchecked", "rawtypes"})
    private void doExportUrls() {
        //这里会获取到注册中心的列表如有配置多个的话,
        // 格式:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=user-service&dubbo=2.0.2&pid=16232&registry=zookeeper&release=2.7.0&timestamp=1553883173387
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

    所以这里说明dubbo是支持多协议的多注册中心的,提前预告,下面这个doExportUrlsFor1Protocol方法会很长很复杂:

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        //获取协议的名称如dubbo
        String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
            //默认是dubbo
            name = Constants.DUBBO;
        }
    ​
        Map<String, String> map = new HashMap<String, String>();
        //组装map 再填充到url上
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
        appendRuntimeParameters(map);
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        //这里省略了一堆设置url的代码,主要是把接口的配置方法加到参数列表里如果method 重试次数
        ....
        if (ProtocolUtils.isGeneric(generic)) {
            map.put(Constants.GENERIC_KEY, generic);
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }
    ​
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
            } else {
                map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(Constants.TOKEN_KEY, token);
            }
        }
        //是否injvm也就是本地发布不上注册中心
        if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
            protocolConfig.setRegister(false);
            map.put("notify", "false");
        }
        // export service
        String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
            contextPath = provider.getContextpath();
        }
    ​
        //获取绑定的有效IP地址
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        //获得一个绑定端口
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
    ​
        //创建一个url
        //dubbo://192.168.1.2:20882/com.lin.service.UserService?anyhost=true&application=user-service
        // &bean.name=com.lin.service.UserService&bind.ip=192.168.1.2&bind.port=20882&dubbo=2.0.2
        // &generic=false&group=userGroup&interface=com.lin.service.UserService&methods=add,findUserByName,findUserById
        // &pid=16232&release=2.7.0&side=provider&timestamp=1553884198155
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
    ​
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
    ​
        String scope = url.getParameter(Constants.SCOPE_KEY);
        // don't export when none is configured
        //当scope为none的时候不发布
        if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
    ​
            // export to local if the config is not remote (export to remote only when config is remote)
            //当scope不是remote的时候发布本地服务
            if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            //当scope不是local的时候发布远程服务
            if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && !registryURLs.isEmpty()) {
                    //当注册中心有多个的时候会发布到多个注册中心
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                        //监控url
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
    ​
                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(Constants.PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                        }
    ​
                        //proxyFactory是一个自适应的扩展点,所以是一个proxyFactory$Adaptive
                        //默认会有这几个
                        //stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
                        //jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory
                        //javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory
                        //当url里有proxy=xxx的时候就取xxxProxyFactory,如果没有的话默认就是JavassistProxyFactory
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        //创建一个Invoker的包装类DelegateProviderMetaDataInvoker
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        //protocol是一个自动扩展点,所以会返回一个ivoker.getURL().protocol/Protocol的一个对象 如RegistryProtocol、DubboProtocol
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    ​
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                MetadataReportService metadataReportService = null;
                if ((metadataReportService = getMetadataReportService()) != null) {
                    //上报元数据中心
                    metadataReportService.publishProvider(url);
                }
            }
        }
        this.urls.add(url);
    }

    这里涉及到一个自适应扩展点的概念,具体什么是自适应扩展点可以到dubbo官网上看,那里介绍的很详细

    因为这里要注册的URL是registry://192.168.xxxx这样格式的,又因为protocol又是一个Protocol自适应扩展点

      private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    ​
    ​
    Exporter<?> exporter = protocol.export(wrapperInvoker);

    所以我们能得到protocol.export这里里面是调用的RegistryProtocol里面的export方法,所以我们再来看下这个方法:

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //获得注册地址
        //zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=user-service...
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        //获取提供者发布url
        //dubbo://192.168.1.2:20882/com.lin.service.UserService?anyhost=true&application=user-service&bean.name=...
        URL providerUrl = getProviderUrl(originInvoker);
    ​
        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    ​
        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        //本地发布服务 也就是服务发布到netty容器里
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    ​
        // url to registry
        //获得注册中心地址
        final Registry registry = getRegistry(originInvoker);
        //获得要注册的提供者URL
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
            //注册到注册中心
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }
    ​
        // Deprecated! Subscribe to override rules in 2.6.x or before.
        //订阅url
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    ​
        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

    我们一步一步来分析上面的代码,首先我这边的注册中心用的是zookeeper所以拿到的注册地址是zookeeper协议的,服务提供者也是用的默认的dubbo协议,那我们下一步来看下服务真正发布的方法doLocalExport方法:

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
    ​
                    final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
                    //这里protocol又是一个自适应扩展点,所以里面会调用invoker.getUrl.getProtocol+"Protocol"的export()方法
                    //如 DubboProtocol.export();
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }

    一开始就是一个双重检查锁,我们不管他,直接关注我们的protocol.export方法,跟上面的一样,protocol也是一个自适应扩展点,所以里面实际用的是我们invoker.getUrl.getProtocol+"Protocol"的export()方法,我们invoker的url呢又是我们上面包装进去的providerUrl也就是dubbo://xxxx这个url,所以最终调用的就是DubboProtocol的export方法:

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
    ​
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
    ​
        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
    ​
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }

    上面那些代码我们不关心先不看了,我们直接可以看到有一个openServer(url)的方法,根据名字我们就可以猜到这里就是开启我们服务的地方了,我们来看下:

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        //创建服务
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }
    private ExchangeServer createServer(URL url) {
        
       ...
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        ...
        return server;
    }

    我们可以看到这里Exchangers.bind(url,requestHandler),这里呢最终会调到:

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        //这里getTransporter也是获得一个自适应的扩展点,如果没有配置的话默认就是用的NettyTransporter
        return getTransporter().bind(url, handler);
    }

    再里面就是发布到Netty容器了,有兴趣的可以自己去看下,现在这里我们的服务就已经发布了,下面还有注册到注册中心,我们再看下我们上面的RegistryProtocol的export方法里面的注册服务的代码:

    // url to registry
    //获得注册中心地址
    final Registry registry = getRegistry(originInvoker);
    //获得要注册的提供者URL
    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
            registryUrl, registeredProviderUrl);
    //to judge if we need to delay publish
    boolean register = registeredProviderUrl.getParameter("register", true);
    if (register) {
        //注册到注册中心
        register(registryUrl, registeredProviderUrl);
        providerInvokerWrapper.setReg(true);
    }

    主要还是register这个方法:

    public void register(URL registryUrl, URL registeredProviderUrl) {
        //因为registryFacotry是一个自适应的扩展点,所以会返回一个zookeeperRegistry,如果是redis://的话就返回一个RedisRegistry
        Registry registry = registryFactory.getRegistry(registryUrl);
        //注册到注册中心
        registry.register(registeredProviderUrl);
    }

    根据环境我们会获得一个ZookeeperRegistry所以我们再看下zookeeperRegistry的register方法:

    因为register这个方法zookeeperRegistry并没有去实现它,所以一定是在父类的register我们继续看他父类FailbackRegistry的register方法:

    public void register(URL url) {
        super.register(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // Sending a registration request to the server side
            //注册服务 是模版模式,所以会在子类实现
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;
    ​
            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
    ​
            // Record a failed registration request to a failed list, retry regularly
            addFailedRegistered(url);
        }
    }

    这方法主要注册的就是doRegister方法了,因为是模版模式,所以这个方法zookeeperRegistry自己实现了这个方法:

    @Override
    public void doRegister(URL url) {
        try {
            //创建一个临时节点
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    所以这里就是最终注册的地方了,这里会根据url去创建一个服务的临时节点,到这服务的发布和注册就已经完成了,其他地方有兴趣的可以自己去看下源码,dubbo里很多地方都用到了自适应扩展点这个概念,所以如果要看源码就要先去理解什么是自适应扩展点。

  • 相关阅读:
    理解闭包Closure
    理解商集
    理解格
    理解距(数学)
    微积分英文词汇,高数名词中英文对照,高等数学术语英语翻译一览
    对Extjs中store的多种操作
    mysql中的除法取整
    【python】用asq实现count(distinct cln)
    Timer 和TimerTask的使用
    使用vim.rc配置vim
  • 原文地址:https://www.cnblogs.com/mori-luck/p/10634333.html
Copyright © 2011-2022 走看看