zoukankan      html  css  js  c++  java
  • Dubbo原理和源码解析之“微内核+插件”机制

    github新增仓库 "dubbo-read"(点此查看,集合所有《Dubbo原理和源码解析》系列文章,后续将继续补充该系列,同时将针对Dubbo所做的功能扩展也进行分享。不定期更新,欢迎Follow。

     

    1 框架设计

    在官方《Dubbo 开发指南》框架设计部分提到,Dubbo 服务框架的基本设计原则是:

    • 采用 URL 作为配置信息的统一格式,所有扩展点都通过传递 URL 携带配置信息;
    • 采用 Microkernel + Plugin 模式,Microkernel 只负责组装 Plugin,Dubbo 自身的功能也是通过扩展点实现的,也就是 Dubbo 的所有功能点都可被用户自定义扩展所替换;

    对于第一点比较容易理解,所有的参数都封装成 Dubbo 自定义的 URL 对象进行传递。URL 对象主要包括以下属性:

    • String protocol
    • String host
    • int port
    • String path
    • Map<String, String> parameters

    本文将重点介绍第二点,对 Microkernel + Plugin 机制的实现原理、源码进行分析和跟踪。

    2 API 和 SPI

    框架或组件通常有两类客户,一个是使用者,一个是扩展者。API (Application Programming Interface) 是给使用者用的,而 SPI (Service Provide Interface) 是给扩展者用的。
    我们系统里抽象的各个模块,往往有很多不同的实现方案,比如日志模块的方案、jdbc模块的方案等。面向的对象的设计里,我们一般推荐 模块之间基于接口编程,模块之间不对实现类进行硬编码。一旦代码里涉及具体的实现类,就违反了 可拔插的原则,如果需要替换一种实现,就需要修改代码。
    为了实现在模块装配的时候能不在程序里动态指明,这就需要一种服务发现机制。JAVA SPI 就提供了这样的一个机制——为某个接口寻找服务实现的机制。有点类似 IOC 的思想,将装配的控制权移到程序之外,在 模块化设计 中这个机制尤其重要。

    3 JAVA SPI

    JAVA SPI 实际上是 ”基于接口编程+策略模式+配置文件“ 组合实现的动态加载机制。具体步骤为:

      1. 定义一个接口;
      2. 编写接口的一个或多个实现;
      3. src/main/resources/ 下建立 /META-INF/services 目录, 新增一个以接口命名的文件,内容是实现类类名;
      4. 使用 ServiceLoader 来加载配置文件中指定的实现。

    假设我们提供了一个“打招呼”的接口,有中文版和英文版两种实现:

    3.1 定义接口

    package com.spi.service;
    
    public interface HelloService {
    
        public String sayHello();
    
    }

    3.2 编写实现

    分别编写中文版、英文版的实现:

    package com.spi.service.impl;
    
    public class ChineseHelloService implements HelloService {
    
        @Override
        public String sayHello() {
            return "你好";
        }
    
    }
    

      

    package com.spi.service.impl;
    
    public class EnglishHelloService implements HelloService {
    
        @Override
        public String sayHello() {
            return "hello";
        }
    
    }
    

      

    编写工厂类,用于封装实现类的获取逻辑:

    public class HelloServiceFactory {
    
        public HelloServiceFactory(){ }
    
        public static HelloService newHelloService(){
            HelloService helloService = null;
            ServiceLoader<HelloService> serviceLoader = ServiceLoader.load(HelloService.class);
            Iterator<HelloService> services = serviceLoader.iterator();
            if(services.hasNext()){
                helloService = services.next();
            }
            return helloService;
        }
    
    }
    

      

    3.3 创建文件

    /src/main/resource/META-INF/services 下创建 com.spi.service.HelloService 文件,内容为两个具体实现类的类名:

    package com.spi.service.impl.EnglishHelloService
    

    package com.spi.service.impl.ChineseHelloService 
    

    3.4 执行测试

    package com.spi;
    
    public class Main {
    
        public static void main(String[] args) {
            HelloService helloService = HelloServiceFactory.newHelloService();
            System.out.println(helloService.sayHello());
        }
    
    }
    

      

    当文件内容为 package com.spi.service.impl.EnglishHelloService 时,执行结果为:

    hello
    

      

    当文件内容为 package com.spi.service.impl.ChineseHelloService 时,执行结果为:

    你好
    

      

    以此类推,如果你把所有实现类类名都写到文件中,由调用者自行选择实现类,那么可以通过以下方式实现(简陋版,纯属举例用):

    com.spi.service.HelloService文件:

    package com.spi.service.impl.EnglishHelloService
    package com.spi.service.impl.ChineseHelloService
    

      

    HelloServiceFactory:

    public class HelloServiceFactory {
    
        private HelloServiceFactory(){
    
        }
    
        public static HelloService newHelloService(String name){
            HelloService helloService = null;
            ServiceLoader<HelloService> serviceLoader = ServiceLoader.load(HelloService.class);
            Iterator<HelloService> services = serviceLoader.iterator();
            while(services.hasNext()){
                HelloService tmp = services.next();
                if(tmp.getClass().toString().contains(name)){
                    helloService = tmp;
                    break;
                }
            }
            return helloService;
        }
    
    }
    

      

    Main:

    public class Main {
    
        public static String name = "com.spi.service.impl.EnglishHelloService";
    
        public static void main(String[] args) {
            HelloService helloService = HelloServiceFactory.newHelloService(name);
            System.out.println(helloService.sayHello());
        }
    
    }
    

      

    4 Dubbo Microkernel + Plugin

    Dubbo “微内核+插件“机制的整体特性如下:

    下面结合源码进行分析

    4.1 ExtensionLoader

    Dubbo 实现 “微内核+插件“机制的核心是 ExtensionLoader,它取代了 JDK 自带的 ServiceLoader。 在 Dubbo 官方文档中提到,ExtensionLoader 改进了 JAVA ServiceLoader 的以下问题:

      1. JDK 标准的 SPI 会一次性实例化扩展点所有实现,没用上也加载,如果有扩展实现初始化很耗时,会很浪费资源。
      2. 如果扩展点加载失败,连扩展点的名称都拿不到了。比如:JDK 标准的 ScriptEngine,通过 getName() 获取脚本类型的名称,但如果 RubyScriptEngine 因为所依赖的 jruby.jar 不存在,导致 RubyScriptEngine 类加载失败,这个失败原因被吃掉了,和 ruby 对应不起来,当用户执行 ruby 脚本时,会报不支持 ruby,而不是真正失败的原因。
      3. 增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点。

    以 LoadBalance 为例,文件 com.alibaba.dubbo.rpc.cluster.LoadBalance 中内容为:

    random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
    roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
    leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
    consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance

    用户使用时,在 XML 中配置 loadbalance="random",那么 Dubbo 将加载(且仅加载)RandomLoadBalance

    从源码角度分析,ExtensionLoader 加载扩展点流程如下:

    4.1.1 获取ExtensionLoader

    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        if (type == null)
            throw new IllegalArgumentException("Extension type == null");
        if(!type.isInterface()) {
            throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
        }
        if(!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type(" + type + 
                    ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
        }
        
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }
    

    在获取 ExtensionLoader 时,或判断传入的 Class 是否为 interface 并是否有 @SPI 注解。创建 ExtensionLoader 实例后在内存中缓存,保证每个扩展点具有唯一的 ExtensionLoader 单例。

    4.1.2 获取扩展点

    //根据名字获取扩展点实例
    public T getExtension(String name) {
    		if (name == null || name.length() == 0)
    		    throw new IllegalArgumentException("Extension name == null");
    		if ("true".equals(name)) {
    		    return getDefaultExtension();
    		}
    		Holder<Object> holder = cachedInstances.get(name);
    		if (holder == null) {
    		    cachedInstances.putIfAbsent(name, new Holder<Object>());
    		    holder = cachedInstances.get(name);
    		}
    		Object instance = holder.get();
    		if (instance == null) {
    		    synchronized (holder) {
    	            instance = holder.get();
    	            if (instance == null) {
    	                instance = createExtension(name);
    	                holder.set(instance);
    	            }
    	        }
    		}
    		return (T) instance;
    }
    
    //实例化扩展点
    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (wrapperClasses != null && wrapperClasses.size() > 0) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                    type + ")  could not be instantiated: " + t.getMessage(), t);
        }
    }
    
    
    private Map<String, Class<?>> getExtensionClasses() {
          Map<String, Class<?>> classes = cachedClasses.get();
          if (classes == null) {
              synchronized (cachedClasses) {
                  classes = cachedClasses.get();
                  if (classes == null) {
                      classes = loadExtensionClasses();
                      cachedClasses.set(classes);
                  }
              }
          }
          return classes;
    }
    
    //从配置文件中加载扩展点
    private Map<String, Class<?>> loadExtensionClasses() {
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if(defaultAnnotation != null) {
            String value = defaultAnnotation.value();
            if(value != null && (value = value.trim()).length() > 0) {
                String[] names = NAME_SEPARATOR.split(value);
                if(names.length > 1) {
                    throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                            + ": " + Arrays.toString(names));
                }
                if(names.length == 1) cachedDefaultName = names[0];
            }
        }
        
        Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
        loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
        loadFile(extensionClasses, DUBBO_DIRECTORY);
        loadFile(extensionClasses, SERVICES_DIRECTORY);
        return extensionClasses;
    }
    

    获取扩展点时,从内存缓存中获取扩展点实例。扩展点实例在进程中也是个单例。Dubbo 从以下三个路径中读取扩展点配置文件并加载:

    • META-INF/services/
    • META-INF/dubbo/
    • META-INF/dubbo/internal/

    4.2 setter & Wrapper

    在实例化扩展点的代码中,我们可以看到有以下两个处理:

    • setter 注入
    • Wrapper 包装
    //实例化扩展点
    private T createExtension(String name) {
        //......
        
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (wrapperClasses != null && wrapperClasses.size() > 0) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
            
        //......
    }
    
    //注入扩展点
    private T injectExtension(T instance) {
        try {
            if (objectFactory != null) {
                for (Method method : instance.getClass().getMethods()) {
                    if (method.getName().startsWith("set")
                            && method.getParameterTypes().length == 1
                            && Modifier.isPublic(method.getModifiers())) {
                        Class<?> pt = method.getParameterTypes()[0];
                        try {
                            String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                            Object object = objectFactory.getExtension(pt, property);
                            if (object != null) {
                                method.invoke(instance, object);
                            }
                        } catch (Exception e) {
                            logger.error("fail to inject via method " + method.getName()
                                    + " of interface " + type.getName() + ": " + e.getMessage(), e);
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }
    

    setter

    扩展点实现类的成员如果为其它扩展点类型,ExtensionLoader 在会自动注入依赖的扩展点。ExtensionLoader 通过扫描扩展点实现类的所有set方法来判定其成员。

    Wrapper 

    如果扩展点实现类有拷贝构造函数,则认为是包装类。包装类持有实际的扩展点实现类,通过包装类可以把所有扩展点的公共逻辑移到包装类,类似AOP。

    4.3 Adaptive & Activate

    从文件加载扩展点代码如下:

    private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
        //......
        BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
        try {
            String line = null;
            while ((line = reader.readLine()) != null) {
                final int ci = line.indexOf('#');
                if (ci >= 0) line = line.substring(0, ci);
                line = line.trim();
                if (line.length() > 0) {
                    try {
                        String name = null;
                        int i = line.indexOf('=');
                        if (i > 0) {
                            name = line.substring(0, i).trim();
                            line = line.substring(i + 1).trim();
                        }
                        if (line.length() > 0) {
                            Class<?> clazz = Class.forName(line, true, classLoader);
                            if (! type.isAssignableFrom(clazz)) {
                                throw new IllegalStateException("Error when load extension class(interface: " +
                                        type + ", class line: " + clazz.getName() + "), class " 
                                        + clazz.getName() + "is not subtype of interface.");
                            }
                            if (clazz.isAnnotationPresent(Adaptive.class)) {
                                if(cachedAdaptiveClass == null) {
                                    cachedAdaptiveClass = clazz;
                                } else if (! cachedAdaptiveClass.equals(clazz)) {
                                    throw new IllegalStateException("More than 1 adaptive class found: "
                                            + cachedAdaptiveClass.getClass().getName()
                                            + ", " + clazz.getClass().getName());
                                }
                            } else {
                                try {
                                    clazz.getConstructor(type);
                                    Set<Class<?>> wrappers = cachedWrapperClasses;
                                    if (wrappers == null) {
                                        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                        wrappers = cachedWrapperClasses;
                                    }
                                    wrappers.add(clazz);
                                } catch (NoSuchMethodException e) {
                                    clazz.getConstructor();
                                    if (name == null || name.length() == 0) {
                                        name = findAnnotationName(clazz);
                                        if (name == null || name.length() == 0) {
                                            if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                    && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                                name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                            } else {
                                                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                            }
                                        }
                                    }
                                    String[] names = NAME_SEPARATOR.split(name);
                                    if (names != null && names.length > 0) {
                                        Activate activate = clazz.getAnnotation(Activate.class);
                                        if (activate != null) {
                                            cachedActivates.put(names[0], activate);
                                        }
                                        for (String n : names) {
                                            if (! cachedNames.containsKey(clazz)) {
                                                cachedNames.put(clazz, n);
                                            }
                                            Class<?> c = extensionClasses.get(n);
                                            if (c == null) {
                                                extensionClasses.put(n, clazz);
                                            } else if (c != clazz) {
                                                throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    } catch (Throwable t) {
                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                        exceptions.put(line, e);
                    }
                }
            } // end of while read lines
        } finally {
            reader.close();
        }
        //......
    }
    

    简单来说,上面的代码做了以下几件事:

      1. 忽略已注释的行
      2. 解析出名称和扩展点实现类名
      3. 判断是否有@Adaptive注解
      4. 匹配构造函数,判断是否为Wrapper类
      5. 判断是否有@Activate注解

    @Adaptive 

    扩展点自适应,直到扩展点方法执行时才决定调用哪一个扩展点实现。扩展点的调用会有URL 作为参数,通过@Adaptive 注解可以提取约定 key 来决定调用哪个实现的方法。

    @Activate

    扩展点自动激活,指定 URL 中激活扩展点的 key,未指定 key 时表示无条件激活。 比如 LoadBalance

    SPI(RandomLoadBalance.NAME)
    public interface LoadBalance {
    
        @Adaptive("loadbalance")
    	<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
    
    }
    

    表示默认使用 Random 负载均衡策略,同时会根据用户在 XML 中配置的 loadbalance 参数来最终决定调用哪个扩展点实现类。

    再比如 AsyncFilter

    @Activate(group = Constants.CONSUMER)
    public class AsyncFilter implements Filter{
    
    }
    

    表示只有在 Consumer 端才会激活。

  • 相关阅读:
    无线安全课堂:手把手教会你搭建伪AP接入点
    转载——开阔自己的视野,勇敢的接触新知识
    关于系统架构的一些总结
    MessageBox.Show()如何换行
    不患寡而患不均
    由CHAR(2)引发的BUG
    DataRow.RowState 属性
    C# 使用TimeSpan计算两个时间差
    利用反射调出其他项目的界面
    DB2 中将date类型的转换成timestamp
  • 原文地址:https://www.cnblogs.com/cyfonly/p/9474125.html
Copyright © 2011-2022 走看看