zoukankan      html  css  js  c++  java
  • Dubbo服务消费者Consumer启动流程(五)

    在看完上篇的服务提供者启动流程之后,再来看消费者的启动流程就简单很多了,其大体的设计流程是差不多的。服务消费者的启动主要调用ReferenceConfig#get(), get方法跟服务提供者的export方法类似,主要关注checkAndUpdateSubConfigs和init方法。

    ReferenceConfig#checkAndUpdateSubConfigs

    这个checkAndUpdateSubConfigs方法这边不详细深入,也不是特别复杂,读者可以自己看每个步骤细节,这边只是大体上讲解流程。

    public void checkAndUpdateSubConfigs() {
            if (StringUtils.isEmpty(interfaceName)) {
                throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
            }
            // 配置优先级 consumer > module > application
            completeCompoundConfigs();
            // 将外部配置中心的配置刷新到本地配置
            startConfigCenter();
            // get consumer's global configuration
            // 设置comsumer配置
            checkDefault();
            this.refresh();
            if (getGeneric() == null && getConsumer() != null) {
                setGeneric(getConsumer().getGeneric());
            }
            if (ProtocolUtils.isGeneric(getGeneric())) {
                interfaceClass = GenericService.class;
            } else {
                try {
                    interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                            .getContextClassLoader());
                } catch (ClassNotFoundException e) {
                    throw new IllegalStateException(e.getMessage(), e);
                }
                // 检查<dubbo:method> 配置的方法是否包含在interfaceClass的方法中
                checkInterfaceAndMethods(interfaceClass, methods);
            }
            // 处理dubbo服务消费端resolve机制,也就是说消息消费者只连服务提供者,绕过注册中心。
            // 这个方法主要是获取直连服务的配置信息
            resolveFile();
            // 校验ReferenceBean的application是否为空,如果为空,new 一个application
            checkApplication();
            checkMetadataReport();
        }

    接下来就是int方法了,初始化ref,即获取到的实际上是一个由proxyFactory来代理invoker对象,通过其Proxy  InvokerInvocationHandler的invoke方法,即调用invoker的invoker方法返回的对象。整个对象类型就是远程服务接口类型。

    ReferenceConfig#init

        private void init() {
            if (initialized) {
                return;
            }
            initialized = true;
            //  校验stub、mock实现类与interface的兼容性
            checkStubAndLocal(interfaceClass);
            checkMock(interfaceClass);
            Map<String, String> map = new HashMap<String, String>();
    
            map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
            /**
             * 构建Map,封装服务消费者引用服务提供者URL的属性,这里主要填充side:consume(消费端)、dubbo:2.0.0(版本)、timestamp、pid:进程ID。
             */
            appendRuntimeParameters(map);
            // 如果不是泛化引用,增加methods:interface的所有方法名,多个用逗号隔开。
            if (!isGeneric()) {
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put("revision", revision);
                }
                // 使用Wrapper 目的是为了缓存interfaceClass的类元数据
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if (methods.length == 0) {
                    logger.warn("No method found in service interface " + interfaceClass.getName());
                    map.put("methods", Constants.ANY_VALUE);
                } else {
                    map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
                }
            }
            // 用Map存储application配置、module配置、默认消费者参数(ConsumerConfig)、服务消费者dubbo:reference的属性。
            map.put(Constants.INTERFACE_KEY, interfaceName);
            appendParameters(map, application);
            appendParameters(map, module);
            appendParameters(map, consumer, Constants.DEFAULT_KEY);
            appendParameters(map, this);
            Map<String, Object> attributes = null;
            /**
             * 获取服务键值 {group}/interface:版本,如果group为空,则为interface:版本,其值存为prifex,
             * 然后将dubbo:method的属性名称也填入map中,键前缀为dubbo.method.methodname.属性名。
             * dubbo:method的子标签dubbo:argument标签的属性也追加到attributes map中,键为 prifex + methodname.属性名。
             */
            if (CollectionUtils.isNotEmpty(methods)) {
                attributes = new HashMap<String, Object>();
                for (MethodConfig methodConfig : methods) {
                    appendParameters(map, methodConfig, methodConfig.getName());
                    String retryKey = methodConfig.getName() + ".retry";
                    if (map.containsKey(retryKey)) {
                        String retryValue = map.remove(retryKey);
                        if ("false".equals(retryValue)) {
                            map.put(methodConfig.getName() + ".retries", "0");
                        }
                    }
                    attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig));
                }
            }
             //填充register.ip属性,该属性是消息消费者连接注册中心的IP,并不是注册中心自身的IP。
            String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
            if (StringUtils.isEmpty(hostToRegistry)) {
                hostToRegistry = NetUtils.getLocalHost();
            }
            map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
    
            ref = createProxy(map);
            // 获取服务键值 {group}/interface:版本
            String serviceKey = URL.buildKey(interfaceName, group, version);
            // 将消息消费者缓存在ApplicationModel中。
            ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
        }

    init的作用还是根据配置构建消费者map中的查询参数URL,关键在粗体部分,ref = createProxy(map);本章重点也是在createProxy方法上详解。

    ReferenceConfig#createProxy

    createProxy 首先判断是否是injvm协议的本地引用,还是revoke机制的直连模式,还是普通消费者,从注册中心获取订阅服务,目的就是为了初始化引用服务的urls,其中url的subscribe设置为true。然后根据url来获取对应协议的invoker,最后根据invoker来代理远程服务。

    判断是否是injvm服务?通过shouldJvmRefer方法,第一是根据配置,如果没有配置,发现是直连服务,那么久不是injvm。或者如果scope=local或injvm=true,那么isJvmRefer=true

    如何判断是否是直连模式呢?根据前面的resolveFile,如果是直连,那么就会初始化这个点对点的url。

    剩下的就是普通消费者模式了,依赖注册中心。

        private T createProxy(Map<String, String> map) {
            // 判断该消费者是否是引用本(JVM)内提供的服务。
            if (shouldJvmRefer(map)) {
                /**
                 * 如果消费者引用本地JVM中的服务,则利用InjvmProtocol创建Invoker,dubbo中的invoker主要负责服务调用的功能
                 * injvm://127.0.0.1/org.apache.dubbo.demo.DemoService?application=dubbo-demo-api-consumer
                 * &default.lazy=false&default.sticky=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello
                 * &pid=36176&register.ip=192.168.0.102&release=&side=consumer&sticky=false&timestamp=1640070027986
                 */
                URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
                invoker = refprotocol.refer(interfaceClass, url);
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
                }
            } else {  // 处于直连模式
                /**
                 * 对直连URL进行分割,多个直连URL用分号隔开,如果URL中不包含path属性,则为URL设置path属性为interfaceName。
                 * 如果直连提供者的协议为registry,则对url增加refer属性,其值为消息消费者所有的属性。(表示从注册中心发现服务提供者)
                 * 如果是其他协议提供者,则合并服务提供者与消息消费者的属性,并移除服务提供者默认属性。以default开头的属性。
                 */
                if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                    String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            URL url = URL.valueOf(u);
                            if (StringUtils.isEmpty(url.getPath())) {
                                url = url.setPath(interfaceName);
                            }
                            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                                urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else { // assemble URL from register center's configuration  普通消息消费者,从注册中心订阅服务
                    /**
                     * :获取所有注册中心URL,其中参数false表示消费端,需要排除dubbo:registry subscribe=false的注册中心,其值为false表示不接受订阅。
                     * 根据注册中心URL,构建监控中心URL。
                     * 如果监控中心不为空,在注册中心URL后增加属性monitor。
                     * 在注册中心URL中,追加属性refer,其值为消费端的所有配置组成的URL。
                     */
                    checkRegistry();
                    List<URL> us = loadRegistries(false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                    }
                }
    
                if (urls.size() == 1) {
                    // 根据URL获取对应协议的Invoker。
                    //如果只有一个服务提供者URL,则直接根据协议构建Invoker,具体有如下协议:
                    // Protocal$Adaptive.refer  ->  RegisterProtocal.refer
                    invoker = refprotocol.refer(interfaceClass, urls.get(0));
                } else {
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    for (URL url : urls) {
                        invokers.add(refprotocol.refer(interfaceClass, url));
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // use last registry url
                        }
                    }
                    if (registryURL != null) { // registry url is available
                        // use RegistryAwareCluster only when register's cluster is available
                        URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
                        // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                        invoker = cluster.join(new StaticDirectory(u, invokers));
                    } else { // not a registry url, must be direct invoke.
                        invoker = cluster.join(new StaticDirectory(invokers));
                    }
                }
            }
            // 如果dubbo:referecnce的check=true或默认为空,则需要判断服务提供者是否存在。
            if (shouldCheck() && !invoker.isAvailable()) {
                // make it possible for consumer to retry later if provider is temporarily unavailable
                initialized = false;
                throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            }
            /**
             * @since 2.7.0
             * ServiceData Store
             */
            MetadataReportService metadataReportService = null;
            if ((metadataReportService = getMetadataReportService()) != null) {
                URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);
                metadataReportService.publishConsumer(consumerURL);
            }
            // create service proxy
            /**
             * 根据invoker获取代理类,其实现逻辑如下:
             * 从消费者URL中获取interfaces的值,用,分隔出单个服务应用接口。
             * 增加默认接口EchoService接口。
             * 根据需要实现的接口,使用jdk或Javassist创建代理类。
             */
            return (T) proxyFactory.getProxy(invoker);
        }

     这里面的checkRegister跟服务者类似。  这里边就不展开了。urls都是register://开头,其中参数register对应具体的注册类型,比如向zookeeper或者redis等。

    根据urls的size,我们就可以初始化invoker了

    invoker = refprotocol.refer(interfaceClass, url);

    如果多个url,则会产生多个invoker,最后通过cluser的join来返回一个invoker对象    invoker = cluster.join(new StaticDirectory(invokers));

    最后通过Javassist来代理。具体细节将会在下一章消费者调用流程中介绍。

  • 相关阅读:
    UVa 10088 (Pick定理) Trees on My Island
    LA 3295 (计数 容斥原理) Counting Triangles
    LA 5846 (计数) Neon Sign
    java是什么?软帝学院告诉你学Java能做什么?Java有什么特性?
    【软帝学院】一套好的java基础教学视频需要哪些有哪些内容
    推荐五个java基础学习网站,小白必备
    学习java设计模式有用吗?懂这六个原则,编程更轻松
    Java是什么?只需5分钟,了解java必须要知道的知识点
    软帝学院:自学java到底难不难?做好这几步,少走3年弯路
    软帝学院:java开发程序很难吗?学会这十步,5分钟搞定一个程序
  • 原文地址:https://www.cnblogs.com/gaojy/p/15709137.html
Copyright © 2011-2022 走看看