zoukankan      html  css  js  c++  java
  • 1. 源码分析---SOFARPC可扩展的机制SPI

    这几天离职在家,正好没事可以疯狂的输出一下,本来想写DUBBO的源码解析的,但是发现写DUBBO源码的太多了,所以找一个写的不那么多的框架,所以就选中SOFARPC这个框架了。

    SOFARPC是蚂蚁金服开源的一个RPC框架,相比DUBBO它没有这么多历史的包袱,代码更加简洁,设计思路更加清晰,更加容易去理解其中的代码。

    那么为什么要去重写原生的SPI呢?官方给出了如下解释:

    1. 按需加载
    2. 可以有别名
    3. 可以有优先级进行排序和覆盖
    4. 可以控制是否单例
    5. 可以在某些场景下使用编码
    6. 可以指定扩展配置位置
    7. 可以排斥其他扩展点

    整个流程如下:

    我们以ConsumerBootstrap为例:

    先要有一个抽象类:

    @Extensible(singleton = false)
    public abstract class ConsumerBootstrap<T> {
        ....
    }
    

    指定扩展实现类:

    @Extension("sofa")
    public class DefaultConsumerBootstrap<T> extends ConsumerBootstrap<T> {
        ...
    }
    

    扩展描述文件META-INF/services/sofa-rpc/com.alipay.sofa.rpc.bootstrap.ConsumerBootstrap

    sofa=com.alipay.sofa.rpc.bootstrap.DefaultConsumerBootstrap
    

    当这些准备完成后,直接调用即可。

    ConsumerBootstrap sofa =  ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");
    

    接下来我们看看ExtensionLoaderFactory的源码

        /**
         * All extension loader {Class : ExtensionLoader}
         * 这个map里面装的是所有ExtensionLoader
         */
        private static final ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();
    
    
        public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) {
            ExtensionLoader<T> loader = LOADER_MAP.get(clazz);
            if (loader == null) {
                //get不到则加上锁
                synchronized (ExtensionLoaderFactory.class) {
                    //防止其他线程操作再get一次
                    loader = LOADER_MAP.get(clazz);
                    if (loader == null) {
                        loader = new ExtensionLoader<T>(clazz, listener);
                        LOADER_MAP.put(clazz, loader);
                    }
                }
            }
            return loader;
        }
    

    然后我们看一下ExtensionLoader这个类的构造器

        protected ExtensionLoader(Class<T> interfaceClass, boolean autoLoad, ExtensionLoaderListener<T> listener) {
            //如果正在执行关闭,则将属性置空后直接返回
            if (RpcRunningState.isShuttingDown()) {
                this.interfaceClass = null;
                this.interfaceName = null;
                this.listener = null;
                this.factory = null;
                this.extensible = null;
                this.all = null;
                return;
            }
            // 接口为空,既不是接口,也不是抽象类
            if (interfaceClass == null ||
                    !(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers()))) {
                throw new IllegalArgumentException("Extensible class must be interface or abstract class!");
            }
            //当前加载的接口类名
            this.interfaceClass = interfaceClass;
            //接口名字
            this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass);
            this.listener = listener;
            //接口上必须要有Extensible注解
            Extensible extensible = interfaceClass.getAnnotation(Extensible.class);
            if (extensible == null) {
                throw new IllegalArgumentException(
                        "Error when load extensible interface " + interfaceName + ", must add annotation @Extensible.");
            } else {
                this.extensible = extensible;
            }
            // 如果是单例,那么factory不为空
            this.factory = extensible.singleton() ? new ConcurrentHashMap<String, T>() : null;
            //这个属性里面是这个接口的所有实现类
            this.all = new ConcurrentHashMap<String, ExtensionClass<T>>();
            if (autoLoad) {
                //获取到扩展点加载的路径
                List<String> paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH);
                for (String path : paths) {
                    //根据路径加载文件
                    loadFromFile(path);
                }
            }
        }
    

    拿到所有的扩展点加载的路径后进入到loadFromFile中进行文件的加载

        protected synchronized void loadFromFile(String path) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Loading extension of extensible {} from path: {}", interfaceName, path);
            }
            // 默认如果不指定文件名字,就是接口名
            String file = StringUtils.isBlank(extensible.file()) ? interfaceName : extensible.file().trim();
            String fullFileName = path + file;
            try {
                ClassLoader classLoader = ClassLoaderUtils.getClassLoader(getClass());
                loadFromClassLoader(classLoader, fullFileName);
            } catch (Throwable t) {
                if (LOGGER.isErrorEnabled()) {
                    LOGGER.error("Failed to load extension of extensible " + interfaceName + " from path:" + fullFileName,
                        t);
                }
            }
        }
        
        
        protected void loadFromClassLoader(ClassLoader classLoader, String fullFileName) throws Throwable {
            Enumeration<URL> urls = classLoader != null ? classLoader.getResources(fullFileName)
                : ClassLoader.getSystemResources(fullFileName);
            // 可能存在多个文件。
            if (urls != null) {
                while (urls.hasMoreElements()) {
                    // 读取一个文件
                    URL url = urls.nextElement();
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Loading extension of extensible {} from classloader: {} and file: {}",
                            interfaceName, classLoader, url);
                    }
                    BufferedReader reader = null;
                    try {
                        reader = new BufferedReader(new InputStreamReader(url.openStream(), "UTF-8"));
                        String line;
                        while ((line = reader.readLine()) != null) {
                            readLine(url, line);
                        }
                    } catch (Throwable t) {
                        if (LOGGER.isWarnEnabled()) {
                            LOGGER.warn("Failed to load extension of extensible " + interfaceName
                                + " from classloader: " + classLoader + " and file:" + url, t);
                        }
                    } finally {
                        if (reader != null) {
                            reader.close();
                        }
                    }
                }
            }
        }
    

    接下来进入到readLine,这个方法主要是读取prop文件里面的每一行记录,并加载该实现类的类文件校验完后将文件添加到all属性中

        protected void readLine(URL url, String line) {
            //读取文件里面的一行记录,并将这行记录用=号分割
            String[] aliasAndClassName = parseAliasAndClassName(line);
            if (aliasAndClassName == null || aliasAndClassName.length != 2) {
                return;
            }
            //别名
            String alias = aliasAndClassName[0];
            //包名
            String className = aliasAndClassName[1];
            // 读取配置的实现类
            Class tmp;
            try {
                tmp = ClassUtils.forName(className, false);
            } catch (Throwable e) {
                if (LOGGER.isWarnEnabled()) {
                    LOGGER.warn("Extension {} of extensible {} is disabled, cause by: {}",
                        className, interfaceName, ExceptionUtils.toShortString(e, 2));
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Extension " + className + " of extensible " + interfaceName + " is disabled.", e);
                }
                return;
            }
            if (!interfaceClass.isAssignableFrom(tmp)) {
                throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
                    " from file:" + url + ", " + className + " is not subtype of interface.");
            }
            Class<? extends T> implClass = (Class<? extends T>) tmp;
    
            // 检查是否有可扩展标识
            Extension extension = implClass.getAnnotation(Extension.class);
            if (extension == null) {
                throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
                    " from file:" + url + ", " + className + " must add annotation @Extension.");
            } else {
                String aliasInCode = extension.value();
                if (StringUtils.isBlank(aliasInCode)) {
                    // 扩展实现类未配置@Extension 标签
                    throw new IllegalArgumentException("Error when load extension of extensible " + interfaceClass +
                        " from file:" + url + ", " + className + "'s alias of @Extension is blank");
                }
                if (alias == null) {
                    // spi文件里没配置,用代码里的
                    alias = aliasInCode;
                } else {
                    // spi文件里配置的和代码里的不一致
                    if (!aliasInCode.equals(alias)) {
                        throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
                            " from file:" + url + ", aliases of " + className + " are " +
                            "not equal between " + aliasInCode + "(code) and " + alias + "(file).");
                    }
                }
                // 接口需要编号,实现类没设置
                if (extensible.coded() && extension.code() < 0) {
                    throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
                        " from file:" + url + ", code of @Extension must >=0 at " + className + ".");
                }
            }
            // 不可以是default和*
            if (StringUtils.DEFAULT.equals(alias) || StringUtils.ALL.equals(alias)) {
                throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
                    " from file:" + url + ", alias of @Extension must not "default" and "*" at " + className + ".");
            }
            // 检查是否有存在同名的
            ExtensionClass old = all.get(alias);
            ExtensionClass<T> extensionClass = null;
            if (old != null) {
                // 如果当前扩展可以覆盖其它同名扩展
                if (extension.override()) {
                    // 如果优先级还没有旧的高,则忽略
                    if (extension.order() < old.getOrder()) {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Extension of extensible {} with alias {} override from {} to {} failure, " +
                                "cause by: order of old extension is higher",
                                interfaceName, alias, old.getClazz(), implClass);
                        }
                    } else {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Extension of extensible {} with alias {}: {} has been override to {}",
                                interfaceName, alias, old.getClazz(), implClass);
                        }
                        // 如果当前扩展可以覆盖其它同名扩展
                        extensionClass = buildClass(extension, implClass, alias);
                    }
                }
                // 如果旧扩展是可覆盖的
                else {
                    if (old.isOverride() && old.getOrder() >= extension.order()) {
                        // 如果已加载覆盖扩展,再加载到原始扩展
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Extension of extensible {} with alias {}: {} has been loaded, ignore origin {}",
                                interfaceName, alias, old.getClazz(), implClass);
                        }
                    } else {
                        // 如果不能被覆盖,抛出已存在异常
                        throw new IllegalStateException(
                            "Error when load extension of extensible " + interfaceClass + " from file:" + url +
                                ", Duplicate class with same alias: " + alias + ", " + old.getClazz() + " and " + implClass);
                    }
                }
            } else {
                extensionClass = buildClass(extension, implClass, alias);
            }
            if (extensionClass != null) {
                // 检查是否有互斥的扩展点
                for (Map.Entry<String, ExtensionClass<T>> entry : all.entrySet()) {
                    ExtensionClass existed = entry.getValue();
                    if (extensionClass.getOrder() >= existed.getOrder()) {
                        // 新的优先级 >= 老的优先级,检查新的扩展是否排除老的扩展
                        String[] rejection = extensionClass.getRejection();
                        if (CommonUtils.isNotEmpty(rejection)) {
                            for (String rej : rejection) {
                                existed = all.get(rej);
                                if (existed == null || extensionClass.getOrder() < existed.getOrder()) {
                                    continue;
                                }
                                ExtensionClass removed = all.remove(rej);
                                if (removed != null) {
                                    if (LOGGER.isInfoEnabled()) {
                                        LOGGER.info(
                                            "Extension of extensible {} with alias {}: {} has been reject by new {}",
                                            interfaceName, removed.getAlias(), removed.getClazz(), implClass);
                                    }
                                }
                            }
                        }
                    } else {
                        String[] rejection = existed.getRejection();
                        if (CommonUtils.isNotEmpty(rejection)) {
                            for (String rej : rejection) {
                                if (rej.equals(extensionClass.getAlias())) {
                                    // 被其它扩展排掉
                                    if (LOGGER.isInfoEnabled()) {
                                        LOGGER.info(
                                            "Extension of extensible {} with alias {}: {} has been reject by old {}",
                                            interfaceName, alias, implClass, existed.getClazz());
                                        return;
                                    }
                                }
                            }
                        }
                    }
                }
    
                loadSuccess(alias, extensionClass);
            }
        }
    

    加载完文件后我们再回到

    ConsumerBootstrap sofa =  ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");
    

    进入到getExtension方法中

        public ExtensionClass<T> getExtensionClass(String alias) {
            return all == null ? null : all.get(alias);
        }
    
        public T getExtension(String alias) {
            //从all属性中拿到加载的class
            ExtensionClass<T> extensionClass = getExtensionClass(alias);
            if (extensionClass == null) {
                throw new SofaRpcRuntimeException("Not found extension of " + interfaceName + " named: "" + alias + ""!");
            } else {
                //在加载class的时候,校验了是否是单例,如果是单例,那么factory不为null
                if (extensible.singleton() && factory != null) {
                    T t = factory.get(alias);
                    if (t == null) {
                        synchronized (this) {
                            t = factory.get(alias);
                            if (t == null) {
                                //实例化
                                t = extensionClass.getExtInstance();
                                //放入到factory,单例的class下次直接拿就好了,不需要重新创建
                                factory.put(alias, t);
                            }
                        }
                    }
                    return t;
                } else {
                    //实例化
                    return extensionClass.getExtInstance();
                }
            }
        }
    

    我们进入到ExtensionClass看看getExtInstance方法

        /**
         * 服务端实例对象(只在是单例的时候保留)
         * 用volatile修饰,保证了可见性
         */
        private volatile transient T       instance;
        
        /**
         * 得到服务端实例对象,如果是单例则返回单例对象,如果不是则返回新创建的实例对象
         *
         * @param argTypes 构造函数参数类型
         * @param args     构造函数参数值
         * @return 扩展点对象实例 ext instance
         */
        public T getExtInstance(Class[] argTypes, Object[] args) {
            if (clazz != null) {
                try {
                    if (singleton) { // 如果是单例
                        if (instance == null) {
                            synchronized (this) {
                                if (instance == null) {
                                    //通过反射创建实例
                                    instance = ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
                                }
                            }
                        }
                        return instance; // 保留单例
                    } else {
                        //通过反射创建实例
                        return ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
                    }
                } catch (Exception e) {
                    throw new SofaRpcRuntimeException("create " + clazz.getCanonicalName() + " instance error", e);
                }
            }
            throw new SofaRpcRuntimeException("Class of ExtensionClass is null");
        }
    

    看完了SOFARPC的扩展类实现后感觉代码写的非常的整洁,逻辑非常的清晰,里面有很多可以学习的地方,比如线程安全用到了双重检查锁和volatile保证可见性。

  • 相关阅读:
    HDU 1010 Tempter of the Bone(DFS剪枝)
    HDU 1013 Digital Roots(九余数定理)
    HDU 2680 Choose the best route(反向建图最短路)
    HDU 1596 find the safest road(最短路)
    HDU 2072 单词数
    HDU 3790 最短路径问题 (dijkstra)
    HDU 1018 Big Number
    HDU 1042 N!
    NYOJ 117 求逆序数 (树状数组)
    20.QT文本文件读写
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/11225066.html
Copyright © 2011-2022 走看看