zoukankan      html  css  js  c++  java
  • Dubbo解析之SPI和自适应扩展

    在继续深入Dubbo之前,必须先要明白Dubbo中的SPI机制。

    一、背景

    1、来源

    Dubbo 的扩展点加载从 JDK 标准的 SPI (Service Provider Interface) 扩展点发现机制加强而来。但还有所不同,它改进了JDK标准的 SPI的以下问题:
    • JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源。
    • 如果扩展点加载失败,连扩展点的名称都拿不到。比如:JDK 标准的 ScriptEngine,通过 getName() 获取脚本类型的名称,但如果 RubyScriptEngine 因为所依赖的 jruby.jar 不存在,导致 RubyScriptEngine 类加载失败,这个失败原因被吃掉了,和 ruby 对应不起来,当用户执行 ruby 脚本时,会报不支持 ruby,而不是真正失败的原因。
    • 增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点。

    2、约定

    在扩展类的 jar 包内,放置扩展点配置文件 META-INF/dubbo/接口全限定名,内容为:配置名=扩展实现类全限定名,多个实现类用换行符分隔。

    3、配置文件

    Dubbo SPI 所需的配置文件需放置在 META-INF/dubbo 路径下,几乎所有的功能都有扩展点实现。
    0
    以Protocol接口为例,它里面有很多实现。
    0 

    二、Dubbo SPI

    通过上图我们可以看到,Dubbo SPI和JDK SPI配置的不同,在Dubbo SPI中可以通过键值对的方式进行配置,这样就可以按需加载指定的实现类。 Dubbo SPI的相关逻辑都被封装到ExtensionLoader类中,通过ExtensionLoader我们可以加载指定的实现类,一个扩展接口就对应一个ExtensionLoader对象,在这里我们把它亲切的称为:扩展点加载器。
    我们先看下它的属性:
    public class ExtensionLoader<T> {
        
        //扩展点配置文件的路径,可以从3个地方加载到扩展点配置文件
        private static final String SERVICES_DIRECTORY = "META-INF/services/";
        private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
        private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";   
        //扩展点加载器的集合
        private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
        //扩展点实现的集合
        private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
        //扩展点名称和实现的映射缓存
        private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
        //拓展点实现类集合缓存
        private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
        //扩展点名称和@Activate的映射缓存
        private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();
        //扩展点实现的缓存
        private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();
    }
    ExtensionLoader会把不同的扩展点配置和实现都缓存起来。同时,Dubbo在官网上也给了我们提醒:扩展点使用单一实例加载(请确保扩展实现的线程安全性),缓存在 ExtensionLoader中。下面我们看几个重点方法。

    1、获取扩展点加载器

    我们首先通过ExtensionLoader.getExtensionLoader() 方法获取一个 ExtensionLoader 实例,它就是扩展点加载器。然后再通过 ExtensionLoader 的 getExtension 方法获取拓展类对象。这其中,getExtensionLoader 方法用于从缓存中获取与拓展类对应的 ExtensionLoader,若缓存未命中,则创建一个新的实例。
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        if (type == null)
            throw new IllegalArgumentException("Extension type == null");
        if (!type.isInterface()) {
            throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
        }
        if (!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type(" + type +
                    ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
        }
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }
    比如可以通过下面这样,来获取Protocol接口的ExtensionLoader实例:
    ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);
    就可以拿到扩展点加载器的对象实例:
    com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.rpc.Protocol]

    2、获取扩展类对象

    上一步我们已经拿到加载器,然后可以根据加载器实例,通过扩展点的名称获取扩展类对象。
    public T getExtension(String name) {
        //校验扩展点名称的合法性
        if (name == null || name.length() == 0)
            throw new IllegalArgumentException("Extension name == null");
        // 获取默认的拓展实现类
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
        //用于持有目标对象
        Holder<Object> holder = cachedInstances.get(name);
        if (holder == null) {
            cachedInstances.putIfAbsent(name, new Holder<Object>());
            holder = cachedInstances.get(name);
        }
        Object instance = holder.get();
        if (instance == null) {
            synchronized (holder) {
                instance = holder.get();
                if (instance == null) {
                    instance = createExtension(name);
                    holder.set(instance);
                }
            }
        }
        return (T) instance;
    }
     
    它先尝试从缓存中获取,未命中则创建扩展对象。那么它的创建过程是怎样的呢?
    private T createExtension(String name) {
        //从配置文件中获取所有的扩展类,Map数据结构
        //然后根据名称获取对应的扩展类
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            //通过反射创建实例,然后放入缓存
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            //注入依赖
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
                // 包装为Wrapper实例
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                    type + ")  could not be instantiated: " + t.getMessage(), t);
        }
    }
     
    这里的重点有两个,依赖注入和Wrapper包装类,它们是Dubbo中IOC 与 AOP 的具体实现。

    2.1、依赖注入

    向拓展对象中注入依赖,它会获取类的所有方法。判断方法是否以 set 开头,且方法仅有一个参数,且方法访问级别为 public,就通过反射设置属性值。所以说,Dubbo中的IOC仅支持以setter方式注入。
    private T injectExtension(T instance) {
        try {
            if (objectFactory != null) {
                for (Method method : instance.getClass().getMethods()) {
                    if (method.getName().startsWith("set")
                            && method.getParameterTypes().length == 1
                            && Modifier.isPublic(method.getModifiers())) {
                        Class<?> pt = method.getParameterTypes()[0];
                        try {
                            String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                            Object object = objectFactory.getExtension(pt, property);
                            if (object != null) {
                                method.invoke(instance, object);
                            }
                        } catch (Exception e) {
                            logger.error("fail to inject via method " + method.getName()
                                    + " of interface " + type.getName() + ": " + e.getMessage(), e);
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }

    2.2、Wrapper

    它会将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例。 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量。说起来可能比较绕,我们直接看下它最后生成的对象就明白了。 我们以DubboProtocol为例,它包装后的对象为:
    0
     综上所述,如果我们获取一个扩展类对象,最后拿到的就是这个Wrapper类的实例。 就像这样:
    ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);
    Protocol extension = extensionLoader.getExtension("dubbo");
    System.out.println(extension);
    输出为:com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper@4cdf35a9

    3、获取所有的扩展类

    在通过名称获取扩展类对象之前,首先需要根据配置文件解析出所有的扩展类,它是一个扩展点名称和扩展类的映射表Map<string, class<?="">>。首先,还是从缓存中cachedClasses获取,如果没有就调用loadExtensionClasses从配置文件中加载。配置文件有三个路径:
    • META-INF/services/
    • META-INF/dubbo/
    • META-INF/dubbo/internal/
    先尝试从缓存中获取。
    private Map<String, Class<?>> getExtensionClasses() {
        //从缓存中获取
        Map<String, Class<?>> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    //加载扩展类
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
    } 
     
    如果没有,就调用loadExtensionClasses从配置文件中读取。
    private Map<String, Class<?>> loadExtensionClasses() {
        //获取 SPI 注解,这里的 type 变量是在调用 getExtensionLoader 方法时传入的
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if (defaultAnnotation != null) {
            String value = defaultAnnotation.value();
            if ((value = value.trim()).length() > 0) {
                String[] names = NAME_SEPARATOR.split(value);
                if (names.length > 1) {
                    throw new IllegalStateException("more than 1 default extension 
                        name on extension " + type.getName()+ ": " + Arrays.toString(names));
                }
                //设置默认的扩展名称,参考getDefaultExtension 方法
                //如果名称为true,就是调用默认扩赞类
                if (names.length == 1) cachedDefaultName = names[0];
            }
        }
        //加载指定路径的配置文件
        Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
        loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
        loadDirectory(extensionClasses, DUBBO_DIRECTORY);
        loadDirectory(extensionClasses, SERVICES_DIRECTORY);
        return extensionClasses;
    }
    以Protocol接口为例,获取到的实现类集合如下,我们就可以根据名称加载具体的扩展类对象。
    {
        registry=class com.alibaba.dubbo.registry.integration.RegistryProtocol
        injvm=class com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
        thrift=class com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
        mock=class com.alibaba.dubbo.rpc.support.MockProtocol
        dubbo=class com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
        http=class com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
        redis=class com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
        rmi=class com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
    }

    三、自适应扩展机制

    在Dubbo中,很多拓展都是通过 SPI 机制进行加载的,比如 Protocol、Cluster、LoadBalance 等。这些扩展并非在框架启动阶段就被加载,而是在扩展方法被调用的时候,根据URL对象参数进行加载。 那么,Dubbo就是通过自适应扩展机制来解决这个问题。
    自适应拓展机制的实现逻辑是这样的: 首先 Dubbo 会为拓展接口生成具有代理功能的代码。然后通过 javassist 或 jdk 编译这段代码,得到 Class 类。最后再通过反射创建代理类,在代理类中,就可以通过URL对象的参数来确定到底调用哪个实现类。

    1、Adaptive注解

    在开始之前,我们有必要先看一下与自适应拓展息息相关的一个注解,即 Adaptive 注解。
    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE, ElementType.METHOD})
    public @interface Adaptive {
        String[] value() default {};
    }
    从上面的代码中可知,Adaptive 可注解在类或方法上。
    • 标注在类上

        Dubbo 不会为该类生成代理类。

    • 标注在方法上

        Dubbo 则会为该方法生成代理逻辑,表示当前方法需要根据 参数URL 调用对应的扩展点实现。

    2、获取自适应拓展类

    getAdaptiveExtension 方法是获取自适应拓展的入口方法。
    public T getAdaptiveExtension() {
        // 从缓存中获取自适应拓展
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if (createAdaptiveInstanceError == null) {
                synchronized (cachedAdaptiveInstance) {
                    instance = cachedAdaptiveInstance.get();
                    //未命中缓存,则创建自适应拓展,然后放入缓存
                    if (instance == null) {
                        try {
                            instance = createAdaptiveExtension();
                            cachedAdaptiveInstance.set(instance);
                        } catch (Throwable t) {
                            createAdaptiveInstanceError = t;
                            throw new IllegalStateException("fail to create 
                                                      adaptive instance: " + t.toString(), t);
                        }
                    }
                }
            }
        }
        return (T) instance;
    } 
    getAdaptiveExtension方法首先会检查缓存,缓存未命中,则调用 createAdaptiveExtension方法创建自适应拓展。
    private T createAdaptiveExtension() {
        try {
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("
                Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }
    这里的代码较少,调用 getAdaptiveExtensionClass方法获取自适应拓展 Class 对象,然后通过反射实例化,最后调用injectExtension方法向拓展实例中注入依赖。 获取自适应扩展类过程如下:
    private Class<?> getAdaptiveExtensionClass() {
        //获取当前接口的所有实现类
        //如果某个实现类标注了@Adaptive,此时cachedAdaptiveClass不为空
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        //以上条件不成立,就创建自适应拓展类
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }
     
    在上面方法中,它会先获取当前接口的所有实现类,如果某个实现类标注了@Adaptive,那么该类就被赋值给cachedAdaptiveClass变量并返回。如果没有,就调用createAdaptiveExtensionClass创建自适应拓展类。

    3、创建自适应拓展类

    createAdaptiveExtensionClass方法用于生成自适应拓展类,该方法首先会生成自适应拓展类的源码,然后通过 Compiler 实例(Dubbo 默认使用 javassist 作为编译器)编译源码,得到代理类 Class 实例。
    private Class<?> createAdaptiveExtensionClass() {
        //构建自适应拓展代码
        String code = createAdaptiveExtensionClassCode();
        ClassLoader classLoader = findClassLoader();
        // 获取编译器实现类 这个Dubbo默认是采用javassist 
        Compiler compiler =ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();
        //编译代码,返回类实例的对象
        return compiler.compile(code, classLoader);
    } 
    在生成自适应扩展类之前,Dubbo会检查接口方法是否包含@Adaptive。如果方法上都没有此注解,就要抛出异常。
    if (!hasAdaptiveAnnotation){
        throw new IllegalStateException(
            "No adaptive method on extension " + type.getName() + ", 
              refuse to create the adaptive class!");
    }
    我们还是以Protocol接口为例,它的export()和refer()方法,都标注为@Adaptive。destroy和 getDefaultPort未标注 @Adaptive注解。Dubbo 不会为没有标注 Adaptive 注解的方法生成代理逻辑,对于该种类型的方法,仅会生成一句抛出异常的代码。
    package com.alibaba.dubbo.rpc;
    import com.alibaba.dubbo.common.URL;
    import com.alibaba.dubbo.common.extension.Adaptive;
    import com.alibaba.dubbo.common.extension.SPI;
    
    @SPI("dubbo")
    public interface Protocol {
        int getDefaultPort();
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
        @Adaptive
        <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
        void destroy();
    } 
    所以说当我们调用这两个方法的时候,会先拿到URL对象中的协议名称,再根据名称找到具体的扩展点实现类,然后去调用。下面是生成自适应扩展类实例的源代码:
    package com.viewscenes.netsupervisor.adaptive;
    
    import com.alibaba.dubbo.common.URL;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    import com.alibaba.dubbo.rpc.Exporter;
    import com.alibaba.dubbo.rpc.Invoker;
    import com.alibaba.dubbo.rpc.Protocol;
    import com.alibaba.dubbo.rpc.RpcException;
    
    public class Protocol$Adaptive implements Protocol {
        public void destroy() {
            throw new UnsupportedOperationException(
                    "method public abstract void Protocol.destroy() of interface Protocol is not adaptive method!");
        }
        public int getDefaultPort() {
            throw new UnsupportedOperationException(
                    "method public abstract int Protocol.getDefaultPort() of interface Protocol is not adaptive method!");
        }
        public Exporter export(Invoker invoker)throws RpcException {
            if (invoker == null) {
                throw new IllegalArgumentException("Invoker argument == null");
            }
            if (invoker.getUrl() == null) {
                throw new IllegalArgumentException("Invoker argument getUrl() == null");
            }
                
            URL url = invoker.getUrl();
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if (extName == null) {
                throw new IllegalStateException("Fail to get extension(Protocol) name from url("
                        + url.toString() + ") use keys([protocol])");
            }
                
            Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
            return extension.export(invoker);
        }
        public Invoker refer(Class clazz,URL ur)throws RpcException {
            if (ur == null) {
                throw new IllegalArgumentException("url == null");
            }
            URL url = ur;
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if (extName == null) {
                throw new IllegalStateException("Fail to get extension(Protocol) name from url("+ url.toString() + ") use keys([protocol])");
            }
            Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
            return extension.refer(clazz, url);
        }
    }
    综上所述,当我们获取某个接口的自适应扩展类,实际就是一个Adaptive类实例。
    ExtensionLoader<Protocol> extensionLoader = 
                      ExtensionLoader.getExtensionLoader(Protocol.class);            
    Protocol adaptiveExtension = extensionLoader.getAdaptiveExtension();
    System.out.println(adaptiveExtension);
    输出为: com.alibaba.dubbo.rpc.Protocol$Adaptive@47f6473

    四、实例

    看完以上流程之后,如果想写一套自己的逻辑替换Dubbo中的流程,就变得很简单。Dubbo默认使用dubbo协议来暴露服务。可以搞一个自定义的协议来替换它。

    1、实现类

    首先,我们创建一个MyProtocol类,它实现Protocol接口。
    package com.viewscenes.netsupervisor.protocol;
    
    import com.alibaba.dubbo.common.URL;
    import com.alibaba.dubbo.rpc.Exporter;
    import com.alibaba.dubbo.rpc.Invoker;
    import com.alibaba.dubbo.rpc.Protocol;
    import com.alibaba.dubbo.rpc.RpcException;
    import com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol;
    
    public class MyProtocol extends DubboProtocol implements Protocol{
    
        public int getDefaultPort() {
            return 28080;
        }
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 
            URL url = invoker.getUrl();
            System.out.println("自定义协议,进行服务暴露:"+url);    
            return super.export(invoker);
        }
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            return super.refer(type, url);
        }
        public void destroy() {}
    }

    2、扩展点配置文件

    然后,在自己的项目中META-INF/services创建com.alibaba.dubbo.rpc.Protocol文件,文件内容为:
    myProtocol=com.viewscenes.netsupervisor.protocol.MyProtocol

    3、修改Dubbo配置文件

    最后修改生产者端的配置文件:
    <!-- 用自定义协议在20880端口暴露服务 -->
    <dubbo:protocol name="myProtocol" port="20880"/>   
    这样在启动生产者端项目的时候,Dubbo在进行服务暴露的过程中,就会调用到我们自定义的MyProtocol类,完成相应的逻辑处理。
  • 相关阅读:
    buildroot编译文件系统,出现mount: you must be root. can't open /dev/console: Permission denied这种错误。
    Ubuntu 18.04实现实时显示网速
    git 第一次push 出现fatal: 无法读取远程仓库。
    Sql2012 AlwaysOn
    SCVMM 安装
    POJ 3537 Crosses and Crosses
    HDOJ 1524 A Chess Game
    HDOJ 1907 John
    HDOJ 1848 Fibonacci again and again
    HDOJ 1536 S-Nim
  • 原文地址:https://www.cnblogs.com/johnvwan/p/15650115.html
Copyright © 2011-2022 走看看