zoukankan      html  css  js  c++  java
  • Dubbo(四):深入理解Dubbo源码之如何实现服务引用

    一、前言

      前面讲了服务是如何导出到注册中心的。其实Dubbo做的一件事就是将服务的URL发布到注册中心上。那现在我们聊一聊消费者一方如何从注册中心订阅服务并进行远程调用的。

    二、引用服务时序图

      首先总的来用文字说一遍内部的大致机制

      Actor:可以当做我们的消费者。当我们使用@Reference注解将对应服务注入到其他类中这时候Spring会第一时间调用getObject方法,而getObject中只有一个方法就是get()。这里可以理解为消费者开始引入服务了

        饿汉式:在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务。

        懒汉式:在 ReferenceBean 对应的服务被注入到其他类中时引用。Dubbo默认使用懒汉式。

      ReferenceConfig:通过get方法其实是进入到ReferenceConfig类中执行init()方法。在这个方法里主要做了下面几件事情:

        1,、对@Reference标注的接口查看是否合法,检查该接口是不是存在泛型

        2、在系统中拿到dubbo.resolve.file这个文件,这个文件是进行配置consumer的接口的。将配置好的consumer信息存到URL中

        3、将配置好的ApplicationConfig、ConsumerConfig、ReferenceConfig、MethodConfig,以及消费者的IP地址存到系统的上下文中

        4、接下来开始创建代理对象进入到ReferenceConfig的createProxy这里还是在ReferenceConfig类中。上面的那些配置统统传入该方法中。上面有提到resolve解析consumer为URL,现在就根据这个URL首先判断是否远程调用还是本地调用。

          4.1若是本地调用,则调用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 实例

          4.2若是远程调用,则读取直连配置项,或注册中心 url,并将读取到的 url 存储到 urls 中。然后根据 urls 元素数量进行后续操作。若 urls 元素数量为1,则直接通过 Protocol 自适应拓展类即RegistryProtocol类或者DubboProtocol构建 Invoker 实例接口这得看URL前面的是registry://开头还是以dubbo://若 urls 元素数量大于1,即存在多个注册中心或服务直连 url,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并即merge多个 Invoker,最后调用 ProxyFactory 生成代理类

      RegistryProtocol:在refer方法中首先为 url 设置协议头,然后根据 url 参数加载注册中心实例。然后获取 group 配置,根据 group 配置决定 doRefer 第一个参数的类型。doRefer 方法创建一个 RegistryDirectory 实例,然后生成服务消费者链接,通过registry.register方法向注册中心注册消费者的链接,然后通过directory.subscribe向注册中心订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。同样Invoker创建过程先不分析,后面会拿一章专门介绍。

      ProxyFactory:Invoker 创建完毕后,接下来要做的事情是为服务接口生成代理对象。有了代理对象,即可进行远程调用。代理对象生成的入口方法为的getProxy。获取需要创建的接口列表,组合成数组。而后将该接口数组传入 Proxy 的 getProxy 方法获取 Proxy 子类,然后创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。InvokerInvocationHandler 实现 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用。可以理解为AOP或拦截器。也就是在获取该对象之前会调用到Proxy实例而不会调用到服务提供者对应的类。至于如何创建proxy实例,请看后面源码的注释。

    三、Dubbo源码

      服务引用入口源码ReferenceBean的getObject方法:

     1 public Object getObject() throws Exception {
     2     return get();
     3 }
     4 
     5 public synchronized T get() {
     6     if (destroyed) {
     7         throw new IllegalStateException("Already destroyed!");
     8     }
     9     // 检测 ref 是否为空,为空则通过 init 方法创建
    10     if (ref == null) {
    11         // init 方法主要用于处理配置,以及调用 createProxy 生成代理类
    12         init();
    13     }
    14     return ref;
    15 }
    View Code

      ReferenceConfig 的 init 进行消费者一方的配置:

        对源码进行了分割,方便理清逻辑

      1 private void init() {
      2     // 避免重复初始化
      3     if (initialized) {
      4         return;
      5     }
      6     initialized = true;
      7     // 检测接口名合法性
      8     if (interfaceName == null || interfaceName.length() == 0) {
      9         throw new IllegalStateException("interface not allow null!");
     10     }
     11 
     12     // 检测 consumer 变量是否为空,为空则创建
     13     checkDefault();
     14     appendProperties(this);
     15     if (getGeneric() == null && getConsumer() != null) {
     16         // 设置 generic
     17         setGeneric(getConsumer().getGeneric());
     18     }
     19 
     20     // 检测是否为泛化接口
     21     if (ProtocolUtils.isGeneric(getGeneric())) {
     22         interfaceClass = GenericService.class;
     23     } else {
     24         try {
     25             // 加载类
     26             interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
     27                     .getContextClassLoader());
     28         } catch (ClassNotFoundException e) {
     29             throw new IllegalStateException(e.getMessage(), e);
     30         }
     31         checkInterfaceAndMethods(interfaceClass, methods);
     32     }
     33     
     34     // -------------------------------分割线1------------------------------
     35 
     36     // 从系统变量中获取与接口名对应的属性值
     37     String resolve = System.getProperty(interfaceName);
     38     String resolveFile = null;
     39     if (resolve == null || resolve.length() == 0) {
     40         // 从系统属性中获取解析文件路径
     41         resolveFile = System.getProperty("dubbo.resolve.file");
     42         if (resolveFile == null || resolveFile.length() == 0) {
     43             // 从指定位置加载配置文件
     44             File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
     45             if (userResolveFile.exists()) {
     46                 // 获取文件绝对路径
     47                 resolveFile = userResolveFile.getAbsolutePath();
     48             }
     49         }
     50         if (resolveFile != null && resolveFile.length() > 0) {
     51             Properties properties = new Properties();
     52             FileInputStream fis = null;
     53             try {
     54                 fis = new FileInputStream(new File(resolveFile));
     55                 // 从文件中加载配置
     56                 properties.load(fis);
     57             } catch (IOException e) {
     58                 throw new IllegalStateException("Unload ..., cause:...");
     59             } finally {
     60                 try {
     61                     if (null != fis) fis.close();
     62                 } catch (IOException e) {
     63                     logger.warn(e.getMessage(), e);
     64                 }
     65             }
     66             // 获取与接口名对应的配置
     67             resolve = properties.getProperty(interfaceName);
     68         }
     69     }
     70     if (resolve != null && resolve.length() > 0) {
     71         // 将 resolve 赋值给 url
     72         url = resolve;
     73     }
     74     
     75     // -------------------------------分割线2------------------------------
     76     if (consumer != null) {
     77         if (application == null) {
     78             // 从 consumer 中获取 Application 实例,下同
     79             application = consumer.getApplication();
     80         }
     81         if (module == null) {
     82             module = consumer.getModule();
     83         }
     84         if (registries == null) {
     85             registries = consumer.getRegistries();
     86         }
     87         if (monitor == null) {
     88             monitor = consumer.getMonitor();
     89         }
     90     }
     91     if (module != null) {
     92         if (registries == null) {
     93             registries = module.getRegistries();
     94         }
     95         if (monitor == null) {
     96             monitor = module.getMonitor();
     97         }
     98     }
     99     if (application != null) {
    100         if (registries == null) {
    101             registries = application.getRegistries();
    102         }
    103         if (monitor == null) {
    104             monitor = application.getMonitor();
    105         }
    106     }
    107     
    108     // 检测 Application 合法性
    109     checkApplication();
    110     // 检测本地存根配置合法性
    111     checkStubAndMock(interfaceClass);
    112     
    113     // -------------------------------分割线3------------------------------
    114     
    115     Map<String, String> map = new HashMap<String, String>();
    116     Map<Object, Object> attributes = new HashMap<Object, Object>();
    117 
    118     // 添加 side、协议版本信息、时间戳和进程号等信息到 map 中
    119     map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
    120     map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
    121     map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    122     if (ConfigUtils.getPid() > 0) {
    123         map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    124     }
    125 
    126     // 非泛化服务
    127     if (!isGeneric()) {
    128         // 获取版本
    129         String revision = Version.getVersion(interfaceClass, version);
    130         if (revision != null && revision.length() > 0) {
    131             map.put("revision", revision);
    132         }
    133 
    134         // 获取接口方法列表,并添加到 map 中
    135         String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
    136         if (methods.length == 0) {
    137             map.put("methods", Constants.ANY_VALUE);
    138         } else {
    139             map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
    140         }
    141     }
    142     map.put(Constants.INTERFACE_KEY, interfaceName);
    143     // 将 ApplicationConfig、ConsumerConfig、ReferenceConfig 等对象的字段信息添加到 map 中
    144     appendParameters(map, application);
    145     appendParameters(map, module);
    146     appendParameters(map, consumer, Constants.DEFAULT_KEY);
    147     appendParameters(map, this);
    148     
    149     // -------------------------------分割线4------------------------------
    150     
    151     String prefix = StringUtils.getServiceKey(map);
    152     if (methods != null && !methods.isEmpty()) {
    153         // 遍历 MethodConfig 列表
    154         for (MethodConfig method : methods) {
    155             appendParameters(map, method, method.getName());
    156             String retryKey = method.getName() + ".retry";
    157             // 检测 map 是否包含 methodName.retry
    158             if (map.containsKey(retryKey)) {
    159                 String retryValue = map.remove(retryKey);
    160                 if ("false".equals(retryValue)) {
    161                     // 添加重试次数配置 methodName.retries
    162                     map.put(method.getName() + ".retries", "0");
    163                 }
    164             }
    165  
    166             // 添加 MethodConfig 中的“属性”字段到 attributes
    167             // 比如 onreturn、onthrow、oninvoke 等
    168             appendAttributes(attributes, method, prefix + "." + method.getName());
    169             checkAndConvertImplicitConfig(method, map, attributes);
    170         }
    171     }
    172     
    173     // -------------------------------✨ 分割线5 ✨------------------------------
    174 
    175     // 获取服务消费者 ip 地址
    176     String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    177     if (hostToRegistry == null || hostToRegistry.length() == 0) {
    178         hostToRegistry = NetUtils.getLocalHost();
    179     } else if (isInvalidLocalHost(hostToRegistry)) {
    180         throw new IllegalArgumentException("Specified invalid registry ip from property..." );
    181     }
    182     map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
    183 
    184     // 存储 attributes 到系统上下文中
    185     StaticContext.getSystemContext().putAll(attributes);
    186 
    187     // 创建代理类
    188     ref = createProxy(map);
    189 
    190     // 根据服务名,ReferenceConfig,代理类构建 ConsumerModel,
    191     // 并将 ConsumerModel 存入到 ApplicationModel 中
    192     ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    193     ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    194 }
    View Code

      ReferenceConfig 的 createProxy 创建代理对象:

        但是不是在这个方法内创建proxy实例,而是对URL进行解析后分三种创建Invoker线路,包括InjvmProtocol中的refer、DubboProtocol的refer与RegistryProtocol中的refer,最后再调用ProxyFactory来对proxy实例进行创建:

      1 private T createProxy(Map<String, String> map) {
      2     URL tmpUrl = new URL("temp", "localhost", 0, map);
      3     final boolean isJvmRefer;
      4     if (isInjvm() == null) {
      5         // url 配置被指定,则不做本地引用
      6         if (url != null && url.length() > 0) {
      7             isJvmRefer = false;
      8         // 根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用
      9         // 比如如果用户显式配置了 scope=local,此时 isInjvmRefer 返回 true
     10         } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
     11             isJvmRefer = true;
     12         } else {
     13             isJvmRefer = false;
     14         }
     15     } else {
     16         // 获取 injvm 配置值
     17         isJvmRefer = isInjvm().booleanValue();
     18     }
     19 
     20     // 本地引用
     21     if (isJvmRefer) {
     22         // 生成本地引用 URL,协议为 injvm
     23         URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
     24         // 调用 refer 方法构建 InjvmInvoker 实例
     25         invoker = refprotocol.refer(interfaceClass, url);
     26         
     27     // 远程引用
     28     } else {
     29         // url 不为空,表明用户可能想进行点对点调用
     30         if (url != null && url.length() > 0) {
     31             // 当需要配置多个 url 时,可用分号进行分割,这里会进行切分
     32             String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
     33             if (us != null && us.length > 0) {
     34                 for (String u : us) {
     35                     URL url = URL.valueOf(u);
     36                     if (url.getPath() == null || url.getPath().length() == 0) {
     37                         // 设置接口全限定名为 url 路径
     38                         url = url.setPath(interfaceName);
     39                     }
     40                     
     41                     // 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心
     42                     if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
     43                         // 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
     44                         urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
     45                     } else {
     46                         // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
     47                         // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
     48                         // 最后将合并后的配置设置为 url 查询字符串中。
     49                         urls.add(ClusterUtils.mergeUrl(url, map));
     50                     }
     51                 }
     52             }
     53         } else {
     54             // 加载注册中心 url
     55             List<URL> us = loadRegistries(false);
     56             if (us != null && !us.isEmpty()) {
     57                 for (URL u : us) {
     58                     URL monitorUrl = loadMonitor(u);
     59                     if (monitorUrl != null) {
     60                         map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
     61                     }
     62                     // 添加 refer 参数到 url 中,并将 url 添加到 urls 中
     63                     urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
     64                 }
     65             }
     66 
     67             // 未配置注册中心,抛出异常
     68             if (urls.isEmpty()) {
     69                 throw new IllegalStateException("No such any registry to reference...");
     70             }
     71         }
     72 
     73         // 单个注册中心或服务提供者(服务直连,下同)
     74         if (urls.size() == 1) {
     75             // 调用 RegistryProtocol 的 refer 构建 Invoker 实例
     76             invoker = refprotocol.refer(interfaceClass, urls.get(0));
     77             
     78         // 多个注册中心或多个服务提供者,或者两者混合
     79         } else {
     80             List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
     81             URL registryURL = null;
     82 
     83             // 获取所有的 Invoker
     84             for (URL url : urls) {
     85                 // 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
     86                 // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
     87                 invokers.add(refprotocol.refer(interfaceClass, url));
     88                 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
     89                     registryURL = url;
     90                 }
     91             }
     92             if (registryURL != null) {
     93                 // 如果注册中心链接不为空,则将使用 AvailableCluster
     94                 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
     95                 // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
     96                 invoker = cluster.join(new StaticDirectory(u, invokers));
     97             } else {
     98                 invoker = cluster.join(new StaticDirectory(invokers));
     99             }
    100         }
    101     }
    102 
    103     Boolean c = check;
    104     if (c == null && consumer != null) {
    105         c = consumer.isCheck();
    106     }
    107     if (c == null) {
    108         c = true;
    109     }
    110     
    111     // invoker 可用性检查
    112     if (c && !invoker.isAvailable()) {
    113         throw new IllegalStateException("No provider available for the service...");
    114     }
    115 
    116     // 生成代理类
    117     return (T) proxyFactory.getProxy(invoker);
    118 }
    View Code

        同样Invoker的创建后面会专门拿一篇来讲。暂时先把Invoker创建看成一个黑盒,只要我们调用即可。

      RegistryProtocol中的refer:

     1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
     2     // 取 registry 参数值,并将其设置为协议头
     3     url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
     4     // 获取注册中心实例
     5     Registry registry = registryFactory.getRegistry(url);
     6     if (RegistryService.class.equals(type)) {
     7         return proxyFactory.getInvoker((T) registry, type, url);
     8     }
     9 
    10     // 将 url 查询字符串转为 Map
    11     Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    12     // 获取 group 配置
    13     String group = qs.get(Constants.GROUP_KEY);
    14     if (group != null && group.length() > 0) {
    15         if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
    16                 || "*".equals(group)) {
    17             // 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
    18             return doRefer(getMergeableCluster(), registry, type, url);
    19         }
    20     }
    21     
    22     // 调用 doRefer 继续执行服务引用逻辑
    23     return doRefer(cluster, registry, type, url);
    24 }
    25 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    26     // 创建 RegistryDirectory 实例
    27     RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    28     // 设置注册中心和协议
    29     directory.setRegistry(registry);
    30     directory.setProtocol(protocol);
    31     Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    32     // 生成服务消费者链接
    33     URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    34 
    35     // 注册服务消费者,在 consumers 目录下新节点
    36     if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
    37             && url.getParameter(Constants.REGISTER_KEY, true)) {
    38         registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
    39                 Constants.CHECK_KEY, String.valueOf(false)));
    40     }
    41 
    42     // 订阅 providers、configurators、routers 等节点数据
    43     directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
    44             Constants.PROVIDERS_CATEGORY
    45                     + "," + Constants.CONFIGURATORS_CATEGORY
    46                     + "," + Constants.ROUTERS_CATEGORY));
    47 
    48     // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
    49     Invoker invoker = cluster.join(directory);
    50     ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    51     return invoker;
    52 }
    View Code

        在Invoker创建完后会返回到ReferenceConfig中,然后进入ProxyFactory中的getProxy方法。

      ProxyFactory中的getProxy方法:

     1 public <T> T getProxy(Invoker<T> invoker) throws RpcException {
     2     // 调用重载方法
     3     return getProxy(invoker, false);
     4 }
     5 
     6 public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
     7     Class<?>[] interfaces = null;
     8     // 获取接口列表
     9     String config = invoker.getUrl().getParameter("interfaces");
    10     if (config != null && config.length() > 0) {
    11         // 切分接口列表
    12         String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
    13         if (types != null && types.length > 0) {
    14             interfaces = new Class<?>[types.length + 2];
    15             // 设置服务接口类和 EchoService.class 到 interfaces 中
    16             interfaces[0] = invoker.getInterface();
    17             interfaces[1] = EchoService.class;
    18             for (int i = 0; i < types.length; i++) {
    19                 // 加载接口类
    20                 interfaces[i + 1] = ReflectUtils.forName(types[i]);
    21             }
    22         }
    23     }
    24     if (interfaces == null) {
    25         interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
    26     }
    27 
    28     // 为 http 和 hessian 协议提供泛化调用支持,参考 pull request #1827
    29     if (!invoker.getInterface().equals(GenericService.class) && generic) {
    30         int len = interfaces.length;
    31         Class<?>[] temp = interfaces;
    32         // 创建新的 interfaces 数组
    33         interfaces = new Class<?>[len + 1];
    34         System.arraycopy(temp, 0, interfaces, 0, len);
    35         // 设置 GenericService.class 到数组中
    36         interfaces[len] = GenericService.class;
    37     }
    38 
    39     // 调用重载方法
    40     return getProxy(invoker, interfaces);
    41 }
    42 
    43 public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
    View Code

        在上面的代码主要是获取接口数组再通过抽象的getProxy进入到Proxy中的getProxy。

    1 public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    2     // 生成 Proxy 子类(Proxy 是抽象类)。并调用 Proxy 子类的 newInstance 方法创建 Proxy 实例
    3     return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    4 }
    View Code

        代码具体分析上面有写。接下来进入到Proxy子类的getProxy中

      1 public static Proxy getProxy(Class<?>... ics) {
      2     // 调用重载方法
      3     return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
      4 }
      5 
      6 public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
      7     if (ics.length > 65535)
      8         throw new IllegalArgumentException("interface limit exceeded");
      9 
     10     StringBuilder sb = new StringBuilder();
     11     // 遍历接口列表
     12     for (int i = 0; i < ics.length; i++) {
     13         String itf = ics[i].getName();
     14         // 检测类型是否为接口
     15         if (!ics[i].isInterface())
     16             throw new RuntimeException(itf + " is not a interface.");
     17 
     18         Class<?> tmp = null;
     19         try {
     20             // 重新加载接口类
     21             tmp = Class.forName(itf, false, cl);
     22         } catch (ClassNotFoundException e) {
     23         }
     24 
     25         // 检测接口是否相同,这里 tmp 有可能为空
     26         if (tmp != ics[i])
     27             throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
     28 
     29         // 拼接接口全限定名,分隔符为 ;
     30         sb.append(itf).append(';');
     31     }
     32 
     33     // 使用拼接后的接口名作为 key
     34     String key = sb.toString();
     35 
     36     Map<String, Object> cache;
     37     synchronized (ProxyCacheMap) {
     38         cache = ProxyCacheMap.get(cl);
     39         if (cache == null) {
     40             cache = new HashMap<String, Object>();
     41             ProxyCacheMap.put(cl, cache);
     42         }
     43     }
     44 
     45     Proxy proxy = null;
     46     synchronized (cache) {
     47         do {
     48             // 从缓存中获取 Reference<Proxy> 实例
     49             Object value = cache.get(key);
     50             if (value instanceof Reference<?>) {
     51                 proxy = (Proxy) ((Reference<?>) value).get();
     52                 if (proxy != null) {
     53                     return proxy;
     54                 }
     55             }
     56 
     57             // 并发控制,保证只有一个线程可以进行后续操作
     58             if (value == PendingGenerationMarker) {
     59                 try {
     60                     // 其他线程在此处进行等待
     61                     cache.wait();
     62                 } catch (InterruptedException e) {
     63                 }
     64             } else {
     65                 // 放置标志位到缓存中,并跳出 while 循环进行后续操作
     66                 cache.put(key, PendingGenerationMarker);
     67                 break;
     68             }
     69         }
     70         while (true);
     71     }
     72 
     73     long id = PROXY_CLASS_COUNTER.getAndIncrement();
     74     String pkg = null;
     75     ClassGenerator ccp = null, ccm = null;
     76     try {
     77         // 创建 ClassGenerator 对象
     78         ccp = ClassGenerator.newInstance(cl);
     79 
     80         Set<String> worked = new HashSet<String>();
     81         List<Method> methods = new ArrayList<Method>();
     82 
     83         for (int i = 0; i < ics.length; i++) {
     84             // 检测接口访问级别是否为 protected 或 privete
     85             if (!Modifier.isPublic(ics[i].getModifiers())) {
     86                 // 获取接口包名
     87                 String npkg = ics[i].getPackage().getName();
     88                 if (pkg == null) {
     89                     pkg = npkg;
     90                 } else {
     91                     if (!pkg.equals(npkg))
     92                         // 非 public 级别的接口必须在同一个包下,否者抛出异常
     93                         throw new IllegalArgumentException("non-public interfaces from different packages");
     94                 }
     95             }
     96             
     97             // 添加接口到 ClassGenerator 中
     98             ccp.addInterface(ics[i]);
     99 
    100             // 遍历接口方法
    101             for (Method method : ics[i].getMethods()) {
    102                 // 获取方法描述,可理解为方法签名
    103                 String desc = ReflectUtils.getDesc(method);
    104                 // 如果方法描述字符串已在 worked 中,则忽略。考虑这种情况,
    105                 // A 接口和 B 接口中包含一个完全相同的方法
    106                 if (worked.contains(desc))
    107                     continue;
    108                 worked.add(desc);
    109 
    110                 int ix = methods.size();
    111                 // 获取方法返回值类型
    112                 Class<?> rt = method.getReturnType();
    113                 // 获取参数列表
    114                 Class<?>[] pts = method.getParameterTypes();
    115 
    116                 // 生成 Object[] args = new Object[1...N]
    117                 StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
    118                 for (int j = 0; j < pts.length; j++)
    119                     // 生成 args[1...N] = ($w)$1...N;
    120                     code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
    121                 // 生成 InvokerHandler 接口的 invoker 方法调用语句,如下:
    122                 // Object ret = handler.invoke(this, methods[1...N], args);
    123                 code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
    124 
    125                 // 返回值不为 void
    126                 if (!Void.TYPE.equals(rt))
    127                     // 生成返回语句,形如 return (java.lang.String) ret;
    128                     code.append(" return ").append(asArgument(rt, "ret")).append(";");
    129 
    130                 methods.add(method);
    131                 // 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中 
    132                 ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
    133             }
    134         }
    135 
    136         if (pkg == null)
    137             pkg = PACKAGE_NAME;
    138 
    139         // 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
    140         String pcn = pkg + ".proxy" + id;
    141         ccp.setClassName(pcn);
    142         ccp.addField("public static java.lang.reflect.Method[] methods;");
    143         // 生成 private java.lang.reflect.InvocationHandler handler;
    144         ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
    145 
    146         // 为接口代理类添加带有 InvocationHandler 参数的构造方法,比如:
    147         // porxy0(java.lang.reflect.InvocationHandler arg0) {
    148         //     handler=$1;
    149         // }
    150         ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
    151         // 为接口代理类添加默认构造方法
    152         ccp.addDefaultConstructor();
    153         
    154         // 生成接口代理类
    155         Class<?> clazz = ccp.toClass();
    156         clazz.getField("methods").set(null, methods.toArray(new Method[0]));
    157 
    158         // 构建 Proxy 子类名称,比如 Proxy1,Proxy2 等
    159         String fcn = Proxy.class.getName() + id;
    160         ccm = ClassGenerator.newInstance(cl);
    161         ccm.setClassName(fcn);
    162         ccm.addDefaultConstructor();
    163         ccm.setSuperClass(Proxy.class);
    164         // 为 Proxy 的抽象方法 newInstance 生成实现代码,形如:
    165         // public Object newInstance(java.lang.reflect.InvocationHandler h) { 
    166         //     return new org.apache.dubbo.proxy0($1);
    167         // }
    168         ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
    169         // 生成 Proxy 实现类
    170         Class<?> pc = ccm.toClass();
    171         // 通过反射创建 Proxy 实例
    172         proxy = (Proxy) pc.newInstance();
    173     } catch (RuntimeException e) {
    174         throw e;
    175     } catch (Exception e) {
    176         throw new RuntimeException(e.getMessage(), e);
    177     } finally {
    178         if (ccp != null)
    179             // 释放资源
    180             ccp.release();
    181         if (ccm != null)
    182             ccm.release();
    183         synchronized (cache) {
    184             if (proxy == null)
    185                 cache.remove(key);
    186             else
    187                 // 写缓存
    188                 cache.put(key, new WeakReference<Proxy>(proxy));
    189             // 唤醒其他等待线程
    190             cache.notifyAll();
    191         }
    192     }
    193     return proxy;
    194 }
    View Code

        ccp 用于为服务接口生成代理类,比如我们有一个 DemoService 接口,这个接口代理类就是由 ccp 生成的。ccm 则是用于为 org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象方法。

        这里需要重点讲一下,因为用到了并发控制。机制是这样的,synchronized中首先获取缓存中 Reference<Proxy> 实例。因为缓存是HashMap结构来存取。key是Reference<Proxy> 实例对应的接口名称,value就是Reference<Proxy> 实例,注意的是接口列表进行拼接了。当第一个线程进入时,key对应的是实例而不是PendingGenerationMarker。所以会进入到else中,else中则设置key的对应的value为标志位PendingGenerationMarker。这样其他线程只能等待,而后对服务接口生产代理类和抽象类的子类。在最后释放资源时,会唤醒其他线程,并且把已经生成过的Reference实例标志成弱引用对象,代表可以回收了。(注:弱引用,比软引用更弱一点,被弱引用关联的对象只能生存到下一次垃圾收集发生之前。当垃圾收集发生时无论内存是否足够,都会回收弱引用对象。具体可以看JVM垃圾回收算法

    四、总结:

      到这里应该算是讲完了Dubbo内的服务引用机制。对于Invoker后面会再单独讲。这里还要补充一句如果是集群的话会启动服务降级以及负载均衡每次只选择一个Invoker调用,同样这个后面会再做单独介绍。

  • 相关阅读:
    Sql Server 收缩日志文件原理及always on 下的实践
    SQL Agent服务无法启动如何破
    Sql Server Always On 读写分离配置方法
    SQL SERVER 9003错误解决方法 只适用于SQL2000
    SQL Server 数据库分离与附加
    SQL SERVER 的模糊查询 LIKE
    sqlserver 时间格式函数详细
    浅谈SQL Server中的三种物理连接操作
    谈一谈SQL Server中的执行计划缓存(下)
    asp.net获取当前系统的时间
  • 原文地址:https://www.cnblogs.com/Cubemen/p/12301188.html
Copyright © 2011-2022 走看看