zoukankan      html  css  js  c++  java
  • Dubbo 扩展点加载机制:从 Java SPI 到 Dubbo SPI

    SPI 全称为 Service Provider Interface,是一种服务发现机制。当程序运行调用接口时,会根据配置文件或默认规则信息加载对应的实现类。所以在程序中并没有直接指定使用接口的哪个实现,而是在外部进行装配。
    要想了解 Dubbo 的设计与实现,其中 Dubbo SPI 加载机制是必须了解的,在 Dubbo 中有大量功能的实现都是基于 Dubbo SPI 实现解耦,同时也使得 Dubbo 获得如此好的可扩展性。

    Java SPI

    通过完成一个 Java SPI 的操作来了解它的机制。

    • 创建一个 AnimalService 接口及 category 方法
    • 创建一个实现类 Cat
    • 创建 META-INF/services 目录,并在该目录下创建一个文件,文件名为 AnimalService 的全限定名作为文件名
    • 在文件中添加实现类 Cat 的全限定名

    Animal 接口

    public interface AnimalService {
        void category();
    }
    

    Cat 实现类

    public class Cat implements AnimalService {
    
        @Override
        public void category() {
            System.out.println("cat: Meow ~");
        }
    }
    

    在 META-INF/services 目录下的 top.ytao.demo.spi.AnimalService 文件中添加:

    top.ytao.demo.spi.Cat
    

    加载 SPI 的实现:

    public class JavaSPITest {
    
        @Test
        public void javaSPI() throws Exception {
            ServiceLoader<AnimalService> serviceLoader = ServiceLoader.load(AnimalService.class);
            // 遍历在配置文件中已配置的 AnimalService 的所有实现类
            for (AnimalService animalService : serviceLoader) {
                animalService.category();
            }
        }
    
    }
    

    执行结果:

    就这样,一个 Java SPI 就实现完成了,通过 ServiceLoader.load 获取加载所有接口已配置的接口实现类,然后可以遍历找出需要的实现。

    Dubbo SPI

    本文 Dubbo 版本为2.7.5
    Dubbo SPI 相较于 Java SPI 更为强大,并且都是由自己实现的一套 SPI 机制。其中主要的改进和优化:

    • 相对于 Java SPI 一次性加载所有实现,Dubbo SPI 是按需加载,只加载需要使用的实现类。同时带有缓存支持。
    • 更为详细的扩展加载失败信息。
    • 增加了对扩展 IOC 和 AOP的支持。

    Dubbo SPI 示例

    Dubbo SPI 的配置文件放在 META-INF/dubbo 下面,并且实现类的配置方式采用 K-V 的方式,key 为实例化对象传入的参数,value 为扩展点实现类全限定名。例如 Cat 的配置文件内容:

    cat = top.ytao.demo.spi.Cat
    

    Dubbo SPI 加载过程中,对 Java SPI 的目录也是可以被兼容的。

    同时需要在接口上增加 @SPI 注解,@SPI 中可以指定 key 值,加载 SPI 如下:

    public class DubboSPITest {
    
        @Test
        public void dubboSPI(){
            ExtensionLoader<AnimalService> extensionLoader = ExtensionLoader.getExtensionLoader(AnimalService.class);
            // 获取扩展类实现
            AnimalService cat = extensionLoader.getExtension("cat");
            System.out.println("Dubbo SPI");
            cat.category();
        }
    
    }
    

    执行结果如下:

    获取 ExtensionLoader 实例

    获取 ExtensionLoader 实例是通过上面 getExtensionLoader 方法,具体实现代码:

    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        if (type == null) {
            throw new IllegalArgumentException("Extension type == null");
        }
        // 检查 type 必须为接口
        if (!type.isInterface()) {
            throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
        }
        // 检查接口是否有 SPI 注解
        if (!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type (" + type +
                    ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
        }
        // 缓存中获取 ExtensionLoader 实例
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            // 加载 ExtensionLoader 实例到缓存中
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }
    

    上面获取扩展类加载器过程主要是检查传入的 type 是否合法,以及从扩展类加载器缓存中是否存在当前类型的接口,如果不存在则添加当前接口至缓存中。
    ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS 是扩展类加载器的缓存,它是以接口作为 key, 扩展类加载器作为 value 进行缓存。

    获取扩展类对象

    获取扩展类对象的方法ExtensionLoader#getExtension,在这里完成扩展对象的缓存及创建工作:

    public T getExtension(String name) {
        if (StringUtils.isEmpty(name)) {
            throw new IllegalArgumentException("Extension name == null");
        }
        // 如果传入的参数为 true ,则获取默认扩展类对象操作
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
        // 获取扩展对象,Holder 里的 value 属性保存着扩展对象实例
        final Holder<Object> holder = getOrCreateHolder(name);
        Object instance = holder.get();
        // 使用双重检查锁
        if (instance == null) {
            synchronized (holder) {
                instance = holder.get();
                if (instance == null) {
                    // 创建扩展对象
                    instance = createExtension(name);
                    holder.set(instance);
                }
            }
        }
        return (T) instance;
    }
    

    获取 holder 对象是从缓存ConcurrentMap<String, Holder<Object>> cachedInstances中获取,如果不存在,则以扩展名 key,创建一个 Holder 对象作为 value,设置到扩展对象缓存。
    如果是新创建的扩展对象实例,那么 holder.get() 一定是 null ,扩展对象为空时,经过双重检查锁,创建扩展对象。

    创建扩展对象

    创建扩展对象过程:

    private T createExtension(String name) {
        // 从全部扩展类中,获取当前扩展名对应的扩展类
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            // 从缓存中获取扩展实例,及设置扩展实例缓存
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            // 向当前实例注入依赖
            injectExtension(instance);
            // 获取包装扩展类缓存
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    // 创建包装扩展类实例,并向其注入依赖
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            // 初始化扩展对象
            initExtension(instance);
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                    type + ") couldn't be instantiated: " + t.getMessage(), t);
        }
    }
    

    上面创建扩展过程中,里面有个 Wrapper 类,这里使用到装饰器模式,该类是没有具体的实现,而是把通用逻辑进行抽象。
    创建这个过程是从所有扩展类中获取当前扩展名对应映射关系的扩展类,以及向当前扩展对象注入依赖。

    获取所有扩展类:

    private Map<String, Class<?>> getExtensionClasses() {
        // 获取普通扩展类缓存
        Map<String, Class<?>> classes = cachedClasses.get();
        // 如果缓存中没有,通过双重检查锁后进行加载
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    // 加载全部扩展类
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
    }
    

    检查普通扩展类缓存是否为空,如果不为空则重新加载,真正加载扩展类在loadExtensionClasses中:

    
    private static final String SERVICES_DIRECTORY = "META-INF/services/";
    
    private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
    
    private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
    
    private Map<String, Class<?>> loadExtensionClasses() {
        // 获取 @SPI 上的默认扩展名
        cacheDefaultExtensionName();
    
        Map<String, Class<?>> extensionClasses = new HashMap<>();
        // 先加载 Dubbo 内部的扩展类, 通过 Boolean 值控制
        loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName(), true);
        // 由于 Dubbo 迁到 apache ,所以包名有变化,会替换之前的 alibaba 为 apache
        loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"), true);
        
        loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
        loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
        loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
        loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
        return extensionClasses;
    }
    

    上面获取 @SPI 扩展名,以及指定要加载的文件。从上面静态常量中,我们可以看到,Dubbo SPI 也是支持加载 Java SPI 的目录,同时还加载 META-INF/dubbo/internal (该目录为 Dubbo 的内部扩展类目录),在 loadDirectory 加载目录配置文件。

        private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type, boolean extensionLoaderClassLoaderFirst) {
            // 获取文件在项目中的路径,如:META-INF/dubbo/top.ytao.demo.spi.AnimalService
            String fileName = dir + type;
            try {
                Enumeration<java.net.URL> urls = null;
                ClassLoader classLoader = findClassLoader();
                
                // 加载内部扩展类
                if (extensionLoaderClassLoaderFirst) {
                    ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
                    if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
                        urls = extensionLoaderClassLoader.getResources(fileName);
                    }
                }
                
                // 加载当前 fileName 文件
                if(urls == null || !urls.hasMoreElements()) {
                    if (classLoader != null) {
                        urls = classLoader.getResources(fileName);
                    } else {
                        urls = ClassLoader.getSystemResources(fileName);
                    }
                }
    
                if (urls != null) {
                    // 迭代加载同名文件的内容
                    while (urls.hasMoreElements()) {
                        java.net.URL resourceURL = urls.nextElement();
                        // 加载文件内容
                        loadResource(extensionClasses, classLoader, resourceURL);
                    }
                }
            } catch (Throwable t) {
                logger.error("Exception occurred when loading extension class (interface: " +
                        type + ", description file: " + fileName + ").", t);
            }
        }
    

    这里获取文件名后加载所有同名文件,然后迭代各个文件,逐个加载文件内容。

    private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
        try {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
                String line;
                // 整行读取文件内容
                while ((line = reader.readLine()) != null) {
                    // 获取当前行中第一个 "#" 的位置索引
                    final int ci = line.indexOf('#');
                    // 如果当前行存在 "#",则去除 "#" 后的内容
                    if (ci >= 0) {
                        line = line.substring(0, ci);
                    }
                    line = line.trim();
                    if (line.length() > 0) {
                        try {
                            String name = null;
                            // 获取当前行 "=" 的索引
                            int i = line.indexOf('=');
                            // 如果当前行存在 "=",将 "=" 左右的值分开复制给 name 和 line
                            if (i > 0) {
                                name = line.substring(0, i).trim();
                                line = line.substring(i + 1).trim();
                            }
                            if (line.length() > 0) {
                                // 加载扩展类
                                loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
                            }
                        } catch (Throwable t) {
                            IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
                            exceptions.put(line, e);
                        }
                    }
                }
            }
        } catch (Throwable t) {
            logger.error("Exception occurred when loading extension class (interface: " +
                    type + ", class file: " + resourceURL + ") in " + resourceURL, t);
        }
    }
    

    上面代码完成文件内容加载和解析,接下来通过 loadClass 加载扩展类。

    private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
        // 检查当前实现类是否实现了 type 接口
        if (!type.isAssignableFrom(clazz)) {
            throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                    type + ", class line: " + clazz.getName() + "), class "
                    + clazz.getName() + " is not subtype of interface.");
        }
        
        // 当前实现类是否有 Adaptive 注解
        if (clazz.isAnnotationPresent(Adaptive.class)) {
            cacheAdaptiveClass(clazz);
        // 当前类是否为 Wrapper 包装扩展类 
        } else if (isWrapperClass(clazz)) {
            cacheWrapperClass(clazz);
        } else {
            // 尝试当前类是否有无参构造方法
            clazz.getConstructor();
            
            if (StringUtils.isEmpty(name)) {
                // 如果 name 为空,则获取 clazz 的 @Extension 注解的值,如果注解值也没有,则使用小写类名
                name = findAnnotationName(clazz);
                if (name.length() == 0) {
                    throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
                }
            }
    
            String[] names = NAME_SEPARATOR.split(name);
            if (ArrayUtils.isNotEmpty(names)) {
                // 缓存 扩展名和@Activate的缓存
                cacheActivateClass(clazz, names[0]);
                for (String n : names) {
                    // 缓存 扩展类和扩展名的缓存
                    cacheName(clazz, n);
                    // 将 扩展类和扩展名 保存到extensionClasses 扩展名->扩展类 关系映射中
                    saveInExtensionClass(extensionClasses, clazz, n);
                }
            }
        }
    }
    

    至此,getExtensionClasses() 加载扩展类方法分析完成,接下分析注入依赖 injectExtension() 方法。

    private T injectExtension(T instance) {
        // 
        if (objectFactory == null) {
            return instance;
        }
    
        try {
            for (Method method : instance.getClass().getMethods()) {
                // 遍历当前扩展类的全部方法,如果当前方法不属于 setter 方法,
                // 即不是以 'set'开头的方法名,参数不是一个的,该方法访问级别不是 public 的,则不往下执行
                if (!isSetter(method)) {
                    continue;
                }
                
                // 当前方法是否添加了不要注入依赖的注解
                if (method.getAnnotation(DisableInject.class) != null) {
                    continue;
                }
                Class<?> pt = method.getParameterTypes()[0];
                // 判断当前参数是否属于 八个基本类型或void
                if (ReflectUtils.isPrimitives(pt)) {
                    continue;
                }
    
                try {
                    // 通过属性 setter 方法获取属性名
                    String property = getSetterProperty(method);
                    // 获取依赖对象
                    Object object = objectFactory.getExtension(pt, property);
                    if (object != null) {
                        // 设置依赖
                        method.invoke(instance, object);
                    }
                } catch (Exception e) {
                    logger.error("Failed to inject via method " + method.getName()
                            + " of interface " + type.getName() + ": " + e.getMessage(), e);
                }
    
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }
    

    通过遍历扩展类所有方法,找到相对应的依赖,然后使用反射调用 settter 方法来进行设置依赖。
    objectFactory 对象如图:

    其中找到相应依赖是在 SpiExtensionFactory 或 SpringExtensionFactory 中,同时,这两个 Factory 保存在 AdaptiveExtensionFactory 中进行维护。

    @Adaptive
    public class AdaptiveExtensionFactory implements ExtensionFactory {
    
        private final List<ExtensionFactory> factories;
    
        public AdaptiveExtensionFactory() {
            // ......
        }
    
        @Override
        public <T> T getExtension(Class<T> type, String name) {
            // 通过遍历匹配到 type->name 的映射
            for (ExtensionFactory factory : factories) {
                T extension = factory.getExtension(type, name);
                if (extension != null) {
                    return extension;
                }
            }
            return null;
        }
    
    }
    

    以上是对 Dubbo SPI 扩展类简单加载过程分析完成。

    自适应加载机制

    为 Dubbo 更加灵活的使一个接口不通过硬编码加载扩展机制,而是通过使用过程中进行加载,Dubbo 的另一加载机制——自适应加载。
    自适应加载机制使用 @Adaptive 标注:

    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE, ElementType.METHOD})
    public @interface Adaptive {
        String[] value() default {};
    }
    

    Adaptive 的值是一个数组,可以配置多个 key。初始化时,遍历所有 key 进行匹配,如果没有则匹配 @SPI 的值。
    当 Adaptive 注解标注在类上时,则简单对应该实现。如果注解标注在接口方法上时,则会根据参数动态生成代码来获取扩展点的实现。
    类上注解处理还是比较好理解,方法上的注解加载相对比较有研读性。通过调用ExtensionLoader#getAdaptiveExtension来进行获取扩展实现。

    public T getAdaptiveExtension() {
        // 获取实例化对象缓存
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if (createAdaptiveInstanceError != null) {
                throw new IllegalStateException("Failed to create adaptive instance: " +
                        createAdaptiveInstanceError.toString(),
                        createAdaptiveInstanceError);
            }
            // 双重检查锁后创建自适应扩展
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        // 创建自适应扩展
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        }
    
        return (T) instance;
    }
    
    private T createAdaptiveExtension() {
        try {
            // 获取自适应扩展后,注入依赖
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }
    
    

    上面代码完成了扩展类对象是否存在缓存中,如果不存在,则通过创建自适应扩展,并将实例注入依赖后,设置在实例化后的自适应扩展对象中。
    其中getAdaptiveExtensionClass是比较核心的流程。

    private Class<?> getAdaptiveExtensionClass() {
        // 加载全部扩展类
        getExtensionClasses();
        // 加载全部扩展类后,如果有 @Adaptive 标注的类,cachedAdaptiveClass 则一定不会为空
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        // 创建自适应扩展类
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }
    
    private Class<?> createAdaptiveExtensionClass() {
        // 生成自适应扩展代码
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        // 获取扩展类加载器
        ClassLoader classLoader = findClassLoader();
        // 获取编译器类型的实现类
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        // 编译代码,返回该对象
        return compiler.compile(code, classLoader);
    }
    

    这里完成的工作主要是,加载全部扩展类,代表所有扩展接口类的实现类,在其加载过程中,如果有 @Adaptive 标注的类,会保存到 cachedAdaptiveClass 中。通过自动生成自适应扩展代码,并被编译后,获取扩展类实例化对象。
    上面编译器类型是可以指定的,通过 compiler 进行指定,例如:<dubbo:application name="taomall-provider" compiler="jdk" />,该编译器默认使用 javassist 编译器。

    在 generate 方法中动态生成代码:

    public String generate() {
        // 检查当前扩展接口的方法上是否有 Adaptive 注解
        if (!hasAdaptiveMethod()) {
            throw new IllegalStateException("No adaptive method exist on extension " + type.getName() + ", refuse to create the adaptive class!");
        }
    
        // 生成代码
        StringBuilder code = new StringBuilder();
        // 生成类的包名
        code.append(generatePackageInfo());
        // 生成类的依赖类
        code.append(generateImports());
        // 生成类的声明信息
        code.append(generateClassDeclaration());
    
        // 生成方法
        Method[] methods = type.getMethods();
        for (Method method : methods) {
            code.append(generateMethod(method));
        }
        code.append("}");
    
        if (logger.isDebugEnabled()) {
            logger.debug(code.toString());
        }
        return code.toString();
    }
    

    上面是生成类信息的方法,生成设计原理是按照已设置好的模板,进行替换操作,生成类。具体信息不代码很多,但阅读还是比较简单。
    自适应加载机制,已简单分析完,咋一眼看,非常复杂,但是了解整体结构和流程,再去细研的话,相对还是好理解。

    总结

    从 Dubbo 设计来看,其良好的扩展性,比较重要的一点是得益于 Dubbo SPI 加载机制。在学习它的设计理念,对可扩展性方面的编码思考也有一定的启发。

    个人博客: https://ytao.top
    关注公众号 【ytao】,更多原创好文
    我的公众号

  • 相关阅读:
    Mysql_常规操作
    三剑客
    Nginx_安全2
    shell_常规小脚本
    redis数据库持久化
    redis数据库操作
    keepalived高可用lvs集群
    ansible的roles角色
    keepalived高可用
    keepalived概念
  • 原文地址:https://www.cnblogs.com/ytao-blog/p/12580462.html
Copyright © 2011-2022 走看看