zoukankan      html  css  js  c++  java
  • dubbo消费端源码分析

    ​ 这篇文章主要是讲述dubbo的消费者初始化发生的事情,比如:怎么注入和注入什么对象到带有@DubboReference和@Reference注解的成员变量里,怎么去建立dubbo的连接,怎么获取注册中心里提供者的地址等等

    消费端的入口-DubboAutoConfiguration

    ​ 因为根据spring-boot的自动装载策略,至于为什么会触发自动转载,这里我就不去说了。只要知道他会注入DubboAutoConfiguration这个对象就行了

    @ConditionalOnProperty(
        prefix = "dubbo",
        name = {"enabled"},
        matchIfMissing = true
    )
    @Configuration
    @AutoConfigureAfter({DubboRelaxedBindingAutoConfiguration.class})
    @EnableConfigurationProperties({DubboConfigurationProperties.class})
    public class DubboAutoConfiguration {
        public DubboAutoConfiguration() {
        }
    		...
        	//注入ReferenceAnnotationBeanPostProcessor对象
        @ConditionalOnMissingBean
        @Bean(
            name = {"referenceAnnotationBeanPostProcessor"}
        )
        public ReferenceAnnotationBeanPostProcessor referenceAnnotationBeanPostProcessor() {
            return new ReferenceAnnotationBeanPostProcessor();
        }
        ...
        
     }
    
    public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements ApplicationContextAware {
        public static final String BEAN_NAME = "referenceAnnotationBeanPostProcessor";
        private static final int CACHE_SIZE = Integer.getInteger("referenceAnnotationBeanPostProcessor.cache.size", 32);
        private final ConcurrentMap<String, ReferenceBean<?>> referenceBeanCache;
        private final ConcurrentMap<InjectedElement, ReferenceBean<?>> injectedFieldReferenceBeanCache;
        private final ConcurrentMap<InjectedElement, ReferenceBean<?>> injectedMethodReferenceBeanCache;
        private ApplicationContext applicationContext;
    
        public ReferenceAnnotationBeanPostProcessor() {
            super(new Class[]{DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class});
            this.referenceBeanCache = new ConcurrentHashMap(CACHE_SIZE);
            this.injectedFieldReferenceBeanCache = new ConcurrentHashMap(CACHE_SIZE);
            this.injectedMethodReferenceBeanCache = new ConcurrentHashMap(CACHE_SIZE);
        }   
     }
    

    ​ DubboAutoConfiguration作用就是注入了ReferenceAnnotationBeanPostProcessor对象,那ReferenceAnnotationBeanPostProcessor有什么特别之处呢?不妨看看它的结构图

    image-20201108171554724

    ​ 可以看到ReferenceAnnotationBeanPostProcessor是InstantiationAwareBeanPostProcessorAdapter的子类还有是MergedBeanDefinitionPostProcessor的实现类。

    ​ 所以初始化的时候会触发postProcessMergedBeanDefinition的方法和postProcessPropertyValues的方法。

    • postProcessPropertyValues:从名字上来看,应该是给对象做依赖注入吧。
    • postProcessMergedBeanDefinition: 这个应该是对象合并触发的。这个会比postProcessPropertyValues执行时机要靠前

    ​ 因为ReferenceAnnotationBeanPostProcessorAbstractAnnotationBeanPostProcessor的子类,postProcessPropertyValuespostProcessMergedBeanDefinition的实现都在AbstractAnnotationBeanPostProcessor(注意这个类是:阿里巴巴包下的类)这里面的实现。接下来我们看一下:

    public abstract class AbstractAnnotationBeanPostProcessor extends InstantiationAwareBeanPostProcessorAdapter implements MergedBeanDefinitionPostProcessor, PriorityOrdered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, DisposableBean {
    
       public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName) {
            if (beanType != null) {
                //发现需要依赖注入的对象,即带有:@Reference和@DubboReference
                InjectionMetadata metadata = this.findInjectionMetadata(beanName, beanType, (PropertyValues)null);
                metadata.checkConfigMembers(beanDefinition);
            }
    
        }
        
        public PropertyValues postProcessPropertyValues(PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {
                //发现需要依赖注入的对象,即带有:@Reference和@DubboReference
            InjectionMetadata metadata = this.findInjectionMetadata(beanName, bean.getClass(), pvs);
            try {
                //依赖注入
                metadata.inject(bean, beanName, pvs);
                return pvs;
            } catch (Exception var7) {
                throw var7;
            } 
        }
        
    }
    

    postProcessPropertyValuespostProcessMergedBeanDefinition都使用findInjectionMetadata,功能是:发现需要依赖注入的对象

    下面分析一下AbstractAnnotationBeanPostProcessor.findInjectionMetadata方法

    findInjectionMetadata发现需要依赖注入的对象

    public abstract class AbstractAnnotationBeanPostProcessor extends InstantiationAwareBeanPostProcessorAdapter implements MergedBeanDefinitionPostProcessor, PriorityOrdered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, DisposableBean {
    
        private InjectionMetadata findInjectionMetadata(String beanName, Class<?> clazz, PropertyValues pvs) {
            //下面这一部分都是是否需要读换存的不用理,主要看buildAnnotatedMetadata,这个方法去
                String cacheKey = StringUtils.hasLength(beanName) ? beanName : clazz.getName();
                AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata metadata = (AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata)this.injectionMetadataCache.get(cacheKey);
                if (InjectionMetadata.needsRefresh(metadata, clazz)) {
                    ConcurrentMap var6 = this.injectionMetadataCache;
                    synchronized(this.injectionMetadataCache) {
                        metadata = (AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata)this.injectionMetadataCache.get(cacheKey);
                        if (InjectionMetadata.needsRefresh(metadata, clazz)) {
                            if (metadata != null) {
                                metadata.clear(pvs);
                            }
    
                            try {
                                metadata = this.buildAnnotatedMetadata(clazz);
                                this.injectionMetadataCache.put(cacheKey, metadata);
                            } catch (NoClassDefFoundError var9) {
                                throw new IllegalStateException("Failed to introspect object class [" + clazz.getName() + "] for annotation metadata: could not find class that it depends on", var9);
                            }
                        }
                    }
                }
    
                return metadata;
            }
        
        
         private AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata buildAnnotatedMetadata(Class<?> beanClass) {
            	//找到类下的成员变量
                Collection<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement> fieldElements = 
                this.findFieldAnnotationMetadata(beanClass);
                //找到类下的方法--方法就不贴出来讨论了
            Collection<AbstractAnnotationBeanPostProcessor.AnnotatedMethodElement> methodElements = 
                this.findAnnotatedMethodMetadata(beanClass);
            return new AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata(beanClass, fieldElements, methodElements);
        }
        
        private List<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement> findFieldAnnotationMetadata(Class<?> beanClass) {
            final List<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement> elements = new LinkedList();
            ReflectionUtils.doWithFields(beanClass, new FieldCallback() {
                public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
               //获取符合条件的注解,即:DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class
                    Class[] var2 = AbstractAnnotationBeanPostProcessor.this.getAnnotationTypes();
                    int var3 = var2.length;
    
                    for(int var4 = 0; var4 < var3; ++var4) {
                        Class<? extends Annotation> annotationType = var2[var4];
                        AnnotationAttributes attributes = AnnotationUtils.getAnnotationAttributes(field, annotationType, AbstractAnnotationBeanPostProcessor.this.getEnvironment(), true, true, new String[0]);
                        if (attributes != null) {
    						//is not supported on static fields:
                            if (Modifier.isStatic(field.getModifiers())) {
                                return;
                            }
                            elements.add(AbstractAnnotationBeanPostProcessor.this.new AnnotatedFieldElement(field, attributes));
                        }
                    }
    
                }
            });
            return elements;
        }
        
        	//构造方法
         public AbstractAnnotationBeanPostProcessor(Class<? extends Annotation>... annotationTypes) {
            this.injectionMetadataCache = new ConcurrentHashMap(CACHE_SIZE);
            this.injectedObjectsCache = new ConcurrentHashMap(CACHE_SIZE);
            this.order = 2147483644;
            this.annotationTypes = annotationTypes;
        }
        
        
        
    }
    
    
    //buildAnnotatedMetadata构造返回的结果类
    private class AnnotatedInjectionMetadata extends InjectionMetadata {
        private final Collection<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement> fieldElements;
        private final Collection<AbstractAnnotationBeanPostProcessor.AnnotatedMethodElement> methodElements;
        
        public AnnotatedInjectionMetadata(Class<?> targetClass, Collection<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement> fieldElements, Collection<AbstractAnnotationBeanPostProcessor.AnnotatedMethodElement> methodElements) {
            super(targetClass, AbstractAnnotationBeanPostProcessor.combine(fieldElements, methodElements));
            this.fieldElements = fieldElements;
            this.methodElements = methodElements;
        }
    
    }
    
    public class InjectionMetadata {
    	    public InjectionMetadata(Class<?> targetClass, Collection<InjectionMetadata.InjectedElement> elements) {
            this.targetClass = targetClass;
            this.injectedElements = elements;
        }
    }
    
    public class  ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements
            ApplicationContextAware {
            public ReferenceAnnotationBeanPostProcessor() {
                //注入注解的位置
            super(DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class);
        }
    }
    
    • findInjectionMetadata: 首先读取缓存有没有相应的信息(通过beanName为key),没有的话就调用buildAnnotatedMetadata来进行创建

    • buildAnnotatedMetadata: 调用findFieldAnnotationMetadata去查找带有相应注解的成员变量,调用findAnnotatedMethodMetadata应该也是查找带有相应注解的方法。 这里我们只研究findFieldAnnotationMetadata,最后构造AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata返回,里面带有的信息有,beanClass,符合条件成员变量的集合,符合条件方法的集合

    • findFieldAnnotationMetadata: ReflectionUtils.doWithFields里面实现其实就是遍历类下的所有的成员变量,得到成员变量后通过 new FieldCallback() 来做一个回调,看看回调里的信息: AbstractAnnotationBeanPostProcessor.this.getAnnotationTypes()得到的是以下这些注解:DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class。然后再通过AnnotationUtils.getAnnotationAttributes方法去获取带有这些注解的成员变量的属性,如果带有这些注解的话,就加入到集合中返回。

      问题:DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class这些注解的引入位置?

      因为在构造ReferenceAnnotationBeanPostProcessor这个类时调用了super(DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class); 这个方法。然后在看到AbstractAnnotationBeanPostProcessor的构造方法可以看到 this.annotationTypes = annotationTypes;

    AbstractAnnotationBeanPostProcessor.postProcessMergedBeanDefinition

        public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName) {
            if (beanType != null) {
                InjectionMetadata metadata = this.findInjectionMetadata(beanName, beanType, (PropertyValues)null);
                metadata.checkConfigMembers(beanDefinition);
            }
    
        }
    
    
    
        public class InjectionMetadata {
            
            public void checkConfigMembers(RootBeanDefinition beanDefinition) {
                Set<InjectionMetadata.InjectedElement> checkedElements = new LinkedHashSet(this.injectedElements.size());
                Iterator var3 = this.injectedElements.iterator();
    
                while(var3.hasNext()) {
                    //这里得到的是findInjectionMetadata方法执行得到的成员变量信息和方法信息
                    InjectionMetadata.InjectedElement element = (InjectionMetadata.InjectedElement)var3.next();
                    Member member = element.getMember();
                    if (!beanDefinition.isExternallyManagedConfigMember(member)) {
                        beanDefinition.registerExternallyManagedConfigMember(member);
                        checkedElements.add(element);
                        if (logger.isTraceEnabled()) {
                            logger.trace("Registered injected element on class [" + this.targetClass.getName() + "]: " + element);
                        }
                    }
                }
    			//通过检查后放入到checkedElements
                this.checkedElements = checkedElements;
            }
        }
    

    ​ findInjectionMetadata上面已经分析了。checkConfigMembers其实就是检查一下通过findInjectionMetadata方法执行得到的成员变量信息和方法信息,符合条件就加入到 InjectionMetadata.checkedElements中

    AbstractAnnotationBeanPostProcessor.postProcessPropertyValues

        public PropertyValues postProcessPropertyValues(PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {
            InjectionMetadata metadata = this.findInjectionMetadata(beanName, bean.getClass(), pvs);
    
            try {
                metadata.inject(bean, beanName, pvs);
                return pvs;
            } catch (Exception var7) {
                throw var7;
            } 
            
        }
    
    
        public class InjectionMetadata {
    
            public void inject(Object target, @Nullable String beanName, @Nullable PropertyValues pvs) throws Throwable {
                //引入的位置postProcessMergedBeanDefinition
                Collection<InjectionMetadata.InjectedElement> checkedElements = this.checkedElements;
         		//如果没有检查过的属性,就用没有检查过的
                Collection<InjectionMetadata.InjectedElement> elementsToIterate = checkedElements != null ? checkedElements : this.injectedElements;
                
                //elementsToIterate就是带有@DubboReference等等的成员变量或方法
                InjectionMetadata.InjectedElement element;
                if (!((Collection)elementsToIterate).isEmpty()) {
                    for(Iterator var6 = ((Collection)elementsToIterate).iterator(); var6.hasNext(); element.inject(target, beanName, pvs)) {
                        element = (InjectionMetadata.InjectedElement)var6.next();
                    }
                }
    
            }
        }
    
    public abstract class AbstractAnnotationBeanPostProcessor extends InstantiationAwareBeanPostProcessorAdapter implements MergedBeanDefinitionPostProcessor, PriorityOrdered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, DisposableBean {
        
            protected Object getInjectedObject(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType, InjectedElement injectedElement) throws Exception {
            String cacheKey = this.buildInjectedObjectCacheKey(attributes, bean, beanName, injectedType, injectedElement);
            Object injectedObject = this.injectedObjectsCache.get(cacheKey);
            if (injectedObject == null) {
                injectedObject = this.doGetInjectedBean(attributes, bean, beanName, injectedType, injectedElement);
                this.injectedObjectsCache.putIfAbsent(cacheKey, injectedObject);
            }
    
            return injectedObject;
        }
        
        
        
    	public class AnnotatedFieldElement extends InjectedElement {
            private final Field field;
            private final AnnotationAttributes attributes;
            private volatile Object bean;
    
            protected AnnotatedFieldElement(Field field, AnnotationAttributes attributes) {
                super(field, (PropertyDescriptor)null);
                this.field = field;
                this.attributes = attributes;
            }
    
            protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
                Class<?> injectedType = this.field.getType();
                Object injectedObject = AbstractAnnotationBeanPostProcessor.this.getInjectedObject(this.attributes, bean, beanName, injectedType, this);
                ReflectionUtils.makeAccessible(this.field);
                this.field.set(bean, injectedObject);
            }
        }
    }
    
    public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements
            ApplicationContextAware {    
    	@Override
        protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                           InjectionMetadata.InjectedElement injectedElement) throws Exception {
            /**
             * The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext}
             */
            String referencedBeanName = buildReferencedBeanName(attributes, injectedType);
    
            /**
             * The name of bean that is declared by {@link Reference @Reference} annotation injection
             */
            String referenceBeanName = getReferenceBeanName(attributes, injectedType);
    
            ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
    
            boolean localServiceBean = isLocalServiceBean(referencedBeanName, referenceBean, attributes);
    
            prepareReferenceBean(referencedBeanName, referenceBean, localServiceBean);
    
            registerReferenceBean(referencedBeanName, referenceBean, attributes, localServiceBean, injectedType);
    
            cacheInjectedReferenceBean(referenceBean, injectedElement);
    
            return referenceBean.get();
        }
    
    }
    

    AbstractAnnotationBeanPostProcessor.postProcessPropertyValues:

    • this.findInjectionMetadata: 上面已经分析了,就是获取需要注入的成员变量和方法
    • metadata.inject(bean, beanName, pvs): 依赖注入

    InjectionMetadata.inject

    • this.checkedElements != null ? this.checkedElements : this.injectedElements: 如果有检查过的checkedElements 就用有检查过的,没有就使用没有检查过的injectedElements。
    • 遍历需要注入的成员变量和方法嗲用: element.inject

    AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement.inject

    • AbstractAnnotationBeanPostProcessor.this.getInjectedObject: 构建一个对象
    • this.field.set(bean, injectedObject): 把构建的对象,设置到成员变量里面

    AbstractAnnotationBeanPostProcessor.getInjectedObject

    • 调用this.doGetInjectedBean , 因为AbstractAnnotationBeanPostProcessor.doGetInjectedBean 是一个抽象的方法,最后回调用字类的方法。即:ReferenceAnnotationBeanPostProcessor.doGetInjectedBean

    ReferenceAnnotationBeanPostProcessor.doGetInjectedBean: 这里直接看referenceBean.get(),只需要着重看返回的对象是什么就好了。

    ReferenceConfig解析

    ReferenceConfig.init()初始化

    public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
        
        private transient volatile T ref;
    
        
    	public synchronized T get() {
            if (destroyed) {
                throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
            }
            if (ref == null) {
                init();
            }
            return ref;
        }
        
         public synchronized void init() {
            if (initialized) {
                return;
            }
    
            if (bootstrap == null) {
                bootstrap = DubboBootstrap.getInstance();
                bootstrap.init();
            }
    
            checkAndUpdateSubConfigs();
    
            checkStubAndLocal(interfaceClass);
            ConfigValidationUtils.checkMock(interfaceClass, this);
    
             //构建map,这个跟提供者差不多
            Map<String, String> map = new HashMap<String, String>();
            map.put(SIDE_KEY, CONSUMER_SIDE);
             
            ReferenceConfigBase.appendRuntimeParameters(map);
            if (!ProtocolUtils.isGeneric(generic)) {
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put(REVISION_KEY, revision);
                }
    
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if (methods.length == 0) {
                    logger.warn("No method found in service interface " + interfaceClass.getName());
                    map.put(METHODS_KEY, ANY_VALUE);
                } else {
                    map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
                }
            }
            map.put(INTERFACE_KEY, interfaceName);
            AbstractConfig.appendParameters(map, getMetrics());
            AbstractConfig.appendParameters(map, getApplication());
            AbstractConfig.appendParameters(map, getModule());
            // remove 'default.' prefix for configs from ConsumerConfig
            // appendParameters(map, consumer, Constants.DEFAULT_KEY);
            AbstractConfig.appendParameters(map, consumer);
            AbstractConfig.appendParameters(map, this);
            MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
            if (metadataReportConfig != null && metadataReportConfig.isValid()) {
                map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
            }
            Map<String, AsyncMethodInfo> attributes = null;
            if (CollectionUtils.isNotEmpty(getMethods())) {
                attributes = new HashMap<>();
                for (MethodConfig methodConfig : getMethods()) {
                    AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                    String retryKey = methodConfig.getName() + ".retry";
                    if (map.containsKey(retryKey)) {
                        String retryValue = map.remove(retryKey);
                        if ("false".equals(retryValue)) {
                            map.put(methodConfig.getName() + ".retries", "0");
                        }
                    }
                    AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                    if (asyncMethodInfo != null) {
    //                    consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
                        attributes.put(methodConfig.getName(), asyncMethodInfo);
                    }
                }
            }
    
            String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
            if (StringUtils.isEmpty(hostToRegistry)) {
                hostToRegistry = NetUtils.getLocalHost();
            } else if (isInvalidLocalHost(hostToRegistry)) {
                throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
            }
            map.put(REGISTER_IP_KEY, hostToRegistry);
    
            serviceMetadata.getAttachments().putAll(map);
    
            ref = createProxy(map);
    
            serviceMetadata.setTarget(ref);
            serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
            ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
            consumerModel.setProxyObject(ref);
            consumerModel.init(attributes);
    
            initialized = true;
    
            checkInvokerAvailable();
    
            // dispatch a ReferenceConfigInitializedEvent since 2.7.4
            dispatch(new ReferenceConfigInitializedEvent(this, invoker));
        }
    }
    

    ReferenceConfig.get(): 判断this.ref是否是空,不是的话,进行init()初始化。

    ReferenceConfig.init()初始化:

    • 构建map,这个跟提供者差不多,构建出来的map如下:

    image-20201109110527018

    • createProxy(map): 创建代理类

    createProxy(map)创建代理对象

     public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
    
         private T createProxy(Map<String, String> map) {
             	//判断是否是本地的refer,本地绑定,这里就不看了。只看远程的
                if (shouldJvmRefer(map)) {
                    URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
                    invoker = REF_PROTOCOL.refer(interfaceClass, url);
                    if (logger.isInfoEnabled()) {
                        logger.info("Using injvm service " + interfaceClass.getName());
                    }
                } else {
                    urls.clear();
                    //这种情况是类似于:    <dubbo:reference id="orderService" interface="com.onion.service.IOrderService" url="dubbo://127.0.0.1:20881/com.onion.service.IOrderService" /> 这种指定url就会进入if语句里面。,这里我们也是只是看注册中心获取的。
    
                    if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                        String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                        if (us != null && us.length > 0) {
                            for (String u : us) {
                                URL url = URL.valueOf(u);
                                if (StringUtils.isEmpty(url.getPath())) {
                                    url = url.setPath(interfaceName);
                                }
                                if (UrlUtils.isRegistry(url)) {
                                    urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                                } else {
                                    urls.add(ClusterUtils.mergeUrl(url, map));
                                }
                            }
                        }
                    } else { // assemble URL from register center's configuration
                        // if protocols not injvm checkRegistry
                        if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                            checkRegistry();
                            List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                            
                            ...遍历us,判断有没有monitorUrl,需要的话加入到url的参数中: monitor=monitorUrl...
    
                            if (urls.isEmpty()) {
    							....抛错.....
                            }
                        }
                    }
    
                    //urls就是注册中心地址,如果只有一个注册中心,就直接获取invoker
                    if (urls.size() == 1) {
                        invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
                    } else {
                        List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                        URL registryURL = null;
                        for (URL url : urls) {
                            invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                            if (UrlUtils.isRegistry(url)) {
                                registryURL = url; // use last registry url
                            }
                        }
                        
                        //选择容错的策略
                        if (registryURL != null) { // registry url is available
                            // for multi-subscription scenario, use 'zone-aware' policy by default
                            String cluster = registryURL.getParameter("cluster", "zone-aware");
                            // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                            invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                        } else { // not a registry url, must be direct invoke.
                            String cluster = CollectionUtils.isNotEmpty(invokers)
                                    ? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter("cluster", "zone-aware") : "failover")
                                    :"failover";
                            invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
                        }
                    }
                }
    
                if (logger.isInfoEnabled()) {
                    logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                String metadata = map.get(METADATA_KEY);
                WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
                if (metadataService != null) {
                    URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
                    metadataService.publishServiceDefinition(consumerURL);
                }
                // create service proxy
                return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
            }
        }
    

    createProxy做的事情:

    • shouldJvmRefer(map): 判断是否连接本地信息,这里我们只考虑远程的。

    • 判断接口是否指定了url地址,如果是的话,直接利用url上面的地址直接构建。 比如是:

      <dubbo:reference id="orderService" interface="com.onion.service.IOrderService" url="dubbo://127.0.0.1:20881/com.onion.service.IOrderService" />这种就会直接利用url上面的地址直接构建了

    • 利用ConfigValidationUtils.loadRegistries获取注册中心地址,这个会得到的注册中心地址为: registry://127.0.0.1:8848?registry=nacos,这个跟提供者里的loadRegistries是一样的,如果想知道为什么会变成 registry://127.0.0.1:8848?registry=nacos,可以去看我上一个文章。

    • 如果是多个注册中心,就会进行容错的处理,这里我们只看一个的把。REF_PROTOCOL.refer(interfaceClass, urls.get(0))直接构建Invoker。

    • PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic)): 把得到invoker作为参数构造一个代理类返回

    REF_PROTOCOLExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(),因为根据自适应的扩展点的功能得到的是Protocol$Adaptive,又因为url协议为registry,所以最后会调用RegistryProtocol.ref

    RegistryProtocol.refer 构造invoker

        public class RegistryProtocol implements Protocol {
            
                private static final ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener();
    
    
            @Override
            @SuppressWarnings("unchecked")
            public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
                url = getRegistryUrl(url);
                Registry registry = registryFactory.getRegistry(url);
                if (RegistryService.class.equals(type)) {
                    return proxyFactory.getInvoker((T) registry, type, url);
                }
                // group="a,b" or group="*"
                Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
                String group = qs.get(GROUP_KEY);
                if (group != null && group.length() > 0) {
                    if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                        return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
                    }
                }
                Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
                return doRefer(cluster, registry, type, url);
            }
            
            private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
                RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
                directory.setRegistry(registry);
                directory.setProtocol(protocol);
                // all attributes of REFER_KEY
                Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
                URL subscribeUrl = new URL("consumer", parameters.remove("register.ip"), 0, type.getName(), parameters);
                if (directory.isShouldRegister()) {
                    directory.setRegisteredConsumerUrl(subscribeUrl);
                    registry.register(directory.getRegisteredConsumerUrl());
                }
                directory.buildRouterChain(subscribeUrl);
                directory.subscribe(toSubscribeUrl(subscribeUrl));
    
                Invoker<T> invoker = cluster.join(directory);
                List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
                if (CollectionUtils.isEmpty(listeners)) {
                    return invoker;
                }
    
                RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
                for (RegistryProtocolListener listener : listeners) {
                    listener.onRefer(this, registryInvokerWrapper);
                }
                return registryInvokerWrapper;
        	}
            
            protected URL getRegistryUrl(URL url) {
                return URLBuilder.from(url)
                        .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
                        .removeParameter(REGISTRY_KEY)
                        .build();
        	}
            
            
         public void subscribe(URL url) {
            setConsumerUrl(url);
             //把当前RegistryDirectory作为listener,去监听节点的变化
            CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
            serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
            registry.subscribe(url, this);
        }
        
      }
    

    RegistryProtocol.refer:

    • getRegistryUrl: 获取注册中心的地址传入

      传入url:registry://127.0.0.1:8848?export=dubbo://127.0.0.1:20880&registry=nacos

      返回:nacos://127.0.0.1:8848?export=dubbo://127.0.0.1:20880

    • registryFactory.getRegistry(url): 通过url的协议。得到相应的registry,这里我是nacos,所以得到的就是nacosRegistry

    • Cluster.getCluster(qs.get("cluster")): 获取相应的Cluster,默认使用的是:"failover"

    • 最后调用doRefer(cluster, registry, type, url)

    doRefer

    • ​ 构造一个RegistryDirectory

      ​ RegistryDirectory是Dubbo中的服务目录,从名字上来看,也比较容易理解,服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。通过这些信息,服务消费者就可通过 Netty 等客户端进行远程调用

    • new URL("consumer", parameters.remove("register.ip"), 0, type.getName(), parameters): 构建一个发布地址,地址格式如下:

      ​ consumer://192.168.56.1/com.onion.service.IUserService?...参数...

    • registry.register(directory.getRegisteredConsumerUrl()):发布该地址去注册中心。

    • directory.subscribe(toSubscribeUrl(subscribeUrl));

      • toSubscribeUrl: 往url中添加 category=providers,configurators,routers 这个参数。providers,configurators,routers这个是需要监听的属性
      • directory.subscribe:把当前RegistryDirectory作为listener,去监听节点的变化,然后调用 registry.subscribe(url, this),这里的registry是NacosRegistry,又因为它是FailbackRegistry的子类,所以会调用FailbackRegistry.subscribe

    注意:FailbackRegistry是全类名称是org.apache.dubbo.registry.support.FailbackRegistry

    registry.register 注册到注册中心

    ​ 因为我是使用nacos为注册中心,所以reigstry是nacosRegistry,又因为FailbackRegistry是nacosRegistry的父类,所以会调用FailbackRegistry.register

    public abstract class FailbackRegistry extends AbstractRegistry {
        
            @Override
        public void register(URL url) {
            if (!acceptable(url)) {
                return;
            }
            super.register(url);
            removeFailedRegistered(url);
            removeFailedUnregistered(url);
            try {
                // Sending a registration request to the server side
                //调用真正的NacosRegistry
                doRegister(url);
            } catch (Exception e) {
                Throwable t = e;
    			....
            }
        }
        
    }
    
    public class NacosRegistry extends FailbackRegistry {
    	
    	@Override
        public void doRegister(URL url) {
            execute(namingService -> namingService.registerInstance(serviceName,
                    getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP), instance));
        }
    }
    

    FailbackRegistry.register方法里面会调用NacosRegistry.doRegister。 这里就是注册的步骤了

    FailbackRegistry.subscribe 获取节点信息和订阅节点信息

    public abstract class FailbackRegistry extends AbstractRegistry {
       
    	@Override
        public void subscribe(URL url, NotifyListener listener) {
            super.subscribe(url, listener);
            //移除失效的节点
            removeFailedSubscribed(url, listener);
            try {
                // Sending a subscription request to the server side
                //重新订阅节点
                doSubscribe(url, listener);
            } catch (Exception e) {
                Throwable t = e;
    
                List<URL> urls = getCacheUrls(url);
                if (CollectionUtils.isNotEmpty(urls)) {
                    notify(url, listener, urls);
                } else {
                    // If the startup detection is opened, the Exception is thrown directly.
                    boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                            && url.getParameter(Constants.CHECK_KEY, true);
                    boolean skipFailback = t instanceof SkipFailbackWrapperException;
                    if (check || skipFailback) {
                        if (skipFailback) {
                            t = t.getCause();
                        }
                        throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                    } else {
                        logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                    }
                }
    
                // Record a failed registration request to a failed list, retry regularly
                addFailedSubscribed(url, listener);
            }
        }
    }
    
    public class NacosRegistry extends FailbackRegistry {
    	@Override
        public void doSubscribe(final URL url, final NotifyListener listener) {
            Set<String> serviceNames = getServiceNames(url, listener);
    
            //Set corresponding serviceNames for easy search later
            if (isServiceNamesWithCompatibleMode(url)) {
                for (String serviceName : serviceNames) {
                    NacosInstanceManageUtil.setCorrespondingServiceNames(serviceName, serviceNames);
                }
            }
    
            doSubscribe(url, listener, serviceNames);
        }
        
        
            private void doSubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
            execute(namingService -> {
                //是否普通 serviceInterface 订阅
                if (isServiceNamesWithCompatibleMode(url)) {
                    List<Instance> allCorrespondingInstanceList = Lists.newArrayList();
                    for (String serviceName : serviceNames) {
                        List<Instance> instances = namingService.getAllInstances(serviceName,
                                getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP));
                        NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
                        allCorrespondingInstanceList.addAll(instances);
                    }
                    notifySubscriber(url, listener, allCorrespondingInstanceList);
                    for (String serviceName : serviceNames) {
                        //订阅。
                        subscribeEventListener(serviceName, url, listener);
                    }
                } else {
    				....
                }
    
            });
        }
    }
    

    doSubscribe(url, listener):

    getServiceNames得到的集合如下显示:(serviceNames)

    0 = "providers:com.onion.service.IUserService::"
    1 = "providers:com.onion.service.IUserService"
    

    然后调用 doSubscribe(url, listener, serviceNames); 进行订阅

    doSubscribe(url, listener, serviceNames):

    • namingService.getAllInstances: 通过serviceName作为参数得到instances,instances就是提供者的节点信息。

    • 把得到的instance节点都放到allCorrespondingInstanceList集合里面,集合数据如下:

      image-20201110152109998

    • notifySubscriber:更新缓存中提供者的节点信息。

    • subscribeEventListener: 监听serviceName的节点的变化。

    NacosRegistry.notifySubscriber更新节点信息

    public class NacosRegistry extends FailbackRegistry {
    
        private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances) {
            List<Instance> healthyInstances = new LinkedList<>(instances);
            if (healthyInstances.size() > 0) {
                filterHealthyInstances(healthyInstances);
            }
            List<URL> urls = toUrlWithEmpty(url, healthyInstances);
            NacosRegistry.this.notify(url, listener, urls);
        }
    }
    
    
    public abstract class FailbackRegistry extends AbstractRegistry {
    
    	@Override
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            try {
                doNotify(url, listener, urls);
            } catch (Exception t) {
                // Record a failed registration request to a failed list, retry regularly
                addFailedNotified(url, listener, urls);
                logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }
         protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
            super.notify(url, listener, urls);
        }
    }
    
    public abstract class AbstractRegistry implements Registry {
        
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            Map<String, List<URL>> result = new HashMap<>();
           
            for (URL u : urls) {
                //urls是从注册中心获取到的地址,u是消费者的地址
                //拿到符合匹配的地址放到categoryList
                if (UrlUtils.isMatch(url, u)) {
                    String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                    List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                    categoryList.add(u);
                }
            }
            if (result.size() == 0) {
                return;
            }
            Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
           //
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
               	//listener就是RegistryDirectory,进行更新
                listener.notify(categoryList);
                // We will update our cache file after each notification.
                // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
    			//把这个url保存到本地文件中
                saveProperties(url);
            }
       }
        
    
        
    }
    

    调用顺序NacosRegistry.notifySubscriber -> FailbackRegistry.doNotify -> AbstractRegistry.notify ,

    AbstractRegistry.notify

    • 把注册中心获取到的地址和消费者的地址进行一个匹配,符合规则的返回categoryList
    • 把得到的categoryList保存到notified中。这个notified在saveProperties用到
    • listener.notify: 这里的listener就是RegistryDirectory,RegistryDirectory.notify
    • saveProperties: 把注册信息保存到本地系统中.下面有介绍请看标题

    RegistryDirectory.notify

       public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    
       @Override
        public synchronized void notify(List<URL> urls) {
            Map<String, List<URL>> categoryUrls = urls.stream()
                    .filter(Objects::nonNull)
                    .filter(this::isValidCategory)
                    .filter(this::isNotCompatibleFor26x)
                    .collect(Collectors.groupingBy(this::judgeCategory));
    
            List<URL> configuratorURLs = categoryUrls.getOrDefault("configurators", Collections.emptyList());
            this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
    
            List<URL> routerURLs = categoryUrls.getOrDefault("routers", Collections.emptyList());
            toRouters(routerURLs).ifPresent(this::addRouters);
    
            // providers
            List<URL> providerURLs = categoryUrls.getOrDefault("providers", Collections.emptyList());
            /**
             * 3.x added for extend URL address
             */
    		...
            refreshOverrideAndInvoker(providerURLs);
        }
           
       private void refreshOverrideAndInvoker(List<URL> urls) {
            // mock zookeeper://xxx?mock=return null
           //逐个调用注册中心里面的配置,覆盖原来的url,组成最新的url 放入overrideDirectoryUrl 存储,
    //此时我们没有在dubbo-admin中修改任何配置,所以这里没必要去分析
            overrideDirectoryUrl();
            refreshInvoker(urls);
        }
           
     private void refreshInvoker(List<URL> invokerUrls) {
    
            if (invokerUrls.size() == 1
                    && invokerUrls.get(0) != null
                    && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
    				....
            } else {
    				...
               //通过urls生成newUrlInvokerMap
                Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 
                
                //赋值
                List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
                routerChain.setInvokers(newInvokers);
                this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
                this.urlInvokerMap = newUrlInvokerMap;
    
            	....
            }
        }
    }
    
        private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
            Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
            if (urls == null || urls.isEmpty()) {
                return newUrlInvokerMap;
            }
            Set<String> keys = new HashSet<>();
            String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
            for (URL providerUrl : urls) {
    			....
                //构建一个新的url
                URL url = mergeUrl(providerUrl);
    
                String key = url.toFullString(); // The parameter urls are sorted
                if (keys.contains(key)) { // Repeated url
                    continue;
                }
                keys.add(key);
    			...
                if (invoker == null) { // Not in the cache, refer again
                    try {
    					...
                         //新建invoker
                         invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                        
                    } catch (Throwable t) {
    				.	...
                    }
                    if (invoker != null) { // Put new invoker in cache
                        newUrlInvokerMap.put(key, invoker);
                    }
                } else {
                    newUrlInvokerMap.put(key, invoker);
                }
            }
            keys.clear();
            return newUrlInvokerMap;
        }
    
    
    //把提供者的url和我的消费者的配置做一个整合
    	private URL mergeUrl(URL providerUrl) {
            //queryMap是消费者的配置信息比如:application -> spring-boot-dubbo-consumer , 
            //methods -> loginUser,getUser
            providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters
    
            providerUrl = overrideWithConfigurator(providerUrl);
    
            providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!
    
            // The combination of directoryUrl and override is at the end of notify, which can't be handled here
            this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters
    
            if ((providerUrl.getPath() == null || providerUrl.getPath()
                    .length() == 0) && DUBBO_PROTOCOL.equals(providerUrl.getProtocol())) { // Compatible version 1.0
                //fix by tony.chenl DUBBO-44
                String path = directoryUrl.getParameter(INTERFACE_KEY);
                if (path != null) {
                    int i = path.indexOf('/');
                    if (i >= 0) {
                        path = path.substring(i + 1);
                    }
                    i = path.lastIndexOf(':');
                    if (i >= 0) {
                        path = path.substring(0, i);
                    }
                    providerUrl = providerUrl.setPath(path);
                }
            }
            return providerUrl;
        }
    

    RegistryDirectory.notify:

    • 获取urls中的参数为"configurators","routers","providers"的值

    • refreshOverrideAndInvoker(providerURLs): 是providerURLs是urls中的"providers"的参数值

    refreshOverrideAndInvoker

    • ​ overrideDirectoryUrl: 逐个调用注册中心里面的配置,覆盖原来的url,组成最新的url 放入overrideDirectoryUrl 存储,此时我们没有在dubbo-admin中修改任何配置,所以这里没必要去分析
    • refreshInvoker: 刷新Invoker

    refreshInvoker:

    • 通过 toInvokers(invokerUrls)方法把url生成Invoker集合Map。名字为:newUrlInvokerMap
    • 把生成的newUrlInvokerMap给this.invokersthis.urlInvokerMap赋值

    toInvokers:

    • 遍历所有的提供者的地址
    • mergeUrl(providerUrl): 把提供者的url和我的消费者的配置做一个整合。ClusterUtils.mergeUrl(providerUrl, queryMap)里面的queryMap的数据如下:image-20201110163055690
    • new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl): 新建invoker
    • 构造newUrlInvokerMap返回

    protocol.refer(serviceType, url)

    ​ url的地址协议是dubbo,这里应该是调用DubboProtocol.refer。(AbstractProtocol是DubboProtocol的父类)

    public abstract class AbstractProtocol implements Protocol {
       
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            	//把返回的invoker再包装一层返回
                return new AsyncToSyncInvoker(this.protocolBindingRefer(type, url));
        }
    }
    
    public class DubboProtocol extends AbstractProtocol {
    	public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
            this.optimizeSerialization(url);
            DubboInvoker<T> invoker = new DubboInvoker(serviceType, url, this.getClients(url), this.invokers);
            this.invokers.add(invoker);
            return invoker;
        }
        
         private ExchangeClient[] getClients(URL url) {
            boolean useShareConnect = false;
            int connections = url.getParameter("connections", 0);
    		...一些connections判断...
            ExchangeClient[] clients = new ExchangeClient[connections];
    
            for(int i = 0; i < clients.length; ++i) {
                if (useShareConnect) {
                    clients[i] = (ExchangeClient)shareClients.get(i);
                } else {
                    //初始化客户端
                    clients[i] = this.initClient(url);
                }
            }
    
            return clients;
        }
        
        
            private ExchangeClient initClient(URL url) {
                try {
                    Object client;
                    if (url.getParameter("lazy", false)) {
                        client = new LazyConnectExchangeClient(url, this.requestHandler);
                    } else {
                        //链接提供者
                        client = Exchangers.connect(url, this.requestHandler);
                    }
    
                    return (ExchangeClient)client;
                } catch (RemotingException var5) {
                    throw new RpcException("Fail to create remoting client for service(" + url + "): " + var5.getMessage(), var5);
                }
            }
        }  
    }
    

    执行的过程: AbstractProtocol.refer -> DubboProtocol.protocolBindingRefer

    DubboProtocol.protocolBindingRefer: 会首先调用initClient 得到客户端信息的信息,然后构建DubboInvoker,最后返回DubboInvoker。

    initClient: 就是获取相应的Exchangers。去做连接 Exchangers.connect(url, this.requestHandler);

    Exchangers继续往下执行连接:

    public class Exchangers {
      
         public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
             url = url.addParameterIfAbsent("codec", "exchange");
             return getExchanger(url).connect(url, handler);
           }
     }
        
    
    public class HeaderExchanger implements Exchanger {
        public static final String NAME = "header";
    
        public HeaderExchanger() {
        }
    
        public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeClient(Transporters.connect(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))}), true);
        }
      }
    
    public class Transporters {
    
        static {
            // check duplicate jar package
            Version.checkDuplicate(Transporters.class);
            Version.checkDuplicate(RemotingException.class);
        }
    
        private Transporters() {
        }
    
        public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
            return getTransporter().connect(url, handler);
        }
    
        public static Transporter getTransporter() {
            return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
        }
    
    }
    
    public class NettyTransporter implements Transporter {
    
        public static final String NAME = "netty";
    
        @Override
        public Client connect(URL url, ChannelHandler handler) throws RemotingException {
            return new NettyClient(url, handler);
        }
    
    }
    
    
    public class NettyClient extends AbstractClient {
        public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        	super(url, wrapChannelHandler(url, handler));
        }
        
    }
    
    public abstract class AbstractClient extends AbstractEndpoint implements Client {
    
        protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
        private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
        private final Lock connectLock = new ReentrantLock();
        private final boolean needReconnect;
        protected volatile ExecutorService executor;
        private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
    
        public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
    
            needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    
            initExecutor(url);
    
            try {
                doOpen();
            } catch (Throwable t) {
                close();
            }
            try {
                // connect.
                connect();
                ...
    
        	}
    }
    

    Exchangers.connect -> Transporters.connect ->NettyTransporter.connect -> 新建NettyClient对象 。

    NettyClient的构建方法:doOpen() --- connect()

    AbstractRegistry.saveProperties保存提供者信息到文件系统

    public abstract class AbstractRegistry implements Registry {
        
            private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
    
        
      public AbstractRegistry(URL url) {
            setUrl(url);
            if (url.getParameter("file.cache", true)) {
                // Start file save timer
                syncSaveFile = url.getParameter("file.cache", false);
               //默认为的文件名字
                String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter("application") + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
                String filename = url.getParameter("file", defaultFilename);
                File file = null;
                if (ConfigUtils.isNotEmpty(filename)) {
                    file = new File(filename);
                 	//判断文件是否有效。。
                }
                this.file = file;
                // When starting the subscription center,
                // we need to read the local cache file for future Registry fault tolerance processing.
                loadProperties();
                notify(url.getBackupUrls());
            }
        }
        
        private void saveProperties(URL url) {
            if (file == null) {
                return;
            }
    
            try {
                StringBuilder buf = new StringBuilder();
                //AbstractRegistry.notify引用位置,获取提供者的map列表
                Map<String, List<URL>> categoryNotified = notified.get(url);
                if (categoryNotified != null) {
                    for (List<URL> us : categoryNotified.values()) {
                        for (URL u : us) {
                            if (buf.length() > 0) {
                                buf.append(URL_SEPARATOR);
                            }
                            buf.append(u.toFullString());
                        }
                    }
                }
                properties.setProperty(url.getServiceKey(), buf.toString());
                long version = lastCacheChanged.incrementAndGet();
                //是否要同步保存到文件里
                if (syncSaveFile) {
                    doSaveProperties(version);
                } else {
                    //异步保存到文件,registryCacheExecutor线程池,开一个线程来执行doSaveProperties(version)
                    registryCacheExecutor.execute(new SaveProperties(version));
                }
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
        
        private class SaveProperties implements Runnable {
            private long version;
    
            private SaveProperties(long version) {
                this.version = version;
            }
    
            @Override
            public void run() {
                doSaveProperties(version);
            }
        }
    }
    

    AbstractRegistry构造的时候:会从url参数中获取"file.cache"的值来判断是否要把提供者的信息保存到文件中,需要的话默认路径为:

    默认路径:
    System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter("application") + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
    为了更好理解,我用debug把路径打印出来如下:
    C:Usersp.dubbodubbo-registry-spring-boot-dubbo-consumer-127.0.0.1-8848.cache
    

    也可以自己指定文件的路径,只需要在url中加入file参数就行,比如:file=/tmp/spring-boot-dubbo-consumer.cache

    saveProperties:notified.get(url)获取提供者的信息,然后拼接成字符串,最后执行doSaveProperties(version);把拼接的字符串保存到文件里面。

    ​ notified.get(url)引用的位置:AbstractRegistry.notify

    保存起来的文件如下:

    #Dubbo Registry Cache
    #Tue Nov 10 17:19:26 CST 2020
    com.onion.service.IUserService=dubbo://192.168.56.1:20880/com.onion.service.IUserService?anyhost=true&application=spring-boot-dubbo-provider&category=providers&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&getUser.retries=2&getUser.return=true&interface=com.onion.service.IUserService&metadata-type=remote&methods=loginUser,getUser&path=com.onion.service.IUserService&pid=23732&protocol=dubbo&release=2.7.8&side=provider&timestamp=1604998712794
    

    subscribeEventListener订阅节点信息

       public class NacosRegistry extends FailbackRegistry {
    
           private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
                    throws NacosException {
                EventListener eventListener = event -> {
                    if (event instanceof NamingEvent) {
                        NamingEvent e = (NamingEvent) event;
                        List<Instance> instances = e.getInstances();
    
    
                        if (isServiceNamesWithCompatibleMode(url)) {
                            NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
                            instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
                        }
    
                        notifySubscriber(url, listener, instances);
                    }
                };
                namingService.subscribe(serviceName,
                        getUrl().getParameter("group", "DEFAULT_GROUP"),
                        eventListener);
            }
           
           
        }
    

    NacosNamingService是com.alibaba.nacos.client.naming包下的,象征着终于使用了nacos-client的相关类了,到这里就是nacos订阅的内容,我就不再往下走了

    public class NacosNamingService implements NamingService {
    	public void subscribe(String serviceName, String groupName, EventListener listener) throws NacosException {
            this.subscribe(serviceName, groupName, new ArrayList(), listener);
        }
        
    

    NacosRegistry.subscribeEventListener:

    • namingService.subscribe: 订阅nacos中的节点的变化,如果节点发生变化eventListener监听器
    • eventListener:回调的监听器的时候,调用notifySubscriber(url, listener, instances),instances就是变化的节点信息,这个方法上面已经说过了,功能就是更新节点的信息

    结尾:

    还有PROXY_FACTORY.getProxy构建代理类没有讲,下一篇主要是讲PROXY_FACTORY.getProxy的构建,和依赖注入对象的调用过程

    附上一个消费端的高清图:

    dubbo服务消费

    图片下载地址:https://gitee.com/gzgyc/blogimage/raw/master/dubbo服务消费.png

  • 相关阅读:
    DML-DDL-DCL
    FastDFS常见场景模拟
    如何定义软件版本
    鸟哥的linux私房菜学习-(七)改变文件属性与权限
    鸟哥的linux私房菜学习-(六)Linux 文件权限介绍
    二、基本语法
    一、JavaSE语言概述
    鸟哥的linux私房菜学习-(五)补充:重点回顾
    鸟哥的linux私房菜学习-(五)Linux系统的在线求助man page与info page
    鸟哥的linux私房菜学习-(四)linux命令的基本概念
  • 原文地址:https://www.cnblogs.com/dabenxiang/p/14028475.html
Copyright © 2011-2022 走看看