zoukankan      html  css  js  c++  java
  • Dubbo发布过程中,消费者的初始化过程

    服务启动的过程类似于Dubbo发布过程中,服务发布启动的过程,现在我们直接进入到服务调用的核心模块ReferenceBean进行分析:
    首先是afterPropertiesSet()方法,是IUserService在初始化之后,进行一个回调处理

    public abstract class AbstractBeanFactory
    		try {
    			populateBean(beanName, mbd, instanceWrapper);
                            //afterPropertiesSet()是在initializeBean方法内部进行调用的
    			exposedObject = initializeBean(beanName, exposedObject, mbd);
    		}
    

    ReferenceBean的afterPropertiesSet()方法主要进行了一下行为:设置consumerConfig、applicationConfig、moduleConfig、registryConfigs(注册中心配置)、monitorConfig,并且添加了一个是否预初始化的操作。

    public class ReferenceBean<T> extends ReferenceConfig<T> implements InitializingBean
        public void afterPropertiesSet() throws Exception {
            if (getConsumer() == null) {
                Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
                if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
                    ConsumerConfig consumerConfig = null;
                    for (ConsumerConfig config : consumerConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (consumerConfig != null) {
                                throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
                            }
                            consumerConfig = config;
                        }
                    }
                    if (consumerConfig != null) {
                        setConsumer(consumerConfig);
                    }
                }
            }
            if (getApplication() == null
                    && (getConsumer() == null || getConsumer().getApplication() == null)) {
                Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
                if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
                    ApplicationConfig applicationConfig = null;
                    for (ApplicationConfig config : applicationConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (applicationConfig != null) {
                                throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
                            }
                            applicationConfig = config;
                        }
                    }
                    if (applicationConfig != null) {
                        setApplication(applicationConfig);
                    }
                }
            }
            if (getModule() == null
                    && (getConsumer() == null || getConsumer().getModule() == null)) {
                Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
                if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
                    ModuleConfig moduleConfig = null;
                    for (ModuleConfig config : moduleConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (moduleConfig != null) {
                                throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
                            }
                            moduleConfig = config;
                        }
                    }
                    if (moduleConfig != null) {
                        setModule(moduleConfig);
                    }
                }
            }
            if ((getRegistries() == null || getRegistries().isEmpty())
                    && (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().isEmpty())
                    && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().isEmpty())) {
                Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
                if (registryConfigMap != null && registryConfigMap.size() > 0) {
                    List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
                    for (RegistryConfig config : registryConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            registryConfigs.add(config);
                        }
                    }
                    if (registryConfigs != null && !registryConfigs.isEmpty()) {
                        super.setRegistries(registryConfigs);
                    }
                }
            }
            if (getMonitor() == null
                    && (getConsumer() == null || getConsumer().getMonitor() == null)
                    && (getApplication() == null || getApplication().getMonitor() == null)) {
                Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
                if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
                    MonitorConfig monitorConfig = null;
                    for (MonitorConfig config : monitorConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (monitorConfig != null) {
                                throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
                            }
                            monitorConfig = config;
                        }
                    }
                    if (monitorConfig != null) {
                        setMonitor(monitorConfig);
                    }
                }
            }
            //预初始化实例对象
            Boolean b = isInit();
            if (b == null && getConsumer() != null) {
                b = getConsumer().isInit();
            }
            if (b != null && b.booleanValue()) {
                getObject();
            }
        }
    }
    

    setApplicationContext(ApplicationContext applicationContext)方法主要用来设置上下文对象,方便获取应用上下文的属性配置、同时SpringExtensionFactory添加了一个ApplicationContext,方便Dubbo在服务扩展点加载时,提供了另外一种加载的方式,从Spring上下文中获取对象。

    public class ReferenceBean<T> extends ReferenceConfig<T> implements ApplicationContextAware{
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
            SpringExtensionFactory.addApplicationContext(applicationContext);
        }
    }
    
    

    getObject()方法主要用来在获取bean对象时,提供自定义实例bean的方法,方便获取代理对象,调用的是父类ReferenceConfig中的get()方法,然后调用ReferenceConfig中的init()方法

    public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean
        @Override
        public Object getObject() throws Exception {
            //此处的get()调用的是父类ReferenceConfig中的方法。此处的get()方法是一个加了synchronized的同步方法
            return get();
        }
    }
    

    在init()方法中,进行了一下操作,首先调用checkDefault()方法,设置consumerConfig默认配置、设置属性值,根据接口名从本地缓存中获取接口对应的解析值,如果不为空,则赋值给URL,然后通过consumer对成员变量设置属性。
    然后通过Wrapper获取接口的一个包装类和方法,并设置参数map.然后获取dubbo的注册地址,然后根据参数map调用createProxy方法创建一个代理对象。

    public class ReferenceConfig<T> extends AbstractReferenceConfig {
     ......
            ref = createProxy(map);
            ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref,     interfaceClass.getMethods());
            ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }
    

    其中map参数值如下:

    "side" -> "consumer"
    "application" -> "user-consumer"
    "register.ip" -> "10.9.233.26"
    "methods" -> "getUserById,queryList"
    "dubbo" -> "2.6.2"
    "pid" -> "7036"
    "interface" -> "com.bail.user.service.IUserService"
    "version" -> "1.0.0"
    "timestamp" -> "1638179225870"
    "revision" -> "1.0.0"
    

    createProxy方法中,调用InjvmProtocol中的isInjvmRefer判断是否是本地jvm引用,在isInjvmRefer方法中调用getExporter()方法来获取暴露者,返回为空,判断isInjvmRefer=false,成员变量url为空,调用loadRegistries(false)加载URLS,从RegistryConfig中获得注册中心地址,然后根据urls从refprotocol中获取调用者invoker,通过调用doRefer方法后,返回一个invoker对象,主要包括注册表信息、接口名称、路径监听等信息,然后继续调用proxyFactory的getProxy(invoker)返回一个代理对象

        private T createProxy(Map<String, String> map) {
            //tmpUrl = temp://localhost?application=user-consumer&dubbo=2.6.2
    //&interface=com.bail.user.service.IUserService&methods=getUserById,queryList&pid=7036
    //&register.ip=10.9.233.26&revision=1.0.0&side=consumer&timestamp=1638179225870&version=1.0.0
            URL tmpUrl = new URL("temp", "localhost", 0, map);
            final boolean isJvmRefer;
            if (isInjvm() == null) {
                if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                    isJvmRefer = false;
                } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                    // by default, reference local service if there is
                    isJvmRefer = true;
                } else {
                    isJvmRefer = false;
                }
            } else {
                isJvmRefer = isInjvm().booleanValue();
            }
    
            if (isJvmRefer) {
                URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
                invoker = refprotocol.refer(interfaceClass, url);
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
                }
            } else {
                if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                    String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            URL url = URL.valueOf(u);
                            if (url.getPath() == null || url.getPath().length() == 0) {
                                url = url.setPath(interfaceName);
                            }
                            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                                urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else { // assemble URL from register center's configuration
                    List<URL> us = loadRegistries(false);
                    if (us != null && !us.isEmpty()) {
                        for (URL u : us) {
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls == null || urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                    }
                }
    
                if (urls.size() == 1) {
                    invoker = refprotocol.refer(interfaceClass, urls.get(0));
                } else {
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    for (URL url : urls) {
                        invokers.add(refprotocol.refer(interfaceClass, url));
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // use last registry url
                        }
                    }
                    if (registryURL != null) { // registry url is available
                        // use AvailableCluster only when register's cluster is available
                        URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                        invoker = cluster.join(new StaticDirectory(u, invokers));
                    } else { // not a registry url
                        invoker = cluster.join(new StaticDirectory(invokers));
                    }
                }
            }
    
            Boolean c = check;
            if (c == null && consumer != null) {
                c = consumer.isCheck();
            }
            if (c == null) {
                c = true; // default true
            }
            if (c && !invoker.isAvailable()) {
                throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            }
            // create service proxy
            return (T) proxyFactory.getProxy(invoker);
        }
    

    然后在创建Protocol扩展类的时候,对应的包括类有三个,

    0 = {Class@5213} "class com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper"
    1 = {Class@5226} "class com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper"
    2 = {Class@5319} "class com.alibaba.dubbo.qos.protocol.QosProtocolWrapper"
    

    接下来分析refprotocol.refer(interfaceClass, urls.get(0))的过程,refprotocol本质上是一个Protocol的实现类,此处调用的是一个Protocol的扩展点实现类,扩展点实现类实际上调用的是RegistryProtocol,而在实例化RegistryProtocol的过程中,RegistryProtocol包含有多个扩展点的成员变量,如下:

        private Cluster cluster;
        private Protocol protocol;
        private RegistryFactory registryFactory;
        private ProxyFactory proxyFactory;
    
    

    在调用refer的方法中,调用了registryFactory的getRegistry(url)方法,经过扩展点的获取,实际上调用的是ZookeeperRegistryFactory的父类AbstractRegistryFactory的getRegistry方法
    此时

    url = zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=user-consumer&dubbo=2.6.2&pid=5752&refer=application%3Duser-consumer%26dubbo%3D2.6.2%26interface%3Dcom.bail.user.service.IUserService%26methods%3DgetUserById%2CqueryList%26pid%3D5752%26register.ip%3D192.168.31.199%26revision%3D1.0.0%26side%3Dconsumer%26timestamp%3D1638194873753%26version%3D1.0.0&timestamp=1638195625410
    

    处理后的

     url = zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=user-consumer&dubbo=2.6.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=5752&timestamp=1638195625410
    

    然后,根据处理后的url获取一个注册服务,调用方法createRegistry(url),实际上调用的是实现类的createRegistry方法,此处调用的是ZookeeperRegistryFactory的createRegistry方法,
    ZookeeperRegistryFactory类中有一个ZookeeperTransporter成员变量,也是一个扩展点。根据传入的url和ZookeeperTransporter,构建一个zkClient,同时添加一个状态监听器,如果状态是RECONNECTED,则调用recover方法:

    public class ZookeeperRegistry extends FailbackRegistry {
        public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
            super(url);
            if (url.isAnyHost()) {
                throw new IllegalStateException("registry address == null");
            }
            String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
            if (!group.startsWith(Constants.PATH_SEPARATOR)) {
                group = Constants.PATH_SEPARATOR + group;
            }
            this.root = group;
            zkClient = zookeeperTransporter.connect(url);
            zkClient.addStateListener(new StateListener() {
                @Override
                public void stateChanged(int state) {
                    if (state == RECONNECTED) {
                        try {
                            recover();
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            });
        }
      private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
        protected void doSubscribe(final URL url, final NotifyListener listener) {
            try {
                if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                for (String child : currentChilds) {
                                    child = URL.decode(child);
                                    if (!anyServices.contains(child)) {
                                        anyServices.add(child);
                                        subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                                    }
                                }
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(root, false);
                    List<String> services = zkClient.addChildListener(root, zkListener);
                    if (services != null && !services.isEmpty()) {
                        for (String service : services) {
                            service = URL.decode(service);
                            anyServices.add(service);
                            subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                    Constants.CHECK_KEY, String.valueOf(false)), listener);
                        }
                    }
                } else {
                    //进入else
                    List<URL> urls = new ArrayList<URL>();
                    for (String path : toCategoriesPath(url)) {
                    //此处的toCategoriesPath(url): 0 = "/dubbo/com.bail.user.service.IUserService/providers"
    //1 = "/dubbo/com.bail.user.service.IUserService/configurators"
    //2 = "/dubbo/com.bail.user.service.IUserService/routers"
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                        if (listeners == null) {
                            zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                            listeners = zkListeners.get(url);
                        }
                        ChildListener zkListener = listeners.get(listener);
                        if (zkListener == null) {
                            //zkListener 为空,创建ChildListener内部类
                            listeners.putIfAbsent(listener, new ChildListener() {
                                @Override
                                public void childChanged(String parentPath, List<String> currentChilds) {
                                    ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                                }
                            });
                            //listener = RegistryDirectory的一个实例对象
                            //zkListener = RegistryDirectoryl的另一个实例对象
                            zkListener = listeners.get(listener);
                        }
                        //创建的是一个永久性节点
                        zkClient.create(path, false);
                        //调用的是AbstractZookeeperClient的addChildListener方法
                        List<String> children = zkClient.addChildListener(path, zkListener);
    //children[0]= dubbo://192.168.137.210:20880/com.bail.user.service.IUserService?anyhost=true&application=user-provider&dubbo=2.6.2&generic=false&getUserById.retries=3&getUserById.timeout=3000&interface=com.bail.user.service.IUserService&methods=getUserById,queryList&pid=12664&retries=2&revision=1.0.0&side=provider&timeout=8000&timestamp=1638340894565&version=1.0.0
                        if (children != null) {
                            urls.addAll(toUrlsWithEmpty(url, path, children));
                        }
                    }
                    notify(url, listener, urls);
                }
            } catch (Throwable e) {
                throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    
    }
    

    AbstractZookeeperClient

    public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient {
        public List<String> addChildListener(String path, final ChildListener listener) {
            //ChildListener 类型为ZookeeperRegistry
             //TargetChildListener 类型为CuratorZookeeperClient$CuratorWatcherImpl
            ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
            if (listeners == null) {
                childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());
                listeners = childListeners.get(path);
            }
            TargetChildListener targetListener = listeners.get(listener);
            if (targetListener == null) {
                listeners.putIfAbsent(listener, createTargetChildListener(path, listener));
                targetListener = listeners.get(listener);
            }
            return addTargetChildListener(path, targetListener);
        }
    }
    

    recover方法是父类FailbackRegistry 中定义的方法。然后将得到的registry放入到AbstractRegistryFactory的Map<String, Registry> REGISTRIES容器中,在得到注册中心后,继续调用refer方法中的后序方法dorefer:doRefer(cluster, registry, type, url),其中的cluster是一个扩展点对象,此方法返回一个invoker对象,接下来看doRefer方法:
    new RegistryDirectory(注册表目录)方法简单的返回了一个directory 对象,简单看一下RegistryDirectory结构,再继续往下走:

    //注册表对象
    public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
        //有三个扩展点对象、集群类型、路由工厂、配置器工厂
        private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
        private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
        private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();
    }
    

    然后将注册中心赋值给对象,然后构建了一个subscribeUrl订阅URL,继续调用subscribe(URL url, NotifyListener listener)订阅方法,其中订阅的监听者就是我们传进去的参数NotifyListener。然后调用doSubscribe方法发送一个订阅请求。调用toCategoriesPath(url)方法,根据url得到三个不同的订阅路径,分别是

    0 = "/dubbo/com.bail.user.service.IUserService/providers"
    1 = "/dubbo/com.bail.user.service.IUserService/configurators"
    2 = "/dubbo/com.bail.user.service.IUserService/routers"
    

    ,订阅到不同的三个路径后,调用notify方法,进行通知功能。

    consumer://192.168.31.199/com.bail.user.service.IUserService?application=user-consumer&
    dubbo=2.6.2&interface=com.bail.user.service.IUserService&methods=getUserById,queryList
    &pid=5752&revision=1.0.0&side=consumer&timestamp=1638194873753&version=1.0.0
    

    然后调用FailbackRegistry的register方法,然后调用子类(即ZookeeperRegistry )的doRegister方法,发送一个注册请求给服务端。
    看一下doRefer中属性值的变化, registry.register注册了一个consumer节点到zookeeper;
    cluster.join(directory)方法,根据注册表返回一个接口的invoker,此处默认加载的cluster为FailoverCluster,并且有一个包装类MockClusterWrapper。
    调用提供消费者注册表注册消费者方法ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory)
    至此doRefer执行完毕,返回一个invoker对象

    public class RegistryProtocol implements Protocol {
        private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            //url = zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=user-consumer&dubbo=2.6.2&pid=11332&refer=application%3Duser-consumer%26dubbo%3D2.6.2%26interface%3Dcom.bail.user.service.IUserService%26methods%3DgetUserById%2CqueryList%26pid%3D11332%26register.ip%3D192.168.137.210%26revision%3D1.0.0%26side%3Dconsumer%26timestamp%3D1638323468558%26version%3D1.0.0&timestamp=1638323742757
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
            //构建一个消费类型的订阅URL
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                //调用注册中心的注册方法,实际上调用的是FailbackRegistry的register
                registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                        Constants.CHECK_KEY, String.valueOf(false)));
            }
            //注册表订阅了一个provider,configurators、touters三个类型的节点,其中category=providers,configurators,routers
            //directory进行了订阅、通知等操作
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY));
    
            Invoker invoker = cluster.join(directory);
            
            ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
            return invoker;
        }
    }
    
    

    调用了FailbackRegistry的register,内部调用了 doRegister(url)方法,实际上调用的是AbstractZookeeperClient的create方法

    public abstract class FailbackRegistry extends AbstractRegistry {
        public void register(URL url) {
            super.register(url);
            failedRegistered.remove(url);
            failedUnregistered.remove(url);
            try {
                // Sending a registration request to the server side
                doRegister(url);
            } catch (Exception e) {
                Throwable t = e;
    
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true)
                        && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
    
                // Record a failed registration request to a failed list, retry regularly
                failedRegistered.add(url);
            }
        }
    
        public void subscribe(URL url, NotifyListener listener) {
            super.subscribe(url, listener);
            removeFailedSubscribed(url, listener);
            try {
                // Sending a subscription request to the server side
                //调用的是ZookeeperRegistry的doSubscribe()方法。
                doSubscribe(url, listener);
            } catch (Exception e) {
                Throwable t = e;
    
                List<URL> urls = getCacheUrls(url);
                if (urls != null && !urls.isEmpty()) {
                    notify(url, listener, urls);
                    logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
                } else {
                    // If the startup detection is opened, the Exception is thrown directly.
                    boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                            && url.getParameter(Constants.CHECK_KEY, true);
                    boolean skipFailback = t instanceof SkipFailbackWrapperException;
                    if (check || skipFailback) {
                        if (skipFailback) {
                            t = t.getCause();
                        }
                        throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                    } else {
                        logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                    }
                }
    
                // Record a failed registration request to a failed list, retry regularly
                addFailedSubscribed(url, listener);
            }
        }
    
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            try {
                doNotify(url, listener, urls);
            } catch (Exception t) {
                // Record a failed registration request to a failed list, retry regularly
                Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
                if (listeners == null) {
                    failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
                    listeners = failedNotified.get(url);
                }
                listeners.put(listener, urls);
                logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }
    
    }
    public abstract class AbstractRegistry implements Registry {
      
        //针对某个URL的监听者
        private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
        public void subscribe(URL url, NotifyListener listener) {
            if (url == null) {
                throw new IllegalArgumentException("subscribe url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("subscribe listener == null");
            }
            if (logger.isInfoEnabled()) {
                logger.info("Subscribe: " + url);
            }
            //listeners为空
            Set<NotifyListener> listeners = subscribed.get(url);
            if (listeners == null) {
                //创建了一个以当前URL为key的MAP,并将当前注册表作为监听者放入set集合
                subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
                listeners = subscribed.get(url);
            }
            
            listeners.add(listener);
        }
    
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            if ((urls == null || urls.isEmpty())
                    && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                logger.warn("Ignore empty notify urls for subscribe url " + url);
                return;
            }
            if (logger.isInfoEnabled()) {
                logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
            }
            Map<String, List<URL>> result = new HashMap<String, List<URL>>();
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                    List<URL> categoryList = result.get(category);
                    if (categoryList == null) {
                        categoryList = new ArrayList<URL>();
                        result.put(category, categoryList);
                    }
                    categoryList.add(u);
                }
            }
            if (result.size() == 0) {
                return;
            }
            Map<String, List<URL>> categoryNotified = notified.get(url);
            if (categoryNotified == null) {
                notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
                categoryNotified = notified.get(url);
            }
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
                saveProperties(url);
                listener.notify(categoryList);
            }
        }
    }
    
    public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient {
        public void create(String path, boolean ephemeral) {
            //path = /dubbo/com.bail.user.service.IUserService/consumers/consumer://192.168.137.210/com.bail.user.service.IUserService?application=user-consumer&category=consumers&check=false&dubbo=2.6.2&interface=com.bail.user.service.IUserService&methods=getUserById,queryList&pid=11332&revision=1.0.0&side=consumer&timestamp=1638323468558&version=1.0.0
            int i = path.lastIndexOf('/');
            if (i > 0) {
                //parentPath = /dubbo/com.bail.user.service.IUserService/consumers
                String parentPath = path.substring(0, i);
                //如果路径不存在,则创建节点
                if (!checkExists(parentPath)) {
                    create(parentPath, false);
                }
            }
            //ephemeral = true ,说明dubbo中的节点是临时性的
            if (ephemeral) {
                createEphemeral(path);
            } else {
                createPersistent(path);
            }
        }
    }
    

    doRefer方法中,构建了一个RegistryDirectory对象,此类中也包含多个扩展点对象,不过都静态final类型的:

    public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
        private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
    
        private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
    
        private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();
    
        public RegistryDirectory(Class<T> serviceType, URL url) {
            super(url);
            if (serviceType == null)
                throw new IllegalArgumentException("service type is null.");
            if (url.getServiceKey() == null || url.getServiceKey().length() == 0)
                throw new IllegalArgumentException("registry serviceKey is null.");
            this.serviceType = serviceType;
            this.serviceKey = url.getServiceKey();
            this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
            this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
            String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
            this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
            String methods = queryMap.get(Constants.METHODS_KEY);
            this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
        }
    }
    
        public void subscribe(URL url) {
            //url = consumer://192.168.137.210/com.bail.user.service.IUserService?application=user-consumer&category=providers,configurators,routers&dubbo=2.6.2&interface=com.bail.user.service.IUserService&methods=getUserById,queryList&pid=11332&revision=1.0.0&side=consumer&timestamp=1638323468558&version=1.0.0
            setConsumerUrl(url);
            //调用注册中心的subscribe方法,即FailbackRegistry的subscribe
            registry.subscribe(url, this);
        }
    
    zkClient = zookeeperTransporter.connect(url);
    //url=zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=user-consumer&dubbo=2.6.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=11332&timestamp=1638323742757
    

    cluster.join(directory

    cluster.join(directory)方法,根据注册表返回一个接口的invoker,此处默认加载的cluster为FailoverCluster,并且有一个包装类MockClusterWrapper。返回doRefer方法

    public class MockClusterWrapper implements Cluster {
    
        private Cluster cluster;
    
        public MockClusterWrapper(Cluster cluster) {
            this.cluster = cluster;
        }
    
        @Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            return new MockClusterInvoker<T>(directory,
                    this.cluster.join(directory));
        }
    
    }
    

    ProviderConsumerRegTable

    调用ProviderConsumerRegTable

    public class ProviderConsumerRegTable {
        //包含两个静态成员变量一个提供调用者、一个消费调用者
        public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
        public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
    }
       
      
     //调用注册方法,在map容器中保存调用者
        public static void registerConsumer(Invoker invoker, URL registryUrl, URL consumerUrl, RegistryDirectory registryDirectory) {
    //构造一个消费调用者Wrapper
            ConsumerInvokerWrapper wrapperInvoker = new ConsumerInvokerWrapper(invoker, registryUrl, consumerUrl, registryDirectory);
            String serviceUniqueName = consumerUrl.getServiceKey();
            Set<ConsumerInvokerWrapper> invokers = consumerInvokers.get(serviceUniqueName);
            if (invokers == null) {
                consumerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ConsumerInvokerWrapper>());
                invokers = consumerInvokers.get(serviceUniqueName);
            }
            invokers.add(wrapperInvoker);
        }
    
    ConsumerInvokerWrapper
    public class ConsumerInvokerWrapper<T> implements Invoker {
        private Invoker<T> invoker;
        private URL originUrl;
        private URL registryUrl;
        private URL consumerUrl;
        private RegistryDirectory registryDirectory;
    
        public ConsumerInvokerWrapper(Invoker<T> invoker, URL registryUrl, URL consumerUrl, RegistryDirectory registryDirectory) {
            this.invoker = invoker;
            this.originUrl = URL.valueOf(invoker.getUrl().toFullString());
            this.registryUrl = URL.valueOf(registryUrl.toFullString());
            this.consumerUrl = consumerUrl;
            this.registryDirectory = registryDirectory;
        }
    }
    
  • 相关阅读:
    二分查找 找到了返回位置 没找到返回应该插入的位置
    前端知识结构
    RequireJS
    Java内部类——成员内部类
    Java笔记
    Java单例模式
    Java与C#的语法区别(不断更新中...)
    进制转换
    查找算法——折半查找
    排序算法——冒泡排序
  • 原文地址:https://www.cnblogs.com/nangonghui/p/15620958.html
Copyright © 2011-2022 走看看