zoukankan      html  css  js  c++  java
  • dubbo源码-服务发现(客户端启动源码)

      之前看open远程服务调用的原理是用 FeignClient声明接口,然后用 EnableFeignClients 引入FeignClientsRegistrar,这个内部实际就是注入FeignClientFactoryBean 对象工厂,然后其内部生成的getObject 方法用JDK动态代理生成一个代理对象。代理对象的invoke 方法根据内部维护的Map<Method, MethodHandler> 用反射获取到方法之后进行调用。

      下面研究dubbo的代理对象生成过程以及调用过程。

      我们使用远程dubbo 远程服务的时候是通过注解方式注入一个bean,我们注入的是接口,所以可以猜测dubbo 是采用JDK的动态代理生成代理对象然后返回给容器,然后容器在对象生命周期的属性注入过程中注入到对应的bean 中。

    比如:

    package cn.qlq.dubbo.controller;
    
    import cn.qz.dubbo.service.UserService;
    import cn.qz.dubbo.vo.UserVO;
    import org.apache.dubbo.config.annotation.Reference;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.List;
    
    @RestController
    public class UserController {
    
        @Reference(version = "1.0.0")
        private UserService userService;
    
        @GetMapping("/listUser")
        public List<UserVO> listUser() {
            return userService.listUser();
        }
    
        @GetMapping("/addUser")
        public void addUser() {
            UserVO user = new UserVO();
            user.setUsername("addUsername");
            user.setFullname("addFullname");
            user.setAge(25);
            userService.addUser(user);
        }
    }

      下面研究其代理创建过程以及调用过程。

    1. DubboAutoConfiguration 自动配置引入下面类

        @ConditionalOnMissingBean
        @Bean(
            name = {"referenceAnnotationBeanPostProcessor"}
        )
        public ReferenceAnnotationBeanPostProcessor referenceAnnotationBeanPostProcessor() {
            return new ReferenceAnnotationBeanPostProcessor();
        }

    ReferenceAnnotationBeanPostProcessor 是一个InstantiationAwareBeanPostProcessor, 所以在对象的属性设置过程popluate 方法中会执行其postProcessProperties 方法。继承关系如下:

     其构造方法如下:

        public ReferenceAnnotationBeanPostProcessor() {
            super(Reference.class, com.alibaba.dubbo.config.annotation.Reference.class);
        }

    org.apache.dubbo.config.spring.beans.factory.annotation.AnnotationInjectedBeanPostProcessor#AnnotationInjectedBeanPostProcessor:

        private final Class<? extends Annotation>[] annotationTypes;
    
        public AnnotationInjectedBeanPostProcessor(Class<? extends Annotation>... annotationTypes) {
            Assert.notEmpty(annotationTypes, "The argument of annotations' types must not empty");
            this.annotationTypes = annotationTypes;
        }

    2. org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory#populateBean 属性注入过程中重要代码如下:

            PropertyDescriptor[] filteredPds = null;
            if (hasInstAwareBpps) {
                if (pvs == null) {
                    pvs = mbd.getPropertyValues();
                }
                for (BeanPostProcessor bp : getBeanPostProcessors()) {
                    if (bp instanceof InstantiationAwareBeanPostProcessor) {
                        InstantiationAwareBeanPostProcessor ibp = (InstantiationAwareBeanPostProcessor) bp;
                        PropertyValues pvsToUse = ibp.postProcessProperties(pvs, bw.getWrappedInstance(), beanName);
                        if (pvsToUse == null) {
                            if (filteredPds == null) {
                                filteredPds = filterPropertyDescriptorsForDependencyCheck(bw, mbd.allowCaching);
                            }
                            pvsToUse = ibp.postProcessPropertyValues(pvs, filteredPds, bw.getWrappedInstance(), beanName);
                            if (pvsToUse == null) {
                                return;
                            }
                        }
                        pvs = pvsToUse;
                    }
                }
            }

    3. 如上代码调用到org.apache.dubbo.config.spring.beans.factory.annotation.AnnotationInjectedBeanPostProcessor#postProcessPropertyValues

        @Override
        public PropertyValues postProcessPropertyValues(
                PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {
    
            InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
            try {
                metadata.inject(bean, beanName, pvs);
            } catch (BeanCreationException ex) {
                throw ex;
            } catch (Throwable ex) {
                throw new BeanCreationException(beanName, "Injection of @" + getAnnotationType().getSimpleName()
                        + " dependencies is failed", ex);
            }
            return pvs;
        }

    获取到的metadata 如下:

    4. 调用到org.apache.dubbo.config.spring.beans.factory.annotation.AnnotationInjectedBeanPostProcessor.AnnotatedFieldElement#inject

            @Override
            protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
    
                Class<?> injectedType = field.getType();
    
                Object injectedObject = getInjectedObject(attributes, bean, beanName, injectedType, this);
    
                ReflectionUtils.makeAccessible(field);
    
                field.set(bean, injectedObject);
    
            }

    可以看到这里是先获取到需要注入的对象,然后反射进行设置值,所以核心是在获取注入的对象。

    5. org.apache.dubbo.config.spring.beans.factory.annotation.AnnotationInjectedBeanPostProcessor#getInjectedObject

       protected Object getInjectedObject(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                           InjectionMetadata.InjectedElement injectedElement) throws Exception {
    
            String cacheKey = buildInjectedObjectCacheKey(attributes, bean, beanName, injectedType, injectedElement);
    
            Object injectedObject = injectedObjectsCache.get(cacheKey);
    
            if (injectedObject == null) {
                injectedObject = doGetInjectedBean(attributes, bean, beanName, injectedType, injectedElement);
                // Customized inject-object if necessary
                injectedObjectsCache.putIfAbsent(cacheKey, injectedObject);
            }
    
            return injectedObject;
    
        }

      这里是先从缓存拿,缓存拿不到就调用方法进行load(获取到之后加入缓存)。

    cacheKey 如下:

    ServiceBean:cn.qz.dubbo.service.UserService:1.0.0#source=private cn.qz.dubbo.service.UserService cn.qlq.dubbo.controller.UserController.userService#attributes={version=1.0.0}

    6. org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor#doGetInjectedBean

        protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                           InjectionMetadata.InjectedElement injectedElement) throws Exception {
    
            String referencedBeanName = buildReferencedBeanName(attributes, injectedType);
    
            ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referencedBeanName, attributes, injectedType);
    
            registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType);
    
            cacheInjectedReferenceBean(referenceBean, injectedElement);
    
            return buildProxy(referencedBeanName, referenceBean, injectedType);
        }

    1》获取referencedBeanName

    ServiceBean:cn.qz.dubbo.service.UserService:1.0.0

    2》调用buildReferenceBeanIfAbsent 获取ReferenceBean, 这个Bean 可以理解为一个配置bean, 如下:(可以看到由缓存机制,可以确保只有一个相关对象)

        private ReferenceBean buildReferenceBeanIfAbsent(String referencedBeanName, AnnotationAttributes attributes,
                                                         Class<?> referencedType)
                throws Exception {
    
            ReferenceBean<?> referenceBean = referenceBeanCache.get(referencedBeanName);
    
            if (referenceBean == null) {
                ReferenceBeanBuilder beanBuilder = ReferenceBeanBuilder
                        .create(attributes, applicationContext)
                        .interfaceClass(referencedType);
                referenceBean = beanBuilder.build();
                referenceBeanCache.put(referencedBeanName, referenceBean);
            }
    
            return referenceBean;
        }

    3》registerReferenceBean 将上面的 referenceBean  注册到容器中

        private void registerReferenceBean(String referencedBeanName, ReferenceBean referenceBean,
                                           AnnotationAttributes attributes,
                                           Class<?> interfaceClass) {
    
            ConfigurableListableBeanFactory beanFactory = getBeanFactory();
    
            String beanName = getReferenceBeanName(attributes, interfaceClass);
    
            if (beanFactory.containsBean(referencedBeanName)) { // If @Service bean is local one
                /**
                 * Get  the @Service's BeanDefinition from {@link BeanFactory}
                 * Refer to {@link ServiceAnnotationBeanPostProcessor#buildServiceBeanDefinition}
                 */
                AbstractBeanDefinition beanDefinition = (AbstractBeanDefinition) beanFactory.getBeanDefinition(referencedBeanName);
                RuntimeBeanReference runtimeBeanReference = (RuntimeBeanReference) beanDefinition.getPropertyValues().get("ref");
                // The name of bean annotated @Service
                String serviceBeanName = runtimeBeanReference.getBeanName();
                // register Alias rather than a new bean name, in order to reduce duplicated beans
                beanFactory.registerAlias(serviceBeanName, beanName);
            } else { // Remote @Service Bean
                if (!beanFactory.containsBean(beanName)) {
                    beanFactory.registerSingleton(beanName, referenceBean);
                }
            }
        }
    View Code

    4》cacheInjectedReferenceBean 缓存相关对象

        private void cacheInjectedReferenceBean(ReferenceBean referenceBean,
                                                InjectionMetadata.InjectedElement injectedElement) {
            if (injectedElement.getMember() instanceof Field) {
                injectedFieldReferenceBeanCache.put(injectedElement, referenceBean);
            } else if (injectedElement.getMember() instanceof Method) {
                injectedMethodReferenceBeanCache.put(injectedElement, referenceBean);
            }
        }
    View Code

    5》buildProxy 生成代理对象:

        private Object buildProxy(String referencedBeanName, ReferenceBean referenceBean, Class<?> injectedType) {
            InvocationHandler handler = buildInvocationHandler(referencedBeanName, referenceBean);
            return Proxy.newProxyInstance(getClassLoader(), new Class[]{injectedType}, handler);
        }

    获取相关的InvocationHandler 如下:

        private InvocationHandler buildInvocationHandler(String referencedBeanName, ReferenceBean referenceBean) {
    
            ReferenceBeanInvocationHandler handler = localReferenceBeanInvocationHandlerCache.get(referencedBeanName);
    
            if (handler == null) {
                handler = new ReferenceBeanInvocationHandler(referenceBean);
            }
    
            if (applicationContext.containsBean(referencedBeanName)) { // Is local @Service Bean or not ?
                // ReferenceBeanInvocationHandler's initialization has to wait for current local @Service Bean has been exported.
                localReferenceBeanInvocationHandlerCache.put(referencedBeanName, handler);
            } else {
                // Remote Reference Bean should initialize immediately
                handler.init();
            }
    
            return handler;
        }
    
        private static class ReferenceBeanInvocationHandler implements InvocationHandler {
    
            private final ReferenceBean referenceBean;
    
            private Object bean;
    
            private ReferenceBeanInvocationHandler(ReferenceBean referenceBean) {
                this.referenceBean = referenceBean;
            }
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Object result;
                try {
                    if (bean == null) { // If the bean is not initialized, invoke init()
                        // issue: https://github.com/apache/dubbo/issues/3429
                        init();
                    }
                    result = method.invoke(bean, args);
                } catch (InvocationTargetException e) {
                    // re-throws the actual Exception.
                    throw e.getTargetException();
                }
                return result;
            }
    
            private void init() {
                this.bean = referenceBean.get();
            }
        }

      可以看到到这里基本上位每个注解对象生成了一个代理对象。同时注册了一个ReferenceBean 到容器中。当一个服务调用的时候会先调用到org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor.ReferenceBeanInvocationHandler#invoke, 然后反射调用再次将方法交给referenceBean.get() 对象。

    7.  org.apache.dubbo.config.spring.ReferenceBean 对象实现了FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean 等接口, 所以查看其getObject 方法是生成的bean, 并且在buildInvocationHandler 生成InvocationHandler 的过程中也会调用 handler.init(); 然后调用referenceBean.get();  获取到对象。

        @Override
        public Object getObject() {
            return get();
        }
    
        public synchronized T get() {
            checkAndUpdateSubConfigs();
    
            if (destroyed) {
                throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
            }
            if (ref == null) {
                init();
            }
            return ref;
        }

    (1) 调用 org.apache.dubbo.config.ReferenceConfig#init 方法

        private void init() {
            if (initialized) {
                return;
            }
            checkStubAndLocal(interfaceClass);
            checkMock(interfaceClass);
            Map<String, String> map = new HashMap<String, String>();
    
            map.put(SIDE_KEY, CONSUMER_SIDE);
    
            appendRuntimeParameters(map);
            if (!isGeneric()) {
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put(REVISION_KEY, revision);
                }
    
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if (methods.length == 0) {
                    logger.warn("No method found in service interface " + interfaceClass.getName());
                    map.put(METHODS_KEY, ANY_VALUE);
                } else {
                    map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
                }
            }
            map.put(INTERFACE_KEY, interfaceName);
            appendParameters(map, metrics);
            appendParameters(map, application);
            appendParameters(map, module);
            // remove 'default.' prefix for configs from ConsumerConfig
            // appendParameters(map, consumer, Constants.DEFAULT_KEY);
            appendParameters(map, consumer);
            appendParameters(map, this);
            Map<String, Object> attributes = null;
            if (CollectionUtils.isNotEmpty(methods)) {
                attributes = new HashMap<String, Object>();
                for (MethodConfig methodConfig : methods) {
                    appendParameters(map, methodConfig, methodConfig.getName());
                    String retryKey = methodConfig.getName() + ".retry";
                    if (map.containsKey(retryKey)) {
                        String retryValue = map.remove(retryKey);
                        if ("false".equals(retryValue)) {
                            map.put(methodConfig.getName() + ".retries", "0");
                        }
                    }
                    attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig));
                }
            }
    
            String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
            if (StringUtils.isEmpty(hostToRegistry)) {
                hostToRegistry = NetUtils.getLocalHost();
            } else if (isInvalidLocalHost(hostToRegistry)) {
                throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
            }
            map.put(REGISTER_IP_KEY, hostToRegistry);
    
            ref = createProxy(map);
    
            String serviceKey = URL.buildKey(interfaceName, group, version);
            ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
            initialized = true;
        }

    (2) org.apache.dubbo.config.ReferenceConfig#createProxy 再次生成代理对象:

        private T createProxy(Map<String, String> map) {
            if (shouldJvmRefer(map)) {
                URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
                invoker = REF_PROTOCOL.refer(interfaceClass, url);
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
                }
            } else {
                urls.clear(); // reference retry init will add url to urls, lead to OOM
                if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                    String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            URL url = URL.valueOf(u);
                            if (StringUtils.isEmpty(url.getPath())) {
                                url = url.setPath(interfaceName);
                            }
                            if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                                urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else { // assemble URL from register center's configuration
                    // if protocols not injvm checkRegistry
                    if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
                        checkRegistry();
                        List<URL> us = loadRegistries(false);
                        if (CollectionUtils.isNotEmpty(us)) {
                            for (URL u : us) {
                                URL monitorUrl = loadMonitor(u);
                                if (monitorUrl != null) {
                                    map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                                }
                                urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                            }
                        }
                        if (urls.isEmpty()) {
                            throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config.");
                        }
                    }
                }
    
                if (urls.size() == 1) {
                    invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
                } else {
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    for (URL url : urls) {
                        invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // use last registry url
                        }
                    }
                    if (registryURL != null) { // registry url is available
                        // use RegistryAwareCluster only when register's CLUSTER is available
                        URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
                        // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                        invoker = CLUSTER.join(new StaticDirectory(u, invokers));
                    } else { // not a registry url, must be direct invoke.
                        invoker = CLUSTER.join(new StaticDirectory(invokers));
                    }
                }
            }
    
            if (shouldCheck() && !invoker.isAvailable()) {
                throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            }
            /**
             * @since 2.7.0
             * ServiceData Store
             */
            MetadataReportService metadataReportService = null;
            if ((metadataReportService = getMetadataReportService()) != null) {
                URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
                metadataReportService.publishConsumer(consumerURL);
            }
            // create service proxy
            return (T) PROXY_FACTORY.getProxy(invoker);
        }

      核心获取配置相关的都是在这个方法中。

    1》 org.apache.dubbo.config.AbstractInterfaceConfig#checkRegistry 检查注册中心

    2》loadRegistries 获取到的us 如下:

    3》  invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); 构造invoker, 构造的invoker 对象如下:

     4》PROXY_FACTORY.getProxy(invoker); 创建代理对象后返回。

    org.apache.dubbo.rpc.proxy.AbstractProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker<T>):

        @Override
        public <T> T getProxy(Invoker<T> invoker) throws RpcException {
            return getProxy(invoker, false);
        }

    继续调用到:org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getProxy

        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }

    所以看到核心的处理是交给:org.apache.dubbo.rpc.proxy.InvokerInvocationHandler

    package org.apache.dubbo.rpc.proxy;
    
    import org.apache.dubbo.common.logger.Logger;
    import org.apache.dubbo.common.logger.LoggerFactory;
    import org.apache.dubbo.rpc.Invoker;
    import org.apache.dubbo.rpc.RpcInvocation;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    
    /**
     * InvokerHandler
     */
    public class InvokerInvocationHandler implements InvocationHandler {
        private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
        private final Invoker<?> invoker;
    
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
    
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    }

      这里的Invoker 是 上面的 MockClusterInvoker。 可以看到该invoke 方法,先判断如果是toString、hashCode、equals 等方法,会调用invoker 自身的方法。如果是其他方法,调用invoker.invoke 方法。

    补充: org.apache.dubbo.config.ReferenceConfig#createProxy 补充

    生成代理对象有重要的一步就是: 为每个url 生成一个Invoker, 也就是共享的访问netty 的client。

    0》org.apache.dubbo.registry.integration.RegistryDirectory#toInvokers

        private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
            Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
            if (urls == null || urls.isEmpty()) {
                return newUrlInvokerMap;
            }
            Set<String> keys = new HashSet<>();
            String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
            for (URL providerUrl : urls) {
                // If protocol is configured at the reference side, only the matching protocol is selected
                if (queryProtocols != null && queryProtocols.length() > 0) {
                    boolean accept = false;
                    String[] acceptProtocols = queryProtocols.split(",");
                    for (String acceptProtocol : acceptProtocols) {
                        if (providerUrl.getProtocol().equals(acceptProtocol)) {
                            accept = true;
                            break;
                        }
                    }
                    if (!accept) {
                        continue;
                    }
                }
                if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                    continue;
                }
                if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                    logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                            " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                            " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                            ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                    continue;
                }
                URL url = mergeUrl(providerUrl);
    
                String key = url.toFullString(); // The parameter urls are sorted
                if (keys.contains(key)) { // Repeated url
                    continue;
                }
                keys.add(key);
                // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
                Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
                Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
                if (invoker == null) { // Not in the cache, refer again
                    try {
                        boolean enabled = true;
                        if (url.hasParameter(DISABLED_KEY)) {
                            enabled = !url.getParameter(DISABLED_KEY, false);
                        } else {
                            enabled = url.getParameter(ENABLED_KEY, true);
                        }
                        if (enabled) {
                            invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                        }
                    } catch (Throwable t) {
                        logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                    }
                    if (invoker != null) { // Put new invoker in cache
                        newUrlInvokerMap.put(key, invoker);
                    }
                } else {
                    newUrlInvokerMap.put(key, invoker);
                }
            }
            keys.clear();
            return newUrlInvokerMap;
        }

      遍历urls 集合,为每个url 生成一个Invoker。 如代码:invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); 

    1》org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#protocolBindingRefer

        public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
            optimizeSerialization(url);
    
            // create rpc invoker.
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
    
            return invoker;
        }

    2》org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getClients

        private ExchangeClient[] getClients(URL url) {
            // whether to share connection
    
            boolean useShareConnect = false;
    
            int connections = url.getParameter(CONNECTIONS_KEY, 0);
            List<ReferenceCountExchangeClient> shareClients = null;
            // if not configured, connection is shared, otherwise, one connection for one service
            if (connections == 0) {
                useShareConnect = true;
    
                /**
                 * The xml configuration should have a higher priority than properties.
                 */
                String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
                connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                        DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
                shareClients = getSharedClient(url, connections);
            }
    
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                if (useShareConnect) {
                    clients[i] = shareClients.get(i);
    
                } else {
                    clients[i] = initClient(url);
                }
            }
    
            return clients;
        }

    3》org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getSharedClient    获取共享的客户端

        private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
            String key = url.getAddress();
            List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
    
            if (checkClientCanUse(clients)) {
                batchClientRefIncr(clients);
                return clients;
            }
    
            locks.putIfAbsent(key, new Object());
            synchronized (locks.get(key)) {
                clients = referenceClientMap.get(key);
                // dubbo check
                if (checkClientCanUse(clients)) {
                    batchClientRefIncr(clients);
                    return clients;
                }
    
                // connectNum must be greater than or equal to 1
                connectNum = Math.max(connectNum, 1);
    
                // If the clients is empty, then the first initialization is
                if (CollectionUtils.isEmpty(clients)) {
                    clients = buildReferenceCountExchangeClientList(url, connectNum);
                    referenceClientMap.put(key, clients);
    
                } else {
                    for (int i = 0; i < clients.size(); i++) {
                        ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
                        // If there is a client in the list that is no longer available, create a new one to replace him.
                        if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
                            clients.set(i, buildReferenceCountExchangeClient(url));
                            continue;
                        }
    
                        referenceCountExchangeClient.incrementAndGetCount();
                    }
                }
    
                /**
                 * I understand that the purpose of the remove operation here is to avoid the expired url key
                 * always occupying this memory space.
                 */
                locks.remove(key);
    
                return clients;
            }

    4》 然后调用org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#buildReferenceCountExchangeClientList

        private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
            List<ReferenceCountExchangeClient> clients = new ArrayList<>();
    
            for (int i = 0; i < connectNum; i++) {
                clients.add(buildReferenceCountExchangeClient(url));
            }
    
            return clients;
        }
    
        private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
            ExchangeClient exchangeClient = initClient(url);
    
            return new ReferenceCountExchangeClient(exchangeClient);
        }
    
        private ExchangeClient initClient(URL url) {
    
            // client type setting.
            String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
    
            url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
            // enable heartbeat by default
            url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
    
            // BIO is not allowed since it has severe performance issue.
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
                throw new RpcException("Unsupported client type: " + str + "," +
                        " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
            }
    
            ExchangeClient client;
            try {
                // connection should be lazy
                if (url.getParameter(LAZY_CONNECT_KEY, false)) {
                    client = new LazyConnectExchangeClient(url, requestHandler);
    
                } else {
                    client = Exchangers.connect(url, requestHandler);
                }
    
            } catch (RemotingException e) {
                throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
            }
    
            return client;
        }

      url是:

    dubbo://192.168.99.1:20990/cn.qz.dubbo.service.UserService?anyhost=true&application=dubbp-service-consumer&bean.name=ServiceBean:cn.qz.dubbo.service.UserService:1.0.0&check=false&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&interface=cn.qz.dubbo.service.UserService&lazy=false&methods=addUser,listUser&pid=12376&qos.enable=false&register=true&register.ip=192.168.99.1&release=2.7.3&remote.application=dubbp-service-impl&revision=1.0.0&side=consumer&sticky=false&timeout=8000&timestamp=1629642596527&version=1.0.0

    5》 然后走到org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#connect

        public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
        }

    5.1》 然后走到org.apache.dubbo.remoting.transport.netty4.NettyTransporter#connect

        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyClient(url, listener);
        }

    5.11》接着走:

        public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
            // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
            // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
            super(url, wrapChannelHandler(url, handler));
        }

    5.12》org.apache.dubbo.remoting.transport.AbstractClient#AbstractClient

        public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
    
            needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    
            try {
                doOpen();
            } catch (Throwable t) {
                close();
                throw new RemotingException(url.toInetSocketAddress(), null,
                        "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                                + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
            }
            try {
                // connect.
                connect();
                if (logger.isInfoEnabled()) {
                    logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
                }
            } catch (RemotingException t) {
                if (url.getParameter(Constants.CHECK_KEY, true)) {
                    close();
                    throw t;
                } else {
                    logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
                }
            } catch (Throwable t) {
                close();
                throw new RemotingException(url.toInetSocketAddress(), null,
                        "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                                + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
            }
    
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
                    .getDefaultExtension().get(CONSUMER_SIDE, Integer.toString(url.getPort()));
            ExtensionLoader.getExtensionLoader(DataStore.class)
                    .getDefaultExtension().remove(CONSUMER_SIDE, Integer.toString(url.getPort()));
        }

    紧接着就是初始化 NettyClient。

    org.apache.dubbo.remoting.transport.netty4.NettyClient 源码如下,是一个重要的客户端处理器类:(用于netty 客户端的初始化工作以及)

    package org.apache.dubbo.remoting.transport.netty4;
    
    import org.apache.dubbo.common.URL;
    import org.apache.dubbo.common.Version;
    import org.apache.dubbo.common.logger.Logger;
    import org.apache.dubbo.common.logger.LoggerFactory;
    import org.apache.dubbo.common.utils.ConfigUtils;
    import org.apache.dubbo.common.utils.NetUtils;
    import org.apache.dubbo.remoting.ChannelHandler;
    import org.apache.dubbo.remoting.Constants;
    import org.apache.dubbo.remoting.RemotingException;
    import org.apache.dubbo.remoting.transport.AbstractClient;
    import org.apache.dubbo.remoting.utils.UrlUtils;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.PooledByteBufAllocator;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.proxy.Socks5ProxyHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    import io.netty.util.concurrent.DefaultThreadFactory;
    
    import static java.util.concurrent.TimeUnit.MILLISECONDS;
    
    import java.net.InetSocketAddress;
    
    /**
     * NettyClient.
     */
    public class NettyClient extends AbstractClient {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
        /**
         * netty client bootstrap
         */
        private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
    
        private static final String SOCKS_PROXY_HOST = "socksProxyHost";
    
        private static final String SOCKS_PROXY_PORT = "socksProxyPort";
    
        private static final String DEFAULT_SOCKS_PROXY_PORT = "1080";
    
        private Bootstrap bootstrap;
    
        /**
         * current channel. Each successful invocation of {@link NettyClient#doConnect()} will
         * replace this with new channel and close old channel.
         * <b>volatile, please copy reference to use.</b>
         */
        private volatile Channel channel;
    
        /**
         * The constructor of NettyClient.
         * It wil init and start netty.
         */
        public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
            // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
            // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
            super(url, wrapChannelHandler(url, handler));
        }
    
        /**
         * Init bootstrap
         *
         * @throws Throwable
         */
        @Override
        protected void doOpen() throws Throwable {
            final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
            bootstrap = new Bootstrap();
            bootstrap.group(nioEventLoopGroup)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                    .channel(NioSocketChannel.class);
    
            if (getConnectTimeout() < 3000) {
                bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
            } else {
                bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
            }
    
            bootstrap.handler(new ChannelInitializer() {
    
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                            .addLast("handler", nettyClientHandler);
                    String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
                    if(socksProxyHost != null) {
                        int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                        Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                        ch.pipeline().addFirst(socks5ProxyHandler);
                    }
                }
            });
        }
    
        @Override
        protected void doConnect() throws Throwable {
            long start = System.currentTimeMillis();
            ChannelFuture future = bootstrap.connect(getConnectAddress());
            try {
                boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
    
                if (ret && future.isSuccess()) {
                    Channel newChannel = future.channel();
                    try {
                        // Close old channel
                        // copy reference
                        Channel oldChannel = NettyClient.this.channel;
                        if (oldChannel != null) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                                }
                                oldChannel.close();
                            } finally {
                                NettyChannel.removeChannelIfDisconnected(oldChannel);
                            }
                        }
                    } finally {
                        if (NettyClient.this.isClosed()) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                                }
                                newChannel.close();
                            } finally {
                                NettyClient.this.channel = null;
                                NettyChannel.removeChannelIfDisconnected(newChannel);
                            }
                        } else {
                            NettyClient.this.channel = newChannel;
                        }
                    }
                } else if (future.cause() != null) {
                    throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                            + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
                } else {
                    throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                            + getRemoteAddress() + " client-side timeout "
                            + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
                }
            } finally {
                // just add new valid channel to NettyChannel's cache
                if (!isConnected()) {
                    //future.cancel(true);
                }
            }
        }
    
        @Override
        protected void doDisConnect() throws Throwable {
            try {
                NettyChannel.removeChannelIfDisconnected(channel);
            } catch (Throwable t) {
                logger.warn(t.getMessage());
            }
        }
    
        @Override
        protected void doClose() throws Throwable {
            // can't shutdown nioEventLoopGroup because the method will be invoked when closing one channel but not a client,
            // but when and how to close the nioEventLoopGroup ?
            // nioEventLoopGroup.shutdownGracefully();
        }
    
        @Override
        protected org.apache.dubbo.remoting.Channel getChannel() {
            Channel c = channel;
            if (c == null || !c.isActive()) {
                return null;
            }
            return NettyChannel.getOrAddChannel(c, getUrl(), this);
        }
    
        @Override
        public boolean canHandleIdle() {
            return true;
        }
    }
    View Code

    org.apache.dubbo.remoting.transport.netty4.NettyClient#doOpen 方法可以看出Netty 客户端启动的线程数以及添加的handler,重要的handler 是 org.apache.dubbo.remoting.transport.netty4.NettyClientHandler。默认用的一个全局的线程组。也就是多个客户端共用一个线程池。

    5.2》org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient#HeaderExchangeClient 做一些初始化,然后开启定时任务

        public HeaderExchangeClient(Client client, boolean startTimer) {
            Assert.notNull(client, "Client can't be null");
            this.client = client;
            this.channel = new HeaderExchangeChannel(client);
    
            if (startTimer) {
                URL url = client.getUrl();
                startReconnectTask(url);
                startHeartBeatTask(url);
            }
        }

    6》最终生成的 DubboInvoker如下:

      所以当发起一个请求实际是先到达org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor.ReferenceBeanInvocationHandler#invoke 方法,然后继续通过反射交给上面 创建的代理对象。代理对象会交给org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke 方法,如果是调用非rpc 远程调用方法,则直接调用invoker 的方法。否则交给invoker 进行rpc 远程调用,然后搜集结果后返回。

       在生成代理对象的过程中会与NettyServer 建立连接,然后所有客户端用一个共享的长连接,长连接可以确保消息的正确性是对发送消息和响应消息加了id 进行标识,这个机制在下一篇进行研究。

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    杭电2074
    关于大整数n!的问题!
    杭电2053
    大整数乘法(高精度)
    JS:获取框架内容
    JQ:hover延迟效果
    jQ+Ajax+PHP 简单实例
    js节省document.getElementById("xxx")的方法
    QQ一键登录功能的实现过程
    点击文字出现文本框
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/15159157.html
Copyright © 2011-2022 走看看