zoukankan      html  css  js  c++  java
  • 服务引用

    执行步骤

    ReferenceBean.getObject()
        -->ReferenceConfig.get()
        -->init()
    	    -->createProxy(map)
    	      -->refprotocol.refer(interfaceClass, urls.get(0))
    			-->ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry");
    			-->extension.refer(arg0, arg1);
    		   	  -->ProtocolFilterWrapper.refer (三个AOP类)
    		        -->RegistryProtocol.refer
    			 	  -->registryFactory.getRegistry(url)//建立zk的连接,和服务端发布一样(省略代码)
    			 	  -->doRefer(cluster, registry, type, url)
    			    	-->FailbackRegistry.register
    					//创建zk的节点,和服务端发布一样(省略代码)。
    					//节点名为:dubbo/per.qiao.service.TestService/consumers
    			    	-->registry.subscribe//订阅zk的节点,和服务端发布一样(省略代码)
    					//dubbo/per.qiao.service.TestService/providers, 
    					//dubbo/per.qiao.service.TestService/configurators
    					//dubbo/per.qiao.service.TestService/routers
    					  -->notify(url, listener, urls);
    				   		-->FailbackRegistry.notify
    				      	   -->doNotify(url, listener, urls);
    					 		-->AbstractRegistry.notify
    					    	   -->saveProperties(url);  
    								//把注册信息保存到cache文件(路径规则与暴露时一样)
    					       		 -->registryCacheExecutor.execute(new SaveProperties(...)); 
    								//采用线程池来处理
    					           -->listener.notify(categoryList);
    						  	     -->RegistryDirectory.notify
    						     	   -->refreshInvoker(invokerUrls);
    								   	 //将URL转换成Invoker key为URL的字符串形式
    									 -->toInvokers(invokerUrls)
                                            -->protocol.refer(serviceType, url), url, providerUrl);
    										  -->Protocol$Adaptive.refer
                                                 -->ExtensionLoader.getExtensionLoader(class)
                                                	.getExtension("dubbo");
    											 -->extension.refer(type, url);
    											   -->QosProtocolWrapper.refer
                                             			//这里创建了一个过滤连
                                                     	//buildInvokerChain(invoker,
                                                     	//"refernce.filter","consumer")
                                                      -->ProtocolFilterWrapper.refer
                                                     	//return new ListenerInvokerWrapper
                                                     	-->ProtocolListenerWrapper.refer
                                                     	  //return new DubboInvoker
                                                     	  -->DubboProtocol.refer
    									 -->destroyUnusedInvokers(
    										oldUrlInvokerMap,newUrlInvokerMap); 
    								// 关闭未使用的Invoker
    								//最终目的:刷新Map<String, Invoker<T>> urlInvokerMap 对象
    								,刷新Map<String, List<Invoker<T>>> methodInvokerMap对象
    			    	-->cluster.join(directory)//加入集群路由
    			      	   -->ExtensionLoader.getExtensionLoader(Cluster.class)
    								.getExtension("failover");
    				 		 -->MockClusterWrapper.join
    				   		   -->this.cluster.join(directory)
    				       		 -->FailoverCluster.join
    					  			-->return new FailoverClusterInvoker<T>(directory)
    					  			-->new MockClusterInvoker  //  返回的invoker对象
    --------------------------------------------------------------------------------------------
    	      -->proxyFactory.getProxy(invoker)  //创建服务代理
    			-->ProxyFactory$Adpative.getProxy
    		      -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class)
    							.getExtension("javassist");
    		        -->StubProxyFactoryWrapper.getProxy(invoker)  //进行了后置增强
    			 	  -->AbstractProxyFactory.getProxy
    			     	-->getProxy(invoker, interfaces)
    					  -->Proxy.getProxy(interfaces)
    				   		-->JavassistProxyFactory.getProxy
    				      		-->Proxy.getProxy(interfaces)
    							//目前代理对象per.qiao.service.TestSevice
    							//, interface com.alibaba.dubbo.rpc.service.EchoService
    							-->newInstance(InvokerInvocationHandler(MockClusterInvoker))
    							//这个MockClusterInvoker是上面refprotocol.refer返回的invoker对象
    							//采用jdk自带的InvocationHandler,创建InvokerInvocationHandler对象。
    

    生成的代理类

    入口:

    ReferenceConfig#init,
    ref = createProxy(map);
    

    JavassistProxyFactory#getProxy会生成一个代理类

    与其说生成一个代理类,倒不如说是两个(具体在com.alibaba.dubbo.common.bytecode.Proxy#getProxy中)

    一个clazz(ccp),一个pc(ccm)

    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    

    Proxy.getProxy方法会生成两个类,并返回Proxy0,调用newInstance时,

    会调用Proxy0#newInstance(handler), 最终返回proxy0对象

    也就是说,ReferenceBean.getObject(调用者)就是这个proxy0对象

    public class Proxy0 extends Proxy {
        @Override
        public Object newInstance() {
            return super.newInstance();
        }
        @Override
        public Object newInstance(java.lang.reflect.InvocationHandler h) {
            return new proxy0(h);
        }
    }
    

    具体操作类(注意:这两个类只有第一个字的大小写不同)

    import java.lang.reflect.InvocationHandler;
    
    /**
     * Create by IntelliJ Idea 2018.2
     *
     * @author: qyp
     * Date: 2019-05-27 10:46
     */
    public class proxy0 implements com.alibaba.dubbo.rpc.service.EchoService, per.qiao.service.TestService {
    
    
        /**
         * 包含这两个接口的实现方法,这里为($echo,getData,getList)
         */
        public static java.lang.reflect.Method[] methods;
      	/**
      	 * 这个hanlder就是上面执行过程refprotocol.refer返回的结果(MockClusterInvoker)
      	 */
        private java.lang.reflect.InvocationHandler handler;
    
        public proxy0(InvocationHandler h) {
            this.handler = h;
        }
    	// ---------这个方法是EchoService中的-------------
        @Override
        public Object $echo(java.lang.Object arg0)  {
            Object[] args = new Object[1];
            args[0] = arg0;
            Object ret = null;
            try {
                ret = handler.invoke(this, methods[2], args);
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
            return (java.lang.Object) ret;
        }
    	// ----------下面两个方法是服务引用的接口中的方法-----------
        @Override
        public java.lang.String getData(java.lang.String arg0) {
            Object[] args = new Object[1];
            args[0] = arg0;
            Object ret = null;
            try {
                ret = handler.invoke(this, methods[0], args);
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
            return (java.lang.String) ret;
        }
    
        @Override
        public java.util.List getList() {
            Object[] args = new Object[0];
            Object ret = null;
            try {
                ret = handler.invoke(this, methods[1], args);
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
            return (java.util.List) ret;
        }
    }
    

    详细说以下服务引用时,是怎么和zookeeper产生联系的;

    问题:如果服务端(生产端)已经启动,客户端(消费段)后,zookeeper上的节点已经存在,那么久不会通知到客户端,那么zookeeper是怎么刷新本地服务列表的??

    RegistryProtocol

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
      	//将url转为Registry对象
        Registry registry = registryFactory.getRegistry(url);
      	...
        // type是接口
        return doRefer(cluster, registry, type, url);
    }
    
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
      		//创建一个注册目录
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
              	//注册消费者节点
                registry.register(registeredConsumerUrl);
                directory.setRegisteredConsumerUrl(registeredConsumerUrl);
            }
      		//订阅 providers,configurators,routers这三个节点
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY));
    
            Invoker invoker = cluster.join(directory);
      		// 将订阅信息保存到本地注册表
            ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
            return invoker;
        }
    

    cluster不是我们要分析的重点,

    FailbackRegistry

    public FailbackRegistry(URL url) {
            super(url);
            this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
      		// 重试注册失败的URL 默认5秒之后重试,间隔是5秒
            this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {
                        retry();
                    } catch (Throwable t) {
                        ...
                    }
                }
            }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
        }
    
    public void register(URL url) {
        super.register(url);
      	//删除注册失败的URL
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // 向服务器端发送注册请求
            doRegister(url);
        } catch (Exception e) {
            // 如果打开启动检测,则直接抛出异常 (配置的check属性)
            ...
            }
            // 将失败的注册请求记录到失败的列表中,定期重试
            failedRegistered.add(url);
        }
    }
    

    ZookeeperRegistry

    用来创建消费者节点

    protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            ...
        }
    }
    

    再来分析消费者订阅

    RegistryProtocol.doRefer

    //订阅 providers,configurators,routers这三个节点
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY));
    

    RegistryDirectory.subscribe

    public void subscribe(URL url) {
      	//设置当前订阅URL
        setConsumerUrl(url);
        registry.subscribe(url, this);
    }
    

    FailbackRegistry

    public void subscribe(URL url, NotifyListener listener) {
    	//设置订阅的回调监听器
        super.subscribe(url, listener);
        //删除失败的订阅路径
        removeFailedSubscribed(url, listener);
        try {
            // 注册客户端信息到zookeeper并创建监听三个节点,顺便刷新本地注册表
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;
    		// 如果订阅失败,则从本地缓存文件中获取监听的URL刷新注册表
          	// 需要了解到的是, 缓存中的数据是通过消费段注册,或者zookeeper通知时调用notify才有的
          	// 也只有订阅失败了才会有此操作
            List<URL> urls = getCacheUrls(url);
            if (urls != null && !urls.isEmpty()) {
                notify(url, listener, urls);
            } else {
     			...
            }
    
            // Record a failed registration request to a failed list, retry regularly
            addFailedSubscribed(url, listener);
        }
    }
    

    ZookeeperRegistry

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        //链接到zookeeper
        zkClient = zookeeperTransporter.connect(url);
        //设置状态监听器
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }
    
    @Override
        protected void doSubscribe(final URL url, final NotifyListener listener) {
            try {
                ...
                } else {
                    List<URL> urls = new ArrayList<URL>();
                    // 遍历需要监听的URL (三个)
                    for (String path : toCategoriesPath(url)) {
                      //从缓存中获取监听
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                        if (listeners == null) {
                            zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                            listeners = zkListeners.get(url);
                        }
                        ChildListener zkListener = listeners.get(listener);
                      	//如果缓存中没有,创建监听
                        if (zkListener == null) {
                            listeners.putIfAbsent(listener, new ChildListener() {
                                @Override
                                public void childChanged(String parentPath, List<String> currentChilds) {
                                  //监听器回调方法为ZookeeperRegistry#notify
                                    ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                                }
                            });
                            zkListener = listeners.get(listener);
                        }
                      	//创建三个监听的节点
                        zkClient.create(path, false);
                        List<String> children = zkClient.addChildListener(path, zkListener);
                        if (children != null) {
                            urls.addAll(toUrlsWithEmpty(url, path, children));
                        }
                    }
              		// 在注册zookeeper的节点监听器后,自动去刷新本地列表
                    notify(url, listener, urls);
                }
            } catch (Throwable e) {
                ...
            }
        }
    
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            try {
                doNotify(url, listener, urls);
            } catch (Exception t) {
                ...
            }
        }
    protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
            super.notify(url, listener, urls);
        }
    

    AbstractRegistry

    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        // 遍历监听的URL 3个 添加到result
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified == null) {
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            categoryNotified = notified.get(url);
        }
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            //将监听过的URL保存到本地文件
            saveProperties(url);
          	//刷新本地注册表
            listener.notify(categoryList);
        }
    }
    

    RegistryDirectory

    public synchronized void notify(List<URL> urls) {
      	// 分别对应 provider, router 和 configurator节点
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        // configurators
        if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
            this.configurators = toConfigurators(configuratorUrls);
        }
        // routers
        if (routerUrls != null && !routerUrls.isEmpty()) {
            List<Router> routers = toRouters(routerUrls);
            if (routers != null) { // null - do nothing
                setRouters(routers);
            }
        }
        List<Configurator> localConfigurators = this.configurators; // local reference
        // merge override parameters
        this.overrideDirectoryUrl = directoryUrl;
        if (localConfigurators != null && !localConfigurators.isEmpty()) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        // providers
        refreshInvoker(invokerUrls);
    }
    // 刷新本地注册表
    private void refreshInvoker(List<URL> invokerUrls) {
            if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                    && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
                this.forbidden = true; // Forbid to access
                this.methodInvokerMap = null; // Set the method invoker map to null
                destroyAllInvokers(); // Close all invokers
            } else {
                this.forbidden = false; // Allow to access
                Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
                if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                    invokerUrls.addAll(this.cachedInvokerUrls);
                } else {
                    this.cachedInvokerUrls = new HashSet<URL>();
                    this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
                }
                if (invokerUrls.isEmpty()) {
                    return;
                }
              	//转换URL为Invoker对象 只有provider节点的url才能生成Invoker对象
              	// 这里返回的是一个invoker的过滤连结构,终点是DubboInvoker
                Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
              	//转换成方法名对应Invoker
                Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
                // state change
                // If the calculation is wrong, it is not processed.
                if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                    logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                    return;
                }
                this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
                this.urlInvokerMap = newUrlInvokerMap;
                try {
                  	// 销毁无用的Invoker对象
                    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
                } catch (Exception e) {
                    logger.warn("destroyUnusedInvokers error. ", e);
                }
            }
        }
    

    客户端在服务的时候会将消费端信息注册到zookeeper(也可以试别的)节点上,顺便监听了providers,configurators,routers这三个节点,然后调用了RegistryDirectory.notiry刷新本地注册表, 返回的结果(引用对象)为MockClusterInvoker包含了RegistryDirectory对象

    小结:

    1. 注册到zookeeper,并订阅providers,configurators和routers节点
    2. 通过refprotocol.refer获取的invoker对象是MockClusterInvoker(默认包装了FailoverClusterInvoker)
    3. ReferenceBean#getObject获取的对象是上面的proxy0对象, 依赖了(2)中的MockClusterInvoker
  • 相关阅读:
    解决pip3的ImportError: cannot import name 'main'
    linux 安装Python3.6
    Linux安装redis和部署
    redis密码管理
    CentOS7使用firewalld打开关闭防火墙与端口
    scrapy 从Windwos平台移植到 Linux平台之实操
    Linux 环境下安装Maven
    解决:安装Jenkins时web界面出现该jenkins实例似乎已离线
    持续集成工具Jenkins结合SVN的安装和使用
    Linux下的SVN服务器搭建
  • 原文地址:https://www.cnblogs.com/qiaozhuangshi/p/11007043.html
Copyright © 2011-2022 走看看