代码和示例的部分来自官方文档,本文只是经过润色
概述
SPI (Service Provider Interface)是JDK里面的扩展点发现机制。这个机制存在的动机是什么呢?服务发现。 往往我们的系统某个模块底层都有不同的实现,例如数据库的底层实现有 MySQL ,NoSQL 等,而 SPI 根据配置加载具体的实现,一个重要的作用就是解耦。
dubbo 中的 SPI
dubbo 中 SPI 改进的功能 :
- 按需进行初始化,类似于spring 的
@Condition
注解。 - 解决JDK的 SPI 中的异常抛出,连扩展点的名称也拿不到
- 增加 AOP 和 IOC
SPI 使用示例
java 中 SPI 使用示例
public interface Robot { void sayHello(); }
接下来定义两个实现类,分别为 OptimusPrime 和 Bumblebee。
public class OptimusPrime implements Robot { @Override public void sayHello() { System.out.println("Hello, I am Optimus Prime."); } } public class Bumblebee implements Robot { @Override public void sayHello() { System.out.println("Hello, I am Bumblebee."); } }
接下来 META-INF/services 文件夹下创建一个文件,名称为 Robot 的全限定名 org.apache.spi.Robot。文件内容为实现类的全限定的类名,如下:
org.apache.spi.OptimusPrime org.apache.spi.Bumblebee
做好所需的准备工作,接下来编写代码进行测试。
public class JavaSPITest { @Test public void sayHello() throws Exception { ServiceLoader<Robot> serviceLoader = ServiceLoader.load(Robot.class); System.out.println("Java SPI"); serviceLoader.forEach(Robot::sayHello); } }
我们这里可以看到SPI只要定义指定某个类具体类,最终某个具体类就会被加载进来,于是Dubbo 利用这个特性,在加载的过程中进行了扩展。后文会介绍如何扩展的
Dubbo 中 SPI 使用示例
Dubbo 并未使用 Java SPI,而是重新实现了一套功能更强的 SPI 机制。Dubbo SPI 的相关逻辑被封装在了 ExtensionLoader 类中,通过 ExtensionLoader,我们可以加载指定的实现类。Dubbo SPI 所需的配置文件需放置在 META-INF/dubbo 路径下,配置内容如下。
optimusPrime = org.apache.spi.OptimusPrime bumblebee = org.apache.spi.Bumblebee
与 Java SPI 实现类配置不同,Dubbo SPI 是通过键值对的方式进行配置,这样我们可以按需加载指定的实现类。另外,在测试 Dubbo SPI 时,需要在 Robot 接口上标注 @SPI 注解。下面来演示 Dubbo SPI 的用法:
public class DubboSPITest { @Test public void sayHello() throws Exception { ExtensionLoader<Robot> extensionLoader = ExtensionLoader.getExtensionLoader(Robot.class); Robot optimusPrime = extensionLoader.getExtension("optimusPrime"); optimusPrime.sayHello(); Robot bumblebee = extensionLoader.getExtension("bumblebee"); bumblebee.sayHello(); } }
Dubbo SPI 机制源码分析
开始之前我们理清一下SPI 使用思路 :
- SPI 注解
- 指定具体实现类
- 生成类
涉及到 Dubbo SPI 一个重要的类就是 ExtensionLoader ,那么我们知道了这个类我们如何入手分析呢?在 dubbo 的测试案例中我们看到 ExtensionLoader 常见的用法如下 :
final private Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
可以看到通过这个静态方法就是获取得到 protocol 的具体对象。
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>(); private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>(); //这个type 在 ExtensionLoader 构造方法中赋值,它指的是加载什么类型的 Extension,在 ServiceConfig有调用 : // private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); private final Class<?> type; //保存多个 Extension 的对象 private final ExtensionFactory objectFactory; private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<>(); private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>(); // cache Activate class which is annotated with <code>Activate</code> private final Map<String, Object> cachedActivates = new ConcurrentHashMap<>(); // 实体 private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>(); // cache Activate class which is annotated with <code>Activate</code> private final Holder<Object> cachedAdaptiveInstance = new Holder<>(); private volatile Class<?> cachedAdaptiveClass = null; private String cachedDefaultName; //可以看到上面的字段都是为了缓存 ExtensionLoader 和对应的实例 //返回一个 ExtensionLoader 对象 public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { if (type == null) { throw new IllegalArgumentException("Extension type == null"); } if (!type.isInterface()) { throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!"); } if (!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type (" + type + ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!"); } ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } return loader; } public T getAdaptiveExtension() { Object instance = cachedAdaptiveInstance.get(); if (instance == null) { if (createAdaptiveInstanceError != null) { throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError); } synchronized (cachedAdaptiveInstance) { instance = cachedAdaptiveInstance.get(); if (instance == null) { try { //缓存不存在,创建一个 AdaptiveExtension,并放进缓存 instance = createAdaptiveExtension(); cachedAdaptiveInstance.set(instance); } catch (Throwable t) { createAdaptiveInstanceError = t; throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t); } } } } return (T) instance; } private T createAdaptiveExtension() { try { //分两步 1.injectExtension 方法调用 2.getAdaptiveExtensionClass方法调用 return injectExtension((T) getAdaptiveExtensionClass().newInstance()); } catch (Exception e) { throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e); } } private Class<?> getAdaptiveExtensionClass() { //先是根据策略加载了extension getExtensionClasses(); if (cachedAdaptiveClass != null) { return cachedAdaptiveClass; } //// 创建自适应拓展类 //假如没有存在 adaptiveClass 类的,什么情况没有? //比如该方法可以获取 Protocol 接口的 DubboProtocol、HttpProtocol、InjvmProtocol 等实现类。 // 在获取实现类的过程中,如果某个实现类被 Adaptive 注解修饰了,那么该类就会被赋值给 cachedAdaptiveClass 变量。 // 此时,上面步骤中的第二步条件成立(缓存不为空),直接返回 cachedAdaptiveClass 即可。 // 如果所有的实现类均未被 Adaptive 注解修饰,那么执行第三步逻辑,创建自适应拓展类。 return cachedAdaptiveClass = createAdaptiveExtensionClass(); } //getExtensionClasses 这个方法用于获取某个接口的所有实现类。比如该方法可以获取 Protocol 接口的 DubboProtocol、HttpProtocol、InjvmProtocol 等实现类。在获取实现类的过程中,如果某个实现类被 Adaptive 注解修饰了,那么该类就会被赋值给 cachedAdaptiveClass 变量。 private Map<String, Class<?>> getExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes; } /** * synchronized in getExtensionClasses * 该方法根据策略加载了 extension 类,并将 adaptive 类 active类都放进了缓存里 * */ private Map<String, Class<?>> loadExtensionClasses() { /* * 先是缓存带有 SPI 注释的类 * 例如 : * @SPI("dubbo") * public interface Protocol { * ... * } * */ cacheDefaultExtensionName(); Map<String, Class<?>> extensionClasses = new HashMap<>(); for (LoadingStrategy strategy : strategies) { loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.excludedPackages()); loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.excludedPackages()); } return extensionClasses; } private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type, boolean extensionLoaderClassLoaderFirst, String... excludedPackages) { String fileName = dir + type; try { .... if (urls != null) { while (urls.hasMoreElements()) { java.net.URL resourceURL = urls.nextElement(); //核心逻辑 loadResource(extensionClasses, classLoader, resourceURL, excludedPackages); } } } catch (Throwable t) { logger.error("Exception occurred when loading extension class (interface: " + type + ", description file: " + fileName + ").", t); } } .... //最终调用到了这里 private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException { if (!type.isAssignableFrom(clazz)) { throw new IllegalStateException("Error occurred when loading extension class (interface: " + type + ", class line: " + clazz.getName() + "), class " + clazz.getName() + " is not subtype of interface."); } if (clazz.isAnnotationPresent(Adaptive.class)) { cacheAdaptiveClass(clazz); } else if (isWrapperClass(clazz)) { cacheWrapperClass(clazz); } else { //既没有注释 Adaptive 也不是包装类,仅仅是创建一个对象,然后进行保存而已 clazz.getConstructor(); if (StringUtils.isEmpty(name)) { name = findAnnotationName(clazz); if (name.length() == 0) { throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL); } } String[] names = NAME_SEPARATOR.split(name); if (ArrayUtils.isNotEmpty(names)) { cacheActivateClass(clazz, names[0]); for (String n : names) { cacheName(clazz, n); saveInExtensionClass(extensionClasses, clazz, n); } } } }
到了这里方法还没有结束哦, injectExtension 的方法如下 (从方法名就可以知道是依赖注入):
private T injectExtension(T instance) { if (objectFactory == null) { return instance; } try { for (Method method : instance.getClass().getMethods()) { //检查是否是 set 方法 if (!isSetter(method)) { continue; } /** * Check {@link DisableInject} to see if we need auto injection for this property */ if (method.getAnnotation(DisableInject.class) != null) { continue; } Class<?> pt = method.getParameterTypes()[0]; if (ReflectUtils.isPrimitives(pt)) { continue; } try { //依赖注入 String property = getSetterProperty(method); Object object = objectFactory.getExtension(pt, property); if (object != null) { method.invoke(instance, object); } } catch (Exception e) { logger.error("Failed to inject via method " + method.getName() + " of interface " + type.getName() + ": " + e.getMessage(), e); } } } catch (Exception e) { logger.error(e.getMessage(), e); } return instance; }
ok,到此为止我们走完了Extension的 getAdaptiveExtension 的调用过程,而我们常常还会看看 getExtension 方法的调用。
看一下 getExtension 方法
public T getExtension(String name) { if (StringUtils.isEmpty(name)) { throw new IllegalArgumentException("Extension name == null"); } if ("true".equals(name)) { //获取默认的拓展实现类 return getDefaultExtension(); } final Holder<Object> holder = getOrCreateHolder(name); //单例模式 Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { // instance = createExtension(name); holder.set(instance); } } } return (T) instance; } /** * 第三步和第四步是为了日后 Dubbo 的 AOP 和 Ioc 是使用 * @param name * @return */ @SuppressWarnings("unchecked") private T createExtension(String name) { //No.1 从配置文件中加载, 例如 name 传进来的是 spi Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { //No.2 反射创建示例 EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } //No.3 向示例注入依赖 injectExtension(instance); Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (CollectionUtils.isNotEmpty(wrapperClasses)) { //No.4 循环创建 wrapper 实例 for (Class<?> wrapperClass : wrapperClasses) { // 这个地方需要注意!! // 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例。 // 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量 // 所以返回的 instance 一定是一层嵌套一层的 instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } initExtension(instance); return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance (name: " + name + ", class: " + type + ") couldn't be instantiated: " + t.getMessage(), t); } }
已 DubboProtocol 为例,返回来是这样的
Dubbo 目前提供了两种 ExtensionFactory,分别是 SpiExtensionFactory 和 SpringExtensionFactory。前者用于创建自适应的拓展,后者是用于从 Spring 的 IOC 容器中获取所需的拓展。
总结
本文从源码分析了 dubbo 的 SPI 机制和生成类的过程。
参考资料
https://zhuanlan.zhihu.com/p/44523895 学习的博客 https://www.cnkirito.moe/dubbo-nacos-stability/
dubbo 的 example ,下载来看一下 https://github.com/apache/dubbo-samples/tree/master/java