zoukankan      html  css  js  c++  java
  • Dubbo发布过程中,消费者的初始化过程


    public abstract class AbstractBeanFactory
    		try {
    			populateBean(beanName, mbd, instanceWrapper);
    			exposedObject = initializeBean(beanName, exposedObject, mbd);


    public class ReferenceBean<T> extends ReferenceConfig<T> implements InitializingBean
        public void afterPropertiesSet() throws Exception {
            if (getConsumer() == null) {
                Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
                if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
                    ConsumerConfig consumerConfig = null;
                    for (ConsumerConfig config : consumerConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (consumerConfig != null) {
                                throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
                            consumerConfig = config;
                    if (consumerConfig != null) {
            if (getApplication() == null
                    && (getConsumer() == null || getConsumer().getApplication() == null)) {
                Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
                if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
                    ApplicationConfig applicationConfig = null;
                    for (ApplicationConfig config : applicationConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (applicationConfig != null) {
                                throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
                            applicationConfig = config;
                    if (applicationConfig != null) {
            if (getModule() == null
                    && (getConsumer() == null || getConsumer().getModule() == null)) {
                Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
                if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
                    ModuleConfig moduleConfig = null;
                    for (ModuleConfig config : moduleConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (moduleConfig != null) {
                                throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
                            moduleConfig = config;
                    if (moduleConfig != null) {
            if ((getRegistries() == null || getRegistries().isEmpty())
                    && (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().isEmpty())
                    && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().isEmpty())) {
                Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
                if (registryConfigMap != null && registryConfigMap.size() > 0) {
                    List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
                    for (RegistryConfig config : registryConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                    if (registryConfigs != null && !registryConfigs.isEmpty()) {
            if (getMonitor() == null
                    && (getConsumer() == null || getConsumer().getMonitor() == null)
                    && (getApplication() == null || getApplication().getMonitor() == null)) {
                Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
                if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
                    MonitorConfig monitorConfig = null;
                    for (MonitorConfig config : monitorConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (monitorConfig != null) {
                                throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
                            monitorConfig = config;
                    if (monitorConfig != null) {
            Boolean b = isInit();
            if (b == null && getConsumer() != null) {
                b = getConsumer().isInit();
            if (b != null && b.booleanValue()) {

    setApplicationContext(ApplicationContext applicationContext)方法主要用来设置上下文对象,方便获取应用上下文的属性配置、同时SpringExtensionFactory添加了一个ApplicationContext,方便Dubbo在服务扩展点加载时,提供了另外一种加载的方式,从Spring上下文中获取对象。

    public class ReferenceBean<T> extends ReferenceConfig<T> implements ApplicationContextAware{
        public void setApplicationContext(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;


    public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean
        public Object getObject() throws Exception {
            return get();


    public class ReferenceConfig<T> extends AbstractReferenceConfig {
            ref = createProxy(map);
            ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref,     interfaceClass.getMethods());
            ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);


    "side" -> "consumer"
    "application" -> "user-consumer"
    "register.ip" -> ""
    "methods" -> "getUserById,queryList"
    "dubbo" -> "2.6.2"
    "pid" -> "7036"
    "interface" -> "com.bail.user.service.IUserService"
    "version" -> "1.0.0"
    "timestamp" -> "1638179225870"
    "revision" -> "1.0.0"


        private T createProxy(Map<String, String> map) {
            //tmpUrl = temp://localhost?application=user-consumer&dubbo=2.6.2
            URL tmpUrl = new URL("temp", "localhost", 0, map);
            final boolean isJvmRefer;
            if (isInjvm() == null) {
                if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                    isJvmRefer = false;
                } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                    // by default, reference local service if there is
                    isJvmRefer = true;
                } else {
                    isJvmRefer = false;
            } else {
                isJvmRefer = isInjvm().booleanValue();
            if (isJvmRefer) {
                URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
                invoker = refprotocol.refer(interfaceClass, url);
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
            } else {
                if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                    String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            URL url = URL.valueOf(u);
                            if (url.getPath() == null || url.getPath().length() == 0) {
                                url = url.setPath(interfaceName);
                            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                                urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                urls.add(ClusterUtils.mergeUrl(url, map));
                } else { // assemble URL from register center's configuration
                    List<URL> us = loadRegistries(false);
                    if (us != null && !us.isEmpty()) {
                        for (URL u : us) {
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    if (urls == null || urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                if (urls.size() == 1) {
                    invoker = refprotocol.refer(interfaceClass, urls.get(0));
                } else {
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    for (URL url : urls) {
                        invokers.add(refprotocol.refer(interfaceClass, url));
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // use last registry url
                    if (registryURL != null) { // registry url is available
                        // use AvailableCluster only when register's cluster is available
                        URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                        invoker = cluster.join(new StaticDirectory(u, invokers));
                    } else { // not a registry url
                        invoker = cluster.join(new StaticDirectory(invokers));
            Boolean c = check;
            if (c == null && consumer != null) {
                c = consumer.isCheck();
            if (c == null) {
                c = true; // default true
            if (c && !invoker.isAvailable()) {
                throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            // create service proxy
            return (T) proxyFactory.getProxy(invoker);


    0 = {Class@5213} "class com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper"
    1 = {Class@5226} "class com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper"
    2 = {Class@5319} "class com.alibaba.dubbo.qos.protocol.QosProtocolWrapper"

    接下来分析refprotocol.refer(interfaceClass, urls.get(0))的过程,refprotocol本质上是一个Protocol的实现类,此处调用的是一个Protocol的扩展点实现类,扩展点实现类实际上调用的是RegistryProtocol,而在实例化RegistryProtocol的过程中,RegistryProtocol包含有多个扩展点的成员变量,如下:

        private Cluster cluster;
        private Protocol protocol;
        private RegistryFactory registryFactory;
        private ProxyFactory proxyFactory;


    url = zookeeper://


     url = zookeeper://


    public class ZookeeperRegistry extends FailbackRegistry {
        public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
            if (url.isAnyHost()) {
                throw new IllegalStateException("registry address == null");
            String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
            if (!group.startsWith(Constants.PATH_SEPARATOR)) {
                group = Constants.PATH_SEPARATOR + group;
            this.root = group;
            zkClient = zookeeperTransporter.connect(url);
            zkClient.addStateListener(new StateListener() {
                public void stateChanged(int state) {
                    if (state == RECONNECTED) {
                        try {
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
      private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
        protected void doSubscribe(final URL url, final NotifyListener listener) {
            try {
                if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                for (String child : currentChilds) {
                                    child = URL.decode(child);
                                    if (!anyServices.contains(child)) {
                                        subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                        zkListener = listeners.get(listener);
                    zkClient.create(root, false);
                    List<String> services = zkClient.addChildListener(root, zkListener);
                    if (services != null && !services.isEmpty()) {
                        for (String service : services) {
                            service = URL.decode(service);
                            subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                    Constants.CHECK_KEY, String.valueOf(false)), listener);
                } else {
                    List<URL> urls = new ArrayList<URL>();
                    for (String path : toCategoriesPath(url)) {
                    //此处的toCategoriesPath(url): 0 = "/dubbo/com.bail.user.service.IUserService/providers"
    //1 = "/dubbo/com.bail.user.service.IUserService/configurators"
    //2 = "/dubbo/com.bail.user.service.IUserService/routers"
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                        if (listeners == null) {
                            zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                            listeners = zkListeners.get(url);
                        ChildListener zkListener = listeners.get(listener);
                        if (zkListener == null) {
                            //zkListener 为空,创建ChildListener内部类
                            listeners.putIfAbsent(listener, new ChildListener() {
                                public void childChanged(String parentPath, List<String> currentChilds) {
                                    ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            //listener = RegistryDirectory的一个实例对象
                            //zkListener = RegistryDirectoryl的另一个实例对象
                            zkListener = listeners.get(listener);
                        zkClient.create(path, false);
                        List<String> children = zkClient.addChildListener(path, zkListener);
    //children[0]= dubbo://,queryList&pid=12664&retries=2&revision=1.0.0&side=provider&timeout=8000&timestamp=1638340894565&version=1.0.0
                        if (children != null) {
                            urls.addAll(toUrlsWithEmpty(url, path, children));
                    notify(url, listener, urls);
            } catch (Throwable e) {
                throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);


    public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient {
        public List<String> addChildListener(String path, final ChildListener listener) {
            //ChildListener 类型为ZookeeperRegistry
             //TargetChildListener 类型为CuratorZookeeperClient$CuratorWatcherImpl
            ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
            if (listeners == null) {
                childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());
                listeners = childListeners.get(path);
            TargetChildListener targetListener = listeners.get(listener);
            if (targetListener == null) {
                listeners.putIfAbsent(listener, createTargetChildListener(path, listener));
                targetListener = listeners.get(listener);
            return addTargetChildListener(path, targetListener);

    recover方法是父类FailbackRegistry 中定义的方法。然后将得到的registry放入到AbstractRegistryFactory的Map<String, Registry> REGISTRIES容器中,在得到注册中心后,继续调用refer方法中的后序方法dorefer:doRefer(cluster, registry, type, url),其中的cluster是一个扩展点对象,此方法返回一个invoker对象,接下来看doRefer方法:
    new RegistryDirectory(注册表目录)方法简单的返回了一个directory 对象,简单看一下RegistryDirectory结构,再继续往下走:

    public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
        private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
        private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
        private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();

    然后将注册中心赋值给对象,然后构建了一个subscribeUrl订阅URL,继续调用subscribe(URL url, NotifyListener listener)订阅方法,其中订阅的监听者就是我们传进去的参数NotifyListener。然后调用doSubscribe方法发送一个订阅请求。调用toCategoriesPath(url)方法,根据url得到三个不同的订阅路径,分别是

    0 = "/dubbo/com.bail.user.service.IUserService/providers"
    1 = "/dubbo/com.bail.user.service.IUserService/configurators"
    2 = "/dubbo/com.bail.user.service.IUserService/routers"



    然后调用FailbackRegistry的register方法,然后调用子类(即ZookeeperRegistry )的doRegister方法,发送一个注册请求给服务端。
    看一下doRefer中属性值的变化, registry.register注册了一个consumer节点到zookeeper;
    调用提供消费者注册表注册消费者方法ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory)

    public class RegistryProtocol implements Protocol {
        private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            //url = zookeeper://
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                        Constants.CHECK_KEY, String.valueOf(false)));
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY));
            Invoker invoker = cluster.join(directory);
            ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
            return invoker;

    调用了FailbackRegistry的register,内部调用了 doRegister(url)方法,实际上调用的是AbstractZookeeperClient的create方法

    public abstract class FailbackRegistry extends AbstractRegistry {
        public void register(URL url) {
            try {
                // Sending a registration request to the server side
            } catch (Exception e) {
                Throwable t = e;
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true)
                        && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                // Record a failed registration request to a failed list, retry regularly
        public void subscribe(URL url, NotifyListener listener) {
            super.subscribe(url, listener);
            removeFailedSubscribed(url, listener);
            try {
                // Sending a subscription request to the server side
                doSubscribe(url, listener);
            } catch (Exception e) {
                Throwable t = e;
                List<URL> urls = getCacheUrls(url);
                if (urls != null && !urls.isEmpty()) {
                    notify(url, listener, urls);
                    logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
                } else {
                    // If the startup detection is opened, the Exception is thrown directly.
                    boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                            && url.getParameter(Constants.CHECK_KEY, true);
                    boolean skipFailback = t instanceof SkipFailbackWrapperException;
                    if (check || skipFailback) {
                        if (skipFailback) {
                            t = t.getCause();
                        throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                    } else {
                        logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                // Record a failed registration request to a failed list, retry regularly
                addFailedSubscribed(url, listener);
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            try {
                doNotify(url, listener, urls);
            } catch (Exception t) {
                // Record a failed registration request to a failed list, retry regularly
                Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
                if (listeners == null) {
                    failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
                    listeners = failedNotified.get(url);
                listeners.put(listener, urls);
                logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    public abstract class AbstractRegistry implements Registry {
        private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
        public void subscribe(URL url, NotifyListener listener) {
            if (url == null) {
                throw new IllegalArgumentException("subscribe url == null");
            if (listener == null) {
                throw new IllegalArgumentException("subscribe listener == null");
            if (logger.isInfoEnabled()) {
                logger.info("Subscribe: " + url);
            Set<NotifyListener> listeners = subscribed.get(url);
            if (listeners == null) {
                subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
                listeners = subscribed.get(url);
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            if ((urls == null || urls.isEmpty())
                    && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                logger.warn("Ignore empty notify urls for subscribe url " + url);
            if (logger.isInfoEnabled()) {
                logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
            Map<String, List<URL>> result = new HashMap<String, List<URL>>();
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                    List<URL> categoryList = result.get(category);
                    if (categoryList == null) {
                        categoryList = new ArrayList<URL>();
                        result.put(category, categoryList);
            if (result.size() == 0) {
            Map<String, List<URL>> categoryNotified = notified.get(url);
            if (categoryNotified == null) {
                notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
                categoryNotified = notified.get(url);
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
    public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient {
        public void create(String path, boolean ephemeral) {
            //path = /dubbo/com.bail.user.service.IUserService/consumers/consumer://,queryList&pid=11332&revision=1.0.0&side=consumer&timestamp=1638323468558&version=1.0.0
            int i = path.lastIndexOf('/');
            if (i > 0) {
                //parentPath = /dubbo/com.bail.user.service.IUserService/consumers
                String parentPath = path.substring(0, i);
                if (!checkExists(parentPath)) {
                    create(parentPath, false);
            //ephemeral = true ,说明dubbo中的节点是临时性的
            if (ephemeral) {
            } else {


    public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
        private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
        private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
        private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();
        public RegistryDirectory(Class<T> serviceType, URL url) {
            if (serviceType == null)
                throw new IllegalArgumentException("service type is null.");
            if (url.getServiceKey() == null || url.getServiceKey().length() == 0)
                throw new IllegalArgumentException("registry serviceKey is null.");
            this.serviceType = serviceType;
            this.serviceKey = url.getServiceKey();
            this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
            this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
            String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
            this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
            String methods = queryMap.get(Constants.METHODS_KEY);
            this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
        public void subscribe(URL url) {
            //url = consumer://,configurators,routers&dubbo=2.6.2&interface=com.bail.user.service.IUserService&methods=getUserById,queryList&pid=11332&revision=1.0.0&side=consumer&timestamp=1638323468558&version=1.0.0
            registry.subscribe(url, this);
    zkClient = zookeeperTransporter.connect(url);



    public class MockClusterWrapper implements Cluster {
        private Cluster cluster;
        public MockClusterWrapper(Cluster cluster) {
            this.cluster = cluster;
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            return new MockClusterInvoker<T>(directory,



    public class ProviderConsumerRegTable {
        public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
        public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
        public static void registerConsumer(Invoker invoker, URL registryUrl, URL consumerUrl, RegistryDirectory registryDirectory) {
            ConsumerInvokerWrapper wrapperInvoker = new ConsumerInvokerWrapper(invoker, registryUrl, consumerUrl, registryDirectory);
            String serviceUniqueName = consumerUrl.getServiceKey();
            Set<ConsumerInvokerWrapper> invokers = consumerInvokers.get(serviceUniqueName);
            if (invokers == null) {
                consumerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ConsumerInvokerWrapper>());
                invokers = consumerInvokers.get(serviceUniqueName);
    public class ConsumerInvokerWrapper<T> implements Invoker {
        private Invoker<T> invoker;
        private URL originUrl;
        private URL registryUrl;
        private URL consumerUrl;
        private RegistryDirectory registryDirectory;
        public ConsumerInvokerWrapper(Invoker<T> invoker, URL registryUrl, URL consumerUrl, RegistryDirectory registryDirectory) {
            this.invoker = invoker;
            this.originUrl = URL.valueOf(invoker.getUrl().toFullString());
            this.registryUrl = URL.valueOf(registryUrl.toFullString());
            this.consumerUrl = consumerUrl;
            this.registryDirectory = registryDirectory;
  • 相关阅读:
    Java web错误汇总
    ueditor 定制工具栏图标
    在mvc返回JSON时出错:序列化类型为“System.Data.Entity.DynamicProxies.Photos....这个会的对象时检测到循环引用 的解决办法
    Entity Framework Extended Library (EF扩展类库,支持批量更新、删除、合并多个查询等)
  • 原文地址:https://www.cnblogs.com/nangonghui/p/15620958.html
Copyright © 2011-2022 走看看