zoukankan      html  css  js  c++  java
  • Dubbo SPI扩展点(二)

    Dubbo的拓展点

    在 Dubbo 中,拓展点分为以下三类:

    指定名称的扩展点:ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("name")。
    自适应扩展点:ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()。
    激活扩展点:ExtensionLoader.getExtensionLoader(Protocol.class).getActiveExtension。
    

    先来看前半段 :
    a.ExtensionLoader#getExtensionLoader

    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        if (type == null) {
            throw new IllegalArgumentException("Extension type == null");
        }
        if (!type.isInterface()) {
            throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
        }
        //首先会校验传入的type:是否为空、是否是interface、是否使用类SPI注解。
        if (!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type (" + type +
                                               ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
        }
        // 从缓存中获取该 loader
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            // 如果从缓存中获取不到,则new 一个,并且保存起来
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }
    

    b. ExtensionLoader 构造函数

    private ExtensionLoader(Class<?> type) {
        this.type = type;
        // 判断类型,很显然 这里会走后面
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }
    

    名称的扩展点(getExtension)

    咱们姑且先当这个ExtensionLoader实例 已经存在缓存中,那么我们直接进入到 getExtension("myLoadBalance") 这段代码流程中

    public T getExtension(String name) {
        if (StringUtils.isEmpty(name)) {
            throw new IllegalArgumentException("Extension name == null");
        }
        //如果name=true,表示返回一个默认的扩展点
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
        //缓存一下,如果实例已经加载过了,直接从缓存读取
        final Holder<Object> holder = getOrCreateHolder(name);
        Object instance = holder.get();
        if (instance == null) {
            synchronized (holder) {
                instance = holder.get();
                if (instance == null) {
                    //根据名称创建实例
                    instance = createExtension(name);
                    holder.set(instance);
                }
            }
        }
        return (T) instance;
    }
    

    createExtension:仍然是根据名称创建扩展

    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            initExtension(instance);
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                                            type + ") couldn't be instantiated: " + t.getMessage(), t);
        }
    }
    

    这个方法内主要做了以下三件事:

    • 加载指定路径下的文件内容,保存到集合中
    • 会对存在依赖注入的扩展点进行依赖注入
    • 会对存在Wrapper类的扩展点,实现扩展点的包装

    加载指定路径下的所有文件 getExtensionClasses()

    private Map<String, Class<?>> getExtensionClasses() {
        Map<String, Class<?>> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    // 真正加载类的方法
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
    }
    

    ExtensionLoader#loadExtensionClasses

     private Map<String, Class<?>> loadExtensionClasses() {
        cacheDefaultExtensionName();
    
        Map<String, Class<?>> extensionClasses = new HashMap<>();
        // 这里循环加载,
        for (LoadingStrategy strategy : strategies) {
            // 这里调用两次,可以从下面的参数中得知可能是为了做兼容
            loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
            loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
        }
    
        return extensionClasses;
    }
    

    我们可以断点看看这个strategies :

    这里对应的三个实现实质上是分别对应的三个拓展点配置目录:

    META-INF/dubbo/internal/
    META-INF/dubbo/
    META-INF/services/
    

    这里通过这 三个路径去把我们的拓展点加载出来并且缓存起来。

    这才使得我们 getExtension("myLoadBalance") 能拿到我们自己的实现.

    我们还需要关注的就是拓展点的包装 instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)),我们直接断点看一下:

    2.7.8 源码在此处有些许差别,但是最终也是如此包装。

    Protocol protocol =  ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("myProtocol");
    

    这里以 Protocol 为例,发现 cachedWrapperClasses 里面有3个 wrapper类,且返回的 instance 并不是一个 DubboProtocol 这么简单,经过了层层包装。那么为什么呢?我们来看一下 Protocol 拓展点文件:

    这里我们可以得出结论,在加载拓展点指定文件的时候,具有Wrapper 实现的时候,会将Wrapper 缓存到 cachedWrapperClasses 集合中,且会将这些拓展点进行包装。

    扩展:

    • ProtocolListenerWrapper, 用于服务export时候插入监听机制
    • QosprotocolWrapper, 如果当前配置了注册中心,则会启动一个Qos server.qos是dubbo的在线运维命令,dubbo2.5.8新版本重构了telnet模块,提供了新的telnet命令支持,新版本的telnet端口与dubbo协议的端口是不同的端口,默认为22222
    • ProtocolFilterWrapper,对invoker进行filter的包装,实现请求的过滤

    自适应扩展点(getAdaptiveExtension)

    在下面这个例子中,我们传入一个Protocol接口,它会返回一个AdaptiveProtocol。这个就叫自适应。

    Protocol protocol =  ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    
    @SPI("dubbo")
    public interface Protocol {
      
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
       
        @Adaptive
        <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    
    }
    

    我们可以看到 Protocol这个类的 export方法上面有一个注解@Adaptive。 这个就是一个自适应扩展点的标识。它可以修饰在类上,也可以修饰在方法上面。这两者有什么区别呢? 简单来说,放在类上,说明当前类是一个确定的自适应扩展点的类。如果是放在方法级别,那么需要生成一个动态字节码,来进行转发。 拿Protocol这个接口来说,它里面定义了export和refer两个抽象方法,这两个方法分别带有@Adaptive的标识,标识是一个自适应方法。 我们知道Protocol是一个通信协议的接口,具体有多种实现,那么这个时候选择哪一种呢? 取决于我们在使用dubbo的时候所配置的协议名称。而这里的方法层面的Adaptive就决定了当前这个方法会采用何种协议来发布服务。

    ExtensionLoader#getAdaptiveExtension

    这个方法主要就是要根据传入的接口返回一个自适应的实现类:

    public T getAdaptiveExtension() {
        // 先从缓存中获取
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if (createAdaptiveInstanceError != null) {
                throw new IllegalStateException("Failed to create adaptive instance: " +
                                                createAdaptiveInstanceError.toString(),
                                                createAdaptiveInstanceError);
            }
           
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        }
    
        return (T) instance;
    }
    

    ExtensionLoader#createAdaptiveExtension

    这个方法中做两个事情:

    • 获得一个自适应扩展点实例
    • 实现依赖注入
    private T createAdaptiveExtension() {
        try {
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }
    

    ExtensionLoader#getAdaptiveExtensionClass

    private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }
    

    getExtensionClasses()这个方法在前面讲过了,会加载当前传入的类型的所有扩展点,保存在一个hashmap中 这里有一个判断逻辑,如果 cachedApdaptiveClas!=null ,直接返回这个cachedAdaptiveClass,这个cachedAdaptiveClass是一个什么?

    cachedAdaptiveClass是在 加载解析/META-INF/dubbo下的扩展点的时候加载进来的。在加载完之后如果这个类有@Adaptive标识,则会赋值赋值而给cachedAdaptiveClass。

    createAdaptiveExtensionClass:动态生成字节码,然后进行动态加载。那么这个时候锁返回的class,如果加载的是Protocol.class,应该是Protocol$Adaptive 这个cachedDefaultName实际上就是扩展点接口的@SPI注解对应的名字,如果此时加载的是Protocol.class,那么cachedDefaultName=dubbo

    private Class<?> createAdaptiveExtensionClass() {
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        ClassLoader classLoader = findClassLoader();
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }
    

    如下 ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 生成的自适应拓展点就是:

    import org.apache.dubbo.common.extension.ExtensionLoader;
    public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
        public void destroy()  {
            throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
        }
        public int getDefaultPort()  {
            throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
        }
        public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
            if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
            org.apache.dubbo.common.URL url = arg0.getUrl();
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
            org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.export(arg0);
        }
        public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
            if (arg1 == null) throw new IllegalArgumentException("url == null");
            org.apache.dubbo.common.URL url = arg1;
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
            org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.refer(arg0, arg1);
        }
        public java.util.List getServers()  {
            throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
        }
    }
    

    我们可以看到在export方法中,会先通过String extName = (url.getProtocol() == null ? “dubbo” : url.getProtocol());去获取extName, 在我们之前的例子中,url.getProtocol()方法返回的应该是"myProtocol",所以extName是myProtocol,然后通过下面代码去获取Protocol的实例:

    org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
    

    关于objectFactory

    在injectExtension这个方法中,我们发现入口出的代码首先判断了objectFactory这个对象是否为空。这个是在哪里初始化的呢?实际上我们在获得ExtensionLoader的时候,就对objectFactory进行了初始化。

    private ExtensionLoader(Class<?> type) {
      this.type = type;
      objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }
    

    然后通过 ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()去获得一个自适应的扩展点,进 入 ExtensionFactory 这个接口中,可以看到它是一个扩展点,并且有一个自己实现的自适应扩展点 AdaptiveExtensionFactory;

    @Adaptive
    public class AdaptiveExtensionFactory implements ExtensionFactory {
    
        private final List<ExtensionFactory> factories;
    
        public AdaptiveExtensionFactory() {
            ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
            List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
            for (String name : loader.getSupportedExtensions()) {
                list.add(loader.getExtension(name));
            }
            factories = Collections.unmodifiableList(list);
        }
    
        @Override
        public <T> T getExtension(Class<T> type, String name) {
            for (ExtensionFactory factory : factories) {
                T extension = factory.getExtension(type, name);
                if (extension != null) {
                    return extension;
                }
            }
            return null;
        }
    }
    

    注意:@Adaptive 加载到类上表示这是一个自定义的适配器类,表示我们再调用 getAdaptiveExtension 方法的时候,不需要走上面这么复杂的过程。会直接加载到 AdaptiveExtensionFactory。

    我们可以看到除了自定义的自适应适配器类以外,还有两个实现类,一个是SPI,一个是Spring,AdaptiveExtensionFactory轮询这2个,从一个中获取到就返回。

    激活扩展点(getActiveExtension)

    自动激活扩展点,有点类似 springboot 的时候用到的 conditional,根据条件进行自动激活。但是这里设计的初衷是,对于一个类会加载多个扩展点的实现,这个时候可以通过自动激活扩展点进行动态加载, 从而简化配置我们的配置工作

    @Activate提供了一些配置来允许我们配置加载条件,比如group过滤,比如key过滤。举个例子,我们可以看看org.apache.dubbo.Filter这个类,它有非常多的实现,比如说CacheFilter,这个缓存过滤器,配置信息如下group表示客户端和和服务端都会加载,value表示url中有cache_key的时候:

    @Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY)
    public class CacheFilter implements Filter {
    
        private CacheFactory cacheFactory;
    
        //...
    }
    

    通过下面这段代码,演示关于 Filter 的自动激活扩展点的效果。

    参考:

    org.apache.dubbo 2.7.7 服务发布注册源码
    Dubbo - Dubbo的SPI机制

  • 相关阅读:
    [Java优化] Java代码细节优化
    [网址] 工具类网址
    [Hadoop] 大数据环境安装博客
    简单说下Netty和RPC吧,大佬绕行
    nginx二级域名配置[CentOS]
    java面试准备-自我介绍
    同步||异步&&阻塞||非阻塞
    关于多线程的知识点-02
    [java进阶]关于多线程的知识点
    [java进阶]关于多线程的知识点
  • 原文地址:https://www.cnblogs.com/snail-gao/p/14166162.html
Copyright © 2011-2022 走看看