zoukankan      html  css  js  c++  java
  • Flink 源码(十):阅读 Flink 源码前必会的知识(五)SPI 和 ClassLoader(二) SPI 技术

    来源:https://mp.weixin.qq.com/s/PtmlneRo6AG4Fyb8y-Bvrw

    三、SPI 技术

    1、什么是 SPI,为什么要有 SPI

    SPI 全称(Service Provide Interface),在 JAVA 中是一个比较重要的概念,在框架设计中被广泛使用。

    在框架设计中,要遵循的原则是对扩展开放,对修改关闭,保证框架实现对于使用者来说是黑盒。

    因为框架不可能做好所有的事情,只能把共性的部分抽离出来进行流程化,然后留下一些扩展点让使用者去实现,这样不同的扩展就不用修改源代码或者对框架进行定制。

    也就是我们经常说的面向接口编程。

    我理解的 SPI 用更通俗的话来讲,就是一种可插拔技术。

    最容易理解的就是 USB,定义好 USB 的接口规范,不同的外设厂家根据 USB 的标准去制造自己的外设,如鼠标,键盘等。

    另外一个例子就是 JDBC,Java 定义好了 JDBC 的规范,不同的数据库厂商去实现这个规范。Java 并不会管某一个数据库是如何实现 JDBC 的接口的。

    2、如何实现 SPI

    这里我在 Github 上有一个工程,Flink-Practice,是公众号文章附带的代码,有需要可以下载:

    Flink实战代码

    实现 SPI 的话,要遵循下面的一些规范:

    • 服务提供者提供了接口的具体实现后,需要在资源文件夹中创建 META-INF/services 文件夹,并且新建一个以全类名为名字的文本文件,文件内容为实现类的全名(如图中下面的红框);
    • 接口实现类必须在工程的 classpath 下,也就是 maven 中需要加入依赖或者 jar 包引用到工程里(如图中的 serviceimpl 包,我就放在了当前工程下了,执行的时候,会把类编译成 class 文件放到当前工程的 classpath 下的);
    • SPI 的实现类中,必须有一个不带参数的空构造方法

    执行测试类之后输出如下:

    可以看到,实现了提供方接口的类,都被执行了。

    3、SPI 源码浅析

    入口在 ServiceLoader.load 方法这里

        public static <S> ServiceLoader<S> load(Class<S> service) {
            // 获取当前线程的上下文类加载器。ContextClassLoader 是每个线程绑定的
            ClassLoader cl = Thread.currentThread().getContextClassLoader();
            return ServiceLoader.load(service, cl);
        }

      首先需要知道,Thread.currentThread().getContextClassLoader();  使用这个获取的类加载器是 AppClassLoader,因为我们的代码是在 main 函数执行的,而自定义的代码都是 AppClassLoader 加载的。

    可以看到最终这个 classloader 是被传到这个地方

    那么不传这个 loader 进来,就加载不到吗?答案是确实加载不到。

    因为 ServiceLoader 是在 rt.jar 包中,而 rt.jar 包是 BootstrapClassLoader 加载的。

    而实现了接口提供者的接口的类,一般是第三方类,是在 classpath 下的,BootStrapClassLoader 能加载到 classpath 下的类吗?不能, AppClassLoader 才会去加载 classpath 的类。

    所以,这里的上下文类加载器(ContextClassLoader ),它其实是破坏了双亲委派机制的,但是也为程序带来了巨大的灵活性和可扩展性。

    其实 ServiceLoader 核心的逻辑就在这两个方法里

           private boolean hasNextService() {
                if (nextName != null) {
                    return true;
                }
                if (configs == null) {
                    try {
                        // 寻找 META-INF/services/类
                        String fullName = PREFIX + service.getName();
                        if (loader == null)
                            configs = ClassLoader.getSystemResources(fullName);
                        else
                            configs = loader.getResources(fullName);
                    } catch (IOException x) {
                        fail(service, "Error locating configuration files", x);
                    }
                }
                while ((pending == null) || !pending.hasNext()) {
                    if (!configs.hasMoreElements()) {
                        return false;
                    }
                    // 解析这个类文件的所有内容
                    pending = parse(service, configs.nextElement());
                }
                nextName = pending.next();
                return true;
            }
    
            private S nextService() {
                if (!hasNextService())
                    throw new NoSuchElementException();
                String cn = nextName;
                nextName = null;
                Class<?> c = null;
                try {
                    // 加载这个类
                    c = Class.forName(cn, false, loader);
                } catch (ClassNotFoundException x) {
                    fail(service,
                         "Provider " + cn + " not found");
                }
                if (!service.isAssignableFrom(c)) {
                    fail(service,
                         "Provider " + cn  + " not a subtype");
                }
                try {
                    // 初始化这个类
                    S p = service.cast(c.newInstance());
                    providers.put(cn, p);
                    return p;
                } catch (Throwable x) {
                    fail(service,
                         "Provider " + cn + " could not be instantiated",
                         x);
                }
                throw new Error();          // This cannot happen
            }

    通过前面的分析,可以发现,JDK SPI 在查找扩展实现类的过程中,需要遍历 SPI 配置文件中定义的所有实现类,该过程中会将这些实现类全部实例化。

    如果 SPI 配置文件中定义了多个实现类,而我们只需要使用其中一个实现类时,就会生成不必要的对象。

    例如,在 Dubbo 中,org.apache.dubbo.rpc.Protocol 接口有 InjvmProtocol、DubboProtocol、RmiProtocol、HttpProtocol、HessianProtocol、ThriftProtocol 等多个实现,如果使用 JDK SPI,就会加载全部实现类,导致资源的浪费。

    Dubbo SPI 不仅解决了上述资源浪费的问题,还对 SPI 配置文件扩展和修改。

    首先,Dubbo 按照 SPI 配置文件的用途,将其分成了三类目录。

    META-INF/services/ 目录:该目录下的 SPI 配置文件用来兼容 JDK SPI 。

    META-INF/dubbo/ 目录:该目录用于存放用户自定义 SPI 配置文件。

    META-INF/dubbo/internal/ 目录:该目录用于存放 Dubbo 内部使用的 SPI 配置文件。

    然后,Dubbo 将 SPI 配置文件改成了 KV 格式,例如:

    dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

    其中 key 被称为扩展名(也就是 ExtensionName),当我们在为一个接口查找具体实现类时,可以指定扩展名来选择相应的扩展实现。

    例如,这里指定扩展名为 dubbo,Dubbo SPI 就知道我们要使用:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol 这个扩展实现类,只实例化这一个扩展实现即可,无须实例化 SPI 配置文件中的其他扩展实现类。

    四、Flink 源码中使用到 SPI 和 Classloader 的地方

    在 Flink 源码中,有很多这样的 SPI 扩展点

    在 flink-clients 模块中

    执行器工厂的接口,有本地执行器的实现和远程执行器工厂类的实现,这些都是通过 SPI 来实现的。

    另外,在 Flink Clients 入口类 CliFronted 中,也使用了经典的 ContextClassLoader 用法,使用反射的方式来执行用户程序中编写的 main 方法

     

     

     

  • 相关阅读:
    企业大数据-之机器数据
    实践和感悟
    企业级大数据处理方案03-数据流程
    scala数据库工具类
    企业生产环境集群稳定性-HA就行吗?
    企业级大数据处理方案-02.环境决定需求、性能决定选型
    Scala编程之访问修饰符
    spark-submit提交参数设置
    window.location
    [转载]Arguments
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14491809.html
Copyright © 2011-2022 走看看