zoukankan      html  css  js  c++  java
  • Rpc框架dubbo-client(v2.6.3) 源码阅读(二)

    接上一篇 dubbo-server 之后,再来看一下 dubbo-client 是如何工作的。

    dubbo提供者服务示例, 其结构是这样的!
    dubbo://192.168.11.6:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=12720&side=provider&timestamp=1534902103892
    dubbo消费者示例,其结构是这样的!
    dubbo://192.168.11.6:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=17400&qos.port=33333&register.ip=192.168.11.6&remote.timestamp=1537448440181&side=consumer&timestamp=1537871015998

    官网可以运行起来的实例:

    // 提供者:
    public class Provider {
        public static void main(String[] args) throws Exception {
            //Prevent to get IPV6 address,this way only work in debug mode
            //But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not
            System.setProperty("java.net.preferIPv4Stack", "true");
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});
            context.start();
            
            // 这个比较巧妙, 只需停留在当前点,不按下enter键,服务就会一直存在,等待消费者连接,而且无需真正提供一个监听服务
            System.in.read(); // press any key to exit
        }
    
    }
    
    // 消费者:
    public class Consumer {
        public static void main(String[] args) {
            //Prevent to get IPV6 address,this way only work in debug mode
            //But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not
            System.setProperty("java.net.preferIPv4Stack", "true");
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
            context.start();
            DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
            String hello = demoService.sayHello("world"); // call remote method, 调用远程服务和本地服务一样,这是其优势所在
            System.out.println(hello); // get result
            }
        }
    }

    以上代码可以直接运行起来,但是建议还是个搭建一个Zookeeper来用用,因为线上基本都是Zk的,搭建也很简单,可参考:ZooKeeper 搭建笔记 

    初始化Context,如上一篇server过程! Rpc框架dubbo-server(v2.6.3) 源码阅读(一)

    // DubboNamespaceHandler, provider与consumer主要的差别在于xml标签的获取不一致,
    public class DubboNamespaceHandler extends NamespaceHandlerSupport {
    
        static {
            Version.checkDuplicate(DubboNamespaceHandler.class);
        }
    
        @Override
        public void init() {
            registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
            registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
            registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
            registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
            // provider解析
            registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
            // consumer解析
            registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
            registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
            registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
            registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
            registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
        }
    
    }

    // 主要看获取bean的过程
    DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy

     // spring, 获取bean, 通过 org.springframework.beans.factory.support.DefaultListableBeanFactory, 获取 
        @Override
        public Object getBean(String name) throws BeansException {
            assertBeanFactoryActive();
            return getBeanFactory().getBean(name);
        }
    
            protected <T> T doGetBean(
                final String name, final Class<T> requiredType, final Object[] args, boolean typeCheckOnly)
                throws BeansException {
    
            final String beanName = transformedBeanName(name);
            Object bean;
    
            // Eagerly check singleton cache for manually registered singletons.
            // 通过该方法获取bean代理实例,com.alibaba.dubbo.config.spring.ReferenceBean
            Object sharedInstance = getSingleton(beanName);
            if (sharedInstance != null && args == null) {
                if (logger.isDebugEnabled()) {
                    if (isSingletonCurrentlyInCreation(beanName)) {
                        logger.debug("Returning eagerly cached instance of singleton bean '" + beanName +
                                "' that is not fully initialized yet - a consequence of a circular reference");
                    }
                    else {
                        logger.debug("Returning cached instance of singleton bean '" + beanName + "'");
                    }
                }
                bean = getObjectForBeanInstance(sharedInstance, name, beanName, null);
            }
    
            else {
                // Fail if we're already creating this bean instance:
                // We're assumably within a circular reference.
                if (isPrototypeCurrentlyInCreation(beanName)) {
                    throw new BeanCurrentlyInCreationException(beanName);
                }
    
                // Check if bean definition exists in this factory.
                BeanFactory parentBeanFactory = getParentBeanFactory();
                if (parentBeanFactory != null && !containsBeanDefinition(beanName)) {
                    // Not found -> check parent.
                    String nameToLookup = originalBeanName(name);
                    if (args != null) {
                        // Delegation to parent with explicit args.
                        return (T) parentBeanFactory.getBean(nameToLookup, args);
                    }
                    else {
                        // No args -> delegate to standard getBean method.
                        return parentBeanFactory.getBean(nameToLookup, requiredType);
                    }
                }
    
                if (!typeCheckOnly) {
                    markBeanAsCreated(beanName);
                }
    
                try {
                    final RootBeanDefinition mbd = getMergedLocalBeanDefinition(beanName);
                    checkMergedBeanDefinition(mbd, beanName, args);
    
                    // Guarantee initialization of beans that the current bean depends on.
                    String[] dependsOn = mbd.getDependsOn();
                    if (dependsOn != null) {
                        for (String dep : dependsOn) {
                            if (isDependent(beanName, dep)) {
                                throw new BeanCreationException(mbd.getResourceDescription(), beanName,
                                        "Circular depends-on relationship between '" + beanName + "' and '" + dep + "'");
                            }
                            registerDependentBean(dep, beanName);
                            try {
                                getBean(dep);
                            }
                            catch (NoSuchBeanDefinitionException ex) {
                                throw new BeanCreationException(mbd.getResourceDescription(), beanName,
                                        "'" + beanName + "' depends on missing bean '" + dep + "'", ex);
                            }
                        }
                    }
    
                    // Create bean instance.
                    if (mbd.isSingleton()) {
                        sharedInstance = getSingleton(beanName, new ObjectFactory<Object>() {
                            @Override
                            public Object getObject() throws BeansException {
                                try {
                                    return createBean(beanName, mbd, args);
                                }
                                catch (BeansException ex) {
                                    // Explicitly remove instance from singleton cache: It might have been put there
                                    // eagerly by the creation process, to allow for circular reference resolution.
                                    // Also remove any beans that received a temporary reference to the bean.
                                    destroySingleton(beanName);
                                    throw ex;
                                }
                            }
                        });
                        bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);
                    }
    
                    else if (mbd.isPrototype()) {
                        // It's a prototype -> create a new instance.
                        Object prototypeInstance = null;
                        try {
                            beforePrototypeCreation(beanName);
                            prototypeInstance = createBean(beanName, mbd, args);
                        }
                        finally {
                            afterPrototypeCreation(beanName);
                        }
                        bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);
                    }
    
                    else {
                        String scopeName = mbd.getScope();
                        final Scope scope = this.scopes.get(scopeName);
                        if (scope == null) {
                            throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'");
                        }
                        try {
                            Object scopedInstance = scope.get(beanName, new ObjectFactory<Object>() {
                                @Override
                                public Object getObject() throws BeansException {
                                    beforePrototypeCreation(beanName);
                                    try {
                                        return createBean(beanName, mbd, args);
                                    }
                                    finally {
                                        afterPrototypeCreation(beanName);
                                    }
                                }
                            });
                            bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);
                        }
                        catch (IllegalStateException ex) {
                            throw new BeanCreationException(beanName,
                                    "Scope '" + scopeName + "' is not active for the current thread; consider " +
                                    "defining a scoped proxy for this bean if you intend to refer to it from a singleton",
                                    ex);
                        }
                    }
                }
                catch (BeansException ex) {
                    cleanupAfterBeanCreationFailure(beanName);
                    throw ex;
                }
            }
    
            // Check if required type matches the type of the actual bean instance.
            if (requiredType != null && bean != null && !requiredType.isInstance(bean)) {
                try {
                    return getTypeConverter().convertIfNecessary(bean, requiredType);
                }
                catch (TypeMismatchException ex) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Failed to convert bean '" + name + "' to required type '" +
                                ClassUtils.getQualifiedName(requiredType) + "'", ex);
                    }
                    throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass());
                }
            }
            return (T) bean;
        }
    View Code


    // ApplicationConfig 初始化时,调用dubbo实例进行初始化

        // org.springframework.beans.BeanUtils.instantiateClass 初始化 ReferenceConfig 类,通过反射调用
        // public com.alibaba.dubbo.config.RegistryConfig()
         public static <T> T instantiateClass(Constructor<T> ctor, Object... args) throws BeanInstantiationException {
            Assert.notNull(ctor, "Constructor must not be null");
            try {
                ReflectionUtils.makeAccessible(ctor);
                // 生成新的实例
                return ctor.newInstance(args);
            }
            catch (InstantiationException ex) {
                throw new BeanInstantiationException(ctor, "Is it an abstract class?", ex);
            }
            catch (IllegalAccessException ex) {
                throw new BeanInstantiationException(ctor, "Is the constructor accessible?", ex);
            }
            catch (IllegalArgumentException ex) {
                throw new BeanInstantiationException(ctor, "Illegal arguments for constructor", ex);
            }
            catch (InvocationTargetException ex) {
                throw new BeanInstantiationException(ctor, "Constructor threw exception", ex.getTargetException());
            }
        }

    // 通过getBean, 转移到 ReferenceBean.getObject() 触发代理初始化

        //     org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean, 触发 创建bean操作,触发 getObject() 进行初始化
        protected <T> T doGetBean(
                final String name, final Class<T> requiredType, final Object[] args, boolean typeCheckOnly)
                throws BeansException {
    
            final String beanName = transformedBeanName(name);
            Object bean;
    
            // Eagerly check singleton cache for manually registered singletons.
            Object sharedInstance = getSingleton(beanName);
            if (sharedInstance != null && args == null) {
                if (logger.isDebugEnabled()) {
                    if (isSingletonCurrentlyInCreation(beanName)) {
                        logger.debug("Returning eagerly cached instance of singleton bean '" + beanName +
                                "' that is not fully initialized yet - a consequence of a circular reference");
                    }
                    else {
                        logger.debug("Returning cached instance of singleton bean '" + beanName + "'");
                    }
                }
                bean = getObjectForBeanInstance(sharedInstance, name, beanName, null);
            }
    
            else {
                // Fail if we're already creating this bean instance:
                // We're assumably within a circular reference.
                if (isPrototypeCurrentlyInCreation(beanName)) {
                    throw new BeanCurrentlyInCreationException(beanName);
                }
    
                // Check if bean definition exists in this factory.
                BeanFactory parentBeanFactory = getParentBeanFactory();
                if (parentBeanFactory != null && !containsBeanDefinition(beanName)) {
                    // Not found -> check parent.
                    String nameToLookup = originalBeanName(name);
                    if (args != null) {
                        // Delegation to parent with explicit args.
                        return (T) parentBeanFactory.getBean(nameToLookup, args);
                    }
                    else {
                        // No args -> delegate to standard getBean method.
                        return parentBeanFactory.getBean(nameToLookup, requiredType);
                    }
                }
    
                if (!typeCheckOnly) {
                    markBeanAsCreated(beanName);
                }
    
                try {
                    final RootBeanDefinition mbd = getMergedLocalBeanDefinition(beanName);
                    checkMergedBeanDefinition(mbd, beanName, args);
    
                    // Guarantee initialization of beans that the current bean depends on.
                    String[] dependsOn = mbd.getDependsOn();
                    if (dependsOn != null) {
                        for (String dep : dependsOn) {
                            if (isDependent(beanName, dep)) {
                                throw new BeanCreationException(mbd.getResourceDescription(), beanName,
                                        "Circular depends-on relationship between '" + beanName + "' and '" + dep + "'");
                            }
                            registerDependentBean(dep, beanName);
                            try {
                                getBean(dep);
                            }
                            catch (NoSuchBeanDefinitionException ex) {
                                throw new BeanCreationException(mbd.getResourceDescription(), beanName,
                                        "'" + beanName + "' depends on missing bean '" + dep + "'", ex);
                            }
                        }
                    }
    
                    // Create bean instance.
                    if (mbd.isSingleton()) {
                        // 此处进行bean创建
                        sharedInstance = getSingleton(beanName, new ObjectFactory<Object>() {
                            @Override
                            public Object getObject() throws BeansException {
                                try {
                                    return createBean(beanName, mbd, args);
                                }
                                catch (BeansException ex) {
                                    // Explicitly remove instance from singleton cache: It might have been put there
                                    // eagerly by the creation process, to allow for circular reference resolution.
                                    // Also remove any beans that received a temporary reference to the bean.
                                    destroySingleton(beanName);
                                    throw ex;
                                }
                            }
                        });
                        bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);
                    }
    
                    else if (mbd.isPrototype()) {
                        // It's a prototype -> create a new instance.
                        Object prototypeInstance = null;
                        try {
                            beforePrototypeCreation(beanName);
                            prototypeInstance = createBean(beanName, mbd, args);
                        }
                        finally {
                            afterPrototypeCreation(beanName);
                        }
                        bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);
                    }
    
                    else {
                        String scopeName = mbd.getScope();
                        final Scope scope = this.scopes.get(scopeName);
                        if (scope == null) {
                            throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'");
                        }
                        try {
                            Object scopedInstance = scope.get(beanName, new ObjectFactory<Object>() {
                                @Override
                                public Object getObject() throws BeansException {
                                    beforePrototypeCreation(beanName);
                                    try {
                                        return createBean(beanName, mbd, args);
                                    }
                                    finally {
                                        afterPrototypeCreation(beanName);
                                    }
                                }
                            });
                            bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);
                        }
                        catch (IllegalStateException ex) {
                            throw new BeanCreationException(beanName,
                                    "Scope '" + scopeName + "' is not active for the current thread; consider " +
                                    "defining a scoped proxy for this bean if you intend to refer to it from a singleton",
                                    ex);
                        }
                    }
                }
                catch (BeansException ex) {
                    cleanupAfterBeanCreationFailure(beanName);
                    throw ex;
                }
            }
    
            // Check if required type matches the type of the actual bean instance.
            if (requiredType != null && bean != null && !requiredType.isInstance(bean)) {
                try {
                    return getTypeConverter().convertIfNecessary(bean, requiredType);
                }
                catch (TypeMismatchException ex) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Failed to convert bean '" + name + "' to required type '" +
                                ClassUtils.getQualifiedName(requiredType) + "'", ex);
                    }
                    throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass());
                }
            }
            return (T) bean;
        }
    
        
        // org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton
        public Object getSingleton(String beanName, ObjectFactory<?> singletonFactory) {
            Assert.notNull(beanName, "'beanName' must not be null");
            synchronized (this.singletonObjects) {
                Object singletonObject = this.singletonObjects.get(beanName);
                if (singletonObject == null) {
                    if (this.singletonsCurrentlyInDestruction) {
                        throw new BeanCreationNotAllowedException(beanName,
                                "Singleton bean creation not allowed while singletons of this factory are in destruction " +
                                "(Do not request a bean from a BeanFactory in a destroy method implementation!)");
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("Creating shared instance of singleton bean '" + beanName + "'");
                    }
                    beforeSingletonCreation(beanName);
                    boolean newSingleton = false;
                    boolean recordSuppressedExceptions = (this.suppressedExceptions == null);
                    if (recordSuppressedExceptions) {
                        this.suppressedExceptions = new LinkedHashSet<Exception>();
                    }
                    try {
                        singletonObject = singletonFactory.getObject();
                        newSingleton = true;
                    }
                    catch (IllegalStateException ex) {
                        // Has the singleton object implicitly appeared in the meantime ->
                        // if yes, proceed with it since the exception indicates that state.
                        singletonObject = this.singletonObjects.get(beanName);
                        if (singletonObject == null) {
                            throw ex;
                        }
                    }
                    catch (BeanCreationException ex) {
                        if (recordSuppressedExceptions) {
                            for (Exception suppressedException : this.suppressedExceptions) {
                                ex.addRelatedCause(suppressedException);
                            }
                        }
                        throw ex;
                    }
                    finally {
                        if (recordSuppressedExceptions) {
                            this.suppressedExceptions = null;
                        }
                        afterSingletonCreation(beanName);
                    }
                    if (newSingleton) {
                        // ref是在此处进行初始化的,有点神奇
                        addSingleton(beanName, singletonObject);
                    }
                }
                return (singletonObject != NULL_OBJECT ? singletonObject : null);
            }
        }
    
        // org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.addSingleton
        protected void addSingleton(String beanName, Object singletonObject) {
            // 就是该 synchronized 方法,触发了 ReferenceConfig.ref 的初始化,实际上是触发了 AbstractConfig.toString() 方法
            synchronized (this.singletonObjects) {
                this.singletonObjects.put(beanName, (singletonObject != null ? singletonObject : NULL_OBJECT));
                this.singletonFactories.remove(beanName);
                this.earlySingletonObjects.remove(beanName);
                this.registeredSingletons.add(beanName);
            }
        }
    View Code

    通过 ReferenceBean.getObject() 方法,获取代理:

     // 由 com.alibaba.dubbo.config.spring.ReferenceBean.getObject 进行获取代理,从而进行代理创建
        @Override
        public Object getObject() throws Exception {
            return get();
        }
        
     // 调用父类 com.alibaba.dubbo.config.ReferenceConfig.get() 方法, 获取实例
        public synchronized T get() {
            if (destroyed) {
                throw new IllegalStateException("Already destroyed!");
            }
            if (ref == null) {
                // 未初始化时,触发一次初始化
                init();
            }
            return ref;
        }

    真正的代理之路开始了:

        // 初始化代理,并设值到 ref 变量中
        private void init() {
            if (initialized) {
                return;
            }
            initialized = true;
            if (interfaceName == null || interfaceName.length() == 0) {
                throw new IllegalStateException("<dubbo:reference interface="" /> interface not allow null!");
            }
            // 在此处添加堆栈打印,排除路径,如下
            new Throwable("*******************************ReferenceConfig.init trace dump").printStackTrace();
            new Throwable().printStackTrace();
            // get consumer's global configuration
            checkDefault();
            appendProperties(this);
            if (getGeneric() == null && getConsumer() != null) {
                setGeneric(getConsumer().getGeneric());
            }
            if (ProtocolUtils.isGeneric(getGeneric())) {
                interfaceClass = GenericService.class;
            } else {
                try {
                    interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                            .getContextClassLoader());
                } catch (ClassNotFoundException e) {
                    throw new IllegalStateException(e.getMessage(), e);
                }
                checkInterfaceAndMethods(interfaceClass, methods);
            }
            String resolve = System.getProperty(interfaceName);
            String resolveFile = null;
            if (resolve == null || resolve.length() == 0) {
                resolveFile = System.getProperty("dubbo.resolve.file");
                if (resolveFile == null || resolveFile.length() == 0) {
                    File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
                    if (userResolveFile.exists()) {
                        resolveFile = userResolveFile.getAbsolutePath();
                    }
                }
                if (resolveFile != null && resolveFile.length() > 0) {
                    Properties properties = new Properties();
                    FileInputStream fis = null;
                    try {
                        fis = new FileInputStream(new File(resolveFile));
                        properties.load(fis);
                    } catch (IOException e) {
                        throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
                    } finally {
                        try {
                            if (null != fis) fis.close();
                        } catch (IOException e) {
                            logger.warn(e.getMessage(), e);
                        }
                    }
                    resolve = properties.getProperty(interfaceName);
                }
            }
            if (resolve != null && resolve.length() > 0) {
                url = resolve;
                if (logger.isWarnEnabled()) {
                    if (resolveFile != null) {
                        logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
                    } else {
                        logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
                    }
                }
            }
            if (consumer != null) {
                if (application == null) {
                    application = consumer.getApplication();
                }
                if (module == null) {
                    module = consumer.getModule();
                }
                if (registries == null) {
                    registries = consumer.getRegistries();
                }
                if (monitor == null) {
                    monitor = consumer.getMonitor();
                }
            }
            if (module != null) {
                if (registries == null) {
                    registries = module.getRegistries();
                }
                if (monitor == null) {
                    monitor = module.getMonitor();
                }
            }
            if (application != null) {
                if (registries == null) {
                    registries = application.getRegistries();
                }
                if (monitor == null) {
                    monitor = application.getMonitor();
                }
            }
            checkApplication();
            // 检查mock配置情况,尝试调用mock实例
            checkStubAndMock(interfaceClass);
            Map<String, String> map = new HashMap<String, String>();
            Map<Object, Object> attributes = new HashMap<Object, Object>();
            map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
            map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
            map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
            if (ConfigUtils.getPid() > 0) {
                map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
            }
            if (!isGeneric()) {
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put("revision", revision);
                }
    
                // 获取wrapper方法
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if (methods.length == 0) {
                    logger.warn("NO method found in service interface " + interfaceClass.getName());
                    map.put("methods", Constants.ANY_VALUE);
                } else {
                    map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
                }
            }
            map.put(Constants.INTERFACE_KEY, interfaceName);
            appendParameters(map, application);
            appendParameters(map, module);
            appendParameters(map, consumer, Constants.DEFAULT_KEY);
            appendParameters(map, this);
            String prefix = StringUtils.getServiceKey(map);
            if (methods != null && !methods.isEmpty()) {
                for (MethodConfig method : methods) {
                    appendParameters(map, method, method.getName());
                    String retryKey = method.getName() + ".retry";
                    if (map.containsKey(retryKey)) {
                        String retryValue = map.remove(retryKey);
                        if ("false".equals(retryValue)) {
                            map.put(method.getName() + ".retries", "0");
                        }
                    }
                    appendAttributes(attributes, method, prefix + "." + method.getName());
                    checkAndConvertImplicitConfig(method, map, attributes);
                }
            }
    
            String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
            if (hostToRegistry == null || hostToRegistry.length() == 0) {
                hostToRegistry = NetUtils.getLocalHost();
            } else if (isInvalidLocalHost(hostToRegistry)) {
                throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
            }
            map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
    
            //attributes are stored by system context.
            StaticContext.getSystemContext().putAll(attributes);
            ref = createProxy(map);
            ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
            ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
        }
        

    控制权留给mock,先检查下是否有设置,有则先检查是否正确。

        // AbstractInterfaceConfig.checkStubAndMock()
        protected void checkStubAndMock(Class<?> interfaceClass) {
            if (ConfigUtils.isNotEmpty(local)) {
                Class<?> localClass = ConfigUtils.isDefault(local) ? ReflectUtils.forName(interfaceClass.getName() + "Local") : ReflectUtils.forName(local);
                if (!interfaceClass.isAssignableFrom(localClass)) {
                    throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceClass.getName());
                }
                try {
                    ReflectUtils.findConstructor(localClass, interfaceClass);
                } catch (NoSuchMethodException e) {
                    throw new IllegalStateException("No such constructor "public " + localClass.getSimpleName() + "(" + interfaceClass.getName() + ")" in local implementation class " + localClass.getName());
                }
            }
            if (ConfigUtils.isNotEmpty(stub)) {
                Class<?> localClass = ConfigUtils.isDefault(stub) ? ReflectUtils.forName(interfaceClass.getName() + "Stub") : ReflectUtils.forName(stub);
                if (!interfaceClass.isAssignableFrom(localClass)) {
                    throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceClass.getName());
                }
                try {
                    ReflectUtils.findConstructor(localClass, interfaceClass);
                } catch (NoSuchMethodException e) {
                    throw new IllegalStateException("No such constructor "public " + localClass.getSimpleName() + "(" + interfaceClass.getName() + ")" in local implementation class " + localClass.getName());
                }
            }
            if (ConfigUtils.isNotEmpty(mock)) {
                if (mock.startsWith(Constants.RETURN_PREFIX)) {
                    String value = mock.substring(Constants.RETURN_PREFIX.length());
                    try {
                        MockInvoker.parseMockValue(value);
                    } catch (Exception e) {
                        throw new IllegalStateException("Illegal mock json value in <dubbo:service ... mock="" + mock + "" />");
                    }
                } else {
                    Class<?> mockClass = ConfigUtils.isDefault(mock) ? ReflectUtils.forName(interfaceClass.getName() + "Mock") : ReflectUtils.forName(mock);
                    if (!interfaceClass.isAssignableFrom(mockClass)) {
                        throw new IllegalStateException("The mock implementation class " + mockClass.getName() + " not implement interface " + interfaceClass.getName());
                    }
                    try {
                        mockClass.getConstructor(new Class<?>[0]);
                    } catch (NoSuchMethodException e) {
                        throw new IllegalStateException("No such empty constructor "public " + mockClass.getSimpleName() + "()" in mock implementation class " + mockClass.getName());
                    }
                }
            }
        }
        

    获取wrapper,以备后续调用:

        // com.alibaba.dubbo.common.bytecode.Wrapper
        public static Wrapper getWrapper(Class<?> c) {
            while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
                c = c.getSuperclass();
    
            if (c == Object.class)
                return OBJECT_WRAPPER;
    
            Wrapper ret = WRAPPER_MAP.get(c);
            if (ret == null) {
                ret = makeWrapper(c);
                WRAPPER_MAP.put(c, ret);
            }
            return ret;
        }
        
        private static Wrapper makeWrapper(Class<?> c) {
            if (c.isPrimitive())
                throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
    
            String name = c.getName();
            ClassLoader cl = ClassHelper.getClassLoader(c);
    
            StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
            StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
            StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
    
            c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
            c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
            c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
    
            Map<String, Class<?>> pts = new HashMap<String, Class<?>>(); // <property name, property types>
            Map<String, Method> ms = new LinkedHashMap<String, Method>(); // <method desc, Method instance>
            List<String> mns = new ArrayList<String>(); // method names.
            List<String> dmns = new ArrayList<String>(); // declaring method names.
    
            // get all public field.
            for (Field f : c.getFields()) {
                String fn = f.getName();
                Class<?> ft = f.getType();
                if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers()))
                    continue;
    
                c1.append(" if( $2.equals("").append(fn).append("") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
                c2.append(" if( $2.equals("").append(fn).append("") ){ return ($w)w.").append(fn).append("; }");
                pts.put(fn, ft);
            }
    
            Method[] methods = c.getMethods();
            // get all public method.
            boolean hasMethod = hasMethods(methods);
            if (hasMethod) {
                c3.append(" try{");
            }
            for (Method m : methods) {
                if (m.getDeclaringClass() == Object.class) //ignore Object's method.
                    continue;
    
                String mn = m.getName();
                c3.append(" if( "").append(mn).append("".equals( $2 ) ");
                int len = m.getParameterTypes().length;
                c3.append(" && ").append(" $3.length == ").append(len);
    
                boolean override = false;
                for (Method m2 : methods) {
                    if (m != m2 && m.getName().equals(m2.getName())) {
                        override = true;
                        break;
                    }
                }
                if (override) {
                    if (len > 0) {
                        for (int l = 0; l < len; l++) {
                            c3.append(" && ").append(" $3[").append(l).append("].getName().equals("")
                                    .append(m.getParameterTypes()[l].getName()).append("")");
                        }
                    }
                }
    
                c3.append(" ) { ");
    
                if (m.getReturnType() == Void.TYPE)
                    c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
                else
                    c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
    
                c3.append(" }");
    
                mns.add(mn);
                if (m.getDeclaringClass() == c)
                    dmns.add(mn);
                ms.put(ReflectUtils.getDesc(m), m);
            }
            if (hasMethod) {
                c3.append(" } catch(Throwable e) { ");
                c3.append("     throw new java.lang.reflect.InvocationTargetException(e); ");
                c3.append(" }");
            }
    
            c3.append(" throw new " + NoSuchMethodException.class.getName() + "("Not found method \""+$2+"\" in class " + c.getName() + "."); }");
    
            // deal with get/set method.
            Matcher matcher;
            for (Map.Entry<String, Method> entry : ms.entrySet()) {
                String md = entry.getKey();
                Method method = (Method) entry.getValue();
                if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
                    String pn = propertyName(matcher.group(1));
                    c2.append(" if( $2.equals("").append(pn).append("") ){ return ($w)w.").append(method.getName()).append("(); }");
                    pts.put(pn, method.getReturnType());
                } else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
                    String pn = propertyName(matcher.group(1));
                    c2.append(" if( $2.equals("").append(pn).append("") ){ return ($w)w.").append(method.getName()).append("(); }");
                    pts.put(pn, method.getReturnType());
                } else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
                    Class<?> pt = method.getParameterTypes()[0];
                    String pn = propertyName(matcher.group(1));
                    c1.append(" if( $2.equals("").append(pn).append("") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");
                    pts.put(pn, pt);
                }
            }
            c1.append(" throw new " + NoSuchPropertyException.class.getName() + "("Not found property \""+$2+"\" filed or setter method in class " + c.getName() + "."); }");
            c2.append(" throw new " + NoSuchPropertyException.class.getName() + "("Not found property \""+$2+"\" filed or setter method in class " + c.getName() + "."); }");
    
            // make class
            long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
            ClassGenerator cc = ClassGenerator.newInstance(cl);
            cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
            cc.setSuperClass(Wrapper.class);
    
            cc.addDefaultConstructor();
            cc.addField("public static String[] pns;"); // property name array.
            cc.addField("public static " + Map.class.getName() + " pts;"); // property type map.
            cc.addField("public static String[] mns;"); // all method name array.
            cc.addField("public static String[] dmns;"); // declared method name array.
            for (int i = 0, len = ms.size(); i < len; i++)
                cc.addField("public static Class[] mts" + i + ";");
    
            cc.addMethod("public String[] getPropertyNames(){ return pns; }");
            cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
            cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
            cc.addMethod("public String[] getMethodNames(){ return mns; }");
            cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
            cc.addMethod(c1.toString());
            cc.addMethod(c2.toString());
            cc.addMethod(c3.toString());
    
            try {
                Class<?> wc = cc.toClass();
                // setup static field.
                wc.getField("pts").set(null, pts);
                wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
                wc.getField("mns").set(null, mns.toArray(new String[0]));
                wc.getField("dmns").set(null, dmns.toArray(new String[0]));
                int ix = 0;
                for (Method m : ms.values())
                    wc.getField("mts" + ix++).set(null, m.getParameterTypes());
                return (Wrapper) wc.newInstance();
            } catch (RuntimeException e) {
                throw e;
            } catch (Throwable e) {
                throw new RuntimeException(e.getMessage(), e);
            } finally {
                cc.release();
                ms.clear();
                mns.clear();
                dmns.clear();
            }
        }
        

    // InvokerInvocationHandler, 代理所有的dubbo请求处理

        
    // InvokerInvocationHandler, 代理所有的dubbo请求处理
    public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker<?> invoker;
    
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    
    }

    // 小插曲,日志打印

    //FailsafeLogger.warn(), 格式为: No spring extension(bean) named:defaultCompiler, try to find an extension(bean) of type java.lang.String, dubbo version: , current host: 192.168.11.6
    
        @Override
        public void warn(String msg, Throwable e) {
            try {
                logger.warn(appendContextMessage(msg), e);
            } catch (Throwable t) {
            }
        }
        private String appendContextMessage(String msg) {
            return " [DUBBO] " + msg + ", dubbo version: " + Version.getVersion() + ", current host: " + NetUtils.getLocalHost();
        }
    
        private Class<?> createAdaptiveExtensionClass() {
            String code = createAdaptiveExtensionClassCode();
            ClassLoader classLoader = findClassLoader();
            com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
            return compiler.compile(code, classLoader);
        }

    // wrapper0

        // 生成的包装类如下: com.alibaba.dubbo.common.bytecode.Wrapper0
    package com.alibaba.dubbo.common.bytecode;
    
    import com.alibaba.dubbo.common.bytecode.ClassGenerator.DC;
    import com.alibaba.dubbo.demo.DemoService;
    import java.lang.reflect.InvocationTargetException;
    import java.util.Map;
    
    public class Wrapper0 extends Wrapper implements DC {
        public static String[] pns;
        public static Map pts;
        public static String[] mns;
        public static String[] dmns;
        public static Class[] mts0;
    
        public String[] getPropertyNames() {
            return pns;
        }
    
        public boolean hasProperty(String var1) {
            return pts.containsKey(var1);
        }
    
        public Class getPropertyType(String var1) {
            return (Class)pts.get(var1);
        }
    
        public String[] getMethodNames() {
            return mns;
        }
    
        public String[] getDeclaredMethodNames() {
            return dmns;
        }
    
        public void setPropertyValue(Object var1, String var2, Object var3) {
            try {
                DemoService var4 = (DemoService)var1;
            } catch (Throwable var6) {
                throw new IllegalArgumentException(var6);
            }
    
            throw new NoSuchPropertyException("Not found property "" + var2 + "" filed or setter method in class com.alibaba.dubbo.demo.DemoService.");
        }
    
        public Object getPropertyValue(Object var1, String var2) {
            try {
                DemoService var3 = (DemoService)var1;
            } catch (Throwable var5) {
                throw new IllegalArgumentException(var5);
            }
    
            throw new NoSuchPropertyException("Not found property "" + var2 + "" filed or setter method in class com.alibaba.dubbo.demo.DemoService.");
        }
    
        public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
            DemoService var5;
            try {
                var5 = (DemoService)var1;
            } catch (Throwable var8) {
                throw new IllegalArgumentException(var8);
            }
    
            try {
                if ("sayHello".equals(var2) && var3.length == 1) {
                    return var5.sayHello((String)var4[0]);
                }
            } catch (Throwable var9) {
                throw new InvocationTargetException(var9);
            }
    
            throw new NoSuchMethodException("Not found method "" + var2 + "" in class com.alibaba.dubbo.demo.DemoService.");
        }
    
        public Wrapper0() {
        }
    }

    // createProxy

        // com.alibaba.dubbo.common.bytecode.ClassGenerator, 生成class实例
        public Class<?> toClass() {
            return toClass(ClassHelper.getClassLoader(ClassGenerator.class), getClass().getProtectionDomain());
        }
    
        public Class<?> toClass(ClassLoader loader, ProtectionDomain pd) {
            if (mCtc != null)
                mCtc.detach();
            long id = CLASS_NAME_COUNTER.getAndIncrement();
            try {
                CtClass ctcs = mSuperClass == null ? null : mPool.get(mSuperClass);
                if (mClassName == null)
                    mClassName = (mSuperClass == null || javassist.Modifier.isPublic(ctcs.getModifiers())
                            ? ClassGenerator.class.getName() : mSuperClass + "$sc") + id;
                mCtc = mPool.makeClass(mClassName);
                if (mSuperClass != null)
                    mCtc.setSuperclass(ctcs);
                // 添加dubbo ClassGenerator 生成的动态类的标志,接口方法为空
                mCtc.addInterface(mPool.get(DC.class.getName())); // add dynamic class tag.
                if (mInterfaces != null)
                    for (String cl : mInterfaces) mCtc.addInterface(mPool.get(cl));
                if (mFields != null)
                    for (String code : mFields) mCtc.addField(CtField.make(code, mCtc));
                if (mMethods != null) {
                    for (String code : mMethods) {
                        if (code.charAt(0) == ':')
                            mCtc.addMethod(CtNewMethod.copy(getCtMethod(mCopyMethods.get(code.substring(1))), code.substring(1, code.indexOf('(')), mCtc, null));
                        else
                            mCtc.addMethod(CtNewMethod.make(code, mCtc));
                    }
                }
                if (mDefaultConstructor)
                    mCtc.addConstructor(CtNewConstructor.defaultConstructor(mCtc));
                if (mConstructors != null) {
                    for (String code : mConstructors) {
                        if (code.charAt(0) == ':') {
                            mCtc.addConstructor(CtNewConstructor.copy(getCtConstructor(mCopyConstructors.get(code.substring(1))), mCtc, null));
                        } else {
                            String[] sn = mCtc.getSimpleName().split("\$+"); // inner class name include $.
                            mCtc.addConstructor(CtNewConstructor.make(code.replaceFirst(SIMPLE_NAME_TAG, sn[sn.length - 1]), mCtc));
                        }
                    }
                }
                return mCtc.toClass(loader, pd);
            } catch (RuntimeException e) {
                throw e;
            } catch (NotFoundException e) {
                throw new RuntimeException(e.getMessage(), e);
            } catch (CannotCompileException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    
    
        // 创建操作代理类
        // ref = createProxy(map); 生成新proxy {methods=sayHello, timestamp=1537527140342, dubbo=2.0.2, register.ip=192.168.11.6, application=demo-consumer, check=false, side=consumer, pid=8776, interface=com.alibaba.dubbo.demo.DemoService, qos.port=33333}
        @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
        private T createProxy(Map<String, String> map) {
            URL tmpUrl = new URL("temp", "localhost", 0, map);
            final boolean isJvmRefer;
            if (isInjvm() == null) {
                if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                    isJvmRefer = false;
                } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                    // by default, reference local service if there is
                    isJvmRefer = true;
                } else {
                    isJvmRefer = false;
                }
            } else {
                isJvmRefer = isInjvm().booleanValue();
            }
    
            if (isJvmRefer) {
                URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
                invoker = refprotocol.refer(interfaceClass, url);
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
                }
            } else {
                if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                    String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            URL url = URL.valueOf(u);
                            if (url.getPath() == null || url.getPath().length() == 0) {
                                url = url.setPath(interfaceName);
                            }
                            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                                urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else { // assemble URL from register center's configuration
                    //[registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=8776&qos.port=33333&registry=zookeeper&timeout=2000&timestamp=1537528008773]
                    // 最后得到的地址是这样的: registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=14040&qos.port=33333&refer=application=demo-consumer%26check=false%26dubbo=2.0.2%26interface=com.alibaba.dubbo.demo.DemoService%26methods=sayHello%26pid=14040%26qos.port=33333%26register.ip=192.168.11.6%26side=consumer%26timestamp=1537844804936&registry=zookeeper&timeout=2000&timestamp=1537847505648
                    List<URL> us = loadRegistries(false);
                    if (us != null && !us.isEmpty()) {
                        for (URL u : us) {
                            // 加载监控地址,以便进行上报数据
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config.");
                    }
                }
    
                if (urls.size() == 1) {
                    // Protocol$Adaptive, 动态生成的代理类,调用远程方法 com.alibaba.dubbo.remoting.transport.AbstractClient(), 先将自己注册到注册中心,再调用提供者方法
                    /** 
                    "main@1" prio=5 tid=0x1 nid=NA runnable
                      java.lang.Thread.State: RUNNABLE
                          at com.alibaba.dubbo.remoting.transport.AbstractClient.<init>(AbstractClient.java:80)
                          at com.alibaba.dubbo.remoting.transport.netty.NettyClient.<init>(NettyClient.java:59)
                          at com.alibaba.dubbo.remoting.transport.netty.NettyTransporter.connect(NettyTransporter.java:37)
                          at com.alibaba.dubbo.remoting.Transporter$Adaptive.connect(Transporter$Adaptive.java:-1)
                          at com.alibaba.dubbo.remoting.Transporters.connect(Transporters.java:75)
                          at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger.connect(HeaderExchanger.java:39)
                          at com.alibaba.dubbo.remoting.exchange.Exchangers.connect(Exchangers.java:109)
                          at com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.initClient(DubboProtocol.java:417)
                          at com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.getSharedClient(DubboProtocol.java:384)
                          - locked <0xb4f> (a java.lang.Object)
                          at com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.getClients(DubboProtocol.java:355)
                          at com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.refer(DubboProtocol.java:337)
                          at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:108)
                          at com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:67)
                          at com.alibaba.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1)
                          at com.alibaba.dubbo.registry.integration.RegistryDirectory.toInvokers(RegistryDirectory.java:387)
                          at com.alibaba.dubbo.registry.integration.RegistryDirectory.refreshInvoker(RegistryDirectory.java:253)
                          at com.alibaba.dubbo.registry.integration.RegistryDirectory.notify(RegistryDirectory.java:223)
                          - locked <0xb2f> (a com.alibaba.dubbo.registry.integration.RegistryDirectory)
                          at com.alibaba.dubbo.registry.support.AbstractRegistry.notify(AbstractRegistry.java:414)
                          at com.alibaba.dubbo.registry.support.FailbackRegistry.doNotify(FailbackRegistry.java:280)
                          at com.alibaba.dubbo.registry.support.FailbackRegistry.notify(FailbackRegistry.java:266)
                          at com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry.doSubscribe(ZookeeperRegistry.java:190)
                          at com.alibaba.dubbo.registry.support.FailbackRegistry.subscribe(FailbackRegistry.java:196)
                          at com.alibaba.dubbo.registry.integration.RegistryDirectory.subscribe(RegistryDirectory.java:159)
                          at com.alibaba.dubbo.registry.integration.RegistryProtocol.doRefer(RegistryProtocol.java:307)
                          at com.alibaba.dubbo.registry.integration.RegistryProtocol.refer(RegistryProtocol.java:288)
                          at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:106)
                          at com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:65)
                          // 从此处开始进行调用
                          at com.alibaba.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1)
                          at com.alibaba.dubbo.config.ReferenceConfig.createProxy(ReferenceConfig.java:395)
                          at com.alibaba.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:334)
                          at com.alibaba.dubbo.config.ReferenceConfig.get(ReferenceConfig.java:163)
                          - locked <0x76b> (a com.alibaba.dubbo.config.spring.ReferenceBean)
                          at com.alibaba.dubbo.config.spring.ReferenceBean.getObject(ReferenceBean.java:66)
                          at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:170)
                          at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:103)
                          - locked <0x790> (a java.util.concurrent.ConcurrentHashMap)
                          at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1640)
                          at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:254)
                          at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
                          at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1080)
                          at com.alibaba.dubbo.demo.consumer.Consumer.main(Consumer.java:30)
                        */
                    invoker = refprotocol.refer(interfaceClass, urls.get(0));
                } else {
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    for (URL url : urls) {
                        invokers.add(refprotocol.refer(interfaceClass, url));
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // use last registry url
                        }
                    }
                    if (registryURL != null) { // registry url is available
                        // use AvailableCluster only when register's cluster is available
                        URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                        invoker = cluster.join(new StaticDirectory(u, invokers));
                    } else { // not a registry url
                        invoker = cluster.join(new StaticDirectory(invokers));
                    }
                }
            }
    
            Boolean c = check;
            if (c == null && consumer != null) {
                c = consumer.isCheck();
            }
            if (c == null) {
                c = true; // default true
            }
            if (c && !invoker.isAvailable()) {
                throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            }
            // create service proxy
            return (T) proxyFactory.getProxy(invoker);
        }
        
        // loadRegistries
        
        protected List<URL> loadRegistries(boolean provider) {
            // 加载前,先检查是否已初始化
            checkRegistry();
            List<URL> registryList = new ArrayList<URL>();
            if (registries != null && !registries.isEmpty()) {
                for (RegistryConfig config : registries) {
                    String address = config.getAddress();
                    if (address == null || address.length() == 0) {
                        address = Constants.ANYHOST_VALUE;
                    }
                    String sysaddress = System.getProperty("dubbo.registry.address");
                    if (sysaddress != null && sysaddress.length() > 0) {
                        address = sysaddress;
                    }
                    if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                        Map<String, String> map = new HashMap<String, String>();
                        appendParameters(map, application);
                        appendParameters(map, config);
                        map.put("path", RegistryService.class.getName());
                        map.put("dubbo", Version.getProtocolVersion());
                        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
                        if (ConfigUtils.getPid() > 0) {
                            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
                        }
                        if (!map.containsKey("protocol")) {
                            if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
                                map.put("protocol", "remote");
                            } else {
                                map.put("protocol", "dubbo");
                            }
                        }
                        // 解析注册中心地址列表
                        List<URL> urls = UrlUtils.parseURLs(address, map);
                        for (URL url : urls) {
                            url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                            url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
                            if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                                    || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                                registryList.add(url);
                            }
                        }
                    }
                }
            }
            return registryList;
        }

    // 

        // com.alibaba.dubbo.rpc.Protocol$Adaptive, 动态生成的类,去调用远程方法
        
    public class Protocol$Adaptive implements Protocol {
        public void destroy() {
            throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public int getDefaultPort() {
            throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public Invoker refer(Class var1, URL var2) throws RpcException {
            if (var2 == null) {
                throw new IllegalArgumentException("url == null");
            } else {
                String var4 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol();
                if (var4 == null) {
                    throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])");
                } else {
                    Protocol var5 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var4);
                    return var5.refer(var1, var2);
                }
            }
        }
    
        public Exporter export(Invoker var1) throws RpcException {
            if (var1 == null) {
                throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
            } else if (var1.getUrl() == null) {
                throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
            } else {
                URL var2 = var1.getUrl();
                String var3 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol();
                if (var3 == null) {
                    throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])");
                } else {
                    Protocol var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var3);
                    return var4.export(var1);
                }
            }
        }
    
        public Protocol$Adaptive() {
        }
    }
        // com.alibaba.dubbo.registry.integration.RegistryDirectory.subscribe, 订阅服务
        public void subscribe(URL url) {
            setConsumerUrl(url);
            registry.subscribe(url, this);
        }
        
        
        // 以zookeeper订阅为例,看一下订阅过程,com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry.doSubscribe()
        @Override
        protected void doSubscribe(final URL url, final NotifyListener listener) {
            try {
                if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                for (String child : currentChilds) {
                                    child = URL.decode(child);
                                    if (!anyServices.contains(child)) {
                                        anyServices.add(child);
                                        subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                                    }
                                }
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(root, false);
                    List<String> services = zkClient.addChildListener(root, zkListener);
                    if (services != null && !services.isEmpty()) {
                        for (String service : services) {
                            service = URL.decode(service);
                            anyServices.add(service);
                            subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                    Constants.CHECK_KEY, String.valueOf(false)), listener);
                        }
                    }
                } else {
                    List<URL> urls = new ArrayList<URL>();
                    for (String path : toCategoriesPath(url)) {
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                        if (listeners == null) {
                            zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                            listeners = zkListeners.get(url);
                        }
                        ChildListener zkListener = listeners.get(listener);
                        if (zkListener == null) {
                            listeners.putIfAbsent(listener, new ChildListener() {
                                @Override
                                public void childChanged(String parentPath, List<String> currentChilds) {
                                    ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                                }
                            });
                            zkListener = listeners.get(listener);
                        }
                        zkClient.create(path, false);
                        List<String> children = zkClient.addChildListener(path, zkListener);
                        if (children != null) {
                            urls.addAll(toUrlsWithEmpty(url, path, children));
                        }
                    }
                    notify(url, listener, urls);
                }
            } catch (Throwable e) {
                throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    
        
        
        // com.alibaba.dubbo.registry.support.AbstractRegistry.notify, 获取服务提供者地址
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            if ((urls == null || urls.isEmpty())
                    && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                logger.warn("Ignore empty notify urls for subscribe url " + url);
                return;
            }
            if (logger.isInfoEnabled()) {
                logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
            }
            Map<String, List<URL>> result = new HashMap<String, List<URL>>();
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                    List<URL> categoryList = result.get(category);
                    if (categoryList == null) {
                        categoryList = new ArrayList<URL>();
                        result.put(category, categoryList);
                    }
                    categoryList.add(u);
                }
            }
            if (result.size() == 0) {
                return;
            }
            Map<String, List<URL>> categoryNotified = notified.get(url);
            if (categoryNotified == null) {
                notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
                categoryNotified = notified.get(url);
            }
            // 通知观察者
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
                saveProperties(url);
                listener.notify(categoryList);
            }
        }
        
        // 获取registry,以决定调用哪个协议实现
        @Override
        public Registry getRegistry(URL url) {
            url = url.setPath(RegistryService.class.getName())
                    .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                    .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
            String key = url.toServiceString();
            // Lock the registry access process to ensure a single instance of the registry
            LOCK.lock();
            try {
                Registry registry = REGISTRIES.get(key);
                if (registry != null) {
                    return registry;
                }
                // 创建registry, 该方法为抽象方法只能由具体的实现类实现
                registry = createRegistry(url);
                if (registry == null) {
                    throw new IllegalStateException("Can not create registry " + url);
                }
                REGISTRIES.put(key, registry);
                return registry;
            } finally {
                // Release the lock
                LOCK.unlock();
            }
        }
        
        // 该方法为在生成代理时自动生成实现
        protected abstract Registry createRegistry(URL url);
    
        // 生成的registry代理类如下
    package com.alibaba.dubbo.registry;
    
    import com.alibaba.dubbo.common.URL;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    
    public class RegistryFactory$Adaptive implements RegistryFactory {
        public Registry getRegistry(URL var1) {
            if (var1 == null) {
                throw new IllegalArgumentException("url == null");
            } else {
                String var3 = var1.getProtocol() == null ? "dubbo" : var1.getProtocol();
                if (var3 == null) {
                    throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + var1.toString() + ") use keys([protocol])");
                } else {
                    RegistryFactory var4 = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(var3);
                    return var4.getRegistry(var1);
                }
            }
        }
    
        public RegistryFactory$Adaptive() {
        }
    }
        // 创建 Registry, todo: 返回xxxRegistry 实例
        @SuppressWarnings("unchecked")
        private T createExtension(String name) {
            Class<?> clazz = getExtensionClasses().get(name);
            if (clazz == null) {
                throw findException(name);
            }
            try {
                T instance = (T) EXTENSION_INSTANCES.get(clazz);
                if (instance == null) {
                    EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                    instance = (T) EXTENSION_INSTANCES.get(clazz);
                }
                injectExtension(instance);
                Set<Class<?>> wrapperClasses = cachedWrapperClasses;
                if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
                    for (Class<?> wrapperClass : wrapperClasses) {
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                    }
                }
                return instance;
            } catch (Throwable t) {
                throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                        type + ")  could not be instantiated: " + t.getMessage(), t);
            }
        }
    
        
        // com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.refer, 
        @Override
        public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
            optimizeSerialization(url);
            // create rpc invoker.  初始化调用远程方法
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
            return invoker;
        }
        
        // 
        private ExchangeClient[] getClients(URL url) {
            // whether to share connection
            boolean service_share_connect = false;
            int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
            // if not configured, connection is shared, otherwise, one connection for one service
            if (connections == 0) {
                service_share_connect = true;
                connections = 1;
            }
    
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                if (service_share_connect) {
                    clients[i] = getSharedClient(url);
                } else {
                    clients[i] = initClient(url);
                }
            }
            return clients;
        }

    // netty 连接远程服务过程

        // 初始化调用端,此处开始连接netty
        private ExchangeClient initClient(URL url) {
    
            // client type setting.
            String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
    
            url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
            // enable heartbeat by default
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    
            // BIO is not allowed since it has severe performance issue.
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
                throw new RpcException("Unsupported client type: " + str + "," +
                        " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
            }
    
            ExchangeClient client;
            try {
                // connection should be lazy
                if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                    client = new LazyConnectExchangeClient(url, requestHandler);
                } else {
                    client = Exchangers.connect(url, requestHandler);
                }
            } catch (RemotingException e) {
                throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
            }
            return client;
        }
    
        // com.alibaba.dubbo.remoting.exchange.Exchangers.connect
        public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handler == null) {
                throw new IllegalArgumentException("handler == null");
            }
            url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
            return getExchanger(url).connect(url, handler);
        }
        
        // com.alibaba.dubbo.remoting.Transporters.connect
        public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            ChannelHandler handler;
            if (handlers == null || handlers.length == 0) {
                handler = new ChannelHandlerAdapter();
            } else if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                handler = new ChannelHandlerDispatcher(handlers);
            }
            return getTransporter().connect(url, handler);
        }
    
    // com.alibaba.dubbo.remoting.transport.netty
    public class NettyTransporter implements Transporter {
    
        public static final String NAME = "netty";
    
        @Override
        public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyServer(url, listener);
        }
    
        @Override
        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyClient(url, listener);
        }
    
    }
    
        
        // com.alibaba.dubbo.remoting.transport.AbstractClient
        public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
    
            send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    
            shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
    
            // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
            reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
    
            try {
                // 模板方法,由子类实现
                doOpen();
            } catch (Throwable t) {
                close();
                throw new RemotingException(url.toInetSocketAddress(), null,
                        "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                                + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
            }
            try {
                // connect.
                connect();
                if (logger.isInfoEnabled()) {
                    logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
                }
            } catch (RemotingException t) {
                if (url.getParameter(Constants.CHECK_KEY, true)) {
                    close();
                    throw t;
                } else {
                    logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
                }
            } catch (Throwable t) {
                close();
                throw new RemotingException(url.toInetSocketAddress(), null,
                        "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                                + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
            }
    
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
                    .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
            ExtensionLoader.getExtensionLoader(DataStore.class)
                    .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
        }
        // 将连接步骤固化,调用实现类的 doConnect
        protected void connect() throws RemotingException {
            connectLock.lock();
            try {
                if (isConnected()) {
                    return;
                }
                initConnectStatusCheckCommand();
                doConnect();
                if (!isConnected()) {
                    throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                            + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                                + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                                + ", channel is " + this.getChannel());
                    }
                }
                reconnect_count.set(0);
                reconnect_error_log_flag.set(false);
            } catch (RemotingException e) {
                throw e;
            } catch (Throwable e) {
                throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                        + ", cause: " + e.getMessage(), e);
            } finally {
                connectLock.unlock();
            }
        }
        
    
    // com.alibaba.dubbo.remoting.transport.netty.NettyClient.
    public class NettyClient extends AbstractClient {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
    
        // ChannelFactory's closure has a DirectMemory leak, using static to avoid
        // https://issues.jboss.org/browse/NETTY-424
        private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
                Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
                Constants.DEFAULT_IO_THREADS);
        private ClientBootstrap bootstrap;
    
        private volatile Channel channel; // volatile, please copy reference to use
    
        public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
            super(url, wrapChannelHandler(url, handler));
        }
    
        @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            bootstrap = new ClientBootstrap(channelFactory);
            // config
            // @see org.jboss.netty.channel.socket.SocketChannelConfig
            bootstrap.setOption("keepAlive", true);
            bootstrap.setOption("tcpNoDelay", true);
            bootstrap.setOption("connectTimeoutMillis", getTimeout());
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                @Override
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    pipeline.addLast("decoder", adapter.getDecoder());
                    pipeline.addLast("encoder", adapter.getEncoder());
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
        }
    
        @Override
        protected void doConnect() throws Throwable {
            long start = System.currentTimeMillis();
            ChannelFuture future = bootstrap.connect(getConnectAddress());
            try {
                boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
    
                if (ret && future.isSuccess()) {
                    Channel newChannel = future.getChannel();
                    newChannel.setInterestOps(Channel.OP_READ_WRITE);
                    try {
                        // Close old channel
                        Channel oldChannel = NettyClient.this.channel; // copy reference
                        if (oldChannel != null) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                                }
                                oldChannel.close();
                            } finally {
                                NettyChannel.removeChannelIfDisconnected(oldChannel);
                            }
                        }
                    } finally {
                        if (NettyClient.this.isClosed()) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                                }
                                newChannel.close();
                            } finally {
                                NettyClient.this.channel = null;
                                NettyChannel.removeChannelIfDisconnected(newChannel);
                            }
                        } else {
                            NettyClient.this.channel = newChannel;
                        }
                    }
                } else if (future.getCause() != null) {
                    throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                            + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
                } else {
                    throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                            + getRemoteAddress() + " client-side timeout "
                            + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
                }
            } finally {
                if (!isConnected()) {
                    future.cancel();
                }
            }
        }
    
        @Override
        protected void doDisConnect() throws Throwable {
            try {
                NettyChannel.removeChannelIfDisconnected(channel);
            } catch (Throwable t) {
                logger.warn(t.getMessage());
            }
        }
    
        @Override
        protected void doClose() throws Throwable {
            /*try {
                bootstrap.releaseExternalResources();
            } catch (Throwable t) {
                logger.warn(t.getMessage());
            }*/
        }
    
        @Override
        protected com.alibaba.dubbo.remoting.Channel getChannel() {
            Channel c = channel;
            if (c == null || !c.isConnected())
                return null;
            return NettyChannel.getOrAddChannel(c, getUrl(), this);
        }
    
    }
    
    
        // com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper.getProxy
        @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public <T> T getProxy(Invoker<T> invoker) throws RpcException {
            T proxy = proxyFactory.getProxy(invoker);
            if (GenericService.class != invoker.getInterface()) {
                String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
                if (ConfigUtils.isNotEmpty(stub)) {
                    Class<?> serviceType = invoker.getInterface();
                    if (ConfigUtils.isDefault(stub)) {
                        if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
                            stub = serviceType.getName() + "Stub";
                        } else {
                            stub = serviceType.getName() + "Local";
                        }
                    }
                    try {
                        Class<?> stubClass = ReflectUtils.forName(stub);
                        if (!serviceType.isAssignableFrom(stubClass)) {
                            throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());
                        }
                        try {
                            Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
                            proxy = (T) constructor.newInstance(new Object[]{proxy});
                            //export stub service
                            URL url = invoker.getUrl();
                            if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
                                url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
                                url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
                                try {
                                    export(proxy, (Class) invoker.getInterface(), url);
                                } catch (Exception e) {
                                    LOGGER.error("export a stub service error.", e);
                                }
                            }
                        } catch (NoSuchMethodException e) {
                            throw new IllegalStateException("No such constructor "public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")" in stub implementation class " + stubClass.getName(), e);
                        }
                    } catch (Throwable t) {
                        LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);
                        // ignore
                    }
                }
            }
            return proxy;
        }
    View Code
    /**
     * AbstractProxyFactory com.alibaba.dubbo.rpc.proxy.AbstractProxyFactory
     */
    public abstract class AbstractProxyFactory implements ProxyFactory {
    
        @Override
        public <T> T getProxy(Invoker<T> invoker) throws RpcException {
            return getProxy(invoker, false);
        }
    
        @Override
        public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
            Class<?>[] interfaces = null;
            String config = invoker.getUrl().getParameter("interfaces");
            if (config != null && config.length() > 0) {
                String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
                if (types != null && types.length > 0) {
                    interfaces = new Class<?>[types.length + 2];
                    interfaces[0] = invoker.getInterface();
                    interfaces[1] = EchoService.class;
                    for (int i = 0; i < types.length; i++) {
                        interfaces[i + 1] = ReflectUtils.forName(types[i]);
                    }
                }
            }
            if (interfaces == null) {
                interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
            }
    
            if (!invoker.getInterface().equals(GenericService.class) && generic) {
                int len = interfaces.length;
                Class<?>[] temp = interfaces;
                interfaces = new Class<?>[len + 1];
                System.arraycopy(temp, 0, interfaces, 0, len);
                interfaces[len] = GenericService.class;
            }
    
            return getProxy(invoker, interfaces);
        }
    
        public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
    
    }
    
    // com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory, 调用 wrapper 去处理
    public class JavassistProxyFactory extends AbstractProxyFactory {
    
        @Override
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }
    
        @Override
        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    
    }
    
        // 保存调用路径
        public static boolean initConsumerModel(String serviceName, ConsumerModel consumerModel) {
            if (consumedServices.putIfAbsent(serviceName, consumerModel) != null) {
                logger.warn("Already register the same consumer:" + serviceName);
                return false;
            }
            return true;
        }

    // String hello = demoService.sayHello("world"); // call remote method,
    // 启用该远程方法,其实是调用的一个代理类,动态生成

    /** 
    "main@1" prio=5 tid=0x1 nid=NA runnable
      java.lang.Thread.State: RUNNABLE
          at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:38)
          at com.alibaba.dubbo.common.bytecode.proxy0.sayHello(proxy0.java:-1)
          at com.alibaba.dubbo.demo.consumer.Consumer.main(Consumer.java:35)
    */
    // 调用动态代理类的方法,其实为调用 InvocationHandler方法
    public class proxy0 implements DC, EchoService, DemoService {
        public static Method[] methods;
        private InvocationHandler handler;
    
        public String sayHello(String var1) {
            Object[] var2 = new Object[]{var1};
            Object var3 = this.handler.invoke(this, methods[0], var2);
            return (String)var3;
        }
    
        public Object $echo(Object var1) {
            Object[] var2 = new Object[]{var1};
            Object var3 = this.handler.invoke(this, methods[1], var2);
            return (Object)var3;
        }
    
        public proxy0() {
        }
    
        public proxy0(InvocationHandler var1) {
            this.handler = var1;
        }
    }
    // com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke 方法,使用反射调用 invoker方法
    public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker<?> invoker;
    
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            // 调用通用方法
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    
    }
    
    
        // com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker. 
        @Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result result = null;
    
            String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
            if (value.length() == 0 || value.equalsIgnoreCase("false")) {
                //no mock, 调用真实代理
                result = this.invoker.invoke(invocation);
            } else if (value.startsWith("force")) {
                if (logger.isWarnEnabled()) {
                    logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
                }
                //force:direct mock
                result = doMockInvoke(invocation, null);
            } else {
                //fail-mock
                try {
                    result = this.invoker.invoke(invocation);
                } catch (RpcException e) {
                    if (e.isBiz()) {
                        throw e;
                    } else {
                        if (logger.isWarnEnabled()) {
                            logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                        }
                        result = doMockInvoke(invocation, e);
                    }
                }
            }
            return result;
        }
    
        // com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke
        @Override
        public Result invoke(final Invocation invocation) throws RpcException {
            checkWhetherDestroyed();
            LoadBalance loadbalance = null;
            List<Invoker<T>> invokers = list(invocation);
            if (invokers != null && !invokers.isEmpty()) {
                // 获取负载均衡策略,默认为 random 
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                        .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
            }
            // 如果是异步调用,会先创建一个调用id,以便在需要的时候使用
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            return doInvoke(invocation, invokers, loadbalance);
        }
        
        // 模板方法
        protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                           LoadBalance loadbalance) throws RpcException;
    
    // 快速失败 invoker 调用
    public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
        private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
    
        public FailoverClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyinvokers = invokers;
            checkInvokers(copyinvokers, invocation);
            int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                if (i > 0) {
                    checkWhetherDestroyed();
                    copyinvokers = list(invocation);
                    // check again
                    checkInvokers(copyinvokers, invocation);
                }
                // 调用AbstractClusterInvoker的负载均衡算法
                Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("Although retry the method " + invocation.getMethodName()
                                + " in the service " + getInterface().getName()
                                + " was successful by the provider " + invoker.getUrl().getAddress()
                                + ", but there have been failed providers " + providers
                                + " (" + providers.size() + "/" + copyinvokers.size()
                                + ") from the registry " + directory.getUrl().getAddress()
                                + " on the consumer " + NetUtils.getLocalHost()
                                + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                                + le.getMessage(), le);
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                    + invocation.getMethodName() + " in the service " + getInterface().getName()
                    + ". Tried " + len + " times of the providers " + providers
                    + " (" + providers.size() + "/" + copyinvokers.size()
                    + ") from the registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                    + Version.getVersion() + ". Last error is: "
                    + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
        }
    
    }

    // 负载均衡

        // 
        protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
            if (invokers == null || invokers.isEmpty())
                return null;
            String methodName = invocation == null ? "" : invocation.getMethodName();
    
            boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
            {
                //ignore overloaded method
                if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
                    stickyInvoker = null;
                }
                //ignore concurrency problem
                if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
                    if (availablecheck && stickyInvoker.isAvailable()) {
                        return stickyInvoker;
                    }
                }
            }
            Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
    
            if (sticky) {
                stickyInvoker = invoker;
            }
            return invoker;
        }
    
        private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
            if (invokers == null || invokers.isEmpty())
                return null;
            // 如果只有一个提供者,就不用负载均衡了
            if (invokers.size() == 1)
                return invokers.get(0);
            if (loadbalance == null) {
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
            }
            Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    
            //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
            if ((selected != null && selected.contains(invoker))
                    || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
                try {
                    Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                    if (rinvoker != null) {
                        invoker = rinvoker;
                    } else {
                        //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                        int index = invokers.indexOf(invoker);
                        try {
                            //Avoid collision
                            invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                        } catch (Exception e) {
                            logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                        }
                    }
                } catch (Throwable t) {
                    logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
                }
            }
            return invoker;
        }
    
    // 随机负载均衡策略的选择算法 
    public class RandomLoadBalance extends AbstractLoadBalance {
    
        public static final String NAME = "random";
    
        private final Random random = new Random();
    
        @Override
        protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
            int length = invokers.size(); // Number of invokers
            int totalWeight = 0; // The sum of weights
            boolean sameWeight = true; // Every invoker has the same weight?
            for (int i = 0; i < length; i++) {
                int weight = getWeight(invokers.get(i), invocation);
                totalWeight += weight; // Sum
                if (sameWeight && i > 0
                        && weight != getWeight(invokers.get(i - 1), invocation)) {
                    sameWeight = false;
                }
            }
            if (totalWeight > 0 && !sameWeight) {
                // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
                int offset = random.nextInt(totalWeight);
                // Return a invoker based on the random value.
                for (int i = 0; i < length; i++) {
                    offset -= getWeight(invokers.get(i), invocation);
                    if (offset < 0) {
                        return invokers.get(i);
                    }
                }
            }
            // If all invokers have the same weight value or totalWeight=0, return evenly.
            return invokers.get(random.nextInt(length));
        }
    
    }
    
        // filter 过滤器调用
        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (!filters.isEmpty()) {
                for (int i = filters.size() - 1; i >= 0; i--) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    last = new Invoker<T>() {
    
                        @Override
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        @Override
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
    
                        @Override
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
    
                        @Override
                        public Result invoke(Invocation invocation) throws RpcException {
                            return filter.invoke(next, invocation);
                        }
    
                        @Override
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
            return last;
        }

    // filter调用链

    @Activate(group = Constants.CONSUMER, order = -10000)
    public class ConsumerContextFilter implements Filter {
    
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            RpcContext.getContext()
                    .setInvoker(invoker)
                    .setInvocation(invocation)
                    .setLocalAddress(NetUtils.getLocalHost(), 0)
                    .setRemoteAddress(invoker.getUrl().getHost(),
                            invoker.getUrl().getPort());
            if (invocation instanceof RpcInvocation) {
                ((RpcInvocation) invocation).setInvoker(invoker);
            }
            try {
                // 同步异步调用 
                RpcResult result = (RpcResult) invoker.invoke(invocation);
                RpcContext.getServerContext().setAttachments(result.getAttachments());
                return result;
            } finally {
                RpcContext.getContext().clearAttachments();
            }
        }
    
    }
    
        // com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke
    @Activate(group = Constants.CONSUMER)
    public class FutureFilter implements Filter {
    
        protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
    
        @Override
        public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
            final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
    
            fireInvokeCallback(invoker, invocation);
            // need to configure if there's return value before the invocation in order to help invoker to judge if it's
            // necessary to return future.
            // 此处为链式调用,先调用 monitor, 再调用具体的方法 DubboInvoker
            Result result = invoker.invoke(invocation);
            if (isAsync) {
                // 如果url参数中标识为异步调用,则执行异步调用
                asyncCallback(invoker, invocation);
            } else {
                // 否则同步调用 
                syncCallback(invoker, invocation, result);
            }
            return result;
        }
    
        private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
            if (result.hasException()) {
                fireThrowCallback(invoker, invocation, result.getException());
            } else {
                fireReturnCallback(invoker, invocation, result.getValue());
            }
        }
    
        private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
            Future<?> f = RpcContext.getContext().getFuture();
            if (f instanceof FutureAdapter) {
                ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
                future.setCallback(new ResponseCallback() {
                    @Override
                    public void done(Object rpcResult) {
                        if (rpcResult == null) {
                            logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
                            return;
                        }
                        ///must be rpcResult
                        if (!(rpcResult instanceof Result)) {
                            logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
                            return;
                        }
                        Result result = (Result) rpcResult;
                        // 结果通知回调
                        if (result.hasException()) {
                            fireThrowCallback(invoker, invocation, result.getException());
                        } else {
                            fireReturnCallback(invoker, invocation, result.getValue());
                        }
                    }
    
                    @Override
                    public void caught(Throwable exception) {
                        fireThrowCallback(invoker, invocation, exception);
                    }
                });
            }
        }
    
        private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
            final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
            final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));
    
            if (onInvokeMethod == null && onInvokeInst == null) {
                return;
            }
            if (onInvokeMethod == null || onInvokeInst == null) {
                throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
            }
            if (!onInvokeMethod.isAccessible()) {
                onInvokeMethod.setAccessible(true);
            }
    
            Object[] params = invocation.getArguments();
            try {
                onInvokeMethod.invoke(onInvokeInst, params);
            } catch (InvocationTargetException e) {
                fireThrowCallback(invoker, invocation, e.getTargetException());
            } catch (Throwable e) {
                fireThrowCallback(invoker, invocation, e);
            }
        }
    
        private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
            final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
            final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));
    
            //not set onreturn callback, 回调设置了 onreturn 方法
            if (onReturnMethod == null && onReturnInst == null) {
                return;
            }
    
            if (onReturnMethod == null || onReturnInst == null) {
                throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
            }
            if (!onReturnMethod.isAccessible()) {
                onReturnMethod.setAccessible(true);
            }
    
            Object[] args = invocation.getArguments();
            Object[] params;
            Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
            if (rParaTypes.length > 1) {
                if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                    params = new Object[2];
                    params[0] = result;
                    params[1] = args;
                } else {
                    params = new Object[args.length + 1];
                    params[0] = result;
                    System.arraycopy(args, 0, params, 1, args.length);
                }
            } else {
                params = new Object[]{result};
            }
            try {
                onReturnMethod.invoke(onReturnInst, params);
            } catch (InvocationTargetException e) {
                fireThrowCallback(invoker, invocation, e.getTargetException());
            } catch (Throwable e) {
                fireThrowCallback(invoker, invocation, e);
            }
        }
    
        private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
            final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
            final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));
    
            //onthrow callback not configured
            if (onthrowMethod == null && onthrowInst == null) {
                return;
            }
            if (onthrowMethod == null || onthrowInst == null) {
                throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
            }
            if (!onthrowMethod.isAccessible()) {
                onthrowMethod.setAccessible(true);
            }
            Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
            if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
                try {
                    Object[] args = invocation.getArguments();
                    Object[] params;
    
                    if (rParaTypes.length > 1) {
                        if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                            params = new Object[2];
                            params[0] = exception;
                            params[1] = args;
                        } else {
                            params = new Object[args.length + 1];
                            params[0] = exception;
                            System.arraycopy(args, 0, params, 1, args.length);
                        }
                    } else {
                        params = new Object[]{exception};
                    }
                    onthrowMethod.invoke(onthrowInst, params);
                } catch (Throwable e) {
                    logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
                }
            } else {
                logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
            }
        }
    }
        

    // 

        // 调用具体的远程方法类
    public class DubboInvoker<T> extends AbstractInvoker<T> {
    
        private final ExchangeClient[] clients;
    
        private final AtomicPositiveInteger index = new AtomicPositiveInteger();
    
        private final String version;
    
        private final ReentrantLock destroyLock = new ReentrantLock();
    
        private final Set<Invoker<?>> invokers;
    
        public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients) {
            this(serviceType, url, clients, null);
        }
    
        public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers) {
            super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
            this.clients = clients;
            // get version.
            this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");
            this.invokers = invokers;
        }
    
        // 实际调用
        @Override
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                if (isOneway) {
                    // 单向调用,立马返回
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                } else if (isAsync) {
                    // 异步调用,立马返回,后续接收结果
                    ResponseFuture future = currentClient.request(inv, timeout);
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {
                    // 一般调用都是这个,同步+超时调用
                    RpcContext.getContext().setFuture(null);
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    
        @Override
        public boolean isAvailable() {
            if (!super.isAvailable())
                return false;
            for (ExchangeClient client : clients) {
                if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) {
                    //cannot write == not Available ?
                    return true;
                }
            }
            return false;
        }
    
    }

         dubbo还是个不错的rpc框架了,在其多年不维护的情况仍然拥有大量的用户,自有其道理。

              罗列其中几个比较有借鉴意义的地方:

                 1. 面向接口编程,全局使用代理模式进行rpc方法调用,使rpc访问更加透明如本地方法;

                 2. 使用模板方法模式,由抽象父类实现框架式方法,调用由子类实现其中某个特有功能,更好的封装;

                 3. 使用外观模式,将多个实现不一的类,统一暴露为一个统一的接口,更易于使用方的调用;

                 4. 使用字节码生成技术,动态生成各种代理类,使实现更灵活;

                 5. 懒加载的应用,单例模式的应用;

                 6. 借助spring进行bean管理,更符合市场需要;

                 7. 使用观察者模式,进行提供者消费者通知,使变更能够周知需要的监听者;

                 8. filter的应用,责任链模式,更易于扩展辅助功能;

                 9. 使用策略模式,使多个负载均衡统一调度,这也大量在SPI机制中体现;

  • 相关阅读:
    根据连接速度选择地址访问
    ASP.NET探针
    C#格式化成小数
    常用经典SQL语句
    比较两个DataSet,并产生增量数据
    实用JS代码大全
    写入、读取、清除Cookie的类
    Base64加密解密
    HttpModule,HttpHandler,HttpHandlerFactory简单使用
    任务栏自定义怎么删除过去项目
  • 原文地址:https://www.cnblogs.com/yougewe/p/9702607.html
Copyright © 2011-2022 走看看