zoukankan      html  css  js  c++  java
  • dubbo源码-服务注册(服务端启动源码)

      在一个微服务的过程中,我们知道一般会有一个注册中心。服务提供者启动之后向注册中心注册自身,服务消费者消费的时候到注册中心拿到注册的服务信息,然后根据服务信息,将请求转发到对应的服务里面,最终通过代理、反射各种方式实现服务的调用。下面简单研究下服务的注册过程以及消费者从服务中心获取到服务之后调用过程。

    1. 首先查看服务注册的信息包括:

    2. 服务注册过程 

    1.  dubbo-springboot-autoconfigure 包中的spring.factories 配置文件引入了一些自动配置的类

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=
    org.apache.dubbo.spring.boot.autoconfigure.DubboAutoConfiguration,
    org.apache.dubbo.spring.boot.autoconfigure.DubboRelaxedBindingAutoConfiguration
    org.springframework.context.ApplicationListener=
    org.apache.dubbo.spring.boot.context.event.OverrideDubboConfigApplicationListener,
    org.apache.dubbo.spring.boot.context.event.WelcomeLogoApplicationListener,
    org.apache.dubbo.spring.boot.context.event.AwaitingNonWebApplicationListener
    org.springframework.boot.env.EnvironmentPostProcessor=
    org.apache.dubbo.spring.boot.env.DubboDefaultPropertiesEnvironmentPostProcessor
    org.springframework.context.ApplicationContextInitializer=
    org.apache.dubbo.spring.boot.context.DubboApplicationContextInitializer

    2. 其中有个自动配置的类是DubboAutoConfiguration, 里面注入了一个重要的bean

    package org.apache.dubbo.spring.boot.autoconfigure;
    
    import java.util.Collections;
    import java.util.Set;
    import org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor;
    import org.apache.dubbo.config.spring.beans.factory.annotation.ServiceAnnotationBeanPostProcessor;
    import org.apache.dubbo.config.spring.context.annotation.DubboConfigConfiguration.Multiple;
    import org.apache.dubbo.config.spring.context.annotation.DubboConfigConfiguration.Single;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.autoconfigure.AutoConfigureAfter;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Import;
    import org.springframework.context.annotation.Primary;
    import org.springframework.core.env.Environment;
    import org.springframework.core.env.PropertyResolver;
    
    @ConditionalOnProperty(
        prefix = "dubbo",
        name = {"enabled"},
        matchIfMissing = true
    )
    @Configuration
    @AutoConfigureAfter({DubboRelaxedBindingAutoConfiguration.class})
    @EnableConfigurationProperties({DubboConfigurationProperties.class})
    public class DubboAutoConfiguration {
        public DubboAutoConfiguration() {
        }
    
        @ConditionalOnProperty(
            prefix = "dubbo.scan.",
            name = {"base-packages"}
        )
        @ConditionalOnBean(
            name = {"dubboScanBasePackagesPropertyResolver"}
        )
        @Bean
        public ServiceAnnotationBeanPostProcessor serviceAnnotationBeanPostProcessor(@Qualifier("dubboScanBasePackagesPropertyResolver") PropertyResolver propertyResolver) {
            Set<String> packagesToScan = (Set)propertyResolver.getProperty("base-packages", Set.class, Collections.emptySet());
            return new ServiceAnnotationBeanPostProcessor(packagesToScan);
        }
    
        @ConditionalOnMissingBean
        @Bean(
            name = {"referenceAnnotationBeanPostProcessor"}
        )
        public ReferenceAnnotationBeanPostProcessor referenceAnnotationBeanPostProcessor() {
            return new ReferenceAnnotationBeanPostProcessor();
        }
    
        @Bean
        @Primary
        public PropertyResolver primaryPropertyResolver(Environment environment) {
            return environment;
        }
    
        @ConditionalOnProperty(
            prefix = "dubbo.config.",
            name = {"multiple"},
            matchIfMissing = true
        )
        @Import({Multiple.class})
        protected static class MultipleDubboConfigConfiguration {
            protected MultipleDubboConfigConfiguration() {
            }
        }
    
        @Import({Single.class})
        protected static class SingleDubboConfigConfiguration {
            protected SingleDubboConfigConfiguration() {
            }
        }
    }

    3. ServiceAnnotationBeanPostProcessor 是重要的入口

    (1) ServiceAnnotationBeanPostProcessor 实现了BeanDefinitionRegistryPostProcessor, 是一个对象工厂后置处理器

      1 /*
      2  * Licensed to the Apache Software Foundation (ASF) under one or more
      3  * contributor license agreements.  See the NOTICE file distributed with
      4  * this work for additional information regarding copyright ownership.
      5  * The ASF licenses this file to You under the Apache License, Version 2.0
      6  * (the "License"); you may not use this file except in compliance with
      7  * the License.  You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 package org.apache.dubbo.config.spring.beans.factory.annotation;
     18 
     19 import org.apache.dubbo.common.logger.Logger;
     20 import org.apache.dubbo.common.logger.LoggerFactory;
     21 import org.apache.dubbo.common.utils.ArrayUtils;
     22 import org.apache.dubbo.config.annotation.Service;
     23 import org.apache.dubbo.config.spring.ServiceBean;
     24 import org.apache.dubbo.config.spring.context.annotation.DubboClassPathBeanDefinitionScanner;
     25 
     26 import org.springframework.beans.BeansException;
     27 import org.springframework.beans.MutablePropertyValues;
     28 import org.springframework.beans.factory.BeanClassLoaderAware;
     29 import org.springframework.beans.factory.config.BeanDefinition;
     30 import org.springframework.beans.factory.config.BeanDefinitionHolder;
     31 import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
     32 import org.springframework.beans.factory.config.RuntimeBeanReference;
     33 import org.springframework.beans.factory.config.SingletonBeanRegistry;
     34 import org.springframework.beans.factory.support.AbstractBeanDefinition;
     35 import org.springframework.beans.factory.support.BeanDefinitionBuilder;
     36 import org.springframework.beans.factory.support.BeanDefinitionRegistry;
     37 import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
     38 import org.springframework.beans.factory.support.BeanNameGenerator;
     39 import org.springframework.beans.factory.support.ManagedList;
     40 import org.springframework.context.EnvironmentAware;
     41 import org.springframework.context.ResourceLoaderAware;
     42 import org.springframework.context.annotation.AnnotationBeanNameGenerator;
     43 import org.springframework.context.annotation.AnnotationConfigUtils;
     44 import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
     45 import org.springframework.context.annotation.ConfigurationClassPostProcessor;
     46 import org.springframework.core.annotation.AnnotationAttributes;
     47 import org.springframework.core.env.Environment;
     48 import org.springframework.core.io.ResourceLoader;
     49 import org.springframework.core.type.filter.AnnotationTypeFilter;
     50 import org.springframework.util.CollectionUtils;
     51 import org.springframework.util.ObjectUtils;
     52 import org.springframework.util.StringUtils;
     53 
     54 import java.lang.annotation.Annotation;
     55 import java.util.Arrays;
     56 import java.util.Collection;
     57 import java.util.HashMap;
     58 import java.util.LinkedHashSet;
     59 import java.util.List;
     60 import java.util.Map;
     61 import java.util.Set;
     62 
     63 import static org.apache.dubbo.config.spring.beans.factory.annotation.ServiceBeanNameBuilder.create;
     64 import static org.apache.dubbo.config.spring.util.AnnotationUtils.resolveServiceInterfaceClass;
     65 import static org.apache.dubbo.config.spring.util.ObjectUtils.of;
     66 import static org.springframework.beans.factory.support.BeanDefinitionBuilder.rootBeanDefinition;
     67 import static org.springframework.context.annotation.AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR;
     68 import static org.springframework.core.annotation.AnnotatedElementUtils.findMergedAnnotation;
     69 import static org.springframework.core.annotation.AnnotationUtils.getAnnotationAttributes;
     70 import static org.springframework.util.ClassUtils.resolveClassName;
     71 
     72 /**
     73  * {@link Service} Annotation
     74  * {@link BeanDefinitionRegistryPostProcessor Bean Definition Registry Post Processor}
     75  *
     76  * @since 2.5.8
     77  */
     78 public class ServiceAnnotationBeanPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,
     79         ResourceLoaderAware, BeanClassLoaderAware {
     80 
     81 
     82     private final Logger logger = LoggerFactory.getLogger(getClass());
     83 
     84     private final Set<String> packagesToScan;
     85 
     86     private Environment environment;
     87 
     88     private ResourceLoader resourceLoader;
     89 
     90     private ClassLoader classLoader;
     91 
     92     public ServiceAnnotationBeanPostProcessor(String... packagesToScan) {
     93         this(Arrays.asList(packagesToScan));
     94     }
     95 
     96     public ServiceAnnotationBeanPostProcessor(Collection<String> packagesToScan) {
     97         this(new LinkedHashSet<>(packagesToScan));
     98     }
     99 
    100     public ServiceAnnotationBeanPostProcessor(Set<String> packagesToScan) {
    101         this.packagesToScan = packagesToScan;
    102     }
    103 
    104     @Override
    105     public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
    106 
    107         Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);
    108 
    109         if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
    110             registerServiceBeans(resolvedPackagesToScan, registry);
    111         } else {
    112             if (logger.isWarnEnabled()) {
    113                 logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
    114             }
    115         }
    116 
    117     }
    118 
    119 
    120     /**
    121      * Registers Beans whose classes was annotated {@link Service}
    122      *
    123      * @param packagesToScan The base packages to scan
    124      * @param registry       {@link BeanDefinitionRegistry}
    125      */
    126     private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
    127 
    128         DubboClassPathBeanDefinitionScanner scanner =
    129                 new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
    130 
    131         BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
    132 
    133         scanner.setBeanNameGenerator(beanNameGenerator);
    134 
    135         scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class));
    136 
    137         /**
    138          * Add the compatibility for legacy Dubbo's @Service
    139          *
    140          * The issue : https://github.com/apache/dubbo/issues/4330
    141          * @since 2.7.3
    142          */
    143         scanner.addIncludeFilter(new AnnotationTypeFilter(com.alibaba.dubbo.config.annotation.Service.class));
    144 
    145         for (String packageToScan : packagesToScan) {
    146 
    147             // Registers @Service Bean first
    148             scanner.scan(packageToScan);
    149 
    150             // Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
    151             Set<BeanDefinitionHolder> beanDefinitionHolders =
    152                     findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);
    153 
    154             if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
    155 
    156                 for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
    157                     registerServiceBean(beanDefinitionHolder, registry, scanner);
    158                 }
    159 
    160                 if (logger.isInfoEnabled()) {
    161                     logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
    162                             beanDefinitionHolders +
    163                             " } were scanned under package[" + packageToScan + "]");
    164                 }
    165 
    166             } else {
    167 
    168                 if (logger.isWarnEnabled()) {
    169                     logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
    170                             + packageToScan + "]");
    171                 }
    172 
    173             }
    174 
    175         }
    176 
    177     }
    178 
    179     /**
    180      * It'd better to use BeanNameGenerator instance that should reference
    181      * {@link ConfigurationClassPostProcessor#componentScanBeanNameGenerator},
    182      * thus it maybe a potential problem on bean name generation.
    183      *
    184      * @param registry {@link BeanDefinitionRegistry}
    185      * @return {@link BeanNameGenerator} instance
    186      * @see SingletonBeanRegistry
    187      * @see AnnotationConfigUtils#CONFIGURATION_BEAN_NAME_GENERATOR
    188      * @see ConfigurationClassPostProcessor#processConfigBeanDefinitions
    189      * @since 2.5.8
    190      */
    191     private BeanNameGenerator resolveBeanNameGenerator(BeanDefinitionRegistry registry) {
    192 
    193         BeanNameGenerator beanNameGenerator = null;
    194 
    195         if (registry instanceof SingletonBeanRegistry) {
    196             SingletonBeanRegistry singletonBeanRegistry = SingletonBeanRegistry.class.cast(registry);
    197             beanNameGenerator = (BeanNameGenerator) singletonBeanRegistry.getSingleton(CONFIGURATION_BEAN_NAME_GENERATOR);
    198         }
    199 
    200         if (beanNameGenerator == null) {
    201 
    202             if (logger.isInfoEnabled()) {
    203 
    204                 logger.info("BeanNameGenerator bean can't be found in BeanFactory with name ["
    205                         + CONFIGURATION_BEAN_NAME_GENERATOR + "]");
    206                 logger.info("BeanNameGenerator will be a instance of " +
    207                         AnnotationBeanNameGenerator.class.getName() +
    208                         " , it maybe a potential problem on bean name generation.");
    209             }
    210 
    211             beanNameGenerator = new AnnotationBeanNameGenerator();
    212 
    213         }
    214 
    215         return beanNameGenerator;
    216 
    217     }
    218 
    219     /**
    220      * Finds a {@link Set} of {@link BeanDefinitionHolder BeanDefinitionHolders} whose bean type annotated
    221      * {@link Service} Annotation.
    222      *
    223      * @param scanner       {@link ClassPathBeanDefinitionScanner}
    224      * @param packageToScan pachage to scan
    225      * @param registry      {@link BeanDefinitionRegistry}
    226      * @return non-null
    227      * @since 2.5.8
    228      */
    229     private Set<BeanDefinitionHolder> findServiceBeanDefinitionHolders(
    230             ClassPathBeanDefinitionScanner scanner, String packageToScan, BeanDefinitionRegistry registry,
    231             BeanNameGenerator beanNameGenerator) {
    232 
    233         Set<BeanDefinition> beanDefinitions = scanner.findCandidateComponents(packageToScan);
    234 
    235         Set<BeanDefinitionHolder> beanDefinitionHolders = new LinkedHashSet<>(beanDefinitions.size());
    236 
    237         for (BeanDefinition beanDefinition : beanDefinitions) {
    238 
    239             String beanName = beanNameGenerator.generateBeanName(beanDefinition, registry);
    240             BeanDefinitionHolder beanDefinitionHolder = new BeanDefinitionHolder(beanDefinition, beanName);
    241             beanDefinitionHolders.add(beanDefinitionHolder);
    242 
    243         }
    244 
    245         return beanDefinitionHolders;
    246 
    247     }
    248 
    249     /**
    250      * Registers {@link ServiceBean} from new annotated {@link Service} {@link BeanDefinition}
    251      *
    252      * @param beanDefinitionHolder
    253      * @param registry
    254      * @param scanner
    255      * @see ServiceBean
    256      * @see BeanDefinition
    257      */
    258     private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
    259                                      DubboClassPathBeanDefinitionScanner scanner) {
    260 
    261         Class<?> beanClass = resolveClass(beanDefinitionHolder);
    262 
    263         Annotation service = findServiceAnnotation(beanClass);
    264 
    265         /**
    266          * The {@link AnnotationAttributes} of @Service annotation
    267          */
    268         AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);
    269 
    270         Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);
    271 
    272         String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();
    273 
    274         AbstractBeanDefinition serviceBeanDefinition =
    275                 buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);
    276 
    277         // ServiceBean Bean name
    278         String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);
    279 
    280         if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean
    281             registry.registerBeanDefinition(beanName, serviceBeanDefinition);
    282 
    283             if (logger.isInfoEnabled()) {
    284                 logger.info("The BeanDefinition[" + serviceBeanDefinition +
    285                         "] of ServiceBean has been registered with name : " + beanName);
    286             }
    287 
    288         } else {
    289 
    290             if (logger.isWarnEnabled()) {
    291                 logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition +
    292                         "] of ServiceBean[ bean name : " + beanName +
    293                         "] was be found , Did @DubboComponentScan scan to same package in many times?");
    294             }
    295 
    296         }
    297 
    298     }
    299 
    300 
    301     /**
    302      * Find the {@link Annotation annotation} of @Service
    303      *
    304      * @param beanClass the {@link Class class} of Bean
    305      * @return <code>null</code> if not found
    306      * @since 2.7.3
    307      */
    308     private Annotation findServiceAnnotation(Class<?> beanClass) {
    309         Annotation service = findMergedAnnotation(beanClass, Service.class);
    310         if (service == null) {
    311             service = findMergedAnnotation(beanClass, com.alibaba.dubbo.config.annotation.Service.class);
    312         }
    313         return service;
    314     }
    315 
    316     /**
    317      * Generates the bean name of {@link ServiceBean}
    318      *
    319      * @param serviceAnnotationAttributes
    320      * @param interfaceClass              the class of interface annotated {@link Service}
    321      * @return ServiceBean@interfaceClassName#annotatedServiceBeanName
    322      * @since 2.7.3
    323      */
    324     private String generateServiceBeanName(AnnotationAttributes serviceAnnotationAttributes, Class<?> interfaceClass) {
    325         ServiceBeanNameBuilder builder = create(interfaceClass, environment)
    326                 .group(serviceAnnotationAttributes.getString("group"))
    327                 .version(serviceAnnotationAttributes.getString("version"));
    328         return builder.build();
    329     }
    330 
    331     private Class<?> resolveClass(BeanDefinitionHolder beanDefinitionHolder) {
    332 
    333         BeanDefinition beanDefinition = beanDefinitionHolder.getBeanDefinition();
    334 
    335         return resolveClass(beanDefinition);
    336 
    337     }
    338 
    339     private Class<?> resolveClass(BeanDefinition beanDefinition) {
    340 
    341         String beanClassName = beanDefinition.getBeanClassName();
    342 
    343         return resolveClassName(beanClassName, classLoader);
    344 
    345     }
    346 
    347     private Set<String> resolvePackagesToScan(Set<String> packagesToScan) {
    348         Set<String> resolvedPackagesToScan = new LinkedHashSet<String>(packagesToScan.size());
    349         for (String packageToScan : packagesToScan) {
    350             if (StringUtils.hasText(packageToScan)) {
    351                 String resolvedPackageToScan = environment.resolvePlaceholders(packageToScan.trim());
    352                 resolvedPackagesToScan.add(resolvedPackageToScan);
    353             }
    354         }
    355         return resolvedPackagesToScan;
    356     }
    357 
    358     /**
    359      * Build the {@link AbstractBeanDefinition Bean Definition}
    360      *
    361      * @param serviceAnnotation
    362      * @param serviceAnnotationAttributes
    363      * @param interfaceClass
    364      * @param annotatedServiceBeanName
    365      * @return
    366      * @since 2.7.3
    367      */
    368     private AbstractBeanDefinition buildServiceBeanDefinition(Annotation serviceAnnotation,
    369                                                               AnnotationAttributes serviceAnnotationAttributes,
    370                                                               Class<?> interfaceClass,
    371                                                               String annotatedServiceBeanName) {
    372 
    373         BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class);
    374 
    375         AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
    376 
    377         MutablePropertyValues propertyValues = beanDefinition.getPropertyValues();
    378 
    379         String[] ignoreAttributeNames = of("provider", "monitor", "application", "module", "registry", "protocol",
    380                 "interface", "interfaceName", "parameters");
    381 
    382         propertyValues.addPropertyValues(new AnnotationPropertyValuesAdapter(serviceAnnotation, environment, ignoreAttributeNames));
    383 
    384         // References "ref" property to annotated-@Service Bean
    385         addPropertyReference(builder, "ref", annotatedServiceBeanName);
    386         // Set interface
    387         builder.addPropertyValue("interface", interfaceClass.getName());
    388         // Convert parameters into map
    389         builder.addPropertyValue("parameters", convertParameters(serviceAnnotationAttributes.getStringArray("parameters")));
    390 
    391         /**
    392          * Add {@link org.apache.dubbo.config.ProviderConfig} Bean reference
    393          */
    394         String providerConfigBeanName = serviceAnnotationAttributes.getString("provider");
    395         if (StringUtils.hasText(providerConfigBeanName)) {
    396             addPropertyReference(builder, "provider", providerConfigBeanName);
    397         }
    398 
    399         /**
    400          * Add {@link org.apache.dubbo.config.MonitorConfig} Bean reference
    401          */
    402         String monitorConfigBeanName = serviceAnnotationAttributes.getString("monitor");
    403         if (StringUtils.hasText(monitorConfigBeanName)) {
    404             addPropertyReference(builder, "monitor", monitorConfigBeanName);
    405         }
    406 
    407         /**
    408          * Add {@link org.apache.dubbo.config.ApplicationConfig} Bean reference
    409          */
    410         String applicationConfigBeanName = serviceAnnotationAttributes.getString("application");
    411         if (StringUtils.hasText(applicationConfigBeanName)) {
    412             addPropertyReference(builder, "application", applicationConfigBeanName);
    413         }
    414 
    415         /**
    416          * Add {@link org.apache.dubbo.config.ModuleConfig} Bean reference
    417          */
    418         String moduleConfigBeanName = serviceAnnotationAttributes.getString("module");
    419         if (StringUtils.hasText(moduleConfigBeanName)) {
    420             addPropertyReference(builder, "module", moduleConfigBeanName);
    421         }
    422 
    423 
    424         /**
    425          * Add {@link org.apache.dubbo.config.RegistryConfig} Bean reference
    426          */
    427         String[] registryConfigBeanNames = serviceAnnotationAttributes.getStringArray("registry");
    428 
    429         List<RuntimeBeanReference> registryRuntimeBeanReferences = toRuntimeBeanReferences(registryConfigBeanNames);
    430 
    431         if (!registryRuntimeBeanReferences.isEmpty()) {
    432             builder.addPropertyValue("registries", registryRuntimeBeanReferences);
    433         }
    434 
    435         /**
    436          * Add {@link org.apache.dubbo.config.ProtocolConfig} Bean reference
    437          */
    438         String[] protocolConfigBeanNames = serviceAnnotationAttributes.getStringArray("protocol");
    439 
    440         List<RuntimeBeanReference> protocolRuntimeBeanReferences = toRuntimeBeanReferences(protocolConfigBeanNames);
    441 
    442         if (!protocolRuntimeBeanReferences.isEmpty()) {
    443             builder.addPropertyValue("protocols", protocolRuntimeBeanReferences);
    444         }
    445 
    446         return builder.getBeanDefinition();
    447 
    448     }
    449 
    450 
    451     private ManagedList<RuntimeBeanReference> toRuntimeBeanReferences(String... beanNames) {
    452 
    453         ManagedList<RuntimeBeanReference> runtimeBeanReferences = new ManagedList<>();
    454 
    455         if (!ObjectUtils.isEmpty(beanNames)) {
    456 
    457             for (String beanName : beanNames) {
    458 
    459                 String resolvedBeanName = environment.resolvePlaceholders(beanName);
    460 
    461                 runtimeBeanReferences.add(new RuntimeBeanReference(resolvedBeanName));
    462             }
    463 
    464         }
    465 
    466         return runtimeBeanReferences;
    467 
    468     }
    469 
    470     private void addPropertyReference(BeanDefinitionBuilder builder, String propertyName, String beanName) {
    471         String resolvedBeanName = environment.resolvePlaceholders(beanName);
    472         builder.addPropertyReference(propertyName, resolvedBeanName);
    473     }
    474 
    475 
    476     private Map<String, String> convertParameters(String[] parameters) {
    477         if (ArrayUtils.isEmpty(parameters)) {
    478             return null;
    479         }
    480 
    481         if (parameters.length % 2 != 0) {
    482             throw new IllegalArgumentException("parameter attribute must be paired with key followed by value");
    483         }
    484 
    485         Map<String, String> map = new HashMap<>();
    486         for (int i = 0; i < parameters.length; i += 2) {
    487             map.put(parameters[i], parameters[i + 1]);
    488         }
    489         return map;
    490     }
    491 
    492     @Override
    493     public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    494 
    495     }
    496 
    497     @Override
    498     public void setEnvironment(Environment environment) {
    499         this.environment = environment;
    500     }
    501 
    502     @Override
    503     public void setResourceLoader(ResourceLoader resourceLoader) {
    504         this.resourceLoader = resourceLoader;
    505     }
    506 
    507     @Override
    508     public void setBeanClassLoader(ClassLoader classLoader) {
    509         this.classLoader = classLoader;
    510     }
    511 
    512 }
    View Code

    (2) org.apache.dubbo.config.spring.beans.factory.annotation.ServiceAnnotationBeanPostProcessor#postProcessBeanDefinitionRegistry 是程序入口,如果配置了dubbo 扫描的包,会扫描包进行注册到Spring 中, 调用方法 org.apache.dubbo.config.spring.beans.factory.annotation.ServiceAnnotationBeanPostProcessor#registerServiceBeans , 所以该方法是核心

    1》registerServiceBeans方法内部满足Spring 的设计思路,先根据注解拿到classpath 中带指定注解的类, 如下拿的是apache包下的Service.class 和 alibaba 包下的Service 注解

            scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class));
    
            scanner.addIncludeFilter(new AnnotationTypeFilter(com.alibaba.dubbo.config.annotation.Service.class));

    2》调用scanner.scan(packageToScan); 先注册Service 注解声明的对象到Spring 容器

    3》调用方法 org.apache.dubbo.config.spring.beans.factory.annotation.ServiceAnnotationBeanPostProcessor#findServiceBeanDefinitionHolders 生成BeanDefinition (所有带@Service 注解的类)

    该方法内部先生成beanName, 然后创建BeanDefinitionHolder

    4》registerServiceBean 方法为每个Service 类注册一个ServiceBean 对象。

    这个方法的逻辑是:获取Service上的属性-》 调用buildServiceBeanDefinition 生成一个抽象BeanDefinition 对象,包含一些基本的属性 -》 生成beanName -》 注册到Spring 中

      到这里完成了Spring beandefinition 的注入过程,接下来SpringIOC进入创建对象以及后续的启动流程。 这里注意 buildServiceBeanDefinition 有个重要的过程就是通过下面方法设置属性:addPropertyReference(builder, "ref", annotatedServiceBeanName);, 这其实就是将ref 属性设为前面解析的service的beanName。(通过org.springframework.beans.factory.support.BeanDefinitionBuilder#addPropertyReference 方式添加的属性,spring 会在创建对象的过程中属性注入过程populateBean 方法中自动注入)

    4. ServiceBean 启动过程

    (1) ServiceBean 实现了ApplicationListener<ContextRefreshedEvent> 接口,在Spring创建完单例对象之后会进行事件发布

      1 /*
      2  * Licensed to the Apache Software Foundation (ASF) under one or more
      3  * contributor license agreements.  See the NOTICE file distributed with
      4  * this work for additional information regarding copyright ownership.
      5  * The ASF licenses this file to You under the Apache License, Version 2.0
      6  * (the "License"); you may not use this file except in compliance with
      7  * the License.  You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 package org.apache.dubbo.config.spring;
     18 
     19 import org.apache.dubbo.common.utils.CollectionUtils;
     20 import org.apache.dubbo.common.utils.StringUtils;
     21 import org.apache.dubbo.config.ApplicationConfig;
     22 import org.apache.dubbo.config.ConfigCenterConfig;
     23 import org.apache.dubbo.config.MetadataReportConfig;
     24 import org.apache.dubbo.config.MetricsConfig;
     25 import org.apache.dubbo.config.ModuleConfig;
     26 import org.apache.dubbo.config.MonitorConfig;
     27 import org.apache.dubbo.config.ProtocolConfig;
     28 import org.apache.dubbo.config.ProviderConfig;
     29 import org.apache.dubbo.config.RegistryConfig;
     30 import org.apache.dubbo.config.ServiceConfig;
     31 import org.apache.dubbo.config.annotation.Service;
     32 import org.apache.dubbo.config.spring.context.event.ServiceBeanExportedEvent;
     33 import org.apache.dubbo.config.spring.extension.SpringExtensionFactory;
     34 
     35 import org.springframework.aop.support.AopUtils;
     36 import org.springframework.beans.factory.BeanFactoryUtils;
     37 import org.springframework.beans.factory.BeanNameAware;
     38 import org.springframework.beans.factory.DisposableBean;
     39 import org.springframework.beans.factory.InitializingBean;
     40 import org.springframework.context.ApplicationContext;
     41 import org.springframework.context.ApplicationContextAware;
     42 import org.springframework.context.ApplicationEventPublisher;
     43 import org.springframework.context.ApplicationEventPublisherAware;
     44 import org.springframework.context.ApplicationListener;
     45 import org.springframework.context.event.ContextRefreshedEvent;
     46 
     47 import java.util.ArrayList;
     48 import java.util.Arrays;
     49 import java.util.List;
     50 import java.util.Map;
     51 
     52 import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
     53 import static org.apache.dubbo.config.spring.util.BeanFactoryUtils.addApplicationListener;
     54 
     55 /**
     56  * ServiceFactoryBean
     57  *
     58  * @export
     59  */
     60 public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
     61         ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
     62         ApplicationEventPublisherAware {
     63 
     64 
     65     private static final long serialVersionUID = 213195494150089726L;
     66 
     67     private final transient Service service;
     68 
     69     private transient ApplicationContext applicationContext;
     70 
     71     private transient String beanName;
     72 
     73     private transient boolean supportedApplicationListener;
     74 
     75     private ApplicationEventPublisher applicationEventPublisher;
     76 
     77     public ServiceBean() {
     78         super();
     79         this.service = null;
     80     }
     81 
     82     public ServiceBean(Service service) {
     83         super(service);
     84         this.service = service;
     85     }
     86 
     87     @Override
     88     public void setApplicationContext(ApplicationContext applicationContext) {
     89         this.applicationContext = applicationContext;
     90         SpringExtensionFactory.addApplicationContext(applicationContext);
     91         supportedApplicationListener = addApplicationListener(applicationContext, this);
     92     }
     93 
     94     @Override
     95     public void setBeanName(String name) {
     96         this.beanName = name;
     97     }
     98 
     99     /**
    100      * Gets associated {@link Service}
    101      *
    102      * @return associated {@link Service}
    103      */
    104     public Service getService() {
    105         return service;
    106     }
    107 
    108     @Override
    109     public void onApplicationEvent(ContextRefreshedEvent event) {
    110         if (!isExported() && !isUnexported()) {
    111             if (logger.isInfoEnabled()) {
    112                 logger.info("The service ready on spring started. service: " + getInterface());
    113             }
    114             export();
    115         }
    116     }
    117 
    118     @Override
    119     @SuppressWarnings({"unchecked", "deprecation"})
    120     public void afterPropertiesSet() throws Exception {
    121         if (getProvider() == null) {
    122             Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
    123             if (providerConfigMap != null && providerConfigMap.size() > 0) {
    124                 Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
    125                 if (CollectionUtils.isEmptyMap(protocolConfigMap)
    126                         && providerConfigMap.size() > 1) { // backward compatibility
    127                     List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
    128                     for (ProviderConfig config : providerConfigMap.values()) {
    129                         if (config.isDefault() != null && config.isDefault()) {
    130                             providerConfigs.add(config);
    131                         }
    132                     }
    133                     if (!providerConfigs.isEmpty()) {
    134                         setProviders(providerConfigs);
    135                     }
    136                 } else {
    137                     ProviderConfig providerConfig = null;
    138                     for (ProviderConfig config : providerConfigMap.values()) {
    139                         if (config.isDefault() == null || config.isDefault()) {
    140                             if (providerConfig != null) {
    141                                 throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);
    142                             }
    143                             providerConfig = config;
    144                         }
    145                     }
    146                     if (providerConfig != null) {
    147                         setProvider(providerConfig);
    148                     }
    149                 }
    150             }
    151         }
    152         if (getApplication() == null
    153                 && (getProvider() == null || getProvider().getApplication() == null)) {
    154             Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
    155             if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
    156                 ApplicationConfig applicationConfig = null;
    157                 for (ApplicationConfig config : applicationConfigMap.values()) {
    158                     if (applicationConfig != null) {
    159                         throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
    160                     }
    161                     applicationConfig = config;
    162                 }
    163                 if (applicationConfig != null) {
    164                     setApplication(applicationConfig);
    165                 }
    166             }
    167         }
    168         if (getModule() == null
    169                 && (getProvider() == null || getProvider().getModule() == null)) {
    170             Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
    171             if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
    172                 ModuleConfig moduleConfig = null;
    173                 for (ModuleConfig config : moduleConfigMap.values()) {
    174                     if (config.isDefault() == null || config.isDefault()) {
    175                         if (moduleConfig != null) {
    176                             throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
    177                         }
    178                         moduleConfig = config;
    179                     }
    180                 }
    181                 if (moduleConfig != null) {
    182                     setModule(moduleConfig);
    183                 }
    184             }
    185         }
    186 
    187         if (StringUtils.isEmpty(getRegistryIds())) {
    188             if (getApplication() != null && StringUtils.isNotEmpty(getApplication().getRegistryIds())) {
    189                 setRegistryIds(getApplication().getRegistryIds());
    190             }
    191             if (getProvider() != null && StringUtils.isNotEmpty(getProvider().getRegistryIds())) {
    192                 setRegistryIds(getProvider().getRegistryIds());
    193             }
    194         }
    195 
    196         if ((CollectionUtils.isEmpty(getRegistries()))
    197                 && (getProvider() == null || CollectionUtils.isEmpty(getProvider().getRegistries()))
    198                 && (getApplication() == null || CollectionUtils.isEmpty(getApplication().getRegistries()))) {
    199             Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
    200             if (CollectionUtils.isNotEmptyMap(registryConfigMap)) {
    201                 List<RegistryConfig> registryConfigs = new ArrayList<>();
    202                 if (StringUtils.isNotEmpty(registryIds)) {
    203                     Arrays.stream(COMMA_SPLIT_PATTERN.split(registryIds)).forEach(id -> {
    204                         if (registryConfigMap.containsKey(id)) {
    205                             registryConfigs.add(registryConfigMap.get(id));
    206                         }
    207                     });
    208                 }
    209 
    210                 if (registryConfigs.isEmpty()) {
    211                     for (RegistryConfig config : registryConfigMap.values()) {
    212                         if (StringUtils.isEmpty(registryIds)) {
    213                             registryConfigs.add(config);
    214                         }
    215                     }
    216                 }
    217                 if (!registryConfigs.isEmpty()) {
    218                     super.setRegistries(registryConfigs);
    219                 }
    220             }
    221         }
    222         if (getMetadataReportConfig() == null) {
    223             Map<String, MetadataReportConfig> metadataReportConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MetadataReportConfig.class, false, false);
    224             if (metadataReportConfigMap != null && metadataReportConfigMap.size() == 1) {
    225                 super.setMetadataReportConfig(metadataReportConfigMap.values().iterator().next());
    226             } else if (metadataReportConfigMap != null && metadataReportConfigMap.size() > 1) {
    227                 throw new IllegalStateException("Multiple MetadataReport configs: " + metadataReportConfigMap);
    228             }
    229         }
    230 
    231         if (getConfigCenter() == null) {
    232             Map<String, ConfigCenterConfig> configenterMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConfigCenterConfig.class, false, false);
    233             if (configenterMap != null && configenterMap.size() == 1) {
    234                 super.setConfigCenter(configenterMap.values().iterator().next());
    235             } else if (configenterMap != null && configenterMap.size() > 1) {
    236                 throw new IllegalStateException("Multiple ConfigCenter found:" + configenterMap);
    237             }
    238         }
    239 
    240         if (getMonitor() == null
    241                 && (getProvider() == null || getProvider().getMonitor() == null)
    242                 && (getApplication() == null || getApplication().getMonitor() == null)) {
    243             Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
    244             if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
    245                 MonitorConfig monitorConfig = null;
    246                 for (MonitorConfig config : monitorConfigMap.values()) {
    247                     if (config.isDefault() == null || config.isDefault()) {
    248                         if (monitorConfig != null) {
    249                             throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
    250                         }
    251                         monitorConfig = config;
    252                     }
    253                 }
    254                 if (monitorConfig != null) {
    255                     setMonitor(monitorConfig);
    256                 }
    257             }
    258         }
    259 
    260         if (getMetrics() == null) {
    261             Map<String, MetricsConfig> metricsConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MetricsConfig.class, false, false);
    262             if (metricsConfigMap != null && metricsConfigMap.size() > 0) {
    263                 MetricsConfig metricsConfig = null;
    264                 for (MetricsConfig config : metricsConfigMap.values()) {
    265                     if (metricsConfig != null) {
    266                         throw new IllegalStateException("Duplicate metrics configs: " + metricsConfig + " and " + config);
    267                     }
    268                     metricsConfig = config;
    269                 }
    270                 if (metricsConfig != null) {
    271                     setMetrics(metricsConfig);
    272                 }
    273             }
    274         }
    275 
    276         if (StringUtils.isEmpty(getProtocolIds())) {
    277             if (getProvider() != null && StringUtils.isNotEmpty(getProvider().getProtocolIds())) {
    278                 setProtocolIds(getProvider().getProtocolIds());
    279             }
    280         }
    281 
    282         if (CollectionUtils.isEmpty(getProtocols())
    283                 && (getProvider() == null || CollectionUtils.isEmpty(getProvider().getProtocols()))) {
    284             Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
    285             if (protocolConfigMap != null && protocolConfigMap.size() > 0) {
    286                 List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
    287                 if (StringUtils.isNotEmpty(getProtocolIds())) {
    288                     Arrays.stream(COMMA_SPLIT_PATTERN.split(getProtocolIds()))
    289                             .forEach(id -> {
    290                                 if (protocolConfigMap.containsKey(id)) {
    291                                     protocolConfigs.add(protocolConfigMap.get(id));
    292                                 }
    293                             });
    294                 }
    295 
    296                 if (protocolConfigs.isEmpty()) {
    297                     for (ProtocolConfig config : protocolConfigMap.values()) {
    298                         if (StringUtils.isEmpty(protocolIds)) {
    299                             protocolConfigs.add(config);
    300                         }
    301                     }
    302                 }
    303 
    304                 if (!protocolConfigs.isEmpty()) {
    305                     super.setProtocols(protocolConfigs);
    306                 }
    307             }
    308         }
    309         if (StringUtils.isEmpty(getPath())) {
    310             if (StringUtils.isNotEmpty(beanName)
    311                     && StringUtils.isNotEmpty(getInterface())
    312                     && beanName.startsWith(getInterface())) {
    313                 setPath(beanName);
    314             }
    315         }
    316         if (!supportedApplicationListener) {
    317             export();
    318         }
    319     }
    320 
    321     /**
    322      * Get the name of {@link ServiceBean}
    323      *
    324      * @return {@link ServiceBean}'s name
    325      * @since 2.6.5
    326      */
    327     public String getBeanName() {
    328         return this.beanName;
    329     }
    330 
    331     /**
    332      * @since 2.6.5
    333      */
    334     @Override
    335     public void export() {
    336         super.export();
    337         // Publish ServiceBeanExportedEvent
    338         publishExportEvent();
    339     }
    340 
    341     /**
    342      * @since 2.6.5
    343      */
    344     private void publishExportEvent() {
    345         ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this);
    346         applicationEventPublisher.publishEvent(exportEvent);
    347     }
    348 
    349     @Override
    350     public void destroy() throws Exception {
    351         // no need to call unexport() here, see
    352         // org.apache.dubbo.config.spring.extension.SpringExtensionFactory.ShutdownHookListener
    353     }
    354 
    355     // merged from dubbox
    356     @Override
    357     protected Class getServiceClass(T ref) {
    358         if (AopUtils.isAopProxy(ref)) {
    359             return AopUtils.getTargetClass(ref);
    360         }
    361         return super.getServiceClass(ref);
    362     }
    363 
    364     /**
    365      * @param applicationEventPublisher
    366      * @since 2.6.5
    367      */
    368     @Override
    369     public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
    370         this.applicationEventPublisher = applicationEventPublisher;
    371     }
    372 }
    View Code

      事件调用过程如下:org.springframework.context.support.AbstractApplicationContext#finishRefresh 方法内部的publishEvent(new ContextRefreshedEvent(this)); -》org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent    这里的核心逻辑其实也是获取到容器中所有的ApplicationListener 然调用其onApplicationEvent 方法。调用链如下:

    (2) org.apache.dubbo.config.spring.ServiceBean#export 方法进行发布服务到注册中心

    (3) 调用到org.apache.dubbo.config.ServiceConfig#export

      1     private void doExportUrls() {
      2         List<URL> registryURLs = loadRegistries(true);
      3         for (ProtocolConfig protocolConfig : protocols) {
      4             String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
      5             ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
      6             ApplicationModel.initProviderModel(pathKey, providerModel);
      7             doExportUrlsFor1Protocol(protocolConfig, registryURLs);
      8         }
      9     }
     10 
     11     private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
     12         String name = protocolConfig.getName();
     13         if (StringUtils.isEmpty(name)) {
     14             name = DUBBO;
     15         }
     16 
     17         Map<String, String> map = new HashMap<String, String>();
     18         map.put(SIDE_KEY, PROVIDER_SIDE);
     19 
     20         appendRuntimeParameters(map);
     21         appendParameters(map, metrics);
     22         appendParameters(map, application);
     23         appendParameters(map, module);
     24         // remove 'default.' prefix for configs from ProviderConfig
     25         // appendParameters(map, provider, Constants.DEFAULT_KEY);
     26         appendParameters(map, provider);
     27         appendParameters(map, protocolConfig);
     28         appendParameters(map, this);
     29         if (CollectionUtils.isNotEmpty(methods)) {
     30             for (MethodConfig method : methods) {
     31                 appendParameters(map, method, method.getName());
     32                 String retryKey = method.getName() + ".retry";
     33                 if (map.containsKey(retryKey)) {
     34                     String retryValue = map.remove(retryKey);
     35                     if ("false".equals(retryValue)) {
     36                         map.put(method.getName() + ".retries", "0");
     37                     }
     38                 }
     39                 List<ArgumentConfig> arguments = method.getArguments();
     40                 if (CollectionUtils.isNotEmpty(arguments)) {
     41                     for (ArgumentConfig argument : arguments) {
     42                         // convert argument type
     43                         if (argument.getType() != null && argument.getType().length() > 0) {
     44                             Method[] methods = interfaceClass.getMethods();
     45                             // visit all methods
     46                             if (methods != null && methods.length > 0) {
     47                                 for (int i = 0; i < methods.length; i++) {
     48                                     String methodName = methods[i].getName();
     49                                     // target the method, and get its signature
     50                                     if (methodName.equals(method.getName())) {
     51                                         Class<?>[] argtypes = methods[i].getParameterTypes();
     52                                         // one callback in the method
     53                                         if (argument.getIndex() != -1) {
     54                                             if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
     55                                                 appendParameters(map, argument, method.getName() + "." + argument.getIndex());
     56                                             } else {
     57                                                 throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
     58                                             }
     59                                         } else {
     60                                             // multiple callbacks in the method
     61                                             for (int j = 0; j < argtypes.length; j++) {
     62                                                 Class<?> argclazz = argtypes[j];
     63                                                 if (argclazz.getName().equals(argument.getType())) {
     64                                                     appendParameters(map, argument, method.getName() + "." + j);
     65                                                     if (argument.getIndex() != -1 && argument.getIndex() != j) {
     66                                                         throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
     67                                                     }
     68                                                 }
     69                                             }
     70                                         }
     71                                     }
     72                                 }
     73                             }
     74                         } else if (argument.getIndex() != -1) {
     75                             appendParameters(map, argument, method.getName() + "." + argument.getIndex());
     76                         } else {
     77                             throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
     78                         }
     79 
     80                     }
     81                 }
     82             } // end of methods for
     83         }
     84 
     85         if (ProtocolUtils.isGeneric(generic)) {
     86             map.put(GENERIC_KEY, generic);
     87             map.put(METHODS_KEY, ANY_VALUE);
     88         } else {
     89             String revision = Version.getVersion(interfaceClass, version);
     90             if (revision != null && revision.length() > 0) {
     91                 map.put(REVISION_KEY, revision);
     92             }
     93 
     94             String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
     95             if (methods.length == 0) {
     96                 logger.warn("No method found in service interface " + interfaceClass.getName());
     97                 map.put(METHODS_KEY, ANY_VALUE);
     98             } else {
     99                 map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
    100             }
    101         }
    102         if (!ConfigUtils.isEmpty(token)) {
    103             if (ConfigUtils.isDefault(token)) {
    104                 map.put(TOKEN_KEY, UUID.randomUUID().toString());
    105             } else {
    106                 map.put(TOKEN_KEY, token);
    107             }
    108         }
    109         // export service
    110         String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    111         Integer port = this.findConfigedPorts(protocolConfig, name, map);
    112         URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
    113 
    114         if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
    115                 .hasExtension(url.getProtocol())) {
    116             url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
    117                     .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    118         }
    119 
    120         String scope = url.getParameter(SCOPE_KEY);
    121         // don't export when none is configured
    122         if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
    123 
    124             // export to local if the config is not remote (export to remote only when config is remote)
    125             if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
    126                 exportLocal(url);
    127             }
    128             // export to remote if the config is not local (export to local only when config is local)
    129             if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
    130                 if (!isOnlyInJvm() && logger.isInfoEnabled()) {
    131                     logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
    132                 }
    133                 if (CollectionUtils.isNotEmpty(registryURLs)) {
    134                     for (URL registryURL : registryURLs) {
    135                         //if protocol is only injvm ,not register
    136                         if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
    137                             continue;
    138                         }
    139                         url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
    140                         URL monitorUrl = loadMonitor(registryURL);
    141                         if (monitorUrl != null) {
    142                             url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
    143                         }
    144                         if (logger.isInfoEnabled()) {
    145                             logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
    146                         }
    147 
    148                         // For providers, this is used to enable custom proxy to generate invoker
    149                         String proxy = url.getParameter(PROXY_KEY);
    150                         if (StringUtils.isNotEmpty(proxy)) {
    151                             registryURL = registryURL.addParameter(PROXY_KEY, proxy);
    152                         }
    153 
    154                         Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
    155                         DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    156 
    157                         Exporter<?> exporter = protocol.export(wrapperInvoker);
    158                         exporters.add(exporter);
    159                     }
    160                 } else {
    161                     Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
    162                     DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    163 
    164                     Exporter<?> exporter = protocol.export(wrapperInvoker);
    165                     exporters.add(exporter);
    166                 }
    167                 /**
    168                  * @since 2.7.0
    169                  * ServiceData Store
    170                  */
    171                 MetadataReportService metadataReportService = null;
    172                 if ((metadataReportService = getMetadataReportService()) != null) {
    173                     metadataReportService.publishProvider(url);
    174                 }
    175             }
    176         }
    177         this.urls.add(url);
    178     }

     所以核心注册服务的过程在doExportUrlsFor1Protocol里面进行发布:可以看到这个过程中会多次出现URL,registery 协议以及dubbo 协议,这是dubbo 根据URL选择对应的策略进行操作的设计,可以说URL用于传参以及选择对应的实现类

    1》 方法调用的时候获取到的registryURLs 如下:

     2》12-109 行是解析参数往map 里面塞,然后构造URL对象

    3》110 - 112 行解析主机和端口,然后构造发布的url 对象用于发布,构造的URL对象如下:可以看到走得是 dubbo 协议

     协议如下:

    dubbo://192.168.99.1:20880/cn.qz.dubbo.service.GroupService?anyhost=true&application=dubbp-service-impl&bean.name=ServiceBean:cn.qz.dubbo.service.GroupService:1.0.0&bind.ip=192.168.99.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=cn.qz.dubbo.service.GroupService&methods=add,listAll&pid=14292&qos.enable=false&register=true&release=2.7.3&revision=1.0.0&side=provider&timestamp=1629273507675&version=1.0.0

    4》120行获取的scope 属性为null, 所以会走122行的代码块, 然后进行服务的发布,服务的发布有两个,一个是发布到JVM内存,一个是发布到注册中心

    • exportLocal(url);  暴露服务到本地
        /**
         * always export injvm
         */
        private void exportLocal(URL url) {
            URL local = URLBuilder.from(url)
                    .setProtocol(LOCAL_PROTOCOL)
                    .setHost(LOCALHOST_VALUE)
                    .setPort(0)
                    .build();
            Exporter<?> exporter = protocol.export(
                    PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
        }

    首先修改协议为injvm 协议:

     然后调用org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol#export:

        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
        }

    然后调用org.apache.dubbo.rpc.protocol.injvm.InjvmExporter#InjvmExporter:

        InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
            super(invoker);
            this.key = key;
            this.exporterMap = exporterMap;
            exporterMap.put(key, this);
        }

      获取到的invoker 信息如下:

       invoker对象如下:

        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    
            if (!filters.isEmpty()) {
                for (int i = filters.size() - 1; i >= 0; i--) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    last = new Invoker<T>() {
    
                        @Override
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        @Override
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
    
                        @Override
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
    
                        @Override
                        public Result invoke(Invocation invocation) throws RpcException {
                            Result asyncResult;
                            try {
                                asyncResult = filter.invoke(next, invocation);
                            } catch (Exception e) {
                                // onError callback
                                if (filter instanceof ListenableFilter) {
                                    Filter.Listener listener = ((ListenableFilter) filter).listener();
                                    if (listener != null) {
                                        listener.onError(e, invoker, invocation);
                                    }
                                }
                                throw e;
                            }
                            return asyncResult;
                        }
    
                        @Override
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
    
            return new CallbackRegistrationInvoker<>(last, filters);
        }
    • 远程暴露服务

    下面代码进行远程暴露

                            Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                            Exporter<?> exporter = protocol.export(wrapperInvoker);
                            exporters.add(exporter);

      在进行远程暴露前进行了一系列的判断,如果只暴露本地服务就直接退出暴露过程;如果有监控就添加监控信息;然后走上面贴出的暴露代码

    构造的invoker 对象如下:

      url信息如下:

    registry://192.168.99.100:2181/org.apache.dubbo.registry.RegistryService?application=dubbp-service-impl&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.99.1%3A20880%2Fcn.qz.dubbo.service.GroupService%3Fanyhost%3Dtrue%26application%3Ddubbp-service-impl%26bean.name%3DServiceBean%3Acn.qz.dubbo.service.GroupService%3A1.0.0%26bind.ip%3D192.168.99.1%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dcn.qz.dubbo.service.GroupService%26methods%3Dadd%2ClistAll%26pid%3D27024%26qos.enable%3Dfalse%26register%3Dtrue%26release%3D2.7.3%26revision%3D1.0.0%26side%3Dprovider%26timestamp%3D1629276323291%26version%3D1.0.0&pid=27024&qos.enable=false&registry=zookeeper&release=2.7.3&timestamp=1629276323288

     接下来进行注册,可以看到协议使用的是registery,因此走:org.apache.dubbo.registry.integration.RegistryProtocol#export

        @Override
        public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            URL registryUrl = getRegistryUrl(originInvoker);
            // url to export locally
            URL providerUrl = getProviderUrl(originInvoker);
    
            // Subscribe the override data
            // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
            //  the same service. Because the subscribed is cached key with the name of the service, it causes the
            //  subscription information to cover.
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
            providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
            //export invoker
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    
            // url to registry
            final Registry registry = getRegistry(originInvoker);
            final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
            ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                    registryUrl, registeredProviderUrl);
            //to judge if we need to delay publish
            boolean register = registeredProviderUrl.getParameter("register", true);
            if (register) {
                register(registryUrl, registeredProviderUrl);
                providerInvokerWrapper.setReg(true);
            }
    
            // Deprecated! Subscribe to override rules in 2.6.x or before.
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    
            exporter.setRegisterUrl(registeredProviderUrl);
            exporter.setSubscribeUrl(overrideSubscribeUrl);
            //Ensure that a new exporter instance is returned every time export
            return new DestroyableExporter<>(exporter);
        }

    然后调用org.apache.dubbo.registry.integration.RegistryProtocol#register 进行注册:

        public void register(URL registryUrl, URL registeredProviderUrl) {
            Registry registry = registryFactory.getRegistry(registryUrl);
            registry.register(registeredProviderUrl);
        }

      这里传递的registryUrl 和 registeredProviderUrl 如下

    zookeeper://192.168.99.100:2181/org.apache.dubbo.registry.RegistryService?application=dubbp-service-impl&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.99.1%3A20880%2Fcn.qz.dubbo.service.GroupService%3Fanyhost%3Dtrue%26application%3Ddubbp-service-impl%26bean.name%3DServiceBean%3Acn.qz.dubbo.service.GroupService%3A1.0.0%26bind.ip%3D192.168.99.1%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dcn.qz.dubbo.service.GroupService%26methods%3Dadd%2ClistAll%26pid%3D27024%26qos.enable%3Dfalse%26register%3Dtrue%26release%3D2.7.3%26revision%3D1.0.0%26side%3Dprovider%26timestamp%3D1629276323291%26version%3D1.0.0&pid=27024&qos.enable=false&release=2.7.3&timestamp=1629276323288
    
    dubbo://192.168.99.1:20880/cn.qz.dubbo.service.GroupService?anyhost=true&application=dubbp-service-impl&bean.name=ServiceBean:cn.qz.dubbo.service.GroupService:1.0.0&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=cn.qz.dubbo.service.GroupService&methods=add,listAll&pid=27024&register=true&release=2.7.3&revision=1.0.0&side=provider&timestamp=1629276323291&version=1.0.0

    最后会调用到:org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister

        @Override
        public void doRegister(URL url) {
            try {
                zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }

      可以看到就是调用zkClient 创建一个临时节点,节点的路径是一开始贴出来的。

    补充: 服务发布过程中开启NettyServer

    上面的调用过程中 org.apache.dubbo.registry.integration.RegistryProtocol#export 代码块中调用org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport, 然后经过一系列调用调用到:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export

        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
    
            // export service.
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
    
            //export an stub service for dispatching event
            Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
            Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
            if (isStubSupportEvent && !isCallbackservice) {
                String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
                if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                                "], has set stubproxy support event ,but no stub methods founded."));
                    }
    
                } else {
                    stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
                }
            }
    
            openServer(url);
            optimizeSerialization(url);
    
            return exporter;
        }
    
        private void openServer(URL url) {
            // find server.
            String key = url.getAddress();
            //client can export a service which's only for server to invoke
            boolean isServer = url.getParameter(IS_SERVER_KEY, true);
            if (isServer) {
                ExchangeServer server = serverMap.get(key);
                if (server == null) {
                    synchronized (this) {
                        server = serverMap.get(key);
                        if (server == null) {
                            serverMap.put(key, createServer(url));
                        }
                    }
                } else {
                    // server supports reset, use together with override
                    server.reset(url);
                }
            }
        }
    
        private ExchangeServer createServer(URL url) {
            url = URLBuilder.from(url)
                    // send readonly event when server closes, it's enabled by default
                    .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                    // enable heartbeat by default
                    .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                    .addParameter(CODEC_KEY, DubboCodec.NAME)
                    .build();
            String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
    
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
                throw new RpcException("Unsupported server type: " + str + ", url: " + url);
            }
    
            ExchangeServer server;
            try {
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
    
            str = url.getParameter(CLIENT_KEY);
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }
    
            return server;
        }

    (1) 可以看到openServer 用双重检查判断 相关的Server 是否已经开启,也就是从缓存map 中拿,如果没拿到就创建一个Server 开启后存入Map, 拿到的key 如下:(可以看到是配置文件配置的dubbo 协议的端口,也就是最后服务监听的端口)

    createServer 代码中String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);  拿到的默认是netty, 也就是默认使用netty 建立server

    接下来调用到:org.apache.dubbo.remoting.transport.netty4.NettyServer#NettyServer

        public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
            // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
            // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }

    org.apache.dubbo.remoting.transport.AbstractServer#AbstractServer:

        public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
            localAddress = getUrl().toInetSocketAddress();
    
            String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
            int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
            if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
                bindIp = ANYHOST_VALUE;
            }
            bindAddress = new InetSocketAddress(bindIp, bindPort);
            this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
            this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
            try {
                doOpen();
                if (logger.isInfoEnabled()) {
                    logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
                }
            } catch (Throwable t) {
                throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
            }
            //fixme replace this with better method
            DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
            executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
        }

    会调用org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen 开启nettyServer (这就是Netty 做Server 的标准流程,建立Worker、BossGroup, 然后bind 端口)

        protected void doOpen() throws Throwable {
            bootstrap = new ServerBootstrap();
    
            bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
            workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                    new DefaultThreadFactory("NettyServerWorker", true));
    
            final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
            channels = nettyServerHandler.getChannels();
    
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            // FIXME: should we use getTimeout()?
                            int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                    .addLast("decoder", adapter.getDecoder())
                                    .addLast("encoder", adapter.getEncoder())
                                    .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                    .addLast("handler", nettyServerHandler);
                        }
                    });
            // bind
            ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
            channelFuture.syncUninterruptibly();
            channel = channelFuture.channel();
    
        }

    1》getUrl 返回的信息如下:

    2》getBindAddress() 返回的端口和地址如下

     3》可以看到添加的handler 主要有编码解码handler、心跳定时、和核心的NettyServerHandler 处理器。

    org.apache.dubbo.remoting.transport.netty4.NettyServerHandler 源码如下:

      1 /*
      2  * Licensed to the Apache Software Foundation (ASF) under one or more
      3  * contributor license agreements.  See the NOTICE file distributed with
      4  * this work for additional information regarding copyright ownership.
      5  * The ASF licenses this file to You under the Apache License, Version 2.0
      6  * (the "License"); you may not use this file except in compliance with
      7  * the License.  You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 package org.apache.dubbo.remoting.transport.netty4;
     18 
     19 import org.apache.dubbo.common.URL;
     20 import org.apache.dubbo.common.logger.Logger;
     21 import org.apache.dubbo.common.logger.LoggerFactory;
     22 import org.apache.dubbo.common.utils.NetUtils;
     23 import org.apache.dubbo.remoting.Channel;
     24 import org.apache.dubbo.remoting.ChannelHandler;
     25 
     26 import io.netty.channel.ChannelDuplexHandler;
     27 import io.netty.channel.ChannelHandlerContext;
     28 import io.netty.channel.ChannelPromise;
     29 import io.netty.handler.timeout.IdleStateEvent;
     30 
     31 import java.net.InetSocketAddress;
     32 import java.util.Map;
     33 import java.util.concurrent.ConcurrentHashMap;
     34 
     35 /**
     36  * NettyServerHandler.
     37  */
     38 @io.netty.channel.ChannelHandler.Sharable
     39 public class NettyServerHandler extends ChannelDuplexHandler {
     40     private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
     41     /**
     42      * the cache for alive worker channel.
     43      * <ip:port, dubbo channel>
     44      */
     45     private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>();
     46 
     47     private final URL url;
     48 
     49     private final ChannelHandler handler;
     50 
     51     public NettyServerHandler(URL url, ChannelHandler handler) {
     52         if (url == null) {
     53             throw new IllegalArgumentException("url == null");
     54         }
     55         if (handler == null) {
     56             throw new IllegalArgumentException("handler == null");
     57         }
     58         this.url = url;
     59         this.handler = handler;
     60     }
     61 
     62     public Map<String, Channel> getChannels() {
     63         return channels;
     64     }
     65 
     66     @Override
     67     public void channelActive(ChannelHandlerContext ctx) throws Exception {
     68         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
     69         try {
     70             if (channel != null) {
     71                 channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
     72             }
     73             handler.connected(channel);
     74         } finally {
     75             NettyChannel.removeChannelIfDisconnected(ctx.channel());
     76         }
     77     }
     78 
     79     @Override
     80     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
     81         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
     82         try {
     83             channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()));
     84             handler.disconnected(channel);
     85         } finally {
     86             NettyChannel.removeChannelIfDisconnected(ctx.channel());
     87         }
     88     }
     89 
     90     @Override
     91     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
     92         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
     93         try {
     94             handler.received(channel, msg);
     95         } finally {
     96             NettyChannel.removeChannelIfDisconnected(ctx.channel());
     97         }
     98     }
     99 
    100 
    101     @Override
    102     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    103         super.write(ctx, msg, promise);
    104         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    105         try {
    106             handler.sent(channel, msg);
    107         } finally {
    108             NettyChannel.removeChannelIfDisconnected(ctx.channel());
    109         }
    110     }
    111 
    112     @Override
    113     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    114         // server will close channel when server don't receive any heartbeat from client util timeout.
    115         if (evt instanceof IdleStateEvent) {
    116             NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    117             try {
    118                 logger.info("IdleStateEvent triggered, close channel " + channel);
    119                 channel.close();
    120             } finally {
    121                 NettyChannel.removeChannelIfDisconnected(ctx.channel());
    122             }
    123         }
    124         super.userEventTriggered(ctx, evt);
    125     }
    126 
    127     @Override
    128     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    129             throws Exception {
    130         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    131         try {
    132             handler.caught(channel, cause);
    133         } finally {
    134             NettyChannel.removeChannelIfDisconnected(ctx.channel());
    135         }
    136     }
    137 }
    View Code

     可以看到核心的处理方法应该在org.apache.dubbo.remoting.transport.netty4.NettyServerHandler#channelRead;

       心跳检测超时的处理是在org.apache.dubbo.remoting.transport.netty4.NettyServerHandler#userEventTriggered 方法中。

    (2) 如果已经开启NettyServer,也就是缓存map 根据key 能拿到服务的信息,就调用下面代码:

    server.reset(url);

      这个代码没做什么实质性的操作。

      也就是注册多个服务,只会开启一个NettyServer, 监听dubbo 协议指定的端口。

     总结:

    1. 服务注册过程可以大致简单总结为如下:

    (1) 引入dubbo 自动配置,会注册一个ServiceAnnotationBeanPostProcessor 对象到Spring容器, 该类实现了接口BeanDefinitionRegistryPostProcessor, 是一个对象工厂后置处理器

    (2) SpringIOC 启动过程中会调用ServiceAnnotationBeanPostProcessor#postProcessBeanDefinitionRegistry, 该类主要的逻辑如下:

    1》 扫描所有带@Service(dubbo 包下) 注解的类,注册到Spring 中,然后每个类都生成一个 ServiceBean 对象,同时也注册到Spring 中。测试可以用下面代码进行测试:

    @RestController
    public class IndexController {
    
        @Autowired
        private ApplicationContext applicationContext;
    
        @GetMapping("/index")
        public void index() {
            String[] beanNamesForType = applicationContext.getBeanNamesForType(ServiceBean.class);
            System.out.println(Arrays.toString(beanNamesForType));
            Arrays.stream(beanNamesForType).forEach(name -> {
                Object bean = applicationContext.getBean(name);
                System.out.println(bean);
            });
        }
    }

    2》 ServiceBean 实现的接口如下:InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAwar, 重要的接口是ApplicationListener, 在Spring IOC 对象创建完成之后的finishRefresh 会进行事件的发布,也就是会调用到ServiceBean.onApplicationEvent 方法,开始服务的暴露、启动NettyServer等操作。服务注册的过程可以从org.apache.dubbo.config.ServiceConfig#doExportUrls 开始

      可以看到处理过程中用了大量的模板模式与策略模式。根据不同的URL解析选择对应的策略处理类进行处理。netty在整个体系中的作用就是启动一个NettyServer,端口是dubbo协议指定的端口;核心的NettyHandler 用于处理接收到请求然后调用具体的ServiceImpl 进行处理,这个在下一篇进行研究。

     补充:切换注册中心为redis

    1. pom 引入如下依赖

            <!--redis 做注册中心-->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.6.2</version>
            </dependency>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.6.2</version>
            </dependency>

    2. yml 修改配置为redis 注册中心

    dubbo:
      application:
        name: dubbp-service-impl
      registry:
        protocol: redis
        address: redis://localhost:6379

    3. debug 查看代码

    (1) org.apache.dubbo.registry.integration.RegistryProtocol#register 方法获取到的registry 是 org.apache.dubbo.registry.redis.RedisRegistry, 所以暴露远程服务走的是:org.apache.dubbo.registry.redis.RedisRegistry#doRegister

        public void doRegister(URL url) {
            String key = toCategoryPath(url);
            String value = url.toFullString();
            String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
            boolean success = false;
            RpcException exception = null;
            for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
                JedisPool jedisPool = entry.getValue();
                try {
                    try (Jedis jedis = jedisPool.getResource()) {
                        jedis.hset(key, value, expire);
                        jedis.publish(key, REGISTER);
                        success = true;
                        if (!replicate) {
                            break; //  If the server side has synchronized data, just write a single machine
                        }
                    }
                } catch (Throwable t) {
                    exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
                }
            }
            if (exception != null) {
                if (success) {
                    logger.warn(exception.getMessage(), exception);
                } else {
                    throw exception;
                }
            }
        }

    (2) 其它启动NettyServer 与之前的一致

    补充:为什么要封装一个 invoker对象?

    统一暴露出一个可执行体,这样调用者简单的使用它,向它发起 invoke 调用,它内部通过反射或者其他方式进行调用,并对响应结果进行处理,查看invoker 接口如下

    public interface Invoker<T> extends Node {
    
        /**
         * get service interface.
         *
         * @return service interface.
         */
        Class<T> getInterface();
    
        /**
         * invoke.
         *
         * @param invocation
         * @return result
         * @throws RpcException
         */
        Result invoke(Invocation invocation) throws RpcException;
    
    }

    涉及到的Invocation 和 Result 对象如下:(可以看到Result 实现了异步获取任务结果的接口)

      1 package org.apache.dubbo.rpc;
      2 
      3 import java.util.Map;
      4 
      5 /**
      6  * Invocation. (API, Prototype, NonThreadSafe)
      7  *
      8  * @serial Don't change the class name and package name.
      9  * @see org.apache.dubbo.rpc.Invoker#invoke(Invocation)
     10  * @see org.apache.dubbo.rpc.RpcInvocation
     11  */
     12 public interface Invocation {
     13 
     14     /**
     15      * get method name.
     16      *
     17      * @return method name.
     18      * @serial
     19      */
     20     String getMethodName();
     21 
     22     /**
     23      * get parameter types.
     24      *
     25      * @return parameter types.
     26      * @serial
     27      */
     28     Class<?>[] getParameterTypes();
     29 
     30     /**
     31      * get arguments.
     32      *
     33      * @return arguments.
     34      * @serial
     35      */
     36     Object[] getArguments();
     37 
     38     /**
     39      * get attachments.
     40      *
     41      * @return attachments.
     42      * @serial
     43      */
     44     Map<String, String> getAttachments();
     45 
     46     void setAttachment(String key, String value);
     47 
     48     void setAttachmentIfAbsent(String key, String value);
     49 
     50     /**
     51      * get attachment by key.
     52      *
     53      * @return attachment value.
     54      * @serial
     55      */
     56     String getAttachment(String key);
     57 
     58     /**
     59      * get attachment by key with default value.
     60      *
     61      * @return attachment value.
     62      * @serial
     63      */
     64     String getAttachment(String key, String defaultValue);
     65 
     66     /**
     67      * get the invoker in current context.
     68      *
     69      * @return invoker.
     70      * @transient
     71      */
     72     Invoker<?> getInvoker();
     73 
     74 }
     75 
     76 public interface Result extends CompletionStage<Result>, Future<Result>, Serializable {
     77 
     78     /**
     79      * Get invoke result.
     80      *
     81      * @return result. if no result return null.
     82      */
     83     Object getValue();
     84 
     85     void setValue(Object value);
     86 
     87     /**
     88      * Get exception.
     89      *
     90      * @return exception. if no exception return null.
     91      */
     92     Throwable getException();
     93 
     94     void setException(Throwable t);
     95 
     96     /**
     97      * Has exception.
     98      *
     99      * @return has exception.
    100      */
    101     boolean hasException();
    102 
    103     /**
    104      * Recreate.
    105      * <p>
    106      * <code>
    107      * if (hasException()) {
    108      * throw getException();
    109      * } else {
    110      * return getValue();
    111      * }
    112      * </code>
    113      *
    114      * @return result.
    115      * @throws if has exception throw it.
    116      */
    117     Object recreate() throws Throwable;
    118 
    119     /**
    120      * get attachments.
    121      *
    122      * @return attachments.
    123      */
    124     Map<String, String> getAttachments();
    125 
    126     /**
    127      * Add the specified map to existing attachments in this instance.
    128      *
    129      * @param map
    130      */
    131     void addAttachments(Map<String, String> map);
    132 
    133     /**
    134      * Replace the existing attachments with the specified param.
    135      *
    136      * @param map
    137      */
    138     void setAttachments(Map<String, String> map);
    139 
    140     /**
    141      * get attachment by key.
    142      *
    143      * @return attachment value.
    144      */
    145     String getAttachment(String key);
    146 
    147     /**
    148      * get attachment by key with default value.
    149      *
    150      * @return attachment value.
    151      */
    152     String getAttachment(String key, String defaultValue);
    153 
    154     void setAttachment(String key, String value);
    155 
    156     /**
    157      * Returns the specified {@code valueIfAbsent} when not complete, or
    158      * returns the result value or throws an exception when complete.
    159      *
    160      * @see CompletableFuture#getNow(Object)
    161      */
    162     Result getNow(Result valueIfAbsent);
    163 
    164     /**
    165      * Add a callback which can be triggered when the RPC call finishes.
    166      * <p>
    167      * Just as the method name implies, this method will guarantee the callback being triggered under the same context as when the call was started,
    168      * see implementation in {@link Result#whenCompleteWithContext(BiConsumer)}
    169      *
    170      * @param fn
    171      * @return
    172      */
    173     Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn);
    174 
    175     default CompletableFuture<Result> completionFuture() {
    176         return toCompletableFuture();
    177     }
    178 }
    View Code

    补充:每个暴露的service都对应的有一个Invoker,用于接收客户端之后处理后续逻辑

    会调用到org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export:

        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
    
            // export service.
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
    
            //export an stub service for dispatching event
            Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
            Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
            if (isStubSupportEvent && !isCallbackservice) {
                String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
                if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                                "], has set stubproxy support event ,but no stub methods founded."));
                    }
    
                } else {
                    stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
                }
            }
    
            openServer(url);
            optimizeSerialization(url);
    
            return exporter;
        }

      这里会把exporter 缓存到 org.apache.dubbo.rpc.protocol.AbstractProtocol#exporterMap 用于后续处理客户端请求的时候调用。 key 可以理解为每个接口的唯一标识。

    参考: https://dubbo.apache.org/zh/blog/2019/10/17/dubbo-%E4%B8%AD%E7%9A%84-url-%E7%BB%9F%E4%B8%80%E6%A8%A1%E5%9E%8B/

    dubbo SPI(Service Provider Interface)相关: https://dubbo.apache.org/zh/docsv2.7/dev/source/dubbo-spi/

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    【Python基础】13_Python中的PASS
    【Python基础】12_Python中的容器类型公共方法
    【Python基础】11_Python中的字符串
    【Python基础】10_Python中的字典
    【Python基础】09_Python中的元组
    【Python基础】08_Python中的列表
    【Python基础】07_Python中的模块
    Json2Html
    数字转换成美元和人民币
    单击行变色
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/15143413.html
Copyright © 2011-2022 走看看