zoukankan      html  css  js  c++  java
  • 2. 源码分析---SOFARPC客户端服务引用

    我们先上一张客户端服务引用的时序图。

    我们首先来看看ComsumerConfig的refer方法吧

        public T refer() {
            if (consumerBootstrap == null) {
                //如果服务消费者启动类为空,怎创建一个
                consumerBootstrap = Bootstraps.from(this);
            }
            return consumerBootstrap.refer();
        }
    

    然后我们再看Bootstraps是怎么创建的

        public static <T> ConsumerBootstrap<T> from(ConsumerConfig<T> consumerConfig) {
            String bootstrap = consumerConfig.getBootstrap();
            ConsumerBootstrap consumerBootstrap;
            //如果有传入启动器,那么就使用启动器的参数
            if (StringUtils.isNotEmpty(bootstrap)) {
                consumerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class)
                    .getExtension(bootstrap,
                        new Class[] { ConsumerConfig.class },
                        new Object[] { consumerConfig });
            } else {
                //没有传入启动器,那么就使用协议的参数
                // default is same with protocol
                bootstrap = consumerConfig.getProtocol();
                ExtensionLoader extensionLoader = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class);
                ExtensionClass<ConsumerBootstrap> extensionClass = extensionLoader.getExtensionClass(bootstrap);
                //预防性代码,实际不可能为空
                if (extensionClass == null) {
                    // if not exist, use default consumer bootstrap
                    // 为空的话,则是使用默认的启动器DefaultConsumerBootstrap
                    bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_CONSUMER_BOOTSTRAP);
                    consumerConfig.setBootstrap(bootstrap);
                    consumerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class)
                        .getExtension(bootstrap, new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig });
                } else {
                    consumerConfig.setBootstrap(bootstrap);
                    consumerBootstrap = extensionClass.getExtInstance(
                        new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig });
                }
            }
            return (ConsumerBootstrap<T>) consumerBootstrap;
        }
    

    这里返回的consumerBootstrap和用的启动器和协议有关,如果用的是bolt那么返回的就是BoltConsumerBootstrap实例。

    从这里可以看出很多功能都是继承自父类DefaultConsumerBootstrap的。

    继续调用consumerBootstrap#refer方法,会直接跳到父类的refer方法中。

        public T refer() {
            if (proxyIns != null) {
                return proxyIns;
            }
            synchronized (this) {
                if (proxyIns != null) {
                    return proxyIns;
                }
                String key = consumerConfig.buildKey();
                String appName = consumerConfig.getAppName();
                // 检查参数
                checkParameters();
                // 提前检查接口类
                if (LOGGER.isInfoEnabled(appName)) {
                    LOGGER.infoWithApp(appName, "Refer consumer config : {} with bean id {}", key, consumerConfig.getId());
                }
    
                // 注意同一interface,同一tags,同一protocol情况
                AtomicInteger cnt = REFERRED_KEYS.get(key); // 计数器
                if (cnt == null) { // 没有发布过
                    cnt = CommonUtils.putToConcurrentMap(REFERRED_KEYS, key, new AtomicInteger(0));
                }
                int c = cnt.incrementAndGet();
                //同一个服务 的最大引用次数,防止由于代码bug导致重复引用,每次引用都会生成一个代理类对象,-1表示不检查
                int maxProxyCount = consumerConfig.getRepeatedReferLimit();
                if (maxProxyCount > 0) {
                    if (c > maxProxyCount) {
                        cnt.decrementAndGet();
                        // 超过最大数量,直接抛出异常
                        throw new SofaRpcRuntimeException("Duplicate consumer config with key " + key
                            + " has been referred more than " + maxProxyCount + " times!"
                            + " Maybe it's wrong config, please check it."
                            + " Ignore this if you did that on purpose!");
                    } else if (c > 1) {
                        if (LOGGER.isInfoEnabled(appName)) {
                            LOGGER.infoWithApp(appName, "Duplicate consumer config with key {} has been referred!"
                                + " Maybe it's wrong config, please check it."
                                + " Ignore this if you did that on purpose!", key);
                        }
                    }
                }
    
                try {
                    // build cluster
                    //默认是FailOverCluster
                    cluster = ClusterFactory.getCluster(this);
                    // build listeners
                    consumerConfig.setConfigListener(buildConfigListener(this));
                    consumerConfig.setProviderInfoListener(buildProviderInfoListener(this));
                    // init cluster
                    cluster.init();
                    // 构造Invoker对象(执行链)
                    proxyInvoker = buildClientProxyInvoker(this);
                    // 创建代理类
                    proxyIns = (T) ProxyFactory.buildProxy(consumerConfig.getProxy(), consumerConfig.getProxyClass(),
                        proxyInvoker);
    
                    //动态配置
                    final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS);
                    if (StringUtils.isNotBlank(dynamicAlias)) {
                        final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManager(
                            consumerConfig.getAppName(), dynamicAlias);
                        dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId());
                    }
                } catch (Exception e) {
                    if (cluster != null) {
                        cluster.destroy();
                        cluster = null;
                    }
                    consumerConfig.setConfigListener(null);
                    consumerConfig.setProviderInfoListener(null);
                    cnt.decrementAndGet(); // 发布失败不计数
                    if (e instanceof SofaRpcRuntimeException) {
                        throw (SofaRpcRuntimeException) e;
                    } else {
                        throw new SofaRpcRuntimeException("Build consumer proxy error!", e);
                    }
                }
                if (consumerConfig.getOnAvailable() != null && cluster != null) {
                    cluster.checkStateChange(false); // 状态变化通知监听器
                }
                RpcRuntimeContext.cacheConsumerConfig(this);
                return proxyIns;
            }
        }
    

    这个方法里面除了做校验以外,主要做了如下几件事:

    1. 设置cluster属性,默认是FailOverCluster
    2. 设置监听器
    3. 初始化cluster
      1. 构造路由链表,主要有DirectUrlRouter、RegistryRouter、CustomRouter
      2. 设置loadBalancer属性,默认是RandomLoadBalancer
      3. 设置地址管理器addressHolder
      4. 设置连接管理器connectionHolder
      5. 构造Filter链
      6. 启动重连线程
    4. 设置proxyInvoker属性,如果用的是bolt协议,那么返回的是BoltClientProxyInvoker
    5. 创建代理类

    如果暴露的服务的接口如下:

    public interface HelloService {
        String sayHello(String string);
    }
    

    默认用javagent代理生成代理类:

    public class HelloService_proxy_0 extends Proxy implements HelloService {
        public Invoker proxyInvoker = null;
        private Method method_1;
    
        public HelloService_proxy_0() {
            super(new UselessInvocationHandler());
            this.method_1 = ReflectUtils.getMethod(HelloService.class, "sayHello", new Class[]{String.class, Integer.TYPE});
        }
    
        public String sayHello(String var1, int var2) {
            Class var3 = HelloService.class;
            Method var4 = this.method_1;
            Class[] var5 = new Class[2];
            Object[] var6 = new Object[]{var1, null};
            var5[0] = String.class;
            var6[1] = new Integer(var2);
            var5[1] = Integer.TYPE;
            SofaRequest var7 = MessageBuilder.buildSofaRequest(var3, var4, var5, var6);
            //这里的invoker应该是BoltClientProxyInvoker
            SofaResponse var8 = this.proxyInvoker.invoke(var7);
            if (var8.isError()) {
                throw new SofaRpcException(199, var8.getErrorMsg());
            } else {
                Object var9 = var8.getAppResponse();
                if (var9 instanceof Throwable) {
                    throw (Throwable)var9;
                } else {
                    return (String)var9;
                }
            }
        }
    
        public String toString() {
            return this.proxyInvoker.toString();
        }
    
        public int hashCode() {
            return this.proxyInvoker.hashCode();
        }
    
        public boolean equals(Object var1) {
            return this == var1 || this.getClass().isInstance(var1) && this.proxyInvoker.equals(JavassistProxy.parseInvoker(var1));
        }
    }
    
  • 相关阅读:
    PSP总结报告
    第十三周例行报告
    对团队成员公开感谢
    附加作业 软件工程原则的应用实例分析
    第十二周例行报告
    第十一周例行报告
    第十周例行报告
    第八周例行报告
    第七周例行报告
    第六周例行报告
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/11232845.html
Copyright © 2011-2022 走看看