zoukankan      html  css  js  c++  java
  • Dubbo学习笔记7:Dubbo增强SPI与SPI中扩展点自动包装的实现原理

    在Dubbo整体架构分析中介绍了Dubbo中除了Service和Config层为API外,其他各层均为SPI,为SPI意味着下面各层都是组件化可以被替换的,也就是扩展性比较强,这也是Dubbo比较好的一点。

    JDK中标准SPI原理

    Dubbo增强的SPI功能是从JDK标准SPI演化而来的,所以有必要先讲讲标准SPI的原理。

    JDK中的SPI(Service Provider Interface)是面向接口编程的,服务规则提供者会在JRE的核心API里面提供服务访问接口,而具体实现则由其他开发商提供。

    例如规范制定者在rt.jar包里面定义了数据库的驱动接口java.sql.Driver。那么MySQL实现的开发商则会在MySQL的驱动包的META-INF文件夹下建立名称为java.sql.Driver的文件,文件内容就是MySQL对java.sql.Driver接口的实现类,如下图:

    如下代码可知 com.mysql.jdbc.Driver 就是实现了 java.sql.Driver 接口:

    public class com.mysql.jdbc.Driver extends com.mysql.jdbc.NonRegisteringDriver implements java.sql.Driver

    上面讲解了如何使用SPI扩展自定义自己的实现,下面来说说SPI实现原理,我们知道Java核心API,比如 rt.jar 包,是使用 Bootstrap ClassLoader 类加载器加载的,而用户提供的Jar包是由Appclassloader加载。并且我们知道如果一个类由类加载器A加载,那么这个类依赖的类也是由相同的类加载器加载。

    而用来搜索开发商提供的SPI扩展实现类的API类(ServiceLoader)是使用 Bootstrap ClassLoader加载的,那么ServiceLoader里面依赖的类应该也是由 Bootstrap ClassLoader 来加载,那么ServiceLoader里面依赖的类应该也是由 Bootstrap ClassLoader 来加载。而上面说了用户提供的包含SPI实现类的Jar包是由 Appclassloader加载,所以需要一种违反双亲委派模型的方法,线程上下文类加载器 ContextClassLoader 就是为了解决这个问题。

    下面我们写个测试代码,看看具体是如何工作了。

    复制代码
    public static void main(String[] args){
        // (1)
        ServiceLoader<Driver> loader = ServiceLoader.load(Driver.class);
        // (2)
        Iterator<Driver> iterator = loader.iterator();
        while(iterator.hasNext()){
            Driver driver = (Driver)iterator.next();
        // (3)
            System.out.println("driver:" + driver.getClass() + ",loader:"+driver.getClass().getClassLoa
    der());
        //(4)
        System.out.println("ServiceLoader loader:" + ServiceLoader.class.getClassLoader());
    }
    复制代码

    然后引入MySQL驱动的Jar包,执行结果如下。

    driver:class com.mysql.jdbc.Driver , loader:sun.misc.Launcher$AppClassLoader@4554617c
    current thread contextloader:sun.misc.Launcher$AppClassLoader@4554617c
    ServiceLoader loader:null

    从结果可知找到了MySQL的驱动,如果你在引入Oracle驱动的Jar包后再运行,则会输出找到了MySQL和Oracle的驱动,这也说明了,JDK标准的SPI会同时把SPI接口的所有实现类都提前加载好。

    另外从执行结果可以知道 ServiceLoader 的加载器 Bootstrap ,因为这里输出了null,并且从该类在 rt.jar 里面,也可以证明。

    下面我们来看下 ServiceLoader 的 load 方法源码。

    复制代码
    public final class ServiceLoader<S> implements Iterable<S>{
        public static <S> ServiceLoader<S> load(Class<S> service){
            // (5) 获取当前线程上下文加载器
            ClassLoader cl = Thread.currentThread().getContextClassLoader();
            return ServiceLoader.load(service,cl);
        }
    
        public static <S> ServiceLoader<S> load(Class<S> service,ClassLoader loader){
            return new ServiceLoader<>(service,loader);
        }
    
        // (6)
        private ServiceLoader(Class<S> svc,ClassLoader cl){
            service = svc;
            loader = cl;
            reload();
        }
    }
    复制代码

    代码(5)获取了当前线程上下文加载器,这里是AppClassLoader。

    代码(6)传递该类加载器到新构造的ServiceLoader的成员变量loader。那么这个loader什么时候使用的呢?下面我们看下LazyIterator的next()方法。

    复制代码
    public S next(){
        if(acc == null){
            return nextService();
        }else{
            PrivilegedAction<S> action = new PrivilegedAction<S>(){
                public S run(){return nextService();}
            }
            return AccessController.doPrivileged(action,acc);
        }
    }
    复制代码
    复制代码
    private S nextService(){
        ...
        try{
            // (7) 使用loader类加载器加载
            c = Class.forName(cn,false,loader);
        }catch(ClassNotFoundException x){
            fail(service,"Provider "+cn+" not found");
        }
        ...
    }
    复制代码

    代码(7)使用loader也就是AppClassLoader加载具体的驱动实现类。至于cn是怎么来的,读者可以参见 LazyIterator 的 hasNext() 方法。

    Dubbo增强SPI的实现

    Dubbo的扩展点加载机制是基于JDK标准的SPI扩展点发现机制增强而来的,Dubbo解决了JDK标准的SPI的以下问题:

    • JDK标准的SPI会一次性实例化扩展点所有实现,如果由扩展点实现初始化很耗时,但如果没用上也加载,会很浪费资源。
    • 如果扩展点加载失败,就失败了,给用户没有任何通知。比如:JDK标准的ScriptEngine,如果Ruby ScriptEngine因为所依赖的jruby.jar不存在,导致Ruby ScriptEngine类加载失败,这个失败原因被吃掉了,当用户执行ruby脚本时,会报空指针异常,而不是报Ruby ScriptEngine不存在。
    • 增加了对扩展点IoC和AOP的支持,一个扩展点可以直接setter注入其他扩展点,也可以对扩展点使用wrapper类进行增强。

    本节我们结合服务提供者配置类ServiceConfig来讲解如何使用增强SPI加载扩展接口Protocol实现类,在ServiceConfig类中,有如下代码:

    public class ServiceConfig<T> extends AbstractServiceConfig{
        private static final long serialVersionUID = 3033787999037024738L;
    
        private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
        ...
    }

    首先SPI扩展接口Protocol的接口定义如下:

    复制代码
    @SPI("dubbo")
    public interface Protocol{
        int getDefaultPort();
    
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    
        @Adaptive
        <T> Invoker<T> refer(Class<T> type,URL url) throws RpcException;
    
        void destroy();        
    }
    复制代码

     这里ExtensionLoader类似JDK标准SPI里面的ServiceLoader类,代码ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()的作用是获取Protocol接口的适配器类,在Dubbo中每个扩展接口都有一个对应的适配器类,这个适配器类是动态生成的一个类,后面会有讲解如何生成的,这里我们先给出Protocol扩展接口对应的适配器类的代码,如下:

    复制代码
    package com.alibaba.dubbo.rpc;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
        public void destroy(){
            throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public int getDefaultPort(){
            throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0,com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException{
            if(arg1 == null)
                throw new IllegalArgumentException("url == null");
            com.alibaba.dubbo.common.URL url = arg1;
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol);
            if(extName == null)
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.refer(arg0,arg1);
        }
    
        public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0){
            ...
            // (1)
            com.alibaba.dubbo.common.URL url = arg0.getUrl();
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if(extName == null)
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            // (2)
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
            // (3)
            return extension.export(arg0);    
        }
    }
    复制代码

    所以当我们调用 protocol.export(invoker) 方法的时候实际调用的是动态生成的Protocol$Adaptive 实例的 export(invoker) 方法,其内部代码(1)首先获取参数里面的URL对象,然后从URL对象里面获取用户设置的Protocol的实现类的名称,然后调用代码(2)根据名称获取具体的Protocol协议的实现类(后面我们会知道获取的是实现类被使用Wrapper类增强后的类),最后代码(3)具体调用Protocol协议的实现类 export(invoker) 方法。

    这里应该清楚的知道适配器类的存在目的就相当一个分发器,根据不同的参数,委托不同的实现类来做指定的事情,Dubbo实现上是把所有的参数封装到一URL对象里面,包含用户配置的参数,比如设置使用什么协议。另外这里也可以知道Dubbo并没有一次性加载所有扩展接口Protocol的实现类,而是根据URL里面协议类型只加载当前使用的扩展实现类。

    下面我们结合时序图来讲解ExtensionLoader的getAdaptiveExtension()方法是如何动态生成扩展接口对应的适配器类,以及getExtension方法如何根据扩展实现类的名称找到对应的实现类的:

    • 时序图步骤(1)获取当前扩展接口对应的ExtensionLoader对象,在Dubbo中每个扩展接口对应着自己的ExtensionLoader对象,如下代码,内部通过并发Map来缓存扩展接口与对应的ExtensionLoader的映射,其中key为扩展接口的Class对象,value为对应的ExtensionLoader实例:
    复制代码
    public static <T> ExtensionLoader<T> getExtensionLoader(class<T> type){
        if(type == null)
            throw new IllegalArgumentException("Extension type == null");
        if(!type.isInterface())
            throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
        if(!withExtensionAnnotation(type))
            throw new IllegalArgumentException("Extension type(" + type + ") is not extension,because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
        ExtensionLoader<T> loader = (ExtensionLoader<T>)EXTENSION_LOADERS.get(type);
        if(loader == null){
            EXTENSION_LOADERS.putIfAbsent(type,new ExtensionLoader<T>(type));
            loader = (Extension<T>)EXTENSION_LOADERS.get(type);
            return loader;
        }  
    }    
    复制代码
    private static final ConcurrentMap<Class<?>,ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>,ExtensionLoader<?>>();

    可知第一次访问某个扩展接口时候需要new一个对应的ExtensionLoader放入缓存,后面就直接从内存获取。

    • 步骤(2)获取当前扩展接口对应的适配器对象,内部是首先获取该扩展接口所有实现类的Class对象(注意,这里获取的是Class对象,并不是对象实例)。getAdaptiveExtension的代码如下:
    复制代码
    @SuppressWarnings("unchecked")
    public T getAdaptiveExtension(){
        Object instance = cachedAdaptiveInstance.get();
        if(instance == null){
            if(createAdaptiveInstanceError == null){
                sychronized(cachedAdaptiveInstance){
                    instance = cachedAdaptiveInstance.get();
                    if(instance == null){
                        try{
                            instance = createAdaptiveExtension();
                            cachedAdaptiveInstance.set(instance);
                        }catch(Throwable t){
                            createAdaptiveInstanceError = t;
                            throw new IllegalStateException("fail to create adaptive instance: " + t.toString(),t);    
                        }
                    }
                }
            }else{
                throw new IllegalStateException("fail to create adaptive instance:" + createAdaptiveInstanceError.toString(),createAdaptiveInstanceError);
            }
        }
        return (T)instance;
    }
    复制代码

    如上代码使用双重检查创建cachedAdaptiveInstance对象,接口对应的适配器对象就保存到了这个对象里面。

    • 我们重点看步骤(3)createAdaptiveExtension方法,因为具体创建适配器对象的是这个方法。createAdaptiveExtension代码如下:
    复制代码
    private T createAdaptiveExtension(){
        try{
            return injectExtension((T)getAdaptiveExtensionClass().newInstance());
        }catch(Exception e){
            throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(),e);
        }  
    }
    复制代码

    可知首先调用了步骤(4)getAdaptiveExtensionClass().newInstance()获取适配器对象的一个实例,然后调用步骤(7)injectExtension方法进行扩展点相互依赖注入。下面首先看下步骤(4)getAdaptiveExtensionClass()是如何动态生成适配器类的Class对象的。

    复制代码
    private Class<?> getAdaptiveExtensionClass(){
        getExtensionClasses();
        if(cachedAdaptiveClass != null){
            return cachedAdaptiveClass;
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }
    复制代码

    如上代码首先调用了步骤(5)getExtensionClasses获取了该扩展接口所有实现类的class对象,然后调用了步骤(6)createAdaptiveExtensionClass创建具体的适配器对象的Class对象,createAdaptiveExtensionClass代码如下:

    private Class<?> createAdaptiveExtensionClass(){
        String code = createAdaptiveExtensionClassCode();
        ClassLoader classLoader = findClassLoader();
        com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code,classLoader);
    }

    其中createAdaptiveExtensionClassCode方法是关键,该方法根据扩展接口生成其对应的适配器类的字符串代码,这里是根据protocol接口的代码生成对应的Protocol$Adaptive的字符串代码存放到变量code中,然后默认调用JavassistCompiler的compile(code,classLoader)根据字符串代码生成适配器的Class对象并返回,然后通过getAdaptiveExtensionClass对象并返回,然后通过getAdaptiveExtensionClass().newInstance()创建适配器类的一个对象实例。至此扩展接口的适配器对象已经创建完毕。

    •  下面我们在步骤(7)前面看看步骤(5)getExtensionClasses如何加载扩展接口的所有实现类的Class对象。其内部最终调用了loadExtensionClasses方法进行加载,loadExtensionClasses代码如下:
    复制代码
    private Map<String,Class<?>> loadExtensionClasses(){
        // 获取扩展接口上SPI注解
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        // 是否存在注解
        if(defaultAnnotation != null){
            String value = defaultAnnotation.value();
            if(value != null && (value = value.trim()).length() > 0){
                String[] names = NAME_SEPARATOR.split(value);
                if(name.length > 1){
                    throw new IllegalStateException("more than 1 default extension name on extension " + type.getName() + ":" + Arrays.toString(names));
                }
                // 默认实现类的名称放到cachedDefaultName
                if(names.length == 1) cachedDefaultName = names[0];
            }
        }
        // 在指定目录的jar里面查找扩展点
        Map<String,Class<?>> extensionClasses = new HashMap<String,Class<?>>();
        loadFile(extensionClasses,DUBBO_INTERNAL_DIRECTORY);    // META/dubbo/internal/
        loadFile(extensionClasses,DUBBO_DIRECTORY);     // META-INF/dubbo/
        loadFile(extensionClasses,SERVICES_DIRECTORY);    // META-INF/services/
        return extensionClasses;
    }
    复制代码

    如上代码,拿Protocol协议来说,这里SPI注解为 @SPI("dubbo") ,那么这里 cachedDefaultName 就是Dubbo。然后 META-INF/dubbo/internal/ 、 META-INF/dubbo 、 META-INF/services/ 目录下去加载具体的扩展实体类,比如Protocol协议默认实现类如下:

    • 步骤(7)injectExtension方法进行扩展点实现类相互依赖自动注入:
    复制代码
    private T injectExtension(T instance){
        try{
            if(objectFactory != null){
                // 遍历扩展点实现类所有的方法
                for(Method method : instance.getClass().getMethods()){
                    // 当前方法public的set方法,并且只有一个入参
                    if(method.getName().startsWith("set") && method.getParameterTypes().length == 1 && Modifier.isPublic(method.getModifiers())){
                        // 获取参数类型
                        Class<?> pt = method.getParameterTypes()[0];
                        try{
                            String property = method.getName().length() > 3 ? method.getName().substring(3,4).toLowerCase + method.getName().substring(4) : "";
                            // 如果是则反射调用set方法
                            if(object != null){
                                method.invoke(instance,object);
                            }
                        }catch(Exception e){
                            logger.error("fail to inject via method " + method.getName() +  " of  interface " + type.getName() + ":" + e.getMessage() , e);
                        }
                    }
                }
            }  
        }catch(Exception e){
            logger.error(e.getMessage() , e);
        }
        return instance;
    }
    复制代码

    Dubbo增强SPI中扩展点自动包装的实现原理

    在Spring AOP中我们可以使用多个切面对指定类的方法进行增强,在Dubbo中也提供了类似的功能,在Dubbo中你可以指定多个Wrapper类对指定的扩展点的实现类的方法进行增强。

    上节讲下面的调用代码时实际调用的是适配器 Protocol$Adaptive 的 export 方法,如果URL对象里面的 protocol 为 dubbo,那么在没有扩展点自动包装时,protocol.export返回的就是DubboProtocol的对象。

    Exporter<?> exporter = protocol.export(wrapperInvoker);

    而真正情况下Dubbo里面使用了ProtocolFilterWrapper、ProtocolListenerWrapper等Wrapper类对DubboProtocol对象进行了包装增强。

    ProtocolFilterWrapper、ProtocolListenerWrapper、DubboProtocol三个类都有一个拷贝构造函数,这个拷贝构造函数的参数就是扩展接口Protocol,所谓包装是指下面意思:

    复制代码
    public class XxxProtocolWrapper implements Protocol{
        private Protocol impl;
        public XxxProtocolWrapper(Protocol protocol){
            impl = protocol;
        }
        public void export(){
            // ...在调用DubboProtocol的export前做些事情
            impl.export();
            // ...在调用DubboProtocol的export后做些事情
        }
        ...
    }
    复制代码

    比如这里会进行两次包装,第一次可能首先使用ProtocolListenerWrapper类对DubboProtocol进行包装,这时候ProtocolListenerWrapper类里面的impl就是DubboProtocol,然后第二次使用 ProtocolFilterWrapper 对 ProtocolListenerWrapper进行包装,也就是ProtocolFilterWrapper里面的impl是ProtocolListenerWrapper,那么这时候调用适配器Protocol$Adaptive的export方法,如果URL对象里面的protocol为dubbo,那么在没有扩展点自动包装时候,这时候 protocol.export 返回的就是 ProtocolFilterWrapper的实例了。

    下面我们看下Dubbo增强的SPI中如何去收集的这些包装类,以及如何实现的使用包装类对SPI实现类的自动包装。

    借用上节的时序图,其实代码(5)getExtensionClasses 里面的 loadFile 方法除了加载扩展接口的所有实现类的Class对象外还对包装类(wrapper类)进行了收集,具体代码如下:

    复制代码
    private void loadFile(Map<String,Class<?>> extensionClasses,String dir){
        String fileName = dir + type.getName();
        try{
            ...
            if(urls != null){
                while(urls.hasMoreElements()){
                    java.net.URL url = urls.nextElement();
                    try{
                        BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(),"utf-8"));
                        try{
                            String line = null;
                            while((line = reader.readLine()) != null){
                                ...
                                if(line.length() > 0){
                                    try{
                                        ...
                                        if(line.length() > 0){
                                            ...
                                        }else{
                                            // (I) 这里判断SPI实现类是否有扩展接口为参数的拷贝构造函数
                                            try{
                                                clazz.getConstructor(type);
                                                Set<class<?>> wrappers = cachedWrapperClasses;
                                                if(wrappers == null){
                                                    cachedWrapperClasses = new ConcurrentHashset<Class<?>>();
                                                    wrappers = cachedWrapperClasses;
                                                }
                                                wrappers.add(clazz);
                                            }catch(NoSuchMethodException e){
                                                ...
                                            }
                                        }
                                    }catch(Throwable t){
                                        ...
                                    }
                                } 
                            }    // end of while read lines
                        } finally {
                            reader.close();
                        }
                    }catch(){
                        ...
                    }
                } // end of while urls    
            }    
        }catch(Throwable t){
            ...
        }
    }
    复制代码

    如上代码(I)处调用 clazz.getConstructor(type),这里是判断SPI实现类clazz是否有扩展接口 type 为参数的拷贝构造函数,如果没有直接抛异常 NoSuchMethodException,该异常被catch掉了,如果有则说明clazz类为wrapper类,则收集起来放入到 cachedWrapperClasses集合,到这里wrapper类的收集已经完毕。

    而具体对扩展实现类使用收集的wrapper类进行自动包装是在 createExtension 方法里做的:

    复制代码
    private T createExtension(String name){
        ...
        try{
            // cachedWrapperClasses里面有元素,即为wrapper类
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if(wrapperClasses != null && wrapperClasses.size() > 0){
                // 使用循环一层层对包装类进行包装,可以参考上面讲解的使用 ProtocolFilterWrapper / ProtocolListenerWrapper对DubboProtocol进行包装的流程
                for(Class<?> wrapperClass : wrapperClasses){
                    instance = injectExtension((T)wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        }catch(Throwable t){
            ...
        }
    }
    复制代码
  • 相关阅读:
    12.12 怀北滑雪场滑雪记
    datetime类型offset-aware与offset-navie
    Django-models中的choise
    什么是__name__()
    单机Ubuntu安装第二个Mysql服务
    Ubuntu下的Python安装
    设置mysql允许外网访问
    Ubuntu初次设置root密码
    使用VMware+Ubuntu,无法切换大小写的异常处理
    XShell/Xftp 无法连接 Ubuntu20
  • 原文地址:https://www.cnblogs.com/cnndevelop/p/12186955.html
Copyright © 2011-2022 走看看